xref: /spdk/lib/blob/blobstore.c (revision 38d75b56f4d47a651794bf6772620fbf8ad1bd03)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/assert.h"
45 #include "spdk_internal/log.h"
46 
47 #include "blobstore.h"
48 
49 #define BLOB_CRC32C_INITIAL    0xffffffffUL
50 
51 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
52 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
53 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
54 void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
55 		uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg);
56 
57 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
58 				uint16_t value_len, bool internal);
59 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
60 				      const void **value, size_t *value_len, bool internal);
61 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal);
62 
63 static void
64 _spdk_blob_verify_md_op(struct spdk_blob *blob)
65 {
66 	assert(blob != NULL);
67 	assert(spdk_get_thread() == blob->bs->md_thread);
68 	assert(blob->state != SPDK_BLOB_STATE_LOADING);
69 }
70 
71 static inline size_t
72 divide_round_up(size_t num, size_t divisor)
73 {
74 	return (num + divisor - 1) / divisor;
75 }
76 
77 static void
78 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
79 {
80 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
81 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
82 	assert(bs->num_free_clusters > 0);
83 
84 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
85 
86 	spdk_bit_array_set(bs->used_clusters, cluster_num);
87 	bs->num_free_clusters--;
88 }
89 
90 static int
91 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster)
92 {
93 	uint64_t *cluster_lba = &blob->active.clusters[cluster_num];
94 
95 	_spdk_blob_verify_md_op(blob);
96 
97 	if (*cluster_lba != 0) {
98 		return -EEXIST;
99 	}
100 
101 	*cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster);
102 	return 0;
103 }
104 
105 static int
106 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num,
107 			  uint64_t *lowest_free_cluster, bool update_map)
108 {
109 	pthread_mutex_lock(&blob->bs->used_clusters_mutex);
110 	*lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters,
111 			       *lowest_free_cluster);
112 	if (*lowest_free_cluster >= blob->bs->total_clusters) {
113 		/* No more free clusters. Cannot satisfy the request */
114 		pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
115 		return -ENOSPC;
116 	}
117 
118 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id);
119 	_spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster);
120 	pthread_mutex_unlock(&blob->bs->used_clusters_mutex);
121 
122 	if (update_map) {
123 		_spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster);
124 	}
125 
126 	return 0;
127 }
128 
129 static void
130 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
131 {
132 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
133 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
134 	assert(bs->num_free_clusters < bs->total_clusters);
135 
136 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
137 
138 	pthread_mutex_lock(&bs->used_clusters_mutex);
139 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
140 	bs->num_free_clusters++;
141 	pthread_mutex_unlock(&bs->used_clusters_mutex);
142 }
143 
144 static void
145 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs)
146 {
147 	xattrs->count = 0;
148 	xattrs->names = NULL;
149 	xattrs->ctx = NULL;
150 	xattrs->get_value = NULL;
151 }
152 
153 void
154 spdk_blob_opts_init(struct spdk_blob_opts *opts)
155 {
156 	opts->num_clusters = 0;
157 	opts->thin_provision = false;
158 	_spdk_blob_xattrs_init(&opts->xattrs);
159 }
160 
161 static struct spdk_blob *
162 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
163 {
164 	struct spdk_blob *blob;
165 
166 	blob = calloc(1, sizeof(*blob));
167 	if (!blob) {
168 		return NULL;
169 	}
170 
171 	blob->id = id;
172 	blob->bs = bs;
173 
174 	blob->state = SPDK_BLOB_STATE_DIRTY;
175 	blob->active.num_pages = 1;
176 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
177 	if (!blob->active.pages) {
178 		free(blob);
179 		return NULL;
180 	}
181 
182 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
183 
184 	TAILQ_INIT(&blob->xattrs);
185 	TAILQ_INIT(&blob->xattrs_internal);
186 
187 	return blob;
188 }
189 
190 static void
191 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs)
192 {
193 	struct spdk_xattr	*xattr, *xattr_tmp;
194 
195 	TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) {
196 		TAILQ_REMOVE(xattrs, xattr, link);
197 		free(xattr->name);
198 		free(xattr->value);
199 		free(xattr);
200 	}
201 }
202 
203 static void
204 _spdk_blob_free(struct spdk_blob *blob)
205 {
206 	assert(blob != NULL);
207 
208 	free(blob->active.clusters);
209 	free(blob->clean.clusters);
210 	free(blob->active.pages);
211 	free(blob->clean.pages);
212 
213 	_spdk_xattrs_free(&blob->xattrs);
214 	_spdk_xattrs_free(&blob->xattrs_internal);
215 
216 	if (blob->back_bs_dev) {
217 		blob->back_bs_dev->destroy(blob->back_bs_dev);
218 	}
219 
220 	free(blob);
221 }
222 
223 static int
224 _spdk_blob_mark_clean(struct spdk_blob *blob)
225 {
226 	uint64_t *clusters = NULL;
227 	uint32_t *pages = NULL;
228 
229 	assert(blob != NULL);
230 
231 	if (blob->active.num_clusters) {
232 		assert(blob->active.clusters);
233 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
234 		if (!clusters) {
235 			return -1;
236 		}
237 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
238 	}
239 
240 	if (blob->active.num_pages) {
241 		assert(blob->active.pages);
242 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
243 		if (!pages) {
244 			free(clusters);
245 			return -1;
246 		}
247 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
248 	}
249 
250 	free(blob->clean.clusters);
251 	free(blob->clean.pages);
252 
253 	blob->clean.num_clusters = blob->active.num_clusters;
254 	blob->clean.clusters = blob->active.clusters;
255 	blob->clean.num_pages = blob->active.num_pages;
256 	blob->clean.pages = blob->active.pages;
257 
258 	blob->active.clusters = clusters;
259 	blob->active.pages = pages;
260 
261 	/* If the metadata was dirtied again while the metadata was being written to disk,
262 	 *  we do not want to revert the DIRTY state back to CLEAN here.
263 	 */
264 	if (blob->state == SPDK_BLOB_STATE_LOADING) {
265 		blob->state = SPDK_BLOB_STATE_CLEAN;
266 	}
267 
268 	return 0;
269 }
270 
271 static int
272 _spdk_blob_deserialize_xattr(struct spdk_blob *blob,
273 			     struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal)
274 {
275 	struct spdk_xattr                       *xattr;
276 
277 	if (desc_xattr->length != sizeof(desc_xattr->name_length) +
278 	    sizeof(desc_xattr->value_length) +
279 	    desc_xattr->name_length + desc_xattr->value_length) {
280 		return -EINVAL;
281 	}
282 
283 	xattr = calloc(1, sizeof(*xattr));
284 	if (xattr == NULL) {
285 		return -ENOMEM;
286 	}
287 
288 	xattr->name = malloc(desc_xattr->name_length + 1);
289 	if (xattr->name == NULL) {
290 		free(xattr);
291 		return -ENOMEM;
292 	}
293 	memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
294 	xattr->name[desc_xattr->name_length] = '\0';
295 
296 	xattr->value = malloc(desc_xattr->value_length);
297 	if (xattr->value == NULL) {
298 		free(xattr->name);
299 		free(xattr);
300 		return -ENOMEM;
301 	}
302 	xattr->value_len = desc_xattr->value_length;
303 	memcpy(xattr->value,
304 	       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
305 	       desc_xattr->value_length);
306 
307 	TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link);
308 
309 	return 0;
310 }
311 
312 
313 static int
314 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob)
315 {
316 	struct spdk_blob_md_descriptor *desc;
317 	size_t	cur_desc = 0;
318 	void *tmp;
319 
320 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
321 	while (cur_desc < sizeof(page->descriptors)) {
322 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
323 			if (desc->length == 0) {
324 				/* If padding and length are 0, this terminates the page */
325 				break;
326 			}
327 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
328 			struct spdk_blob_md_descriptor_flags	*desc_flags;
329 
330 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
331 
332 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
333 				return -EINVAL;
334 			}
335 
336 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
337 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
338 				return -EINVAL;
339 			}
340 
341 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
342 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
343 				blob->data_ro = true;
344 				blob->md_ro = true;
345 			}
346 
347 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
348 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
349 				blob->md_ro = true;
350 			}
351 
352 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
353 				blob->data_ro = true;
354 				blob->md_ro = true;
355 			}
356 
357 			blob->invalid_flags = desc_flags->invalid_flags;
358 			blob->data_ro_flags = desc_flags->data_ro_flags;
359 			blob->md_ro_flags = desc_flags->md_ro_flags;
360 
361 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
362 			struct spdk_blob_md_descriptor_extent	*desc_extent;
363 			unsigned int				i, j;
364 			unsigned int				cluster_count = blob->active.num_clusters;
365 
366 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
367 
368 			if (desc_extent->length == 0 ||
369 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
370 				return -EINVAL;
371 			}
372 
373 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
374 				for (j = 0; j < desc_extent->extents[i].length; j++) {
375 					if (!spdk_bit_array_get(blob->bs->used_clusters,
376 								desc_extent->extents[i].cluster_idx + j)) {
377 						return -EINVAL;
378 					}
379 					cluster_count++;
380 				}
381 			}
382 
383 			if (cluster_count == 0) {
384 				return -EINVAL;
385 			}
386 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
387 			if (tmp == NULL) {
388 				return -ENOMEM;
389 			}
390 			blob->active.clusters = tmp;
391 			blob->active.cluster_array_size = cluster_count;
392 
393 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
394 				for (j = 0; j < desc_extent->extents[i].length; j++) {
395 					if (desc_extent->extents[i].cluster_idx != 0) {
396 						blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
397 								desc_extent->extents[i].cluster_idx + j);
398 					} else if (spdk_blob_is_thin_provisioned(blob)) {
399 						blob->active.clusters[blob->active.num_clusters++] = 0;
400 					} else {
401 						return -EINVAL;
402 					}
403 				}
404 			}
405 
406 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
407 			int rc;
408 
409 			rc = _spdk_blob_deserialize_xattr(blob,
410 							  (struct spdk_blob_md_descriptor_xattr *) desc, false);
411 			if (rc != 0) {
412 				return rc;
413 			}
414 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
415 			int rc;
416 
417 			rc = _spdk_blob_deserialize_xattr(blob,
418 							  (struct spdk_blob_md_descriptor_xattr *) desc, true);
419 			if (rc != 0) {
420 				return rc;
421 			}
422 		} else {
423 			/* Unrecognized descriptor type.  Do not fail - just continue to the
424 			 *  next descriptor.  If this descriptor is associated with some feature
425 			 *  defined in a newer version of blobstore, that version of blobstore
426 			 *  should create and set an associated feature flag to specify if this
427 			 *  blob can be loaded or not.
428 			 */
429 		}
430 
431 		/* Advance to the next descriptor */
432 		cur_desc += sizeof(*desc) + desc->length;
433 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
434 			break;
435 		}
436 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
437 	}
438 
439 	return 0;
440 }
441 
442 static int
443 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
444 		 struct spdk_blob *blob)
445 {
446 	const struct spdk_blob_md_page *page;
447 	uint32_t i;
448 	int rc;
449 
450 	assert(page_count > 0);
451 	assert(pages[0].sequence_num == 0);
452 	assert(blob != NULL);
453 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
454 	assert(blob->active.clusters == NULL);
455 
456 	/* The blobid provided doesn't match what's in the MD, this can
457 	 * happen for example if a bogus blobid is passed in through open.
458 	 */
459 	if (blob->id != pages[0].id) {
460 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
461 			    blob->id, pages[0].id);
462 		return -ENOENT;
463 	}
464 
465 	for (i = 0; i < page_count; i++) {
466 		page = &pages[i];
467 
468 		assert(page->id == blob->id);
469 		assert(page->sequence_num == i);
470 
471 		rc = _spdk_blob_parse_page(page, blob);
472 		if (rc != 0) {
473 			return rc;
474 		}
475 	}
476 
477 	return 0;
478 }
479 
480 static int
481 _spdk_blob_serialize_add_page(const struct spdk_blob *blob,
482 			      struct spdk_blob_md_page **pages,
483 			      uint32_t *page_count,
484 			      struct spdk_blob_md_page **last_page)
485 {
486 	struct spdk_blob_md_page *page;
487 
488 	assert(pages != NULL);
489 	assert(page_count != NULL);
490 
491 	if (*page_count == 0) {
492 		assert(*pages == NULL);
493 		*page_count = 1;
494 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
495 					 SPDK_BS_PAGE_SIZE,
496 					 NULL);
497 	} else {
498 		assert(*pages != NULL);
499 		(*page_count)++;
500 		*pages = spdk_dma_realloc(*pages,
501 					  SPDK_BS_PAGE_SIZE * (*page_count),
502 					  SPDK_BS_PAGE_SIZE,
503 					  NULL);
504 	}
505 
506 	if (*pages == NULL) {
507 		*page_count = 0;
508 		*last_page = NULL;
509 		return -ENOMEM;
510 	}
511 
512 	page = &(*pages)[*page_count - 1];
513 	memset(page, 0, sizeof(*page));
514 	page->id = blob->id;
515 	page->sequence_num = *page_count - 1;
516 	page->next = SPDK_INVALID_MD_PAGE;
517 	*last_page = page;
518 
519 	return 0;
520 }
521 
522 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
523  * Update required_sz on both success and failure.
524  *
525  */
526 static int
527 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
528 			   uint8_t *buf, size_t buf_sz,
529 			   size_t *required_sz, bool internal)
530 {
531 	struct spdk_blob_md_descriptor_xattr	*desc;
532 
533 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
534 		       strlen(xattr->name) +
535 		       xattr->value_len;
536 
537 	if (buf_sz < *required_sz) {
538 		return -1;
539 	}
540 
541 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
542 
543 	desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR;
544 	desc->length = sizeof(desc->name_length) +
545 		       sizeof(desc->value_length) +
546 		       strlen(xattr->name) +
547 		       xattr->value_len;
548 	desc->name_length = strlen(xattr->name);
549 	desc->value_length = xattr->value_len;
550 
551 	memcpy(desc->name, xattr->name, desc->name_length);
552 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
553 	       xattr->value,
554 	       desc->value_length);
555 
556 	return 0;
557 }
558 
559 static void
560 _spdk_blob_serialize_extent(const struct spdk_blob *blob,
561 			    uint64_t start_cluster, uint64_t *next_cluster,
562 			    uint8_t *buf, size_t buf_sz)
563 {
564 	struct spdk_blob_md_descriptor_extent *desc;
565 	size_t cur_sz;
566 	uint64_t i, extent_idx;
567 	uint32_t lba, lba_per_cluster, lba_count;
568 
569 	/* The buffer must have room for at least one extent */
570 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
571 	if (buf_sz < cur_sz) {
572 		*next_cluster = start_cluster;
573 		return;
574 	}
575 
576 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
577 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
578 
579 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
580 
581 	lba = blob->active.clusters[start_cluster];
582 	lba_count = lba_per_cluster;
583 	extent_idx = 0;
584 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
585 		if ((lba + lba_count) == blob->active.clusters[i]) {
586 			lba_count += lba_per_cluster;
587 			continue;
588 		}
589 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
590 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
591 		extent_idx++;
592 
593 		cur_sz += sizeof(desc->extents[extent_idx]);
594 
595 		if (buf_sz < cur_sz) {
596 			/* If we ran out of buffer space, return */
597 			desc->length = sizeof(desc->extents[0]) * extent_idx;
598 			*next_cluster = i;
599 			return;
600 		}
601 
602 		lba = blob->active.clusters[i];
603 		lba_count = lba_per_cluster;
604 	}
605 
606 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
607 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
608 	extent_idx++;
609 
610 	desc->length = sizeof(desc->extents[0]) * extent_idx;
611 	*next_cluster = blob->active.num_clusters;
612 
613 	return;
614 }
615 
616 static void
617 _spdk_blob_serialize_flags(const struct spdk_blob *blob,
618 			   uint8_t *buf, size_t *buf_sz)
619 {
620 	struct spdk_blob_md_descriptor_flags *desc;
621 
622 	/*
623 	 * Flags get serialized first, so we should always have room for the flags
624 	 *  descriptor.
625 	 */
626 	assert(*buf_sz >= sizeof(*desc));
627 
628 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
629 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
630 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
631 	desc->invalid_flags = blob->invalid_flags;
632 	desc->data_ro_flags = blob->data_ro_flags;
633 	desc->md_ro_flags = blob->md_ro_flags;
634 
635 	*buf_sz -= sizeof(*desc);
636 }
637 
638 static int
639 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob,
640 			    const struct spdk_xattr_tailq *xattrs, bool internal,
641 			    struct spdk_blob_md_page **pages,
642 			    struct spdk_blob_md_page *cur_page,
643 			    uint32_t *page_count, uint8_t **buf,
644 			    size_t *remaining_sz)
645 {
646 	const struct spdk_xattr	*xattr;
647 	int	rc;
648 
649 	TAILQ_FOREACH(xattr, xattrs, link) {
650 		size_t required_sz = 0;
651 
652 		rc = _spdk_blob_serialize_xattr(xattr,
653 						*buf, *remaining_sz,
654 						&required_sz, internal);
655 		if (rc < 0) {
656 			/* Need to add a new page to the chain */
657 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
658 							   &cur_page);
659 			if (rc < 0) {
660 				spdk_dma_free(*pages);
661 				*pages = NULL;
662 				*page_count = 0;
663 				return rc;
664 			}
665 
666 			*buf = (uint8_t *)cur_page->descriptors;
667 			*remaining_sz = sizeof(cur_page->descriptors);
668 
669 			/* Try again */
670 			required_sz = 0;
671 			rc = _spdk_blob_serialize_xattr(xattr,
672 							*buf, *remaining_sz,
673 							&required_sz, internal);
674 
675 			if (rc < 0) {
676 				spdk_dma_free(*pages);
677 				*pages = NULL;
678 				*page_count = 0;
679 				return -1;
680 			}
681 		}
682 
683 		*remaining_sz -= required_sz;
684 		*buf += required_sz;
685 	}
686 
687 	return 0;
688 }
689 
690 static int
691 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages,
692 		     uint32_t *page_count)
693 {
694 	struct spdk_blob_md_page		*cur_page;
695 	int					rc;
696 	uint8_t					*buf;
697 	size_t					remaining_sz;
698 	uint64_t				last_cluster;
699 
700 	assert(pages != NULL);
701 	assert(page_count != NULL);
702 	assert(blob != NULL);
703 	assert(blob->state == SPDK_BLOB_STATE_DIRTY);
704 
705 	*pages = NULL;
706 	*page_count = 0;
707 
708 	/* A blob always has at least 1 page, even if it has no descriptors */
709 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
710 	if (rc < 0) {
711 		return rc;
712 	}
713 
714 	buf = (uint8_t *)cur_page->descriptors;
715 	remaining_sz = sizeof(cur_page->descriptors);
716 
717 	/* Serialize flags */
718 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
719 	buf += sizeof(struct spdk_blob_md_descriptor_flags);
720 
721 	/* Serialize xattrs */
722 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false,
723 					 pages, cur_page, page_count, &buf, &remaining_sz);
724 	if (rc < 0) {
725 		return rc;
726 	}
727 
728 	/* Serialize internal xattrs */
729 	rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true,
730 					 pages, cur_page, page_count, &buf, &remaining_sz);
731 	if (rc < 0) {
732 		return rc;
733 	}
734 
735 	/* Serialize extents */
736 	last_cluster = 0;
737 	while (last_cluster < blob->active.num_clusters) {
738 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
739 					    buf, remaining_sz);
740 
741 		if (last_cluster == blob->active.num_clusters) {
742 			break;
743 		}
744 
745 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
746 						   &cur_page);
747 		if (rc < 0) {
748 			return rc;
749 		}
750 
751 		buf = (uint8_t *)cur_page->descriptors;
752 		remaining_sz = sizeof(cur_page->descriptors);
753 	}
754 
755 	return 0;
756 }
757 
758 struct spdk_blob_load_ctx {
759 	struct spdk_blob		*blob;
760 
761 	struct spdk_blob_md_page	*pages;
762 	uint32_t			num_pages;
763 	spdk_bs_sequence_t	        *seq;
764 
765 	spdk_bs_sequence_cpl		cb_fn;
766 	void				*cb_arg;
767 };
768 
769 static uint32_t
770 _spdk_blob_md_page_calc_crc(void *page)
771 {
772 	uint32_t		crc;
773 
774 	crc = BLOB_CRC32C_INITIAL;
775 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
776 	crc ^= BLOB_CRC32C_INITIAL;
777 
778 	return crc;
779 
780 }
781 
782 static void
783 _spdk_blob_load_final(void *cb_arg, int bserrno)
784 {
785 	struct spdk_blob_load_ctx	*ctx = cb_arg;
786 	struct spdk_blob		*blob = ctx->blob;
787 
788 	_spdk_blob_mark_clean(blob);
789 
790 	ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno);
791 
792 	/* Free the memory */
793 	spdk_dma_free(ctx->pages);
794 	free(ctx);
795 }
796 
797 static void
798 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno)
799 {
800 	struct spdk_blob_load_ctx	*ctx = cb_arg;
801 	struct spdk_blob		*blob = ctx->blob;
802 
803 	if (bserrno != 0) {
804 		goto error;
805 	}
806 
807 	blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot);
808 
809 	if (blob->back_bs_dev == NULL) {
810 		bserrno = -ENOMEM;
811 		goto error;
812 	}
813 
814 	_spdk_blob_load_final(ctx, bserrno);
815 	return;
816 
817 error:
818 	SPDK_ERRLOG("Snapshot fail\n");
819 	_spdk_blob_free(blob);
820 	ctx->cb_fn(ctx->seq, NULL, bserrno);
821 	spdk_dma_free(ctx->pages);
822 	free(ctx);
823 }
824 
825 static void
826 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
827 {
828 	struct spdk_blob_load_ctx	*ctx = cb_arg;
829 	struct spdk_blob		*blob = ctx->blob;
830 	struct spdk_blob_md_page	*page;
831 	const void			*value;
832 	size_t				len;
833 	int				rc;
834 	uint32_t			crc;
835 
836 	page = &ctx->pages[ctx->num_pages - 1];
837 	crc = _spdk_blob_md_page_calc_crc(page);
838 	if (crc != page->crc) {
839 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
840 		_spdk_blob_free(blob);
841 		ctx->cb_fn(seq, NULL, -EINVAL);
842 		spdk_dma_free(ctx->pages);
843 		free(ctx);
844 		return;
845 	}
846 
847 	if (page->next != SPDK_INVALID_MD_PAGE) {
848 		uint32_t next_page = page->next;
849 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
850 
851 
852 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
853 
854 		/* Read the next page */
855 		ctx->num_pages++;
856 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
857 					      sizeof(*page), NULL);
858 		if (ctx->pages == NULL) {
859 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
860 			free(ctx);
861 			return;
862 		}
863 
864 		spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1],
865 					  next_lba,
866 					  _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
867 					  _spdk_blob_load_cpl, ctx);
868 		return;
869 	}
870 
871 	/* Parse the pages */
872 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
873 	if (rc) {
874 		_spdk_blob_free(blob);
875 		ctx->cb_fn(seq, NULL, rc);
876 		spdk_dma_free(ctx->pages);
877 		free(ctx);
878 		return;
879 	}
880 	ctx->seq = seq;
881 
882 
883 	if (spdk_blob_is_thin_provisioned(blob)) {
884 		rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true);
885 		if (rc == 0) {
886 			if (len != sizeof(spdk_blob_id)) {
887 				_spdk_blob_free(blob);
888 				ctx->cb_fn(seq, NULL, -EINVAL);
889 				spdk_dma_free(ctx->pages);
890 				free(ctx);
891 				return;
892 			}
893 			/* open snapshot blob and continue in the callback function */
894 			spdk_bs_open_blob(blob->bs, *(spdk_blob_id *)value,
895 					  _spdk_blob_load_snapshot_cpl, ctx);
896 			return;
897 		} else {
898 			/* add zeroes_dev for thin provisioned blob */
899 			blob->back_bs_dev = spdk_bs_create_zeroes_dev();
900 		}
901 	} else {
902 		/* standard blob */
903 		blob->back_bs_dev = NULL;
904 	}
905 	_spdk_blob_load_final(ctx, bserrno);
906 }
907 
908 /* Load a blob from disk given a blobid */
909 static void
910 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
911 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
912 {
913 	struct spdk_blob_load_ctx *ctx;
914 	struct spdk_blob_store *bs;
915 	uint32_t page_num;
916 	uint64_t lba;
917 
918 	_spdk_blob_verify_md_op(blob);
919 
920 	bs = blob->bs;
921 
922 	ctx = calloc(1, sizeof(*ctx));
923 	if (!ctx) {
924 		cb_fn(seq, cb_arg, -ENOMEM);
925 		return;
926 	}
927 
928 	ctx->blob = blob;
929 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
930 				      SPDK_BS_PAGE_SIZE, NULL);
931 	if (!ctx->pages) {
932 		free(ctx);
933 		cb_fn(seq, cb_arg, -ENOMEM);
934 		return;
935 	}
936 	ctx->num_pages = 1;
937 	ctx->cb_fn = cb_fn;
938 	ctx->cb_arg = cb_arg;
939 
940 	page_num = _spdk_bs_blobid_to_page(blob->id);
941 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
942 
943 	blob->state = SPDK_BLOB_STATE_LOADING;
944 
945 	spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba,
946 				  _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
947 				  _spdk_blob_load_cpl, ctx);
948 }
949 
950 struct spdk_blob_persist_ctx {
951 	struct spdk_blob		*blob;
952 
953 	struct spdk_blob_md_page	*pages;
954 
955 	uint64_t			idx;
956 
957 	spdk_bs_sequence_t		*seq;
958 	spdk_bs_sequence_cpl		cb_fn;
959 	void				*cb_arg;
960 };
961 
962 static void
963 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
964 {
965 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
966 	struct spdk_blob		*blob = ctx->blob;
967 
968 	if (bserrno == 0) {
969 		_spdk_blob_mark_clean(blob);
970 	}
971 
972 	/* Call user callback */
973 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
974 
975 	/* Free the memory */
976 	spdk_dma_free(ctx->pages);
977 	free(ctx);
978 }
979 
980 static void
981 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
982 {
983 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
984 	struct spdk_blob		*blob = ctx->blob;
985 	struct spdk_blob_store		*bs = blob->bs;
986 	void				*tmp;
987 	size_t				i;
988 
989 	/* Release all clusters that were truncated */
990 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
991 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
992 
993 		/* Nothing to release if it was not allocated */
994 		if (blob->active.clusters[i] != 0) {
995 			_spdk_bs_release_cluster(bs, cluster_num);
996 		}
997 	}
998 
999 	if (blob->active.num_clusters == 0) {
1000 		free(blob->active.clusters);
1001 		blob->active.clusters = NULL;
1002 		blob->active.cluster_array_size = 0;
1003 	} else {
1004 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
1005 		assert(tmp != NULL);
1006 		blob->active.clusters = tmp;
1007 		blob->active.cluster_array_size = blob->active.num_clusters;
1008 	}
1009 
1010 	_spdk_blob_persist_complete(seq, ctx, bserrno);
1011 }
1012 
1013 static void
1014 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1015 {
1016 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1017 	struct spdk_blob		*blob = ctx->blob;
1018 	struct spdk_blob_store		*bs = blob->bs;
1019 	spdk_bs_batch_t			*batch;
1020 	size_t				i;
1021 	uint64_t			lba;
1022 	uint32_t			lba_count;
1023 
1024 	/* Clusters don't move around in blobs. The list shrinks or grows
1025 	 * at the end, but no changes ever occur in the middle of the list.
1026 	 */
1027 
1028 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
1029 
1030 	/* Unmap all clusters that were truncated */
1031 	lba = 0;
1032 	lba_count = 0;
1033 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
1034 		uint64_t next_lba = blob->active.clusters[i];
1035 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
1036 
1037 		if (next_lba > 0 && (lba + lba_count) == next_lba) {
1038 			/* This cluster is contiguous with the previous one. */
1039 			lba_count += next_lba_count;
1040 			continue;
1041 		}
1042 
1043 		/* This cluster is not contiguous with the previous one. */
1044 
1045 		/* If a run of LBAs previously existing, send them
1046 		 * as an unmap.
1047 		 */
1048 		if (lba_count > 0) {
1049 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1050 		}
1051 
1052 		/* Start building the next batch */
1053 		lba = next_lba;
1054 		if (next_lba > 0) {
1055 			lba_count = next_lba_count;
1056 		} else {
1057 			lba_count = 0;
1058 		}
1059 	}
1060 
1061 	/* If we ended with a contiguous set of LBAs, send the unmap now */
1062 	if (lba_count > 0) {
1063 		spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1064 	}
1065 
1066 	spdk_bs_batch_close(batch);
1067 }
1068 
1069 static void
1070 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1071 {
1072 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1073 	struct spdk_blob		*blob = ctx->blob;
1074 	struct spdk_blob_store		*bs = blob->bs;
1075 	size_t				i;
1076 
1077 	/* This loop starts at 1 because the first page is special and handled
1078 	 * below. The pages (except the first) are never written in place,
1079 	 * so any pages in the clean list must be zeroed.
1080 	 */
1081 	for (i = 1; i < blob->clean.num_pages; i++) {
1082 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
1083 	}
1084 
1085 	if (blob->active.num_pages == 0) {
1086 		uint32_t page_num;
1087 
1088 		page_num = _spdk_bs_blobid_to_page(blob->id);
1089 		spdk_bit_array_clear(bs->used_md_pages, page_num);
1090 	}
1091 
1092 	/* Move on to unmapping clusters */
1093 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
1094 }
1095 
1096 static void
1097 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1098 {
1099 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1100 	struct spdk_blob		*blob = ctx->blob;
1101 	struct spdk_blob_store		*bs = blob->bs;
1102 	uint64_t			lba;
1103 	uint32_t			lba_count;
1104 	spdk_bs_batch_t			*batch;
1105 	size_t				i;
1106 
1107 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
1108 
1109 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
1110 
1111 	/* This loop starts at 1 because the first page is special and handled
1112 	 * below. The pages (except the first) are never written in place,
1113 	 * so any pages in the clean list must be zeroed.
1114 	 */
1115 	for (i = 1; i < blob->clean.num_pages; i++) {
1116 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
1117 
1118 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1119 	}
1120 
1121 	/* The first page will only be zeroed if this is a delete. */
1122 	if (blob->active.num_pages == 0) {
1123 		uint32_t page_num;
1124 
1125 		/* The first page in the metadata goes where the blobid indicates */
1126 		page_num = _spdk_bs_blobid_to_page(blob->id);
1127 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
1128 
1129 		spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1130 	}
1131 
1132 	spdk_bs_batch_close(batch);
1133 }
1134 
1135 static void
1136 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1137 {
1138 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1139 	struct spdk_blob		*blob = ctx->blob;
1140 	struct spdk_blob_store		*bs = blob->bs;
1141 	uint64_t			lba;
1142 	uint32_t			lba_count;
1143 	struct spdk_blob_md_page	*page;
1144 
1145 	if (blob->active.num_pages == 0) {
1146 		/* Move on to the next step */
1147 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1148 		return;
1149 	}
1150 
1151 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1152 
1153 	page = &ctx->pages[0];
1154 	/* The first page in the metadata goes where the blobid indicates */
1155 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
1156 
1157 	spdk_bs_sequence_write_dev(seq, page, lba, lba_count,
1158 				   _spdk_blob_persist_zero_pages, ctx);
1159 }
1160 
1161 static void
1162 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1163 {
1164 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
1165 	struct spdk_blob		*blob = ctx->blob;
1166 	struct spdk_blob_store		*bs = blob->bs;
1167 	uint64_t			lba;
1168 	uint32_t			lba_count;
1169 	struct spdk_blob_md_page	*page;
1170 	spdk_bs_batch_t			*batch;
1171 	size_t				i;
1172 
1173 	/* Clusters don't move around in blobs. The list shrinks or grows
1174 	 * at the end, but no changes ever occur in the middle of the list.
1175 	 */
1176 
1177 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
1178 
1179 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
1180 
1181 	/* This starts at 1. The root page is not written until
1182 	 * all of the others are finished
1183 	 */
1184 	for (i = 1; i < blob->active.num_pages; i++) {
1185 		page = &ctx->pages[i];
1186 		assert(page->sequence_num == i);
1187 
1188 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
1189 
1190 		spdk_bs_batch_write_dev(batch, page, lba, lba_count);
1191 	}
1192 
1193 	spdk_bs_batch_close(batch);
1194 }
1195 
1196 static int
1197 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz)
1198 {
1199 	uint64_t	i;
1200 	uint64_t	*tmp;
1201 	uint64_t	lfc; /* lowest free cluster */
1202 	uint64_t	num_clusters;
1203 	struct spdk_blob_store *bs;
1204 
1205 	bs = blob->bs;
1206 
1207 	_spdk_blob_verify_md_op(blob);
1208 
1209 	if (blob->active.num_clusters == sz) {
1210 		return 0;
1211 	}
1212 
1213 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1214 		/* If this blob was resized to be larger, then smaller, then
1215 		 * larger without syncing, then the cluster array already
1216 		 * contains spare assigned clusters we can use.
1217 		 */
1218 		num_clusters = spdk_min(blob->active.cluster_array_size,
1219 					sz);
1220 	} else {
1221 		num_clusters = blob->active.num_clusters;
1222 	}
1223 
1224 	/* Do two passes - one to verify that we can obtain enough clusters
1225 	 * and another to actually claim them.
1226 	 */
1227 
1228 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1229 		lfc = 0;
1230 		for (i = num_clusters; i < sz; i++) {
1231 			lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1232 			if (lfc >= bs->total_clusters) {
1233 				/* No more free clusters. Cannot satisfy the request */
1234 				return -ENOSPC;
1235 			}
1236 			lfc++;
1237 		}
1238 	}
1239 
1240 	if (sz > num_clusters) {
1241 		/* Expand the cluster array if necessary.
1242 		 * We only shrink the array when persisting.
1243 		 */
1244 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1245 		if (sz > 0 && tmp == NULL) {
1246 			return -ENOMEM;
1247 		}
1248 		memset(tmp + blob->active.cluster_array_size, 0,
1249 		       sizeof(uint64_t) * (sz - blob->active.cluster_array_size));
1250 		blob->active.clusters = tmp;
1251 		blob->active.cluster_array_size = sz;
1252 	}
1253 
1254 	blob->state = SPDK_BLOB_STATE_DIRTY;
1255 
1256 	if (spdk_blob_is_thin_provisioned(blob) == false) {
1257 		lfc = 0;
1258 		for (i = num_clusters; i < sz; i++) {
1259 			_spdk_bs_allocate_cluster(blob, i, &lfc, true);
1260 			lfc++;
1261 		}
1262 	}
1263 
1264 	blob->active.num_clusters = sz;
1265 
1266 	return 0;
1267 }
1268 
1269 static void
1270 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx)
1271 {
1272 	spdk_bs_sequence_t *seq = ctx->seq;
1273 	struct spdk_blob *blob = ctx->blob;
1274 	struct spdk_blob_store *bs = blob->bs;
1275 	uint64_t i;
1276 	uint32_t page_num;
1277 	int rc;
1278 
1279 	if (blob->active.num_pages == 0) {
1280 		/* This is the signal that the blob should be deleted.
1281 		 * Immediately jump to the clean up routine. */
1282 		assert(blob->clean.num_pages > 0);
1283 		ctx->idx = blob->clean.num_pages - 1;
1284 		blob->state = SPDK_BLOB_STATE_CLEAN;
1285 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1286 		return;
1287 
1288 	}
1289 
1290 	/* Generate the new metadata */
1291 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1292 	if (rc < 0) {
1293 		_spdk_blob_persist_complete(seq, ctx, rc);
1294 		return;
1295 	}
1296 
1297 	assert(blob->active.num_pages >= 1);
1298 
1299 	/* Resize the cache of page indices */
1300 	blob->active.pages = realloc(blob->active.pages,
1301 				     blob->active.num_pages * sizeof(*blob->active.pages));
1302 	if (!blob->active.pages) {
1303 		_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1304 		return;
1305 	}
1306 
1307 	/* Assign this metadata to pages. This requires two passes -
1308 	 * one to verify that there are enough pages and a second
1309 	 * to actually claim them. */
1310 	page_num = 0;
1311 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1312 	for (i = 1; i < blob->active.num_pages; i++) {
1313 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1314 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1315 			_spdk_blob_persist_complete(seq, ctx, -ENOMEM);
1316 			return;
1317 		}
1318 		page_num++;
1319 	}
1320 
1321 	page_num = 0;
1322 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1323 	for (i = 1; i < blob->active.num_pages; i++) {
1324 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1325 		ctx->pages[i - 1].next = page_num;
1326 		/* Now that previous metadata page is complete, calculate the crc for it. */
1327 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1328 		blob->active.pages[i] = page_num;
1329 		spdk_bit_array_set(bs->used_md_pages, page_num);
1330 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1331 		page_num++;
1332 	}
1333 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1334 	/* Start writing the metadata from last page to first */
1335 	ctx->idx = blob->active.num_pages - 1;
1336 	blob->state = SPDK_BLOB_STATE_CLEAN;
1337 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1338 }
1339 
1340 /* Write a blob to disk */
1341 static void
1342 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob,
1343 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1344 {
1345 	struct spdk_blob_persist_ctx *ctx;
1346 
1347 	_spdk_blob_verify_md_op(blob);
1348 
1349 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1350 		cb_fn(seq, cb_arg, 0);
1351 		return;
1352 	}
1353 
1354 	ctx = calloc(1, sizeof(*ctx));
1355 	if (!ctx) {
1356 		cb_fn(seq, cb_arg, -ENOMEM);
1357 		return;
1358 	}
1359 	ctx->blob = blob;
1360 	ctx->seq = seq;
1361 	ctx->cb_fn = cb_fn;
1362 	ctx->cb_arg = cb_arg;
1363 
1364 	_spdk_blob_persist_start(ctx);
1365 }
1366 
1367 struct spdk_blob_copy_cluster_ctx {
1368 	struct spdk_blob *blob;
1369 	uint8_t *buf;
1370 	uint64_t page;
1371 	uint64_t new_cluster;
1372 	spdk_bs_sequence_t *seq;
1373 };
1374 
1375 static void
1376 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno)
1377 {
1378 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1379 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq;
1380 	TAILQ_HEAD(, spdk_bs_request_set) requests;
1381 	spdk_bs_user_op_t *op;
1382 
1383 	TAILQ_INIT(&requests);
1384 	TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link);
1385 
1386 	while (!TAILQ_EMPTY(&requests)) {
1387 		op = TAILQ_FIRST(&requests);
1388 		TAILQ_REMOVE(&requests, op, link);
1389 		if (bserrno == 0) {
1390 			spdk_bs_user_op_execute(op);
1391 		} else {
1392 			spdk_bs_user_op_abort(op);
1393 		}
1394 	}
1395 
1396 	spdk_dma_free(ctx->buf);
1397 	free(ctx);
1398 }
1399 
1400 static void
1401 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno)
1402 {
1403 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1404 
1405 	if (bserrno) {
1406 		uint32_t cluster_number;
1407 
1408 		if (bserrno == -EEXIST) {
1409 			/* The metadata insert failed because another thread
1410 			 * allocated the cluster first. Free our cluster
1411 			 * but continue without error. */
1412 			bserrno = 0;
1413 		}
1414 
1415 		cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1416 		_spdk_bs_release_cluster(ctx->blob->bs, cluster_number);
1417 	}
1418 
1419 	spdk_bs_sequence_finish(ctx->seq, bserrno);
1420 }
1421 
1422 static void
1423 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1424 {
1425 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1426 	uint32_t cluster_number;
1427 
1428 	if (bserrno) {
1429 		/* The write failed, so jump to the final completion handler */
1430 		spdk_bs_sequence_finish(seq, bserrno);
1431 		return;
1432 	}
1433 
1434 	cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page);
1435 
1436 	_spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster,
1437 					       _spdk_blob_insert_cluster_cpl, ctx);
1438 }
1439 
1440 static void
1441 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1442 {
1443 	struct spdk_blob_copy_cluster_ctx *ctx = cb_arg;
1444 
1445 	if (bserrno != 0) {
1446 		/* The read failed, so jump to the final completion handler */
1447 		spdk_bs_sequence_finish(seq, bserrno);
1448 		return;
1449 	}
1450 
1451 	/* Write whole cluster */
1452 	spdk_bs_sequence_write_dev(seq, ctx->buf,
1453 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster),
1454 				   _spdk_bs_cluster_to_lba(ctx->blob->bs, 1),
1455 				   _spdk_blob_write_copy_cpl, ctx);
1456 }
1457 
1458 static void
1459 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob,
1460 				   struct spdk_io_channel *_ch,
1461 				   uint64_t offset, spdk_bs_user_op_t *op)
1462 {
1463 	struct spdk_bs_cpl cpl;
1464 	struct spdk_bs_channel *ch;
1465 	struct spdk_blob_copy_cluster_ctx *ctx;
1466 	uint32_t cluster_start_page;
1467 	uint32_t cluster_number;
1468 	int rc;
1469 
1470 	ch = spdk_io_channel_get_ctx(_ch);
1471 
1472 	if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) {
1473 		/* There are already operations pending. Queue this user op
1474 		 * and return because it will be re-executed when the outstanding
1475 		 * cluster allocation completes. */
1476 		TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1477 		return;
1478 	}
1479 
1480 	/* Round the page offset down to the first page in the cluster */
1481 	cluster_start_page = _spdk_bs_page_to_cluster_start(blob, offset);
1482 
1483 	/* Calculate which index in the metadata cluster array the corresponding
1484 	 * cluster is supposed to be at. */
1485 	cluster_number = _spdk_bs_page_to_cluster(blob->bs, cluster_start_page);
1486 
1487 	ctx = calloc(1, sizeof(*ctx));
1488 	if (!ctx) {
1489 		spdk_bs_user_op_abort(op);
1490 		return;
1491 	}
1492 
1493 	assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0);
1494 
1495 	ctx->blob = blob;
1496 	ctx->page = cluster_start_page;
1497 
1498 	ctx->buf = spdk_dma_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, NULL);
1499 	if (!ctx->buf) {
1500 		SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n",
1501 			    blob->bs->cluster_sz);
1502 		free(ctx);
1503 		spdk_bs_user_op_abort(op);
1504 		return;
1505 	}
1506 
1507 	rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false);
1508 	if (rc != 0) {
1509 		spdk_dma_free(ctx->buf);
1510 		free(ctx);
1511 		spdk_bs_user_op_abort(op);
1512 		return;
1513 	}
1514 
1515 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1516 	cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl;
1517 	cpl.u.blob_basic.cb_arg = ctx;
1518 
1519 	ctx->seq = spdk_bs_sequence_start(_ch, &cpl);
1520 	if (!ctx->seq) {
1521 		_spdk_bs_release_cluster(blob->bs, ctx->new_cluster);
1522 		spdk_dma_free(ctx->buf);
1523 		free(ctx);
1524 		spdk_bs_user_op_abort(op);
1525 		return;
1526 	}
1527 
1528 	/* Queue the user op to block other incoming operations */
1529 	TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link);
1530 
1531 	/* Read cluster from backing device */
1532 	spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf,
1533 				     _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page),
1534 				     _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz),
1535 				     _spdk_blob_write_copy, ctx);
1536 }
1537 
1538 static void
1539 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t page, uint64_t length,
1540 				       uint64_t *lba,	uint32_t *lba_count)
1541 {
1542 	*lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1543 
1544 	if (!_spdk_bs_page_is_allocated(blob, page)) {
1545 		assert(blob->back_bs_dev != NULL);
1546 		*lba = _spdk_bs_dev_page_to_lba(blob->back_bs_dev, page);
1547 		*lba_count = _spdk_bs_blob_lba_to_back_dev_lba(blob, *lba_count);
1548 	} else {
1549 		*lba = _spdk_bs_blob_page_to_lba(blob, page);
1550 	}
1551 }
1552 
1553 struct op_split_ctx {
1554 	struct spdk_blob *blob;
1555 	struct spdk_io_channel *channel;
1556 	uint64_t page_offset;
1557 	uint64_t pages_remaining;
1558 	void *curr_payload;
1559 	enum spdk_blob_op_type op_type;
1560 	spdk_bs_sequence_t *seq;
1561 };
1562 
1563 static void
1564 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno)
1565 {
1566 	struct op_split_ctx	*ctx = cb_arg;
1567 	struct spdk_blob	*blob = ctx->blob;
1568 	struct spdk_io_channel	*ch = ctx->channel;
1569 	enum spdk_blob_op_type	op_type = ctx->op_type;
1570 	uint8_t			*buf = ctx->curr_payload;
1571 	uint64_t		offset = ctx->page_offset;
1572 	uint64_t		length = ctx->pages_remaining;
1573 	uint64_t		op_length;
1574 
1575 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1576 		spdk_bs_sequence_finish(ctx->seq, bserrno);
1577 		free(ctx);
1578 		return;
1579 	}
1580 
1581 	op_length = spdk_min(length, _spdk_bs_num_pages_to_cluster_boundary(blob, offset));
1582 
1583 	/* Update length and payload for next operation */
1584 	ctx->pages_remaining -= op_length;
1585 	ctx->page_offset += op_length;
1586 	if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1587 		ctx->curr_payload += op_length;
1588 	}
1589 
1590 	switch (op_type) {
1591 	case SPDK_BLOB_READ:
1592 		spdk_blob_io_read(blob, ch, buf, offset, op_length,
1593 				  _spdk_blob_request_submit_op_split_next, ctx);
1594 		break;
1595 	case SPDK_BLOB_WRITE:
1596 		spdk_blob_io_write(blob, ch, buf, offset, op_length,
1597 				   _spdk_blob_request_submit_op_split_next, ctx);
1598 		break;
1599 	case SPDK_BLOB_UNMAP:
1600 		spdk_blob_io_unmap(blob, ch, offset, op_length,
1601 				   _spdk_blob_request_submit_op_split_next, ctx);
1602 		break;
1603 	case SPDK_BLOB_WRITE_ZEROES:
1604 		spdk_blob_io_write_zeroes(blob, ch, offset, op_length,
1605 					  _spdk_blob_request_submit_op_split_next, ctx);
1606 		break;
1607 	case SPDK_BLOB_READV:
1608 	case SPDK_BLOB_WRITEV:
1609 		SPDK_ERRLOG("readv/write not valid for %s\n", __func__);
1610 		spdk_bs_sequence_finish(ctx->seq, -EINVAL);
1611 		free(ctx);
1612 		break;
1613 	}
1614 }
1615 
1616 static void
1617 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob,
1618 				   void *payload, uint64_t offset, uint64_t length,
1619 				   spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1620 {
1621 	struct op_split_ctx *ctx;
1622 	spdk_bs_sequence_t *seq;
1623 	struct spdk_bs_cpl cpl;
1624 
1625 	assert(blob != NULL);
1626 
1627 	ctx = calloc(1, sizeof(struct op_split_ctx));
1628 	if (ctx == NULL) {
1629 		cb_fn(cb_arg, -ENOMEM);
1630 		return;
1631 	}
1632 
1633 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1634 	cpl.u.blob_basic.cb_fn = cb_fn;
1635 	cpl.u.blob_basic.cb_arg = cb_arg;
1636 
1637 	seq = spdk_bs_sequence_start(ch, &cpl);
1638 	if (!seq) {
1639 		free(ctx);
1640 		cb_fn(cb_arg, -ENOMEM);
1641 		return;
1642 	}
1643 
1644 	ctx->blob = blob;
1645 	ctx->channel = ch;
1646 	ctx->curr_payload = payload;
1647 	ctx->page_offset = offset;
1648 	ctx->pages_remaining = length;
1649 	ctx->op_type = op_type;
1650 	ctx->seq = seq;
1651 
1652 	_spdk_blob_request_submit_op_split_next(ctx, 0);
1653 }
1654 
1655 static void
1656 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob,
1657 				    void *payload, uint64_t offset, uint64_t length,
1658 				    spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1659 {
1660 	struct spdk_bs_cpl cpl;
1661 	uint64_t lba;
1662 	uint32_t lba_count;
1663 
1664 	assert(blob != NULL);
1665 
1666 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1667 	cpl.u.blob_basic.cb_fn = cb_fn;
1668 	cpl.u.blob_basic.cb_arg = cb_arg;
1669 
1670 	_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1671 
1672 	switch (op_type) {
1673 	case SPDK_BLOB_READ: {
1674 		spdk_bs_batch_t *batch;
1675 
1676 		batch = spdk_bs_batch_open(_ch, &cpl);
1677 		if (!batch) {
1678 			cb_fn(cb_arg, -ENOMEM);
1679 			return;
1680 		}
1681 
1682 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1683 			/* Read from the blob */
1684 			spdk_bs_batch_read_dev(batch, payload, lba, lba_count);
1685 		} else {
1686 			/* Read from the backing block device */
1687 			spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count);
1688 		}
1689 
1690 		spdk_bs_batch_close(batch);
1691 		break;
1692 	}
1693 	case SPDK_BLOB_WRITE:
1694 	case SPDK_BLOB_WRITE_ZEROES: {
1695 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1696 			/* Write to the blob */
1697 			spdk_bs_batch_t *batch;
1698 
1699 			batch = spdk_bs_batch_open(_ch, &cpl);
1700 			if (!batch) {
1701 				cb_fn(cb_arg, -ENOMEM);
1702 				return;
1703 			}
1704 
1705 			if (op_type == SPDK_BLOB_WRITE) {
1706 				spdk_bs_batch_write_dev(batch, payload, lba, lba_count);
1707 			} else {
1708 				spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count);
1709 			}
1710 
1711 			spdk_bs_batch_close(batch);
1712 		} else {
1713 			/* Queue this operation and allocate the cluster */
1714 			spdk_bs_user_op_t *op;
1715 
1716 			op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length);
1717 			if (!op) {
1718 				cb_fn(cb_arg, -ENOMEM);
1719 				return;
1720 			}
1721 
1722 			_spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op);
1723 		}
1724 		break;
1725 	}
1726 	case SPDK_BLOB_UNMAP: {
1727 		spdk_bs_batch_t *batch;
1728 
1729 		batch = spdk_bs_batch_open(_ch, &cpl);
1730 		if (!batch) {
1731 			cb_fn(cb_arg, -ENOMEM);
1732 			return;
1733 		}
1734 
1735 		if (_spdk_bs_page_is_allocated(blob, offset)) {
1736 			spdk_bs_batch_unmap_dev(batch, lba, lba_count);
1737 		}
1738 
1739 		spdk_bs_batch_close(batch);
1740 		break;
1741 	}
1742 	case SPDK_BLOB_READV:
1743 	case SPDK_BLOB_WRITEV:
1744 		SPDK_ERRLOG("readv/write not valid\n");
1745 		cb_fn(cb_arg, -EINVAL);
1746 		break;
1747 	}
1748 }
1749 
1750 static void
1751 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1752 			     void *payload, uint64_t offset, uint64_t length,
1753 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1754 {
1755 	assert(blob != NULL);
1756 
1757 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1758 		cb_fn(cb_arg, -EPERM);
1759 		return;
1760 	}
1761 
1762 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1763 		cb_fn(cb_arg, -EINVAL);
1764 		return;
1765 	}
1766 
1767 	if (length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset)) {
1768 		_spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length,
1769 						    cb_fn, cb_arg, op_type);
1770 	} else {
1771 		_spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length,
1772 						   cb_fn, cb_arg, op_type);
1773 	}
1774 }
1775 
1776 struct rw_iov_ctx {
1777 	struct spdk_blob *blob;
1778 	struct spdk_io_channel *channel;
1779 	spdk_blob_op_complete cb_fn;
1780 	void *cb_arg;
1781 	bool read;
1782 	int iovcnt;
1783 	struct iovec *orig_iov;
1784 	uint64_t page_offset;
1785 	uint64_t pages_remaining;
1786 	uint64_t pages_done;
1787 	struct iovec iov[0];
1788 };
1789 
1790 static void
1791 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1792 {
1793 	assert(cb_arg == NULL);
1794 	spdk_bs_sequence_finish(seq, bserrno);
1795 }
1796 
1797 static void
1798 _spdk_rw_iov_split_next(void *cb_arg, int bserrno)
1799 {
1800 	struct rw_iov_ctx *ctx = cb_arg;
1801 	struct spdk_blob *blob = ctx->blob;
1802 	struct iovec *iov, *orig_iov;
1803 	int iovcnt;
1804 	size_t orig_iovoff;
1805 	uint64_t page_count, pages_to_boundary, page_offset;
1806 	uint64_t byte_count;
1807 
1808 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1809 		ctx->cb_fn(ctx->cb_arg, bserrno);
1810 		free(ctx);
1811 		return;
1812 	}
1813 
1814 	page_offset = ctx->page_offset;
1815 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(blob, page_offset);
1816 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1817 
1818 	/*
1819 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1820 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1821 	 *  point to the current position in the I/O sequence.
1822 	 */
1823 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1824 	orig_iov = &ctx->orig_iov[0];
1825 	orig_iovoff = 0;
1826 	while (byte_count > 0) {
1827 		if (byte_count >= orig_iov->iov_len) {
1828 			byte_count -= orig_iov->iov_len;
1829 			orig_iov++;
1830 		} else {
1831 			orig_iovoff = byte_count;
1832 			byte_count = 0;
1833 		}
1834 	}
1835 
1836 	/*
1837 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1838 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1839 	 */
1840 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1841 	iov = &ctx->iov[0];
1842 	iovcnt = 0;
1843 	while (byte_count > 0) {
1844 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1845 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1846 		byte_count -= iov->iov_len;
1847 		orig_iovoff = 0;
1848 		orig_iov++;
1849 		iov++;
1850 		iovcnt++;
1851 	}
1852 
1853 	ctx->page_offset += page_count;
1854 	ctx->pages_done += page_count;
1855 	ctx->pages_remaining -= page_count;
1856 	iov = &ctx->iov[0];
1857 
1858 	if (ctx->read) {
1859 		spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1860 				   page_count, _spdk_rw_iov_split_next, ctx);
1861 	} else {
1862 		spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, page_offset,
1863 				    page_count, _spdk_rw_iov_split_next, ctx);
1864 	}
1865 }
1866 
1867 static void
1868 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel,
1869 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1870 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1871 {
1872 	struct spdk_bs_cpl	cpl;
1873 
1874 	assert(blob != NULL);
1875 
1876 	if (!read && blob->data_ro) {
1877 		cb_fn(cb_arg, -EPERM);
1878 		return;
1879 	}
1880 
1881 	if (length == 0) {
1882 		cb_fn(cb_arg, 0);
1883 		return;
1884 	}
1885 
1886 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1887 		cb_fn(cb_arg, -EINVAL);
1888 		return;
1889 	}
1890 
1891 	/*
1892 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1893 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1894 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1895 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1896 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1897 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1898 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1899 	 *
1900 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1901 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1902 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1903 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1904 	 */
1905 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1906 		uint32_t lba_count;
1907 		uint64_t lba;
1908 
1909 		_spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count);
1910 
1911 		cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1912 		cpl.u.blob_basic.cb_fn = cb_fn;
1913 		cpl.u.blob_basic.cb_arg = cb_arg;
1914 
1915 		if (read) {
1916 			spdk_bs_sequence_t *seq;
1917 
1918 			seq = spdk_bs_sequence_start(_channel, &cpl);
1919 			if (!seq) {
1920 				cb_fn(cb_arg, -ENOMEM);
1921 				return;
1922 			}
1923 
1924 			if (_spdk_bs_page_is_allocated(blob, offset)) {
1925 				spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1926 			} else {
1927 				spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count,
1928 							      _spdk_rw_iov_done, NULL);
1929 			}
1930 		} else {
1931 			if (_spdk_bs_page_is_allocated(blob, offset)) {
1932 				spdk_bs_sequence_t *seq;
1933 
1934 				seq = spdk_bs_sequence_start(_channel, &cpl);
1935 				if (!seq) {
1936 					cb_fn(cb_arg, -ENOMEM);
1937 					return;
1938 				}
1939 
1940 				spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1941 			} else {
1942 				/* Queue this operation and allocate the cluster */
1943 				spdk_bs_user_op_t *op;
1944 
1945 				op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, length);
1946 				if (!op) {
1947 					cb_fn(cb_arg, -ENOMEM);
1948 					return;
1949 				}
1950 
1951 				_spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op);
1952 			}
1953 		}
1954 	} else {
1955 		struct rw_iov_ctx *ctx;
1956 
1957 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1958 		if (ctx == NULL) {
1959 			cb_fn(cb_arg, -ENOMEM);
1960 			return;
1961 		}
1962 
1963 		ctx->blob = blob;
1964 		ctx->channel = _channel;
1965 		ctx->cb_fn = cb_fn;
1966 		ctx->cb_arg = cb_arg;
1967 		ctx->read = read;
1968 		ctx->orig_iov = iov;
1969 		ctx->iovcnt = iovcnt;
1970 		ctx->page_offset = offset;
1971 		ctx->pages_remaining = length;
1972 		ctx->pages_done = 0;
1973 
1974 		_spdk_rw_iov_split_next(ctx, 0);
1975 	}
1976 }
1977 
1978 static struct spdk_blob *
1979 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1980 {
1981 	struct spdk_blob *blob;
1982 
1983 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1984 		if (blob->id == blobid) {
1985 			return blob;
1986 		}
1987 	}
1988 
1989 	return NULL;
1990 }
1991 
1992 static int
1993 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1994 {
1995 	struct spdk_blob_store		*bs = io_device;
1996 	struct spdk_bs_channel		*channel = ctx_buf;
1997 	struct spdk_bs_dev		*dev;
1998 	uint32_t			max_ops = bs->max_channel_ops;
1999 	uint32_t			i;
2000 
2001 	dev = bs->dev;
2002 
2003 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
2004 	if (!channel->req_mem) {
2005 		return -1;
2006 	}
2007 
2008 	TAILQ_INIT(&channel->reqs);
2009 
2010 	for (i = 0; i < max_ops; i++) {
2011 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
2012 	}
2013 
2014 	channel->bs = bs;
2015 	channel->dev = dev;
2016 	channel->dev_channel = dev->create_channel(dev);
2017 
2018 	if (!channel->dev_channel) {
2019 		SPDK_ERRLOG("Failed to create device channel.\n");
2020 		free(channel->req_mem);
2021 		return -1;
2022 	}
2023 
2024 	TAILQ_INIT(&channel->need_cluster_alloc);
2025 
2026 	return 0;
2027 }
2028 
2029 static void
2030 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
2031 {
2032 	struct spdk_bs_channel *channel = ctx_buf;
2033 	spdk_bs_user_op_t *op;
2034 
2035 	while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) {
2036 		op = TAILQ_FIRST(&channel->need_cluster_alloc);
2037 		TAILQ_REMOVE(&channel->need_cluster_alloc, op, link);
2038 		spdk_bs_user_op_abort(op);
2039 	}
2040 
2041 	free(channel->req_mem);
2042 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
2043 }
2044 
2045 static void
2046 _spdk_bs_dev_destroy(void *io_device)
2047 {
2048 	struct spdk_blob_store *bs = io_device;
2049 	struct spdk_blob	*blob, *blob_tmp;
2050 
2051 	bs->dev->destroy(bs->dev);
2052 
2053 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
2054 		TAILQ_REMOVE(&bs->blobs, blob, link);
2055 		_spdk_blob_free(blob);
2056 	}
2057 
2058 	pthread_mutex_destroy(&bs->used_clusters_mutex);
2059 
2060 	spdk_bit_array_free(&bs->used_blobids);
2061 	spdk_bit_array_free(&bs->used_md_pages);
2062 	spdk_bit_array_free(&bs->used_clusters);
2063 	/*
2064 	 * If this function is called for any reason except a successful unload,
2065 	 * the unload_cpl type will be NONE and this will be a nop.
2066 	 */
2067 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
2068 
2069 	free(bs);
2070 }
2071 
2072 static void
2073 _spdk_bs_free(struct spdk_blob_store *bs)
2074 {
2075 	spdk_bs_unregister_md_thread(bs);
2076 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
2077 }
2078 
2079 void
2080 spdk_bs_opts_init(struct spdk_bs_opts *opts)
2081 {
2082 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
2083 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
2084 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
2085 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
2086 	memset(&opts->bstype, 0, sizeof(opts->bstype));
2087 	opts->iter_cb_fn = NULL;
2088 	opts->iter_cb_arg = NULL;
2089 }
2090 
2091 static int
2092 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
2093 {
2094 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
2095 	    opts->max_channel_ops == 0) {
2096 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
2097 		return -1;
2098 	}
2099 
2100 	return 0;
2101 }
2102 
2103 static struct spdk_blob_store *
2104 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
2105 {
2106 	struct spdk_blob_store	*bs;
2107 	uint64_t dev_size;
2108 	int rc;
2109 
2110 	dev_size = dev->blocklen * dev->blockcnt;
2111 	if (dev_size < opts->cluster_sz) {
2112 		/* Device size cannot be smaller than cluster size of blobstore */
2113 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n",
2114 			    dev_size, opts->cluster_sz);
2115 		return NULL;
2116 	}
2117 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
2118 		/* Cluster size cannot be smaller than page size */
2119 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n",
2120 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
2121 		return NULL;
2122 	}
2123 	bs = calloc(1, sizeof(struct spdk_blob_store));
2124 	if (!bs) {
2125 		return NULL;
2126 	}
2127 
2128 	TAILQ_INIT(&bs->blobs);
2129 	bs->dev = dev;
2130 	bs->md_thread = spdk_get_thread();
2131 	assert(bs->md_thread != NULL);
2132 
2133 	/*
2134 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
2135 	 *  even multiple of the cluster size.
2136 	 */
2137 	bs->cluster_sz = opts->cluster_sz;
2138 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
2139 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2140 	bs->num_free_clusters = bs->total_clusters;
2141 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
2142 	if (bs->used_clusters == NULL) {
2143 		free(bs);
2144 		return NULL;
2145 	}
2146 
2147 	bs->max_channel_ops = opts->max_channel_ops;
2148 	bs->super_blob = SPDK_BLOBID_INVALID;
2149 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
2150 
2151 	/* The metadata is assumed to be at least 1 page */
2152 	bs->used_md_pages = spdk_bit_array_create(1);
2153 	bs->used_blobids = spdk_bit_array_create(0);
2154 
2155 	pthread_mutex_init(&bs->used_clusters_mutex, NULL);
2156 
2157 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
2158 				sizeof(struct spdk_bs_channel));
2159 	rc = spdk_bs_register_md_thread(bs);
2160 	if (rc == -1) {
2161 		spdk_io_device_unregister(bs, NULL);
2162 		pthread_mutex_destroy(&bs->used_clusters_mutex);
2163 		spdk_bit_array_free(&bs->used_blobids);
2164 		spdk_bit_array_free(&bs->used_md_pages);
2165 		spdk_bit_array_free(&bs->used_clusters);
2166 		free(bs);
2167 		return NULL;
2168 	}
2169 
2170 	return bs;
2171 }
2172 
2173 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
2174 
2175 struct spdk_bs_load_ctx {
2176 	struct spdk_blob_store		*bs;
2177 	struct spdk_bs_super_block	*super;
2178 
2179 	struct spdk_bs_md_mask		*mask;
2180 	bool				in_page_chain;
2181 	uint32_t			page_index;
2182 	uint32_t			cur_page;
2183 	struct spdk_blob_md_page	*page;
2184 	bool				is_load;
2185 
2186 	spdk_bs_sequence_t			*seq;
2187 	spdk_blob_op_with_handle_complete	iter_cb_fn;
2188 	void					*iter_cb_arg;
2189 };
2190 
2191 static void
2192 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2193 {
2194 	assert(bserrno != 0);
2195 
2196 	spdk_dma_free(ctx->super);
2197 	spdk_bs_sequence_finish(seq, bserrno);
2198 	/*
2199 	 * Only free the blobstore when a load fails.  If an unload fails (for some reason)
2200 	 *  we want to keep the blobstore in case the caller wants to try again.
2201 	 */
2202 	if (ctx->is_load) {
2203 		_spdk_bs_free(ctx->bs);
2204 	}
2205 	free(ctx);
2206 }
2207 
2208 static void
2209 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
2210 {
2211 	uint32_t i = 0;
2212 
2213 	while (true) {
2214 		i = spdk_bit_array_find_first_set(array, i);
2215 		if (i >= mask->length) {
2216 			break;
2217 		}
2218 		mask->mask[i / 8] |= 1U << (i % 8);
2219 		i++;
2220 	}
2221 }
2222 
2223 static void
2224 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
2225 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
2226 {
2227 	/* Update the values in the super block */
2228 	super->super_blob = bs->super_blob;
2229 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
2230 	super->crc = _spdk_blob_md_page_calc_crc(super);
2231 	spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0),
2232 				   _spdk_bs_byte_to_lba(bs, sizeof(*super)),
2233 				   cb_fn, cb_arg);
2234 }
2235 
2236 static void
2237 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2238 {
2239 	struct spdk_bs_load_ctx	*ctx = arg;
2240 	uint64_t	mask_size, lba, lba_count;
2241 
2242 	/* Write out the used clusters mask */
2243 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2244 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2245 	if (!ctx->mask) {
2246 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2247 		return;
2248 	}
2249 
2250 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
2251 	ctx->mask->length = ctx->bs->total_clusters;
2252 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
2253 
2254 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
2255 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2256 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2257 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2258 }
2259 
2260 static void
2261 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2262 {
2263 	struct spdk_bs_load_ctx	*ctx = arg;
2264 	uint64_t	mask_size, lba, lba_count;
2265 
2266 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2267 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2268 	if (!ctx->mask) {
2269 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2270 		return;
2271 	}
2272 
2273 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
2274 	ctx->mask->length = ctx->super->md_len;
2275 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
2276 
2277 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
2278 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2279 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2280 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2281 }
2282 
2283 static void
2284 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
2285 {
2286 	struct spdk_bs_load_ctx	*ctx = arg;
2287 	uint64_t	mask_size, lba, lba_count;
2288 
2289 	if (ctx->super->used_blobid_mask_len == 0) {
2290 		/*
2291 		 * This is a pre-v3 on-disk format where the blobid mask does not get
2292 		 *  written to disk.
2293 		 */
2294 		cb_fn(seq, arg, 0);
2295 		return;
2296 	}
2297 
2298 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2299 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2300 	if (!ctx->mask) {
2301 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2302 		return;
2303 	}
2304 
2305 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
2306 	ctx->mask->length = ctx->super->md_len;
2307 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
2308 
2309 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
2310 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2311 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2312 	spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg);
2313 }
2314 
2315 static void _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx,
2316 				   int bserrno);
2317 
2318 static void
2319 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno)
2320 {
2321 	struct spdk_bs_load_ctx *ctx = arg;
2322 
2323 	if (bserrno == 0) {
2324 		ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0);
2325 		spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx);
2326 		return;
2327 	}
2328 
2329 	if (bserrno == -ENOENT) {
2330 		bserrno = 0;
2331 	} else {
2332 		/*
2333 		 * This case needs to be looked at further.  Same problem
2334 		 *  exists with applications that rely on explicit blob
2335 		 *  iteration.  We should just skip the blob that failed
2336 		 *  to load and coontinue on to the next one.
2337 		 */
2338 		SPDK_ERRLOG("Error in iterating blobs\n");
2339 	}
2340 
2341 	ctx->iter_cb_fn = NULL;
2342 	_spdk_bs_load_complete(ctx->seq, ctx, bserrno);
2343 }
2344 
2345 static void
2346 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
2347 {
2348 	if (ctx->iter_cb_fn) {
2349 		ctx->seq = seq;
2350 		spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx);
2351 		return;
2352 	}
2353 
2354 	spdk_dma_free(ctx->super);
2355 	spdk_dma_free(ctx->mask);
2356 	free(ctx);
2357 	spdk_bs_sequence_finish(seq, bserrno);
2358 }
2359 
2360 static void
2361 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2362 {
2363 	struct spdk_bs_load_ctx *ctx = cb_arg;
2364 	uint32_t i, j;
2365 	int rc;
2366 
2367 	/* The type must be correct */
2368 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
2369 
2370 	/* The length of the mask (in bits) must not be greater than
2371 	 * the length of the buffer (converted to bits) */
2372 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
2373 
2374 	/* The length of the mask must be exactly equal to the size
2375 	 * (in pages) of the metadata region */
2376 	assert(ctx->mask->length == ctx->super->md_len);
2377 
2378 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length);
2379 	if (rc < 0) {
2380 		spdk_dma_free(ctx->mask);
2381 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2382 		return;
2383 	}
2384 
2385 	for (i = 0; i < ctx->mask->length / 8; i++) {
2386 		uint8_t segment = ctx->mask->mask[i];
2387 		for (j = 0; segment; j++) {
2388 			if (segment & 1U) {
2389 				spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j);
2390 			}
2391 			segment >>= 1U;
2392 		}
2393 	}
2394 
2395 	_spdk_bs_load_complete(seq, ctx, bserrno);
2396 }
2397 
2398 static void
2399 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2400 {
2401 	struct spdk_bs_load_ctx *ctx = cb_arg;
2402 	uint64_t		lba, lba_count, mask_size;
2403 	uint32_t		i, j;
2404 	int			rc;
2405 
2406 	/* The type must be correct */
2407 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
2408 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2409 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
2410 					     struct spdk_blob_md_page) * 8));
2411 	/* The length of the mask must be exactly equal to the total number of clusters */
2412 	assert(ctx->mask->length == ctx->bs->total_clusters);
2413 
2414 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2415 	if (rc < 0) {
2416 		spdk_dma_free(ctx->mask);
2417 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2418 		return;
2419 	}
2420 
2421 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2422 	for (i = 0; i < ctx->mask->length / 8; i++) {
2423 		uint8_t segment = ctx->mask->mask[i];
2424 		for (j = 0; segment && (j < 8); j++) {
2425 			if (segment & 1U) {
2426 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
2427 				assert(ctx->bs->num_free_clusters > 0);
2428 				ctx->bs->num_free_clusters--;
2429 			}
2430 			segment >>= 1U;
2431 		}
2432 	}
2433 
2434 	spdk_dma_free(ctx->mask);
2435 
2436 	/* Read the used blobids mask */
2437 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
2438 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2439 	if (!ctx->mask) {
2440 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2441 		return;
2442 	}
2443 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
2444 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
2445 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2446 				  _spdk_bs_load_used_blobids_cpl, ctx);
2447 }
2448 
2449 static void
2450 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2451 {
2452 	struct spdk_bs_load_ctx *ctx = cb_arg;
2453 	uint64_t		lba, lba_count, mask_size;
2454 	uint32_t		i, j;
2455 	int			rc;
2456 
2457 	/* The type must be correct */
2458 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
2459 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
2460 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
2461 				     8));
2462 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
2463 	assert(ctx->mask->length == ctx->super->md_len);
2464 
2465 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
2466 	if (rc < 0) {
2467 		spdk_dma_free(ctx->mask);
2468 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2469 		return;
2470 	}
2471 
2472 	for (i = 0; i < ctx->mask->length / 8; i++) {
2473 		uint8_t segment = ctx->mask->mask[i];
2474 		for (j = 0; segment && (j < 8); j++) {
2475 			if (segment & 1U) {
2476 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
2477 			}
2478 			segment >>= 1U;
2479 		}
2480 	}
2481 	spdk_dma_free(ctx->mask);
2482 
2483 	/* Read the used clusters mask */
2484 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
2485 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2486 	if (!ctx->mask) {
2487 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2488 		return;
2489 	}
2490 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
2491 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
2492 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2493 				  _spdk_bs_load_used_clusters_cpl, ctx);
2494 }
2495 
2496 static void
2497 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2498 {
2499 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2500 	uint64_t lba, lba_count, mask_size;
2501 
2502 	/* Read the used pages mask */
2503 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
2504 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
2505 	if (!ctx->mask) {
2506 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2507 		return;
2508 	}
2509 
2510 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
2511 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
2512 	spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count,
2513 				  _spdk_bs_load_used_pages_cpl, ctx);
2514 }
2515 
2516 static int
2517 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
2518 {
2519 	struct spdk_blob_md_descriptor *desc;
2520 	size_t	cur_desc = 0;
2521 
2522 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
2523 	while (cur_desc < sizeof(page->descriptors)) {
2524 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
2525 			if (desc->length == 0) {
2526 				/* If padding and length are 0, this terminates the page */
2527 				break;
2528 			}
2529 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
2530 			struct spdk_blob_md_descriptor_extent	*desc_extent;
2531 			unsigned int				i, j;
2532 			unsigned int				cluster_count = 0;
2533 
2534 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
2535 
2536 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
2537 				for (j = 0; j < desc_extent->extents[i].length; j++) {
2538 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
2539 					if (bs->num_free_clusters == 0) {
2540 						return -1;
2541 					}
2542 					bs->num_free_clusters--;
2543 					cluster_count++;
2544 				}
2545 			}
2546 			if (cluster_count == 0) {
2547 				return -1;
2548 			}
2549 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
2550 			/* Skip this item */
2551 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) {
2552 			/* Skip this item */
2553 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
2554 			/* Skip this item */
2555 		} else {
2556 			/* Error */
2557 			return -1;
2558 		}
2559 		/* Advance to the next descriptor */
2560 		cur_desc += sizeof(*desc) + desc->length;
2561 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
2562 			break;
2563 		}
2564 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
2565 	}
2566 	return 0;
2567 }
2568 
2569 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
2570 {
2571 	uint32_t crc;
2572 
2573 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
2574 	if (crc != ctx->page->crc) {
2575 		return false;
2576 	}
2577 
2578 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
2579 		return false;
2580 	}
2581 	return true;
2582 }
2583 
2584 static void
2585 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
2586 
2587 static void
2588 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2589 {
2590 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2591 
2592 	_spdk_bs_load_complete(seq, ctx, bserrno);
2593 }
2594 
2595 static void
2596 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2597 {
2598 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2599 
2600 	spdk_dma_free(ctx->mask);
2601 	ctx->mask = NULL;
2602 
2603 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
2604 }
2605 
2606 static void
2607 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2608 {
2609 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2610 
2611 	spdk_dma_free(ctx->mask);
2612 	ctx->mask = NULL;
2613 
2614 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
2615 }
2616 
2617 static void
2618 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2619 {
2620 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
2621 }
2622 
2623 static void
2624 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2625 {
2626 	struct spdk_bs_load_ctx *ctx = cb_arg;
2627 	uint64_t num_md_clusters;
2628 	uint64_t i;
2629 	uint32_t page_num;
2630 
2631 	if (bserrno != 0) {
2632 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
2633 		return;
2634 	}
2635 
2636 	page_num = ctx->cur_page;
2637 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
2638 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
2639 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
2640 			if (ctx->page->sequence_num == 0) {
2641 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
2642 			}
2643 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
2644 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2645 				return;
2646 			}
2647 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
2648 				ctx->in_page_chain = true;
2649 				ctx->cur_page = ctx->page->next;
2650 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2651 				return;
2652 			}
2653 		}
2654 	}
2655 
2656 	ctx->in_page_chain = false;
2657 
2658 	do {
2659 		ctx->page_index++;
2660 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2661 
2662 	if (ctx->page_index < ctx->super->md_len) {
2663 		ctx->cur_page = ctx->page_index;
2664 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2665 	} else {
2666 		/* Claim all of the clusters used by the metadata */
2667 		num_md_clusters = divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster);
2668 		for (i = 0; i < num_md_clusters; i++) {
2669 			_spdk_bs_claim_cluster(ctx->bs, i);
2670 		}
2671 		spdk_dma_free(ctx->page);
2672 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
2673 	}
2674 }
2675 
2676 static void
2677 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2678 {
2679 	struct spdk_bs_load_ctx *ctx = cb_arg;
2680 	uint64_t lba;
2681 
2682 	assert(ctx->cur_page < ctx->super->md_len);
2683 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
2684 	spdk_bs_sequence_read_dev(seq, ctx->page, lba,
2685 				  _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
2686 				  _spdk_bs_load_replay_md_cpl, ctx);
2687 }
2688 
2689 static void
2690 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
2691 {
2692 	struct spdk_bs_load_ctx *ctx = cb_arg;
2693 
2694 	ctx->page_index = 0;
2695 	ctx->cur_page = 0;
2696 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
2697 				     SPDK_BS_PAGE_SIZE,
2698 				     NULL);
2699 	if (!ctx->page) {
2700 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2701 		return;
2702 	}
2703 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2704 }
2705 
2706 static void
2707 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2708 {
2709 	struct spdk_bs_load_ctx *ctx = cb_arg;
2710 	int		rc;
2711 
2712 	if (bserrno != 0) {
2713 		_spdk_bs_load_ctx_fail(seq, ctx, -EIO);
2714 		return;
2715 	}
2716 
2717 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
2718 	if (rc < 0) {
2719 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2720 		return;
2721 	}
2722 
2723 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
2724 	if (rc < 0) {
2725 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2726 		return;
2727 	}
2728 
2729 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2730 	if (rc < 0) {
2731 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2732 		return;
2733 	}
2734 
2735 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2736 	_spdk_bs_load_replay_md(seq, cb_arg);
2737 }
2738 
2739 static void
2740 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2741 {
2742 	struct spdk_bs_load_ctx *ctx = cb_arg;
2743 	uint32_t	crc;
2744 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2745 
2746 	if (ctx->super->version > SPDK_BS_VERSION ||
2747 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2748 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2749 		return;
2750 	}
2751 
2752 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2753 		   sizeof(ctx->super->signature)) != 0) {
2754 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2755 		return;
2756 	}
2757 
2758 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2759 	if (crc != ctx->super->crc) {
2760 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2761 		return;
2762 	}
2763 
2764 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2765 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2766 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2767 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2768 	} else {
2769 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2770 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2771 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2772 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
2773 		return;
2774 	}
2775 
2776 	/* Parse the super block */
2777 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2778 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2779 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2780 	ctx->bs->md_start = ctx->super->md_start;
2781 	ctx->bs->md_len = ctx->super->md_len;
2782 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2783 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2784 	ctx->bs->super_blob = ctx->super->super_blob;
2785 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2786 
2787 	if (ctx->super->clean == 0) {
2788 		_spdk_bs_recover(seq, ctx, 0);
2789 	} else if (ctx->super->used_blobid_mask_len == 0) {
2790 		/*
2791 		 * Metadata is clean, but this is an old metadata format without
2792 		 *  a blobid mask.  Clear the clean bit and then build the masks
2793 		 *  using _spdk_bs_recover.
2794 		 */
2795 		ctx->super->clean = 0;
2796 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx);
2797 	} else {
2798 		ctx->super->clean = 0;
2799 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2800 	}
2801 }
2802 
2803 void
2804 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2805 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2806 {
2807 	struct spdk_blob_store	*bs;
2808 	struct spdk_bs_cpl	cpl;
2809 	spdk_bs_sequence_t	*seq;
2810 	struct spdk_bs_load_ctx *ctx;
2811 	struct spdk_bs_opts	opts = {};
2812 
2813 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2814 
2815 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2816 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen);
2817 		dev->destroy(dev);
2818 		cb_fn(cb_arg, NULL, -EINVAL);
2819 		return;
2820 	}
2821 
2822 	if (o) {
2823 		opts = *o;
2824 	} else {
2825 		spdk_bs_opts_init(&opts);
2826 	}
2827 
2828 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2829 		dev->destroy(dev);
2830 		cb_fn(cb_arg, NULL, -EINVAL);
2831 		return;
2832 	}
2833 
2834 	bs = _spdk_bs_alloc(dev, &opts);
2835 	if (!bs) {
2836 		dev->destroy(dev);
2837 		cb_fn(cb_arg, NULL, -ENOMEM);
2838 		return;
2839 	}
2840 
2841 	ctx = calloc(1, sizeof(*ctx));
2842 	if (!ctx) {
2843 		_spdk_bs_free(bs);
2844 		cb_fn(cb_arg, NULL, -ENOMEM);
2845 		return;
2846 	}
2847 
2848 	ctx->bs = bs;
2849 	ctx->is_load = true;
2850 	ctx->iter_cb_fn = opts.iter_cb_fn;
2851 	ctx->iter_cb_arg = opts.iter_cb_arg;
2852 
2853 	/* Allocate memory for the super block */
2854 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2855 	if (!ctx->super) {
2856 		free(ctx);
2857 		_spdk_bs_free(bs);
2858 		return;
2859 	}
2860 
2861 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2862 	cpl.u.bs_handle.cb_fn = cb_fn;
2863 	cpl.u.bs_handle.cb_arg = cb_arg;
2864 	cpl.u.bs_handle.bs = bs;
2865 
2866 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2867 	if (!seq) {
2868 		spdk_dma_free(ctx->super);
2869 		free(ctx);
2870 		_spdk_bs_free(bs);
2871 		cb_fn(cb_arg, NULL, -ENOMEM);
2872 		return;
2873 	}
2874 
2875 	/* Read the super block */
2876 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2877 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2878 				  _spdk_bs_load_super_cpl, ctx);
2879 }
2880 
2881 /* END spdk_bs_load */
2882 
2883 /* START spdk_bs_init */
2884 
2885 struct spdk_bs_init_ctx {
2886 	struct spdk_blob_store		*bs;
2887 	struct spdk_bs_super_block	*super;
2888 };
2889 
2890 static void
2891 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2892 {
2893 	struct spdk_bs_init_ctx *ctx = cb_arg;
2894 
2895 	spdk_dma_free(ctx->super);
2896 	free(ctx);
2897 
2898 	spdk_bs_sequence_finish(seq, bserrno);
2899 }
2900 
2901 static void
2902 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2903 {
2904 	struct spdk_bs_init_ctx *ctx = cb_arg;
2905 
2906 	/* Write super block */
2907 	spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2908 				   _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2909 				   _spdk_bs_init_persist_super_cpl, ctx);
2910 }
2911 
2912 void
2913 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2914 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2915 {
2916 	struct spdk_bs_init_ctx *ctx;
2917 	struct spdk_blob_store	*bs;
2918 	struct spdk_bs_cpl	cpl;
2919 	spdk_bs_sequence_t	*seq;
2920 	spdk_bs_batch_t		*batch;
2921 	uint64_t		num_md_lba;
2922 	uint64_t		num_md_pages;
2923 	uint64_t		num_md_clusters;
2924 	uint32_t		i;
2925 	struct spdk_bs_opts	opts = {};
2926 	int			rc;
2927 
2928 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2929 
2930 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2931 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2932 			    dev->blocklen);
2933 		dev->destroy(dev);
2934 		cb_fn(cb_arg, NULL, -EINVAL);
2935 		return;
2936 	}
2937 
2938 	if (o) {
2939 		opts = *o;
2940 	} else {
2941 		spdk_bs_opts_init(&opts);
2942 	}
2943 
2944 	if (_spdk_bs_opts_verify(&opts) != 0) {
2945 		dev->destroy(dev);
2946 		cb_fn(cb_arg, NULL, -EINVAL);
2947 		return;
2948 	}
2949 
2950 	bs = _spdk_bs_alloc(dev, &opts);
2951 	if (!bs) {
2952 		dev->destroy(dev);
2953 		cb_fn(cb_arg, NULL, -ENOMEM);
2954 		return;
2955 	}
2956 
2957 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2958 		/* By default, allocate 1 page per cluster.
2959 		 * Technically, this over-allocates metadata
2960 		 * because more metadata will reduce the number
2961 		 * of usable clusters. This can be addressed with
2962 		 * more complex math in the future.
2963 		 */
2964 		bs->md_len = bs->total_clusters;
2965 	} else {
2966 		bs->md_len = opts.num_md_pages;
2967 	}
2968 
2969 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2970 	if (rc < 0) {
2971 		_spdk_bs_free(bs);
2972 		cb_fn(cb_arg, NULL, -ENOMEM);
2973 		return;
2974 	}
2975 
2976 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
2977 	if (rc < 0) {
2978 		_spdk_bs_free(bs);
2979 		cb_fn(cb_arg, NULL, -ENOMEM);
2980 		return;
2981 	}
2982 
2983 	ctx = calloc(1, sizeof(*ctx));
2984 	if (!ctx) {
2985 		_spdk_bs_free(bs);
2986 		cb_fn(cb_arg, NULL, -ENOMEM);
2987 		return;
2988 	}
2989 
2990 	ctx->bs = bs;
2991 
2992 	/* Allocate memory for the super block */
2993 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2994 	if (!ctx->super) {
2995 		free(ctx);
2996 		_spdk_bs_free(bs);
2997 		return;
2998 	}
2999 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
3000 	       sizeof(ctx->super->signature));
3001 	ctx->super->version = SPDK_BS_VERSION;
3002 	ctx->super->length = sizeof(*ctx->super);
3003 	ctx->super->super_blob = bs->super_blob;
3004 	ctx->super->clean = 0;
3005 	ctx->super->cluster_size = bs->cluster_sz;
3006 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
3007 
3008 	/* Calculate how many pages the metadata consumes at the front
3009 	 * of the disk.
3010 	 */
3011 
3012 	/* The super block uses 1 page */
3013 	num_md_pages = 1;
3014 
3015 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
3016 	 * up to the nearest page, plus a header.
3017 	 */
3018 	ctx->super->used_page_mask_start = num_md_pages;
3019 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3020 					 divide_round_up(bs->md_len, 8),
3021 					 SPDK_BS_PAGE_SIZE);
3022 	num_md_pages += ctx->super->used_page_mask_len;
3023 
3024 	/* The used_clusters mask requires 1 bit per cluster, rounded
3025 	 * up to the nearest page, plus a header.
3026 	 */
3027 	ctx->super->used_cluster_mask_start = num_md_pages;
3028 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3029 					    divide_round_up(bs->total_clusters, 8),
3030 					    SPDK_BS_PAGE_SIZE);
3031 	num_md_pages += ctx->super->used_cluster_mask_len;
3032 
3033 	/* The used_blobids mask requires 1 bit per metadata page, rounded
3034 	 * up to the nearest page, plus a header.
3035 	 */
3036 	ctx->super->used_blobid_mask_start = num_md_pages;
3037 	ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
3038 					   divide_round_up(bs->md_len, 8),
3039 					   SPDK_BS_PAGE_SIZE);
3040 	num_md_pages += ctx->super->used_blobid_mask_len;
3041 
3042 	/* The metadata region size was chosen above */
3043 	ctx->super->md_start = bs->md_start = num_md_pages;
3044 	ctx->super->md_len = bs->md_len;
3045 	num_md_pages += bs->md_len;
3046 
3047 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
3048 
3049 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
3050 
3051 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
3052 	if (num_md_clusters > bs->total_clusters) {
3053 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
3054 			    "please decrease number of pages reserved for metadata "
3055 			    "or increase cluster size.\n");
3056 		spdk_dma_free(ctx->super);
3057 		free(ctx);
3058 		_spdk_bs_free(bs);
3059 		cb_fn(cb_arg, NULL, -ENOMEM);
3060 		return;
3061 	}
3062 	/* Claim all of the clusters used by the metadata */
3063 	for (i = 0; i < num_md_clusters; i++) {
3064 		_spdk_bs_claim_cluster(bs, i);
3065 	}
3066 
3067 	bs->total_data_clusters = bs->num_free_clusters;
3068 
3069 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
3070 	cpl.u.bs_handle.cb_fn = cb_fn;
3071 	cpl.u.bs_handle.cb_arg = cb_arg;
3072 	cpl.u.bs_handle.bs = bs;
3073 
3074 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3075 	if (!seq) {
3076 		spdk_dma_free(ctx->super);
3077 		free(ctx);
3078 		_spdk_bs_free(bs);
3079 		cb_fn(cb_arg, NULL, -ENOMEM);
3080 		return;
3081 	}
3082 
3083 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
3084 
3085 	/* Clear metadata space */
3086 	spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba);
3087 	/* Trim data clusters */
3088 	spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
3089 
3090 	spdk_bs_batch_close(batch);
3091 }
3092 
3093 /* END spdk_bs_init */
3094 
3095 /* START spdk_bs_destroy */
3096 
3097 static void
3098 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3099 {
3100 	struct spdk_bs_init_ctx *ctx = cb_arg;
3101 	struct spdk_blob_store *bs = ctx->bs;
3102 
3103 	/*
3104 	 * We need to defer calling spdk_bs_call_cpl() until after
3105 	 * dev destruction, so tuck these away for later use.
3106 	 */
3107 	bs->unload_err = bserrno;
3108 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3109 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3110 
3111 	spdk_bs_sequence_finish(seq, bserrno);
3112 
3113 	_spdk_bs_free(bs);
3114 	free(ctx);
3115 }
3116 
3117 void
3118 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
3119 		void *cb_arg)
3120 {
3121 	struct spdk_bs_cpl	cpl;
3122 	spdk_bs_sequence_t	*seq;
3123 	struct spdk_bs_init_ctx *ctx;
3124 
3125 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
3126 
3127 	if (!TAILQ_EMPTY(&bs->blobs)) {
3128 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3129 		cb_fn(cb_arg, -EBUSY);
3130 		return;
3131 	}
3132 
3133 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3134 	cpl.u.bs_basic.cb_fn = cb_fn;
3135 	cpl.u.bs_basic.cb_arg = cb_arg;
3136 
3137 	ctx = calloc(1, sizeof(*ctx));
3138 	if (!ctx) {
3139 		cb_fn(cb_arg, -ENOMEM);
3140 		return;
3141 	}
3142 
3143 	ctx->bs = bs;
3144 
3145 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3146 	if (!seq) {
3147 		free(ctx);
3148 		cb_fn(cb_arg, -ENOMEM);
3149 		return;
3150 	}
3151 
3152 	/* Write zeroes to the super block */
3153 	spdk_bs_sequence_write_zeroes_dev(seq,
3154 					  _spdk_bs_page_to_lba(bs, 0),
3155 					  _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
3156 					  _spdk_bs_destroy_trim_cpl, ctx);
3157 }
3158 
3159 /* END spdk_bs_destroy */
3160 
3161 /* START spdk_bs_unload */
3162 
3163 static void
3164 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3165 {
3166 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3167 
3168 	spdk_dma_free(ctx->super);
3169 
3170 	/*
3171 	 * We need to defer calling spdk_bs_call_cpl() until after
3172 	 * dev destuction, so tuck these away for later use.
3173 	 */
3174 	ctx->bs->unload_err = bserrno;
3175 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
3176 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
3177 
3178 	spdk_bs_sequence_finish(seq, bserrno);
3179 
3180 	_spdk_bs_free(ctx->bs);
3181 	free(ctx);
3182 }
3183 
3184 static void
3185 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3186 {
3187 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3188 
3189 	spdk_dma_free(ctx->mask);
3190 	ctx->super->clean = 1;
3191 
3192 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
3193 }
3194 
3195 static void
3196 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3197 {
3198 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3199 
3200 	spdk_dma_free(ctx->mask);
3201 	ctx->mask = NULL;
3202 
3203 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
3204 }
3205 
3206 static void
3207 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3208 {
3209 	struct spdk_bs_load_ctx	*ctx = cb_arg;
3210 
3211 	spdk_dma_free(ctx->mask);
3212 	ctx->mask = NULL;
3213 
3214 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
3215 }
3216 
3217 static void
3218 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3219 {
3220 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
3221 }
3222 
3223 void
3224 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
3225 {
3226 	struct spdk_bs_cpl	cpl;
3227 	spdk_bs_sequence_t	*seq;
3228 	struct spdk_bs_load_ctx *ctx;
3229 
3230 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
3231 
3232 	if (!TAILQ_EMPTY(&bs->blobs)) {
3233 		SPDK_ERRLOG("Blobstore still has open blobs\n");
3234 		cb_fn(cb_arg, -EBUSY);
3235 		return;
3236 	}
3237 
3238 	ctx = calloc(1, sizeof(*ctx));
3239 	if (!ctx) {
3240 		cb_fn(cb_arg, -ENOMEM);
3241 		return;
3242 	}
3243 
3244 	ctx->bs = bs;
3245 	ctx->is_load = false;
3246 
3247 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3248 	if (!ctx->super) {
3249 		free(ctx);
3250 		cb_fn(cb_arg, -ENOMEM);
3251 		return;
3252 	}
3253 
3254 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3255 	cpl.u.bs_basic.cb_fn = cb_fn;
3256 	cpl.u.bs_basic.cb_arg = cb_arg;
3257 
3258 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3259 	if (!seq) {
3260 		spdk_dma_free(ctx->super);
3261 		free(ctx);
3262 		cb_fn(cb_arg, -ENOMEM);
3263 		return;
3264 	}
3265 
3266 	/* Read super block */
3267 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3268 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3269 				  _spdk_bs_unload_read_super_cpl, ctx);
3270 }
3271 
3272 /* END spdk_bs_unload */
3273 
3274 /* START spdk_bs_set_super */
3275 
3276 struct spdk_bs_set_super_ctx {
3277 	struct spdk_blob_store		*bs;
3278 	struct spdk_bs_super_block	*super;
3279 };
3280 
3281 static void
3282 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3283 {
3284 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
3285 
3286 	if (bserrno != 0) {
3287 		SPDK_ERRLOG("Unable to write to super block of blobstore\n");
3288 	}
3289 
3290 	spdk_dma_free(ctx->super);
3291 
3292 	spdk_bs_sequence_finish(seq, bserrno);
3293 
3294 	free(ctx);
3295 }
3296 
3297 static void
3298 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3299 {
3300 	struct spdk_bs_set_super_ctx	*ctx = cb_arg;
3301 
3302 	if (bserrno != 0) {
3303 		SPDK_ERRLOG("Unable to read super block of blobstore\n");
3304 		spdk_dma_free(ctx->super);
3305 		spdk_bs_sequence_finish(seq, bserrno);
3306 		free(ctx);
3307 		return;
3308 	}
3309 
3310 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx);
3311 }
3312 
3313 void
3314 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
3315 		  spdk_bs_op_complete cb_fn, void *cb_arg)
3316 {
3317 	struct spdk_bs_cpl		cpl;
3318 	spdk_bs_sequence_t		*seq;
3319 	struct spdk_bs_set_super_ctx	*ctx;
3320 
3321 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n");
3322 
3323 	ctx = calloc(1, sizeof(*ctx));
3324 	if (!ctx) {
3325 		cb_fn(cb_arg, -ENOMEM);
3326 		return;
3327 	}
3328 
3329 	ctx->bs = bs;
3330 
3331 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
3332 	if (!ctx->super) {
3333 		free(ctx);
3334 		cb_fn(cb_arg, -ENOMEM);
3335 		return;
3336 	}
3337 
3338 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
3339 	cpl.u.bs_basic.cb_fn = cb_fn;
3340 	cpl.u.bs_basic.cb_arg = cb_arg;
3341 
3342 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3343 	if (!seq) {
3344 		spdk_dma_free(ctx->super);
3345 		free(ctx);
3346 		cb_fn(cb_arg, -ENOMEM);
3347 		return;
3348 	}
3349 
3350 	bs->super_blob = blobid;
3351 
3352 	/* Read super block */
3353 	spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
3354 				  _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
3355 				  _spdk_bs_set_super_read_cpl, ctx);
3356 }
3357 
3358 /* END spdk_bs_set_super */
3359 
3360 void
3361 spdk_bs_get_super(struct spdk_blob_store *bs,
3362 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3363 {
3364 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
3365 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
3366 	} else {
3367 		cb_fn(cb_arg, bs->super_blob, 0);
3368 	}
3369 }
3370 
3371 uint64_t
3372 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
3373 {
3374 	return bs->cluster_sz;
3375 }
3376 
3377 uint64_t
3378 spdk_bs_get_page_size(struct spdk_blob_store *bs)
3379 {
3380 	return SPDK_BS_PAGE_SIZE;
3381 }
3382 
3383 uint64_t
3384 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
3385 {
3386 	return bs->num_free_clusters;
3387 }
3388 
3389 uint64_t
3390 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
3391 {
3392 	return bs->total_data_clusters;
3393 }
3394 
3395 static int
3396 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
3397 {
3398 	bs->md_channel = spdk_get_io_channel(bs);
3399 	if (!bs->md_channel) {
3400 		SPDK_ERRLOG("Failed to get IO channel.\n");
3401 		return -1;
3402 	}
3403 
3404 	return 0;
3405 }
3406 
3407 static int
3408 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
3409 {
3410 	spdk_put_io_channel(bs->md_channel);
3411 
3412 	return 0;
3413 }
3414 
3415 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob)
3416 {
3417 	assert(blob != NULL);
3418 
3419 	return blob->id;
3420 }
3421 
3422 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob)
3423 {
3424 	assert(blob != NULL);
3425 
3426 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
3427 }
3428 
3429 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob)
3430 {
3431 	assert(blob != NULL);
3432 
3433 	return blob->active.num_clusters;
3434 }
3435 
3436 /* START spdk_bs_create_blob */
3437 
3438 static void
3439 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3440 {
3441 	struct spdk_blob *blob = cb_arg;
3442 
3443 	_spdk_blob_free(blob);
3444 
3445 	spdk_bs_sequence_finish(seq, bserrno);
3446 }
3447 
3448 static int
3449 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs,
3450 		      bool internal)
3451 {
3452 	uint64_t i;
3453 	size_t value_len = 0;
3454 	int rc;
3455 	const void *value = NULL;
3456 	if (xattrs->count > 0 && xattrs->get_value == NULL) {
3457 		return -EINVAL;
3458 	}
3459 	for (i = 0; i < xattrs->count; i++) {
3460 		xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len);
3461 		if (value == NULL || value_len == 0) {
3462 			return -EINVAL;
3463 		}
3464 		rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal);
3465 		if (rc < 0) {
3466 			return rc;
3467 		}
3468 	}
3469 	return 0;
3470 }
3471 
3472 static void
3473 _spdk_blob_set_thin_provision(struct spdk_blob *blob)
3474 {
3475 	_spdk_blob_verify_md_op(blob);
3476 	blob->invalid_flags |= SPDK_BLOB_THIN_PROV;
3477 	blob->state = SPDK_BLOB_STATE_DIRTY;
3478 }
3479 
3480 static void
3481 _spdk_bs_create_blob(struct spdk_blob_store *bs,
3482 		     const struct spdk_blob_opts *opts,
3483 		     const struct spdk_blob_xattr_opts *internal_xattrs,
3484 		     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3485 {
3486 	struct spdk_blob	*blob;
3487 	uint32_t		page_idx;
3488 	struct spdk_bs_cpl	cpl;
3489 	struct spdk_blob_opts	opts_default;
3490 	struct spdk_blob_xattr_opts internal_xattrs_default;
3491 	spdk_bs_sequence_t	*seq;
3492 	spdk_blob_id		id;
3493 	int rc;
3494 
3495 	assert(spdk_get_thread() == bs->md_thread);
3496 
3497 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
3498 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
3499 		cb_fn(cb_arg, 0, -ENOMEM);
3500 		return;
3501 	}
3502 	spdk_bit_array_set(bs->used_blobids, page_idx);
3503 	spdk_bit_array_set(bs->used_md_pages, page_idx);
3504 
3505 	id = _spdk_bs_page_to_blobid(page_idx);
3506 
3507 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
3508 
3509 	blob = _spdk_blob_alloc(bs, id);
3510 	if (!blob) {
3511 		cb_fn(cb_arg, 0, -ENOMEM);
3512 		return;
3513 	}
3514 
3515 	if (!opts) {
3516 		spdk_blob_opts_init(&opts_default);
3517 		opts = &opts_default;
3518 	}
3519 	if (!internal_xattrs) {
3520 		_spdk_blob_xattrs_init(&internal_xattrs_default);
3521 		internal_xattrs = &internal_xattrs_default;
3522 	}
3523 
3524 	rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false);
3525 	if (rc < 0) {
3526 		_spdk_blob_free(blob);
3527 		cb_fn(cb_arg, 0, rc);
3528 		return;
3529 	}
3530 
3531 	rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true);
3532 	if (rc < 0) {
3533 		_spdk_blob_free(blob);
3534 		cb_fn(cb_arg, 0, rc);
3535 		return;
3536 	}
3537 
3538 	if (opts->thin_provision) {
3539 		_spdk_blob_set_thin_provision(blob);
3540 	}
3541 
3542 	rc = _spdk_blob_resize(blob, opts->num_clusters);
3543 	if (rc < 0) {
3544 		_spdk_blob_free(blob);
3545 		cb_fn(cb_arg, 0, rc);
3546 		return;
3547 	}
3548 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
3549 	cpl.u.blobid.cb_fn = cb_fn;
3550 	cpl.u.blobid.cb_arg = cb_arg;
3551 	cpl.u.blobid.blobid = blob->id;
3552 
3553 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
3554 	if (!seq) {
3555 		_spdk_blob_free(blob);
3556 		cb_fn(cb_arg, 0, -ENOMEM);
3557 		return;
3558 	}
3559 
3560 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
3561 }
3562 
3563 void spdk_bs_create_blob(struct spdk_blob_store *bs,
3564 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3565 {
3566 	_spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg);
3567 }
3568 
3569 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
3570 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3571 {
3572 	_spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg);
3573 }
3574 
3575 /* END spdk_bs_create_blob */
3576 
3577 /* START blob_cleanup */
3578 
3579 struct spdk_clone_snapshot_ctx {
3580 	struct spdk_bs_cpl      cpl;
3581 	int bserrno;
3582 
3583 	struct {
3584 		spdk_blob_id id;
3585 		struct spdk_blob *blob;
3586 	} original;
3587 	struct {
3588 		spdk_blob_id id;
3589 		struct spdk_blob *blob;
3590 	} new;
3591 
3592 	/* xattrs specified for snapshot/clones only. They have no impact on
3593 	 * the original blobs xattrs. */
3594 	const struct spdk_blob_xattr_opts *xattrs;
3595 };
3596 
3597 static void
3598 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno)
3599 {
3600 	struct spdk_clone_snapshot_ctx *ctx = cb_arg;
3601 	struct spdk_bs_cpl *cpl = &ctx->cpl;
3602 
3603 	if (bserrno != 0) {
3604 		if (ctx->bserrno != 0) {
3605 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
3606 		} else {
3607 			ctx->bserrno = bserrno;
3608 		}
3609 	}
3610 
3611 	switch (cpl->type) {
3612 	case SPDK_BS_CPL_TYPE_BLOBID:
3613 		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno);
3614 		break;
3615 	default:
3616 		SPDK_UNREACHABLE();
3617 		break;
3618 	}
3619 
3620 	free(ctx);
3621 }
3622 
3623 static void
3624 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno)
3625 {
3626 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3627 	struct spdk_blob *origblob = ctx->original.blob;
3628 
3629 	if (bserrno != 0) {
3630 		if (ctx->bserrno != 0) {
3631 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
3632 		} else {
3633 			ctx->bserrno = bserrno;
3634 		}
3635 	}
3636 
3637 	ctx->original.id = origblob->id;
3638 	spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx);
3639 }
3640 
3641 static void
3642 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno)
3643 {
3644 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3645 	struct spdk_blob *newblob = ctx->new.blob;
3646 
3647 	if (bserrno != 0) {
3648 		if (ctx->bserrno != 0) {
3649 			SPDK_ERRLOG("Cleanup error %d\n", bserrno);
3650 		} else {
3651 			ctx->bserrno = bserrno;
3652 		}
3653 	}
3654 
3655 	ctx->new.id = newblob->id;
3656 	spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx);
3657 }
3658 
3659 /* END blob_cleanup */
3660 
3661 /* START spdk_bs_create_snapshot */
3662 
3663 static void
3664 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno)
3665 {
3666 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3667 	struct spdk_blob *newblob = ctx->new.blob;
3668 
3669 	if (bserrno != 0) {
3670 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
3671 		return;
3672 	}
3673 
3674 	/* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */
3675 	bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true);
3676 	if (bserrno != 0) {
3677 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
3678 		return;
3679 	}
3680 
3681 	spdk_blob_set_read_only(newblob);
3682 
3683 	/* sync snapshot metadata */
3684 	spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg);
3685 }
3686 
3687 static void
3688 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno)
3689 {
3690 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3691 	struct spdk_blob *origblob = ctx->original.blob;
3692 	struct spdk_blob *newblob = ctx->new.blob;
3693 
3694 	if (bserrno != 0) {
3695 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
3696 		return;
3697 	}
3698 
3699 	/* Set internal xattr for snapshot id */
3700 	bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true);
3701 	if (bserrno != 0) {
3702 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno);
3703 		return;
3704 	}
3705 
3706 	/* Create new back_bs_dev for snapshot */
3707 	origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob);
3708 	if (origblob->back_bs_dev == NULL) {
3709 		_spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL);
3710 		return;
3711 	}
3712 
3713 	/* set clone blob as thin provisioned */
3714 	_spdk_blob_set_thin_provision(origblob);
3715 
3716 	/* Zero out origblob cluster map */
3717 	memset(origblob->active.clusters, 0,
3718 	       origblob->active.num_clusters * sizeof(origblob->active.clusters));
3719 
3720 	/* sync clone metadata */
3721 	spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx);
3722 }
3723 
3724 static void
3725 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3726 {
3727 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3728 	struct spdk_blob *origblob = ctx->original.blob;
3729 	struct spdk_blob *newblob = _blob;
3730 
3731 	if (bserrno != 0) {
3732 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
3733 		return;
3734 	}
3735 
3736 	ctx->new.blob = newblob;
3737 
3738 	/* set new back_bs_dev for snapshot */
3739 	newblob->back_bs_dev = origblob->back_bs_dev;
3740 	/* Set invalid flags from origblob */
3741 	newblob->invalid_flags = origblob->invalid_flags;
3742 
3743 	/* Copy cluster map to snapshot */
3744 	memcpy(newblob->active.clusters, origblob->active.clusters,
3745 	       origblob->active.num_clusters * sizeof(origblob->active.clusters));
3746 
3747 	/* sync snapshot metadata */
3748 	spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx);
3749 }
3750 
3751 static void
3752 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
3753 {
3754 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3755 	struct spdk_blob *origblob = ctx->original.blob;
3756 
3757 	if (bserrno != 0) {
3758 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
3759 		return;
3760 	}
3761 
3762 	ctx->new.id = blobid;
3763 	ctx->cpl.u.blobid.blobid = blobid;
3764 
3765 	spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx);
3766 }
3767 
3768 
3769 static void
3770 _spdk_bs_xattr_snapshot(void *arg, const char *name,
3771 			const void **value, size_t *value_len)
3772 {
3773 	assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0);
3774 
3775 	struct spdk_blob *blob = (struct spdk_blob *)arg;
3776 	*value = &blob->id;
3777 	*value_len = sizeof(blob->id);
3778 }
3779 
3780 static void
3781 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3782 {
3783 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3784 	struct spdk_blob_opts opts;
3785 	struct spdk_blob_xattr_opts internal_xattrs;
3786 	char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS };
3787 
3788 	if (bserrno != 0) {
3789 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
3790 		return;
3791 	}
3792 
3793 	ctx->original.blob = _blob;
3794 
3795 	if (_blob->data_ro || _blob->md_ro) {
3796 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n",
3797 			      _blob->id);
3798 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
3799 		return;
3800 	}
3801 
3802 	spdk_blob_opts_init(&opts);
3803 	_spdk_blob_xattrs_init(&internal_xattrs);
3804 
3805 	/* Change the size of new blob to the same as in original blob,
3806 	 * but do not allocate clusters */
3807 	opts.thin_provision = true;
3808 	opts.num_clusters = spdk_blob_get_num_clusters(_blob);
3809 
3810 	/* If there are any xattrs specified for snapshot, set them now */
3811 	if (ctx->xattrs) {
3812 		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
3813 	}
3814 	/* Set internal xattr SNAPSHOT_IN_PROGRESS */
3815 	internal_xattrs.count = 1;
3816 	internal_xattrs.ctx = _blob;
3817 	internal_xattrs.names = xattrs_names;
3818 	internal_xattrs.get_value = _spdk_bs_xattr_snapshot;
3819 
3820 	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
3821 			     _spdk_bs_snapshot_newblob_create_cpl, ctx);
3822 }
3823 
3824 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid,
3825 			     const struct spdk_blob_xattr_opts *snapshot_xattrs,
3826 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3827 {
3828 	struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx));
3829 
3830 	if (!ctx) {
3831 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
3832 		return;
3833 	}
3834 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
3835 	ctx->cpl.u.blobid.cb_fn = cb_fn;
3836 	ctx->cpl.u.blobid.cb_arg = cb_arg;
3837 	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
3838 	ctx->bserrno = 0;
3839 	ctx->original.id = blobid;
3840 	ctx->xattrs = snapshot_xattrs;
3841 
3842 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx);
3843 }
3844 /* END spdk_bs_create_snapshot */
3845 
3846 /* START spdk_bs_create_clone */
3847 
3848 static void
3849 _spdk_bs_xattr_clone(void *arg, const char *name,
3850 		     const void **value, size_t *value_len)
3851 {
3852 	assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0);
3853 
3854 	struct spdk_blob *blob = (struct spdk_blob *)arg;
3855 	*value = &blob->id;
3856 	*value_len = sizeof(blob->id);
3857 }
3858 
3859 static void
3860 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno)
3861 {
3862 	struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3863 
3864 	ctx->cpl.u.blobid.blobid = blobid;
3865 	_spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno);
3866 }
3867 
3868 static void
3869 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3870 {
3871 	struct spdk_clone_snapshot_ctx	*ctx = (struct spdk_clone_snapshot_ctx *)cb_arg;
3872 	struct spdk_blob_opts		opts;
3873 	struct spdk_blob_xattr_opts internal_xattrs;
3874 	char *xattr_names[] = { BLOB_SNAPSHOT };
3875 
3876 	if (bserrno != 0) {
3877 		_spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno);
3878 		return;
3879 	}
3880 
3881 	ctx->original.blob = _blob;
3882 
3883 	if (!_blob->data_ro || !_blob->md_ro) {
3884 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n");
3885 		_spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL);
3886 		return;
3887 	}
3888 
3889 	spdk_blob_opts_init(&opts);
3890 	_spdk_blob_xattrs_init(&internal_xattrs);
3891 
3892 	opts.thin_provision = true;
3893 	opts.num_clusters = spdk_blob_get_num_clusters(_blob);
3894 	if (ctx->xattrs) {
3895 		memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs));
3896 	}
3897 
3898 	/* Set internal xattr BLOB_SNAPSHOT */
3899 	internal_xattrs.count = 1;
3900 	internal_xattrs.ctx = _blob;
3901 	internal_xattrs.names = xattr_names;
3902 	internal_xattrs.get_value = _spdk_bs_xattr_clone;
3903 
3904 	_spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs,
3905 			     _spdk_bs_clone_newblob_create_cpl, ctx);
3906 }
3907 
3908 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid,
3909 			  const struct spdk_blob_xattr_opts *clone_xattrs,
3910 			  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
3911 {
3912 	struct spdk_clone_snapshot_ctx	*ctx = calloc(1, sizeof(*ctx));
3913 
3914 	if (!ctx) {
3915 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM);
3916 		return;
3917 	}
3918 
3919 	ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
3920 	ctx->cpl.u.blobid.cb_fn = cb_fn;
3921 	ctx->cpl.u.blobid.cb_arg = cb_arg;
3922 	ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID;
3923 	ctx->bserrno = 0;
3924 	ctx->xattrs = clone_xattrs;
3925 	ctx->original.id = blobid;
3926 
3927 	spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx);
3928 }
3929 
3930 /* END spdk_bs_create_clone */
3931 
3932 /* START spdk_blob_resize */
3933 void
3934 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg)
3935 {
3936 	int			rc;
3937 
3938 	_spdk_blob_verify_md_op(blob);
3939 
3940 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
3941 
3942 	if (blob->md_ro) {
3943 		cb_fn(cb_arg, -EPERM);
3944 		return;
3945 	}
3946 
3947 	if (sz == blob->active.num_clusters) {
3948 		cb_fn(cb_arg, 0);
3949 		return;
3950 	}
3951 
3952 	rc = _spdk_blob_resize(blob, sz);
3953 	cb_fn(cb_arg, rc);
3954 }
3955 
3956 /* END spdk_blob_resize */
3957 
3958 
3959 /* START spdk_bs_delete_blob */
3960 
3961 static void
3962 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
3963 {
3964 	spdk_bs_sequence_t *seq = cb_arg;
3965 
3966 	spdk_bs_sequence_finish(seq, bserrno);
3967 }
3968 
3969 static void
3970 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3971 {
3972 	struct spdk_blob *blob = cb_arg;
3973 
3974 	if (bserrno != 0) {
3975 		/*
3976 		 * We already removed this blob from the blobstore tailq, so
3977 		 *  we need to free it here since this is the last reference
3978 		 *  to it.
3979 		 */
3980 		_spdk_blob_free(blob);
3981 		_spdk_bs_delete_close_cpl(seq, bserrno);
3982 		return;
3983 	}
3984 
3985 	/*
3986 	 * This will immediately decrement the ref_count and call
3987 	 *  the completion routine since the metadata state is clean.
3988 	 *  By calling spdk_blob_close, we reduce the number of call
3989 	 *  points into code that touches the blob->open_ref count
3990 	 *  and the blobstore's blob list.
3991 	 */
3992 	spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq);
3993 }
3994 
3995 static void
3996 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno)
3997 {
3998 	spdk_bs_sequence_t *seq = cb_arg;
3999 	uint32_t page_num;
4000 
4001 	if (bserrno != 0) {
4002 		spdk_bs_sequence_finish(seq, bserrno);
4003 		return;
4004 	}
4005 
4006 	_spdk_blob_verify_md_op(blob);
4007 
4008 	if (blob->open_ref > 1) {
4009 		/*
4010 		 * Someone has this blob open (besides this delete context).
4011 		 *  Decrement the ref count directly and return -EBUSY.
4012 		 */
4013 		blob->open_ref--;
4014 		spdk_bs_sequence_finish(seq, -EBUSY);
4015 		return;
4016 	}
4017 
4018 	/*
4019 	 * Remove the blob from the blob_store list now, to ensure it does not
4020 	 *  get returned after this point by _spdk_blob_lookup().
4021 	 */
4022 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
4023 	page_num = _spdk_bs_blobid_to_page(blob->id);
4024 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
4025 	blob->state = SPDK_BLOB_STATE_DIRTY;
4026 	blob->active.num_pages = 0;
4027 	_spdk_blob_resize(blob, 0);
4028 
4029 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob);
4030 }
4031 
4032 void
4033 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
4034 		    spdk_blob_op_complete cb_fn, void *cb_arg)
4035 {
4036 	struct spdk_bs_cpl	cpl;
4037 	spdk_bs_sequence_t	*seq;
4038 
4039 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
4040 
4041 	assert(spdk_get_thread() == bs->md_thread);
4042 
4043 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
4044 	cpl.u.blob_basic.cb_fn = cb_fn;
4045 	cpl.u.blob_basic.cb_arg = cb_arg;
4046 
4047 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4048 	if (!seq) {
4049 		cb_fn(cb_arg, -ENOMEM);
4050 		return;
4051 	}
4052 
4053 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
4054 }
4055 
4056 /* END spdk_bs_delete_blob */
4057 
4058 /* START spdk_bs_open_blob */
4059 
4060 static void
4061 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4062 {
4063 	struct spdk_blob *blob = cb_arg;
4064 
4065 	/* If the blob have crc error, we just return NULL. */
4066 	if (blob == NULL) {
4067 		seq->cpl.u.blob_handle.blob = NULL;
4068 		spdk_bs_sequence_finish(seq, bserrno);
4069 		return;
4070 	}
4071 
4072 	blob->open_ref++;
4073 
4074 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
4075 
4076 	spdk_bs_sequence_finish(seq, bserrno);
4077 }
4078 
4079 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
4080 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4081 {
4082 	struct spdk_blob		*blob;
4083 	struct spdk_bs_cpl		cpl;
4084 	spdk_bs_sequence_t		*seq;
4085 	uint32_t			page_num;
4086 
4087 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
4088 	assert(spdk_get_thread() == bs->md_thread);
4089 
4090 	page_num = _spdk_bs_blobid_to_page(blobid);
4091 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
4092 		/* Invalid blobid */
4093 		cb_fn(cb_arg, NULL, -ENOENT);
4094 		return;
4095 	}
4096 
4097 	blob = _spdk_blob_lookup(bs, blobid);
4098 	if (blob) {
4099 		blob->open_ref++;
4100 		cb_fn(cb_arg, blob, 0);
4101 		return;
4102 	}
4103 
4104 	blob = _spdk_blob_alloc(bs, blobid);
4105 	if (!blob) {
4106 		cb_fn(cb_arg, NULL, -ENOMEM);
4107 		return;
4108 	}
4109 
4110 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
4111 	cpl.u.blob_handle.cb_fn = cb_fn;
4112 	cpl.u.blob_handle.cb_arg = cb_arg;
4113 	cpl.u.blob_handle.blob = blob;
4114 
4115 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
4116 	if (!seq) {
4117 		_spdk_blob_free(blob);
4118 		cb_fn(cb_arg, NULL, -ENOMEM);
4119 		return;
4120 	}
4121 
4122 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
4123 }
4124 /* END spdk_bs_open_blob */
4125 
4126 /* START spdk_blob_set_read_only */
4127 int spdk_blob_set_read_only(struct spdk_blob *blob)
4128 {
4129 	_spdk_blob_verify_md_op(blob);
4130 
4131 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
4132 
4133 	blob->state = SPDK_BLOB_STATE_DIRTY;
4134 	return 0;
4135 }
4136 /* END spdk_blob_set_read_only */
4137 
4138 /* START spdk_blob_sync_md */
4139 
4140 static void
4141 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4142 {
4143 	struct spdk_blob *blob = cb_arg;
4144 
4145 	if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
4146 		blob->data_ro = true;
4147 		blob->md_ro = true;
4148 	}
4149 
4150 	spdk_bs_sequence_finish(seq, bserrno);
4151 }
4152 
4153 static void
4154 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
4155 {
4156 	struct spdk_bs_cpl	cpl;
4157 	spdk_bs_sequence_t	*seq;
4158 
4159 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
4160 	cpl.u.blob_basic.cb_fn = cb_fn;
4161 	cpl.u.blob_basic.cb_arg = cb_arg;
4162 
4163 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
4164 	if (!seq) {
4165 		cb_fn(cb_arg, -ENOMEM);
4166 		return;
4167 	}
4168 
4169 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
4170 }
4171 
4172 void
4173 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
4174 {
4175 	_spdk_blob_verify_md_op(blob);
4176 
4177 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
4178 
4179 	if (blob->md_ro) {
4180 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
4181 		cb_fn(cb_arg, 0);
4182 		return;
4183 	}
4184 
4185 	_spdk_blob_sync_md(blob, cb_fn, cb_arg);
4186 }
4187 
4188 /* END spdk_blob_sync_md */
4189 
4190 struct spdk_blob_insert_cluster_ctx {
4191 	struct spdk_thread	*thread;
4192 	struct spdk_blob	*blob;
4193 	uint32_t		cluster_num;	/* cluster index in blob */
4194 	uint32_t		cluster;	/* cluster on disk */
4195 	int			rc;
4196 	spdk_blob_op_complete	cb_fn;
4197 	void			*cb_arg;
4198 };
4199 
4200 static void
4201 _spdk_blob_insert_cluster_msg_cpl(void *arg)
4202 {
4203 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
4204 
4205 	ctx->cb_fn(ctx->cb_arg, ctx->rc);
4206 	free(ctx);
4207 }
4208 
4209 static void
4210 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno)
4211 {
4212 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
4213 
4214 	ctx->rc = bserrno;
4215 	spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
4216 }
4217 
4218 static void
4219 _spdk_blob_insert_cluster_msg(void *arg)
4220 {
4221 	struct spdk_blob_insert_cluster_ctx *ctx = arg;
4222 
4223 	ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster);
4224 	if (ctx->rc != 0) {
4225 		spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx);
4226 		return;
4227 	}
4228 
4229 	ctx->blob->state = SPDK_BLOB_STATE_DIRTY;
4230 	_spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx);
4231 }
4232 
4233 void
4234 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num,
4235 				       uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg)
4236 {
4237 	struct spdk_blob_insert_cluster_ctx *ctx;
4238 
4239 	ctx = calloc(1, sizeof(*ctx));
4240 	if (ctx == NULL) {
4241 		cb_fn(cb_arg, -ENOMEM);
4242 		return;
4243 	}
4244 
4245 	ctx->thread = spdk_get_thread();
4246 	ctx->blob = blob;
4247 	ctx->cluster_num = cluster_num;
4248 	ctx->cluster = cluster;
4249 	ctx->cb_fn = cb_fn;
4250 	ctx->cb_arg = cb_arg;
4251 
4252 	spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx);
4253 }
4254 
4255 /* START spdk_blob_close */
4256 
4257 static void
4258 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
4259 {
4260 	struct spdk_blob *blob = cb_arg;
4261 
4262 	if (bserrno == 0) {
4263 		blob->open_ref--;
4264 		if (blob->open_ref == 0) {
4265 			/*
4266 			 * Blobs with active.num_pages == 0 are deleted blobs.
4267 			 *  these blobs are removed from the blob_store list
4268 			 *  when the deletion process starts - so don't try to
4269 			 *  remove them again.
4270 			 */
4271 			if (blob->active.num_pages > 0) {
4272 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
4273 			}
4274 			_spdk_blob_free(blob);
4275 		}
4276 	}
4277 
4278 	spdk_bs_sequence_finish(seq, bserrno);
4279 }
4280 
4281 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg)
4282 {
4283 	struct spdk_bs_cpl	cpl;
4284 	spdk_bs_sequence_t	*seq;
4285 
4286 	_spdk_blob_verify_md_op(blob);
4287 
4288 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
4289 
4290 	if (blob->open_ref == 0) {
4291 		cb_fn(cb_arg, -EBADF);
4292 		return;
4293 	}
4294 
4295 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
4296 	cpl.u.blob_basic.cb_fn = cb_fn;
4297 	cpl.u.blob_basic.cb_arg = cb_arg;
4298 
4299 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
4300 	if (!seq) {
4301 		cb_fn(cb_arg, -ENOMEM);
4302 		return;
4303 	}
4304 
4305 	/* Sync metadata */
4306 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
4307 }
4308 
4309 /* END spdk_blob_close */
4310 
4311 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
4312 {
4313 	return spdk_get_io_channel(bs);
4314 }
4315 
4316 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
4317 {
4318 	spdk_put_io_channel(channel);
4319 }
4320 
4321 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel,
4322 			uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
4323 {
4324 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
4325 				     SPDK_BLOB_UNMAP);
4326 }
4327 
4328 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel,
4329 			       uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
4330 {
4331 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
4332 				     SPDK_BLOB_WRITE_ZEROES);
4333 }
4334 
4335 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel,
4336 			void *payload, uint64_t offset, uint64_t length,
4337 			spdk_blob_op_complete cb_fn, void *cb_arg)
4338 {
4339 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
4340 				     SPDK_BLOB_WRITE);
4341 }
4342 
4343 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel,
4344 		       void *payload, uint64_t offset, uint64_t length,
4345 		       spdk_blob_op_complete cb_fn, void *cb_arg)
4346 {
4347 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
4348 				     SPDK_BLOB_READ);
4349 }
4350 
4351 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel,
4352 			 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4353 			 spdk_blob_op_complete cb_fn, void *cb_arg)
4354 {
4355 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
4356 }
4357 
4358 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel,
4359 			struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4360 			spdk_blob_op_complete cb_fn, void *cb_arg)
4361 {
4362 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
4363 }
4364 
4365 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4366 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
4367 {
4368 	spdk_blob_io_unmap(blob, channel, offset, length, cb_fn, cb_arg);
4369 }
4370 
4371 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4372 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
4373 {
4374 	spdk_blob_io_write_zeroes(blob, channel, offset, length, cb_fn, cb_arg);
4375 }
4376 
4377 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4378 			   void *payload, uint64_t offset, uint64_t length,
4379 			   spdk_blob_op_complete cb_fn, void *cb_arg)
4380 {
4381 	spdk_blob_io_write(blob, channel, payload, offset, length, cb_fn, cb_arg);
4382 }
4383 
4384 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4385 			  void *payload, uint64_t offset, uint64_t length,
4386 			  spdk_blob_op_complete cb_fn, void *cb_arg)
4387 {
4388 	spdk_blob_io_read(blob, channel, payload, offset, length, cb_fn, cb_arg);
4389 }
4390 
4391 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4392 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4393 			    spdk_blob_op_complete cb_fn, void *cb_arg)
4394 {
4395 	spdk_blob_io_writev(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg);
4396 }
4397 
4398 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
4399 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
4400 			   spdk_blob_op_complete cb_fn, void *cb_arg)
4401 {
4402 	spdk_blob_io_readv(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg);
4403 }
4404 
4405 struct spdk_bs_iter_ctx {
4406 	int64_t page_num;
4407 	struct spdk_blob_store *bs;
4408 
4409 	spdk_blob_op_with_handle_complete cb_fn;
4410 	void *cb_arg;
4411 };
4412 
4413 static void
4414 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
4415 {
4416 	struct spdk_bs_iter_ctx *ctx = cb_arg;
4417 	struct spdk_blob_store *bs = ctx->bs;
4418 	spdk_blob_id id;
4419 
4420 	if (bserrno == 0) {
4421 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
4422 		free(ctx);
4423 		return;
4424 	}
4425 
4426 	ctx->page_num++;
4427 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
4428 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
4429 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
4430 		free(ctx);
4431 		return;
4432 	}
4433 
4434 	id = _spdk_bs_page_to_blobid(ctx->page_num);
4435 
4436 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
4437 }
4438 
4439 void
4440 spdk_bs_iter_first(struct spdk_blob_store *bs,
4441 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4442 {
4443 	struct spdk_bs_iter_ctx *ctx;
4444 
4445 	ctx = calloc(1, sizeof(*ctx));
4446 	if (!ctx) {
4447 		cb_fn(cb_arg, NULL, -ENOMEM);
4448 		return;
4449 	}
4450 
4451 	ctx->page_num = -1;
4452 	ctx->bs = bs;
4453 	ctx->cb_fn = cb_fn;
4454 	ctx->cb_arg = cb_arg;
4455 
4456 	_spdk_bs_iter_cpl(ctx, NULL, -1);
4457 }
4458 
4459 static void
4460 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
4461 {
4462 	struct spdk_bs_iter_ctx *ctx = cb_arg;
4463 
4464 	_spdk_bs_iter_cpl(ctx, NULL, -1);
4465 }
4466 
4467 void
4468 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob,
4469 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
4470 {
4471 	struct spdk_bs_iter_ctx *ctx;
4472 
4473 	assert(blob != NULL);
4474 
4475 	ctx = calloc(1, sizeof(*ctx));
4476 	if (!ctx) {
4477 		cb_fn(cb_arg, NULL, -ENOMEM);
4478 		return;
4479 	}
4480 
4481 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
4482 	ctx->bs = bs;
4483 	ctx->cb_fn = cb_fn;
4484 	ctx->cb_arg = cb_arg;
4485 
4486 	/* Close the existing blob */
4487 	spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx);
4488 }
4489 
4490 static int
4491 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
4492 		     uint16_t value_len, bool internal)
4493 {
4494 	struct spdk_xattr_tailq *xattrs;
4495 	struct spdk_xattr	*xattr;
4496 
4497 	_spdk_blob_verify_md_op(blob);
4498 
4499 	if (blob->md_ro) {
4500 		return -EPERM;
4501 	}
4502 
4503 	if (internal) {
4504 		xattrs = &blob->xattrs_internal;
4505 		blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR;
4506 	} else {
4507 		xattrs = &blob->xattrs;
4508 	}
4509 
4510 	TAILQ_FOREACH(xattr, xattrs, link) {
4511 		if (!strcmp(name, xattr->name)) {
4512 			free(xattr->value);
4513 			xattr->value_len = value_len;
4514 			xattr->value = malloc(value_len);
4515 			memcpy(xattr->value, value, value_len);
4516 
4517 			blob->state = SPDK_BLOB_STATE_DIRTY;
4518 
4519 			return 0;
4520 		}
4521 	}
4522 
4523 	xattr = calloc(1, sizeof(*xattr));
4524 	if (!xattr) {
4525 		return -1;
4526 	}
4527 	xattr->name = strdup(name);
4528 	xattr->value_len = value_len;
4529 	xattr->value = malloc(value_len);
4530 	memcpy(xattr->value, value, value_len);
4531 	TAILQ_INSERT_TAIL(xattrs, xattr, link);
4532 
4533 	blob->state = SPDK_BLOB_STATE_DIRTY;
4534 
4535 	return 0;
4536 }
4537 
4538 int
4539 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value,
4540 		    uint16_t value_len)
4541 {
4542 	return _spdk_blob_set_xattr(blob, name, value, value_len, false);
4543 }
4544 
4545 static int
4546 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal)
4547 {
4548 	struct spdk_xattr_tailq *xattrs;
4549 	struct spdk_xattr	*xattr;
4550 
4551 	_spdk_blob_verify_md_op(blob);
4552 
4553 	if (blob->md_ro) {
4554 		return -EPERM;
4555 	}
4556 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
4557 
4558 	TAILQ_FOREACH(xattr, xattrs, link) {
4559 		if (!strcmp(name, xattr->name)) {
4560 			TAILQ_REMOVE(xattrs, xattr, link);
4561 			free(xattr->value);
4562 			free(xattr->name);
4563 			free(xattr);
4564 
4565 			if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) {
4566 				blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR;
4567 			}
4568 			blob->state = SPDK_BLOB_STATE_DIRTY;
4569 
4570 			return 0;
4571 		}
4572 	}
4573 
4574 	return -ENOENT;
4575 }
4576 
4577 int
4578 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name)
4579 {
4580 	return _spdk_blob_remove_xattr(blob, name, false);
4581 }
4582 
4583 static int
4584 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
4585 			   const void **value, size_t *value_len, bool internal)
4586 {
4587 	struct spdk_xattr	*xattr;
4588 	struct spdk_xattr_tailq *xattrs;
4589 
4590 	xattrs = internal ? &blob->xattrs_internal : &blob->xattrs;
4591 
4592 	TAILQ_FOREACH(xattr, xattrs, link) {
4593 		if (!strcmp(name, xattr->name)) {
4594 			*value = xattr->value;
4595 			*value_len = xattr->value_len;
4596 			return 0;
4597 		}
4598 	}
4599 	return -ENOENT;
4600 }
4601 
4602 int
4603 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name,
4604 			  const void **value, size_t *value_len)
4605 {
4606 	_spdk_blob_verify_md_op(blob);
4607 
4608 	return _spdk_blob_get_xattr_value(blob, name, value, value_len, false);
4609 }
4610 
4611 struct spdk_xattr_names {
4612 	uint32_t	count;
4613 	const char	*names[0];
4614 };
4615 
4616 static int
4617 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names)
4618 {
4619 	struct spdk_xattr	*xattr;
4620 	int			count = 0;
4621 
4622 	TAILQ_FOREACH(xattr, xattrs, link) {
4623 		count++;
4624 	}
4625 
4626 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
4627 	if (*names == NULL) {
4628 		return -ENOMEM;
4629 	}
4630 
4631 	TAILQ_FOREACH(xattr, xattrs, link) {
4632 		(*names)->names[(*names)->count++] = xattr->name;
4633 	}
4634 
4635 	return 0;
4636 }
4637 
4638 int
4639 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names)
4640 {
4641 	_spdk_blob_verify_md_op(blob);
4642 
4643 	return _spdk_blob_get_xattr_names(&blob->xattrs, names);
4644 }
4645 
4646 uint32_t
4647 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
4648 {
4649 	assert(names != NULL);
4650 
4651 	return names->count;
4652 }
4653 
4654 const char *
4655 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
4656 {
4657 	if (index >= names->count) {
4658 		return NULL;
4659 	}
4660 
4661 	return names->names[index];
4662 }
4663 
4664 void
4665 spdk_xattr_names_free(struct spdk_xattr_names *names)
4666 {
4667 	free(names);
4668 }
4669 
4670 struct spdk_bs_type
4671 spdk_bs_get_bstype(struct spdk_blob_store *bs)
4672 {
4673 	return bs->bstype;
4674 }
4675 
4676 void
4677 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
4678 {
4679 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
4680 }
4681 
4682 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
4683