xref: /spdk/lib/blob/blobstore.c (revision e734bb9f9fb798be2bbe77b4a3cb9fdf42170456)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 
53 static inline size_t
54 divide_round_up(size_t num, size_t divisor)
55 {
56 	return (num + divisor - 1) / divisor;
57 }
58 
59 static void
60 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
61 {
62 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
63 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
64 	assert(bs->num_free_clusters > 0);
65 
66 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
67 
68 	spdk_bit_array_set(bs->used_clusters, cluster_num);
69 	bs->num_free_clusters--;
70 }
71 
72 static void
73 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
74 {
75 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
76 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
77 	assert(bs->num_free_clusters < bs->total_clusters);
78 
79 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
80 
81 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
82 	bs->num_free_clusters++;
83 }
84 
85 static struct spdk_blob_data *
86 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
87 {
88 	struct spdk_blob_data *blob;
89 
90 	blob = calloc(1, sizeof(*blob));
91 	if (!blob) {
92 		return NULL;
93 	}
94 
95 	blob->id = id;
96 	blob->bs = bs;
97 
98 	blob->state = SPDK_BLOB_STATE_DIRTY;
99 	blob->active.num_pages = 1;
100 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
101 	if (!blob->active.pages) {
102 		free(blob);
103 		return NULL;
104 	}
105 
106 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
107 
108 	TAILQ_INIT(&blob->xattrs);
109 
110 	return blob;
111 }
112 
113 static void
114 _spdk_blob_free(struct spdk_blob_data *blob)
115 {
116 	struct spdk_xattr 	*xattr, *xattr_tmp;
117 
118 	assert(blob != NULL);
119 
120 	free(blob->active.clusters);
121 	free(blob->clean.clusters);
122 	free(blob->active.pages);
123 	free(blob->clean.pages);
124 
125 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
126 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
127 		free(xattr->name);
128 		free(xattr->value);
129 		free(xattr);
130 	}
131 
132 	free(blob);
133 }
134 
135 static int
136 _spdk_blob_mark_clean(struct spdk_blob_data *blob)
137 {
138 	uint64_t *clusters = NULL;
139 	uint32_t *pages = NULL;
140 
141 	assert(blob != NULL);
142 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
143 	       blob->state == SPDK_BLOB_STATE_SYNCING);
144 
145 	if (blob->active.num_clusters) {
146 		assert(blob->active.clusters);
147 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
148 		if (!clusters) {
149 			return -1;
150 		}
151 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
152 	}
153 
154 	if (blob->active.num_pages) {
155 		assert(blob->active.pages);
156 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
157 		if (!pages) {
158 			free(clusters);
159 			return -1;
160 		}
161 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
162 	}
163 
164 	free(blob->clean.clusters);
165 	free(blob->clean.pages);
166 
167 	blob->clean.num_clusters = blob->active.num_clusters;
168 	blob->clean.clusters = blob->active.clusters;
169 	blob->clean.num_pages = blob->active.num_pages;
170 	blob->clean.pages = blob->active.pages;
171 
172 	blob->active.clusters = clusters;
173 	blob->active.pages = pages;
174 
175 	blob->state = SPDK_BLOB_STATE_CLEAN;
176 
177 	return 0;
178 }
179 
180 static int
181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob)
182 {
183 	struct spdk_blob_md_descriptor *desc;
184 	size_t	cur_desc = 0;
185 	void *tmp;
186 
187 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
188 	while (cur_desc < sizeof(page->descriptors)) {
189 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
190 			if (desc->length == 0) {
191 				/* If padding and length are 0, this terminates the page */
192 				break;
193 			}
194 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
195 			struct spdk_blob_md_descriptor_flags	*desc_flags;
196 
197 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
198 
199 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
200 				return -EINVAL;
201 			}
202 
203 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
204 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
205 				return -EINVAL;
206 			}
207 
208 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
209 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
210 				blob->data_ro = true;
211 				blob->md_ro = true;
212 			}
213 
214 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
215 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
216 				blob->md_ro = true;
217 			}
218 
219 			blob->invalid_flags = desc_flags->invalid_flags;
220 			blob->data_ro_flags = desc_flags->data_ro_flags;
221 			blob->md_ro_flags = desc_flags->md_ro_flags;
222 
223 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
224 			struct spdk_blob_md_descriptor_extent	*desc_extent;
225 			unsigned int				i, j;
226 			unsigned int				cluster_count = blob->active.num_clusters;
227 
228 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
229 
230 			if (desc_extent->length == 0 ||
231 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
232 				return -EINVAL;
233 			}
234 
235 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
236 				for (j = 0; j < desc_extent->extents[i].length; j++) {
237 					if (!spdk_bit_array_get(blob->bs->used_clusters,
238 								desc_extent->extents[i].cluster_idx + j)) {
239 						return -EINVAL;
240 					}
241 					cluster_count++;
242 				}
243 			}
244 
245 			if (cluster_count == 0) {
246 				return -EINVAL;
247 			}
248 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
249 			if (tmp == NULL) {
250 				return -ENOMEM;
251 			}
252 			blob->active.clusters = tmp;
253 			blob->active.cluster_array_size = cluster_count;
254 
255 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
256 				for (j = 0; j < desc_extent->extents[i].length; j++) {
257 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
258 							desc_extent->extents[i].cluster_idx + j);
259 				}
260 			}
261 
262 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
263 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
264 			struct spdk_xattr 			*xattr;
265 
266 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
267 
268 			if (desc_xattr->length != sizeof(desc_xattr->name_length) +
269 			    sizeof(desc_xattr->value_length) +
270 			    desc_xattr->name_length + desc_xattr->value_length) {
271 				return -EINVAL;
272 			}
273 
274 			xattr = calloc(1, sizeof(*xattr));
275 			if (xattr == NULL) {
276 				return -ENOMEM;
277 			}
278 
279 			xattr->name = malloc(desc_xattr->name_length + 1);
280 			if (xattr->name == NULL) {
281 				free(xattr);
282 				return -ENOMEM;
283 			}
284 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
285 			xattr->name[desc_xattr->name_length] = '\0';
286 
287 			xattr->value = malloc(desc_xattr->value_length);
288 			if (xattr->value == NULL) {
289 				free(xattr->name);
290 				free(xattr);
291 				return -ENOMEM;
292 			}
293 			xattr->value_len = desc_xattr->value_length;
294 			memcpy(xattr->value,
295 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
296 			       desc_xattr->value_length);
297 
298 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
299 		} else {
300 			/* Unrecognized descriptor type.  Do not fail - just continue to the
301 			 *  next descriptor.  If this descriptor is associated with some feature
302 			 *  defined in a newer version of blobstore, that version of blobstore
303 			 *  should create and set an associated feature flag to specify if this
304 			 *  blob can be loaded or not.
305 			 */
306 		}
307 
308 		/* Advance to the next descriptor */
309 		cur_desc += sizeof(*desc) + desc->length;
310 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
311 			break;
312 		}
313 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
314 	}
315 
316 	return 0;
317 }
318 
319 static int
320 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
321 		 struct spdk_blob_data *blob)
322 {
323 	const struct spdk_blob_md_page *page;
324 	uint32_t i;
325 	int rc;
326 
327 	assert(page_count > 0);
328 	assert(pages[0].sequence_num == 0);
329 	assert(blob != NULL);
330 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
331 	assert(blob->active.clusters == NULL);
332 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
333 
334 	/* The blobid provided doesn't match what's in the MD, this can
335 	 * happen for example if a bogus blobid is passed in through open.
336 	 */
337 	if (blob->id != pages[0].id) {
338 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
339 			    blob->id, pages[0].id);
340 		return -ENOENT;
341 	}
342 
343 	for (i = 0; i < page_count; i++) {
344 		page = &pages[i];
345 
346 		assert(page->id == blob->id);
347 		assert(page->sequence_num == i);
348 
349 		rc = _spdk_blob_parse_page(page, blob);
350 		if (rc != 0) {
351 			return rc;
352 		}
353 	}
354 
355 	return 0;
356 }
357 
358 static int
359 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob,
360 			      struct spdk_blob_md_page **pages,
361 			      uint32_t *page_count,
362 			      struct spdk_blob_md_page **last_page)
363 {
364 	struct spdk_blob_md_page *page;
365 
366 	assert(pages != NULL);
367 	assert(page_count != NULL);
368 
369 	if (*page_count == 0) {
370 		assert(*pages == NULL);
371 		*page_count = 1;
372 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
373 					 SPDK_BS_PAGE_SIZE,
374 					 NULL);
375 	} else {
376 		assert(*pages != NULL);
377 		(*page_count)++;
378 		*pages = spdk_dma_realloc(*pages,
379 					  SPDK_BS_PAGE_SIZE * (*page_count),
380 					  SPDK_BS_PAGE_SIZE,
381 					  NULL);
382 	}
383 
384 	if (*pages == NULL) {
385 		*page_count = 0;
386 		*last_page = NULL;
387 		return -ENOMEM;
388 	}
389 
390 	page = &(*pages)[*page_count - 1];
391 	memset(page, 0, sizeof(*page));
392 	page->id = blob->id;
393 	page->sequence_num = *page_count - 1;
394 	page->next = SPDK_INVALID_MD_PAGE;
395 	*last_page = page;
396 
397 	return 0;
398 }
399 
400 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
401  * Update required_sz on both success and failure.
402  *
403  */
404 static int
405 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
406 			   uint8_t *buf, size_t buf_sz,
407 			   size_t *required_sz)
408 {
409 	struct spdk_blob_md_descriptor_xattr	*desc;
410 
411 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
412 		       strlen(xattr->name) +
413 		       xattr->value_len;
414 
415 	if (buf_sz < *required_sz) {
416 		return -1;
417 	}
418 
419 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
420 
421 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
422 	desc->length = sizeof(desc->name_length) +
423 		       sizeof(desc->value_length) +
424 		       strlen(xattr->name) +
425 		       xattr->value_len;
426 	desc->name_length = strlen(xattr->name);
427 	desc->value_length = xattr->value_len;
428 
429 	memcpy(desc->name, xattr->name, desc->name_length);
430 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
431 	       xattr->value,
432 	       desc->value_length);
433 
434 	return 0;
435 }
436 
437 static void
438 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob,
439 			    uint64_t start_cluster, uint64_t *next_cluster,
440 			    uint8_t *buf, size_t buf_sz)
441 {
442 	struct spdk_blob_md_descriptor_extent *desc;
443 	size_t cur_sz;
444 	uint64_t i, extent_idx;
445 	uint32_t lba, lba_per_cluster, lba_count;
446 
447 	/* The buffer must have room for at least one extent */
448 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
449 	if (buf_sz < cur_sz) {
450 		*next_cluster = start_cluster;
451 		return;
452 	}
453 
454 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
455 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
456 
457 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
458 
459 	lba = blob->active.clusters[start_cluster];
460 	lba_count = lba_per_cluster;
461 	extent_idx = 0;
462 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
463 		if ((lba + lba_count) == blob->active.clusters[i]) {
464 			lba_count += lba_per_cluster;
465 			continue;
466 		}
467 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
468 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
469 		extent_idx++;
470 
471 		cur_sz += sizeof(desc->extents[extent_idx]);
472 
473 		if (buf_sz < cur_sz) {
474 			/* If we ran out of buffer space, return */
475 			desc->length = sizeof(desc->extents[0]) * extent_idx;
476 			*next_cluster = i;
477 			return;
478 		}
479 
480 		lba = blob->active.clusters[i];
481 		lba_count = lba_per_cluster;
482 	}
483 
484 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
485 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
486 	extent_idx++;
487 
488 	desc->length = sizeof(desc->extents[0]) * extent_idx;
489 	*next_cluster = blob->active.num_clusters;
490 
491 	return;
492 }
493 
494 static void
495 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob,
496 			   uint8_t *buf, size_t *buf_sz)
497 {
498 	struct spdk_blob_md_descriptor_flags *desc;
499 
500 	/*
501 	 * Flags get serialized first, so we should always have room for the flags
502 	 *  descriptor.
503 	 */
504 	assert(*buf_sz >= sizeof(*desc));
505 
506 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
507 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
508 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
509 	desc->invalid_flags = blob->invalid_flags;
510 	desc->data_ro_flags = blob->data_ro_flags;
511 	desc->md_ro_flags = blob->md_ro_flags;
512 
513 	*buf_sz -= sizeof(*desc);
514 }
515 
516 static int
517 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages,
518 		     uint32_t *page_count)
519 {
520 	struct spdk_blob_md_page		*cur_page;
521 	const struct spdk_xattr			*xattr;
522 	int 					rc;
523 	uint8_t					*buf;
524 	size_t					remaining_sz;
525 	uint64_t				last_cluster;
526 
527 	assert(pages != NULL);
528 	assert(page_count != NULL);
529 	assert(blob != NULL);
530 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
531 
532 	*pages = NULL;
533 	*page_count = 0;
534 
535 	/* A blob always has at least 1 page, even if it has no descriptors */
536 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
537 	if (rc < 0) {
538 		return rc;
539 	}
540 
541 	buf = (uint8_t *)cur_page->descriptors;
542 	remaining_sz = sizeof(cur_page->descriptors);
543 
544 	/* Serialize flags */
545 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
546 
547 	/* Serialize xattrs */
548 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
549 		size_t required_sz = 0;
550 		rc = _spdk_blob_serialize_xattr(xattr,
551 						buf, remaining_sz,
552 						&required_sz);
553 		if (rc < 0) {
554 			/* Need to add a new page to the chain */
555 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
556 							   &cur_page);
557 			if (rc < 0) {
558 				spdk_dma_free(*pages);
559 				*pages = NULL;
560 				*page_count = 0;
561 				return rc;
562 			}
563 
564 			buf = (uint8_t *)cur_page->descriptors;
565 			remaining_sz = sizeof(cur_page->descriptors);
566 
567 			/* Try again */
568 			required_sz = 0;
569 			rc = _spdk_blob_serialize_xattr(xattr,
570 							buf, remaining_sz,
571 							&required_sz);
572 
573 			if (rc < 0) {
574 				spdk_dma_free(*pages);
575 				*pages = NULL;
576 				*page_count = 0;
577 				return -1;
578 			}
579 		}
580 
581 		remaining_sz -= required_sz;
582 		buf += required_sz;
583 	}
584 
585 	/* Serialize extents */
586 	last_cluster = 0;
587 	while (last_cluster < blob->active.num_clusters) {
588 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
589 					    buf, remaining_sz);
590 
591 		if (last_cluster == blob->active.num_clusters) {
592 			break;
593 		}
594 
595 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
596 						   &cur_page);
597 		if (rc < 0) {
598 			return rc;
599 		}
600 
601 		buf = (uint8_t *)cur_page->descriptors;
602 		remaining_sz = sizeof(cur_page->descriptors);
603 	}
604 
605 	return 0;
606 }
607 
608 struct spdk_blob_load_ctx {
609 	struct spdk_blob_data 		*blob;
610 
611 	struct spdk_blob_md_page	*pages;
612 	uint32_t			num_pages;
613 
614 	spdk_bs_sequence_cpl		cb_fn;
615 	void				*cb_arg;
616 };
617 
618 static uint32_t
619 _spdk_blob_md_page_calc_crc(void *page)
620 {
621 	uint32_t		crc;
622 
623 	crc = BLOB_CRC32C_INITIAL;
624 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
625 	crc ^= BLOB_CRC32C_INITIAL;
626 
627 	return crc;
628 
629 }
630 
631 static void
632 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
633 {
634 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
635 	struct spdk_blob_data 		*blob = ctx->blob;
636 	struct spdk_blob_md_page	*page;
637 	int				rc;
638 	uint32_t			crc;
639 
640 	page = &ctx->pages[ctx->num_pages - 1];
641 	crc = _spdk_blob_md_page_calc_crc(page);
642 	if (crc != page->crc) {
643 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
644 		_spdk_blob_free(blob);
645 		ctx->cb_fn(seq, NULL, -EINVAL);
646 		spdk_dma_free(ctx->pages);
647 		free(ctx);
648 		return;
649 	}
650 
651 	if (page->next != SPDK_INVALID_MD_PAGE) {
652 		uint32_t next_page = page->next;
653 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
654 
655 
656 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
657 
658 		/* Read the next page */
659 		ctx->num_pages++;
660 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
661 					      sizeof(*page), NULL);
662 		if (ctx->pages == NULL) {
663 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
664 			free(ctx);
665 			return;
666 		}
667 
668 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
669 				      next_lba,
670 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
671 				      _spdk_blob_load_cpl, ctx);
672 		return;
673 	}
674 
675 	/* Parse the pages */
676 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
677 	if (rc) {
678 		_spdk_blob_free(blob);
679 		ctx->cb_fn(seq, NULL, rc);
680 		spdk_dma_free(ctx->pages);
681 		free(ctx);
682 		return;
683 	}
684 
685 	_spdk_blob_mark_clean(blob);
686 
687 	ctx->cb_fn(seq, ctx->cb_arg, rc);
688 
689 	/* Free the memory */
690 	spdk_dma_free(ctx->pages);
691 	free(ctx);
692 }
693 
694 /* Load a blob from disk given a blobid */
695 static void
696 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
697 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
698 {
699 	struct spdk_blob_load_ctx *ctx;
700 	struct spdk_blob_store *bs;
701 	uint32_t page_num;
702 	uint64_t lba;
703 
704 	assert(blob != NULL);
705 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
706 	       blob->state == SPDK_BLOB_STATE_DIRTY);
707 
708 	bs = blob->bs;
709 
710 	ctx = calloc(1, sizeof(*ctx));
711 	if (!ctx) {
712 		cb_fn(seq, cb_arg, -ENOMEM);
713 		return;
714 	}
715 
716 	ctx->blob = blob;
717 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
718 				      SPDK_BS_PAGE_SIZE, NULL);
719 	if (!ctx->pages) {
720 		free(ctx);
721 		cb_fn(seq, cb_arg, -ENOMEM);
722 		return;
723 	}
724 	ctx->num_pages = 1;
725 	ctx->cb_fn = cb_fn;
726 	ctx->cb_arg = cb_arg;
727 
728 	page_num = _spdk_bs_blobid_to_page(blob->id);
729 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
730 
731 	blob->state = SPDK_BLOB_STATE_LOADING;
732 
733 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
734 			      _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
735 			      _spdk_blob_load_cpl, ctx);
736 }
737 
738 struct spdk_blob_persist_ctx {
739 	struct spdk_blob_data 		*blob;
740 
741 	struct spdk_blob_md_page	*pages;
742 
743 	uint64_t			idx;
744 
745 	spdk_bs_sequence_cpl		cb_fn;
746 	void				*cb_arg;
747 };
748 
749 static void
750 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
751 {
752 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
753 	struct spdk_blob_data 		*blob = ctx->blob;
754 
755 	if (bserrno == 0) {
756 		_spdk_blob_mark_clean(blob);
757 	}
758 
759 	/* Call user callback */
760 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
761 
762 	/* Free the memory */
763 	spdk_dma_free(ctx->pages);
764 	free(ctx);
765 }
766 
767 static void
768 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
769 {
770 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
771 	struct spdk_blob_data 		*blob = ctx->blob;
772 	struct spdk_blob_store		*bs = blob->bs;
773 	void				*tmp;
774 	size_t				i;
775 
776 	/* Release all clusters that were truncated */
777 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
778 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
779 
780 		_spdk_bs_release_cluster(bs, cluster_num);
781 	}
782 
783 	if (blob->active.num_clusters == 0) {
784 		free(blob->active.clusters);
785 		blob->active.clusters = NULL;
786 		blob->active.cluster_array_size = 0;
787 	} else {
788 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
789 		assert(tmp != NULL);
790 		blob->active.clusters = tmp;
791 		blob->active.cluster_array_size = blob->active.num_clusters;
792 	}
793 
794 	_spdk_blob_persist_complete(seq, ctx, bserrno);
795 }
796 
797 static void
798 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
799 {
800 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
801 	struct spdk_blob_data 		*blob = ctx->blob;
802 	struct spdk_blob_store		*bs = blob->bs;
803 	spdk_bs_batch_t			*batch;
804 	size_t				i;
805 	uint64_t			lba;
806 	uint32_t			lba_count;
807 
808 	/* Clusters don't move around in blobs. The list shrinks or grows
809 	 * at the end, but no changes ever occur in the middle of the list.
810 	 */
811 
812 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
813 
814 	/* Unmap all clusters that were truncated */
815 	lba = 0;
816 	lba_count = 0;
817 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
818 		uint64_t next_lba = blob->active.clusters[i];
819 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
820 
821 		if ((lba + lba_count) == next_lba) {
822 			/* This cluster is contiguous with the previous one. */
823 			lba_count += next_lba_count;
824 			continue;
825 		}
826 
827 		/* This cluster is not contiguous with the previous one. */
828 
829 		/* If a run of LBAs previously existing, send them
830 		 * as an unmap.
831 		 */
832 		if (lba_count > 0) {
833 			spdk_bs_batch_unmap(batch, lba, lba_count);
834 		}
835 
836 		/* Start building the next batch */
837 		lba = next_lba;
838 		lba_count = next_lba_count;
839 	}
840 
841 	/* If we ended with a contiguous set of LBAs, send the unmap now */
842 	if (lba_count > 0) {
843 		spdk_bs_batch_unmap(batch, lba, lba_count);
844 	}
845 
846 	spdk_bs_batch_close(batch);
847 }
848 
849 static void
850 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
851 {
852 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
853 	struct spdk_blob_data 		*blob = ctx->blob;
854 	struct spdk_blob_store		*bs = blob->bs;
855 	size_t				i;
856 
857 	/* This loop starts at 1 because the first page is special and handled
858 	 * below. The pages (except the first) are never written in place,
859 	 * so any pages in the clean list must be zeroed.
860 	 */
861 	for (i = 1; i < blob->clean.num_pages; i++) {
862 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
863 	}
864 
865 	if (blob->active.num_pages == 0) {
866 		uint32_t page_num;
867 
868 		page_num = _spdk_bs_blobid_to_page(blob->id);
869 		spdk_bit_array_clear(bs->used_md_pages, page_num);
870 	}
871 
872 	/* Move on to unmapping clusters */
873 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
874 }
875 
876 static void
877 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
878 {
879 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
880 	struct spdk_blob_data 		*blob = ctx->blob;
881 	struct spdk_blob_store		*bs = blob->bs;
882 	uint64_t			lba;
883 	uint32_t			lba_count;
884 	spdk_bs_batch_t			*batch;
885 	size_t				i;
886 
887 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
888 
889 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
890 
891 	/* This loop starts at 1 because the first page is special and handled
892 	 * below. The pages (except the first) are never written in place,
893 	 * so any pages in the clean list must be zeroed.
894 	 */
895 	for (i = 1; i < blob->clean.num_pages; i++) {
896 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
897 
898 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
899 	}
900 
901 	/* The first page will only be zeroed if this is a delete. */
902 	if (blob->active.num_pages == 0) {
903 		uint32_t page_num;
904 
905 		/* The first page in the metadata goes where the blobid indicates */
906 		page_num = _spdk_bs_blobid_to_page(blob->id);
907 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
908 
909 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
910 	}
911 
912 	spdk_bs_batch_close(batch);
913 }
914 
915 static void
916 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
917 {
918 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
919 	struct spdk_blob_data		*blob = ctx->blob;
920 	struct spdk_blob_store		*bs = blob->bs;
921 	uint64_t			lba;
922 	uint32_t			lba_count;
923 	struct spdk_blob_md_page	*page;
924 
925 	if (blob->active.num_pages == 0) {
926 		/* Move on to the next step */
927 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
928 		return;
929 	}
930 
931 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
932 
933 	page = &ctx->pages[0];
934 	/* The first page in the metadata goes where the blobid indicates */
935 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
936 
937 	spdk_bs_sequence_write(seq, page, lba, lba_count,
938 			       _spdk_blob_persist_zero_pages, ctx);
939 }
940 
941 static void
942 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
943 {
944 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
945 	struct spdk_blob_data 		*blob = ctx->blob;
946 	struct spdk_blob_store		*bs = blob->bs;
947 	uint64_t 			lba;
948 	uint32_t			lba_count;
949 	struct spdk_blob_md_page	*page;
950 	spdk_bs_batch_t			*batch;
951 	size_t				i;
952 
953 	/* Clusters don't move around in blobs. The list shrinks or grows
954 	 * at the end, but no changes ever occur in the middle of the list.
955 	 */
956 
957 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
958 
959 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
960 
961 	/* This starts at 1. The root page is not written until
962 	 * all of the others are finished
963 	 */
964 	for (i = 1; i < blob->active.num_pages; i++) {
965 		page = &ctx->pages[i];
966 		assert(page->sequence_num == i);
967 
968 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
969 
970 		spdk_bs_batch_write(batch, page, lba, lba_count);
971 	}
972 
973 	spdk_bs_batch_close(batch);
974 }
975 
976 static int
977 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz)
978 {
979 	uint64_t	i;
980 	uint64_t	*tmp;
981 	uint64_t	lfc; /* lowest free cluster */
982 	struct spdk_blob_store *bs;
983 
984 	bs = blob->bs;
985 
986 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
987 	       blob->state != SPDK_BLOB_STATE_SYNCING);
988 
989 	if (blob->active.num_clusters == sz) {
990 		return 0;
991 	}
992 
993 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
994 		/* If this blob was resized to be larger, then smaller, then
995 		 * larger without syncing, then the cluster array already
996 		 * contains spare assigned clusters we can use.
997 		 */
998 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
999 						     sz);
1000 	}
1001 
1002 	blob->state = SPDK_BLOB_STATE_DIRTY;
1003 
1004 	/* Do two passes - one to verify that we can obtain enough clusters
1005 	 * and another to actually claim them.
1006 	 */
1007 
1008 	lfc = 0;
1009 	for (i = blob->active.num_clusters; i < sz; i++) {
1010 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1011 		if (lfc >= bs->total_clusters) {
1012 			/* No more free clusters. Cannot satisfy the request */
1013 			assert(false);
1014 			return -1;
1015 		}
1016 		lfc++;
1017 	}
1018 
1019 	if (sz > blob->active.num_clusters) {
1020 		/* Expand the cluster array if necessary.
1021 		 * We only shrink the array when persisting.
1022 		 */
1023 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1024 		if (sz > 0 && tmp == NULL) {
1025 			assert(false);
1026 			return -1;
1027 		}
1028 		blob->active.clusters = tmp;
1029 		blob->active.cluster_array_size = sz;
1030 	}
1031 
1032 	lfc = 0;
1033 	for (i = blob->active.num_clusters; i < sz; i++) {
1034 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1035 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
1036 		_spdk_bs_claim_cluster(bs, lfc);
1037 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
1038 		lfc++;
1039 	}
1040 
1041 	blob->active.num_clusters = sz;
1042 
1043 	return 0;
1044 }
1045 
1046 /* Write a blob to disk */
1047 static void
1048 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
1049 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1050 {
1051 	struct spdk_blob_persist_ctx *ctx;
1052 	int rc;
1053 	uint64_t i;
1054 	uint32_t page_num;
1055 	struct spdk_blob_store *bs;
1056 
1057 	assert(blob != NULL);
1058 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
1059 	       blob->state == SPDK_BLOB_STATE_DIRTY);
1060 
1061 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1062 		cb_fn(seq, cb_arg, 0);
1063 		return;
1064 	}
1065 
1066 	bs = blob->bs;
1067 
1068 	ctx = calloc(1, sizeof(*ctx));
1069 	if (!ctx) {
1070 		cb_fn(seq, cb_arg, -ENOMEM);
1071 		return;
1072 	}
1073 	ctx->blob = blob;
1074 	ctx->cb_fn = cb_fn;
1075 	ctx->cb_arg = cb_arg;
1076 
1077 	blob->state = SPDK_BLOB_STATE_SYNCING;
1078 
1079 	if (blob->active.num_pages == 0) {
1080 		/* This is the signal that the blob should be deleted.
1081 		 * Immediately jump to the clean up routine. */
1082 		assert(blob->clean.num_pages > 0);
1083 		ctx->idx = blob->clean.num_pages - 1;
1084 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1085 		return;
1086 
1087 	}
1088 
1089 	/* Generate the new metadata */
1090 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1091 	if (rc < 0) {
1092 		free(ctx);
1093 		cb_fn(seq, cb_arg, rc);
1094 		return;
1095 	}
1096 
1097 	assert(blob->active.num_pages >= 1);
1098 
1099 	/* Resize the cache of page indices */
1100 	blob->active.pages = realloc(blob->active.pages,
1101 				     blob->active.num_pages * sizeof(*blob->active.pages));
1102 	if (!blob->active.pages) {
1103 		free(ctx);
1104 		cb_fn(seq, cb_arg, -ENOMEM);
1105 		return;
1106 	}
1107 
1108 	/* Assign this metadata to pages. This requires two passes -
1109 	 * one to verify that there are enough pages and a second
1110 	 * to actually claim them. */
1111 	page_num = 0;
1112 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1113 	for (i = 1; i < blob->active.num_pages; i++) {
1114 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1115 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1116 			spdk_dma_free(ctx->pages);
1117 			free(ctx);
1118 			blob->state = SPDK_BLOB_STATE_DIRTY;
1119 			cb_fn(seq, cb_arg, -ENOMEM);
1120 			return;
1121 		}
1122 		page_num++;
1123 	}
1124 
1125 	page_num = 0;
1126 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1127 	for (i = 1; i < blob->active.num_pages; i++) {
1128 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1129 		ctx->pages[i - 1].next = page_num;
1130 		/* Now that previous metadata page is complete, calculate the crc for it. */
1131 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1132 		blob->active.pages[i] = page_num;
1133 		spdk_bit_array_set(bs->used_md_pages, page_num);
1134 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1135 		page_num++;
1136 	}
1137 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1138 	/* Start writing the metadata from last page to first */
1139 	ctx->idx = blob->active.num_pages - 1;
1140 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1141 }
1142 
1143 static void
1144 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1145 			     void *payload, uint64_t offset, uint64_t length,
1146 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1147 {
1148 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1149 	spdk_bs_batch_t			*batch;
1150 	struct spdk_bs_cpl		cpl;
1151 	uint64_t			lba;
1152 	uint32_t			lba_count;
1153 	uint8_t				*buf;
1154 	uint64_t			page;
1155 
1156 	assert(blob != NULL);
1157 
1158 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1159 		cb_fn(cb_arg, -EPERM);
1160 		return;
1161 	}
1162 
1163 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1164 		cb_fn(cb_arg, -EINVAL);
1165 		return;
1166 	}
1167 
1168 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1169 	cpl.u.blob_basic.cb_fn = cb_fn;
1170 	cpl.u.blob_basic.cb_arg = cb_arg;
1171 
1172 	batch = spdk_bs_batch_open(_channel, &cpl);
1173 	if (!batch) {
1174 		cb_fn(cb_arg, -ENOMEM);
1175 		return;
1176 	}
1177 
1178 	length = _spdk_bs_page_to_lba(blob->bs, length);
1179 	page = offset;
1180 	buf = payload;
1181 	while (length > 0) {
1182 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1183 		lba_count = spdk_min(length,
1184 				     _spdk_bs_page_to_lba(blob->bs,
1185 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1186 
1187 		switch (op_type) {
1188 		case SPDK_BLOB_READ:
1189 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1190 			break;
1191 		case SPDK_BLOB_WRITE:
1192 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1193 			break;
1194 		case SPDK_BLOB_UNMAP:
1195 			spdk_bs_batch_unmap(batch, lba, lba_count);
1196 			break;
1197 		case SPDK_BLOB_WRITE_ZEROES:
1198 			spdk_bs_batch_write_zeroes(batch, lba, lba_count);
1199 			break;
1200 		}
1201 
1202 		length -= lba_count;
1203 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1204 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1205 			buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1206 		}
1207 	}
1208 
1209 	spdk_bs_batch_close(batch);
1210 }
1211 
1212 struct rw_iov_ctx {
1213 	struct spdk_blob_data *blob;
1214 	bool read;
1215 	int iovcnt;
1216 	struct iovec *orig_iov;
1217 	uint64_t page_offset;
1218 	uint64_t pages_remaining;
1219 	uint64_t pages_done;
1220 	struct iovec iov[0];
1221 };
1222 
1223 static void
1224 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1225 {
1226 	assert(cb_arg == NULL);
1227 	spdk_bs_sequence_finish(seq, bserrno);
1228 }
1229 
1230 static void
1231 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1232 {
1233 	struct rw_iov_ctx *ctx = cb_arg;
1234 	struct iovec *iov, *orig_iov;
1235 	int iovcnt;
1236 	size_t orig_iovoff;
1237 	uint64_t lba;
1238 	uint64_t page_count, pages_to_boundary;
1239 	uint32_t lba_count;
1240 	uint64_t byte_count;
1241 
1242 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1243 		free(ctx);
1244 		spdk_bs_sequence_finish(seq, bserrno);
1245 		return;
1246 	}
1247 
1248 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
1249 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1250 	lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
1251 	lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);
1252 
1253 	/*
1254 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1255 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1256 	 *  point to the current position in the I/O sequence.
1257 	 */
1258 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1259 	orig_iov = &ctx->orig_iov[0];
1260 	orig_iovoff = 0;
1261 	while (byte_count > 0) {
1262 		if (byte_count >= orig_iov->iov_len) {
1263 			byte_count -= orig_iov->iov_len;
1264 			orig_iov++;
1265 		} else {
1266 			orig_iovoff = byte_count;
1267 			byte_count = 0;
1268 		}
1269 	}
1270 
1271 	/*
1272 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1273 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1274 	 */
1275 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1276 	iov = &ctx->iov[0];
1277 	iovcnt = 0;
1278 	while (byte_count > 0) {
1279 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1280 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1281 		byte_count -= iov->iov_len;
1282 		orig_iovoff = 0;
1283 		orig_iov++;
1284 		iov++;
1285 		iovcnt++;
1286 	}
1287 
1288 	ctx->page_offset += page_count;
1289 	ctx->pages_done += page_count;
1290 	ctx->pages_remaining -= page_count;
1291 	iov = &ctx->iov[0];
1292 
1293 	if (ctx->read) {
1294 		spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1295 	} else {
1296 		spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1297 	}
1298 }
1299 
1300 static void
1301 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1302 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1303 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1304 {
1305 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1306 	spdk_bs_sequence_t		*seq;
1307 	struct spdk_bs_cpl		cpl;
1308 
1309 	assert(blob != NULL);
1310 
1311 	if (!read && blob->data_ro) {
1312 		cb_fn(cb_arg, -EPERM);
1313 		return;
1314 	}
1315 
1316 	if (length == 0) {
1317 		cb_fn(cb_arg, 0);
1318 		return;
1319 	}
1320 
1321 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1322 		cb_fn(cb_arg, -EINVAL);
1323 		return;
1324 	}
1325 
1326 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1327 	cpl.u.blob_basic.cb_fn = cb_fn;
1328 	cpl.u.blob_basic.cb_arg = cb_arg;
1329 
1330 	/*
1331 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1332 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1333 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1334 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1335 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1336 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1337 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1338 	 *
1339 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1340 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1341 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1342 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1343 	 */
1344 	seq = spdk_bs_sequence_start(_channel, &cpl);
1345 	if (!seq) {
1346 		cb_fn(cb_arg, -ENOMEM);
1347 		return;
1348 	}
1349 
1350 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1351 		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
1352 		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1353 
1354 		if (read) {
1355 			spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1356 		} else {
1357 			spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1358 		}
1359 	} else {
1360 		struct rw_iov_ctx *ctx;
1361 
1362 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1363 		if (ctx == NULL) {
1364 			spdk_bs_sequence_finish(seq, -ENOMEM);
1365 			return;
1366 		}
1367 
1368 		ctx->blob = blob;
1369 		ctx->read = read;
1370 		ctx->orig_iov = iov;
1371 		ctx->iovcnt = iovcnt;
1372 		ctx->page_offset = offset;
1373 		ctx->pages_remaining = length;
1374 		ctx->pages_done = 0;
1375 
1376 		_spdk_rw_iov_split_next(seq, ctx, 0);
1377 	}
1378 }
1379 
1380 static struct spdk_blob_data *
1381 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1382 {
1383 	struct spdk_blob_data *blob;
1384 
1385 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1386 		if (blob->id == blobid) {
1387 			return blob;
1388 		}
1389 	}
1390 
1391 	return NULL;
1392 }
1393 
1394 static int
1395 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1396 {
1397 	struct spdk_blob_store		*bs = io_device;
1398 	struct spdk_bs_channel		*channel = ctx_buf;
1399 	struct spdk_bs_dev		*dev;
1400 	uint32_t			max_ops = bs->max_channel_ops;
1401 	uint32_t			i;
1402 
1403 	dev = bs->dev;
1404 
1405 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1406 	if (!channel->req_mem) {
1407 		return -1;
1408 	}
1409 
1410 	TAILQ_INIT(&channel->reqs);
1411 
1412 	for (i = 0; i < max_ops; i++) {
1413 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1414 	}
1415 
1416 	channel->bs = bs;
1417 	channel->dev = dev;
1418 	channel->dev_channel = dev->create_channel(dev);
1419 
1420 	if (!channel->dev_channel) {
1421 		SPDK_ERRLOG("Failed to create device channel.\n");
1422 		free(channel->req_mem);
1423 		return -1;
1424 	}
1425 
1426 	return 0;
1427 }
1428 
1429 static void
1430 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1431 {
1432 	struct spdk_bs_channel *channel = ctx_buf;
1433 
1434 	free(channel->req_mem);
1435 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1436 }
1437 
1438 static void
1439 _spdk_bs_dev_destroy(void *io_device)
1440 {
1441 	struct spdk_blob_store *bs = io_device;
1442 	struct spdk_blob_data	*blob, *blob_tmp;
1443 
1444 	bs->dev->destroy(bs->dev);
1445 
1446 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1447 		TAILQ_REMOVE(&bs->blobs, blob, link);
1448 		_spdk_blob_free(blob);
1449 	}
1450 
1451 	spdk_bit_array_free(&bs->used_md_pages);
1452 	spdk_bit_array_free(&bs->used_clusters);
1453 	/*
1454 	 * If this function is called for any reason except a successful unload,
1455 	 * the unload_cpl type will be NONE and this will be a nop.
1456 	 */
1457 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
1458 
1459 	free(bs);
1460 }
1461 
1462 static void
1463 _spdk_bs_free(struct spdk_blob_store *bs)
1464 {
1465 	spdk_bs_unregister_md_thread(bs);
1466 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
1467 }
1468 
1469 void
1470 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1471 {
1472 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1473 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1474 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1475 	opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS;
1476 	memset(&opts->bstype, 0, sizeof(opts->bstype));
1477 }
1478 
1479 static int
1480 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
1481 {
1482 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
1483 	    opts->max_channel_ops == 0) {
1484 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
1485 		return -1;
1486 	}
1487 
1488 	return 0;
1489 }
1490 
1491 static struct spdk_blob_store *
1492 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1493 {
1494 	struct spdk_blob_store	*bs;
1495 	uint64_t dev_size;
1496 	int rc;
1497 
1498 	dev_size = dev->blocklen * dev->blockcnt;
1499 	if (dev_size < opts->cluster_sz) {
1500 		/* Device size cannot be smaller than cluster size of blobstore */
1501 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size,
1502 			    opts->cluster_sz);
1503 		return NULL;
1504 	}
1505 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
1506 		/* Cluster size cannot be smaller than page size */
1507 		SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n",
1508 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
1509 		return NULL;
1510 	}
1511 	bs = calloc(1, sizeof(struct spdk_blob_store));
1512 	if (!bs) {
1513 		return NULL;
1514 	}
1515 
1516 	TAILQ_INIT(&bs->blobs);
1517 	bs->dev = dev;
1518 
1519 	/*
1520 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1521 	 *  even multiple of the cluster size.
1522 	 */
1523 	bs->cluster_sz = opts->cluster_sz;
1524 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1525 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
1526 	bs->num_free_clusters = bs->total_clusters;
1527 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1528 	if (bs->used_clusters == NULL) {
1529 		free(bs);
1530 		return NULL;
1531 	}
1532 
1533 	bs->max_channel_ops = opts->max_channel_ops;
1534 	bs->super_blob = SPDK_BLOBID_INVALID;
1535 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
1536 
1537 	/* The metadata is assumed to be at least 1 page */
1538 	bs->used_md_pages = spdk_bit_array_create(1);
1539 
1540 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1541 				sizeof(struct spdk_bs_channel));
1542 	rc = spdk_bs_register_md_thread(bs);
1543 	if (rc == -1) {
1544 		spdk_io_device_unregister(bs, NULL);
1545 		spdk_bit_array_free(&bs->used_md_pages);
1546 		spdk_bit_array_free(&bs->used_clusters);
1547 		free(bs);
1548 		return NULL;
1549 	}
1550 
1551 	return bs;
1552 }
1553 
1554 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
1555 
1556 struct spdk_bs_load_ctx {
1557 	struct spdk_blob_store		*bs;
1558 	struct spdk_bs_super_block	*super;
1559 
1560 	struct spdk_bs_md_mask		*mask;
1561 	bool				in_page_chain;
1562 	uint32_t			page_index;
1563 	uint32_t			cur_page;
1564 	struct spdk_blob_md_page	*page;
1565 };
1566 
1567 static void
1568 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
1569 {
1570 	uint32_t i = 0;
1571 
1572 	while (true) {
1573 		i = spdk_bit_array_find_first_set(array, i);
1574 		if (i >= mask->length) {
1575 			break;
1576 		}
1577 		mask->mask[i / 8] |= 1U << (i % 8);
1578 		i++;
1579 	}
1580 }
1581 
1582 static void
1583 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1584 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1585 {
1586 	/* Update the values in the super block */
1587 	super->super_blob = bs->super_blob;
1588 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
1589 	super->crc = _spdk_blob_md_page_calc_crc(super);
1590 	spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0),
1591 			       _spdk_bs_byte_to_lba(bs, sizeof(*super)),
1592 			       cb_fn, cb_arg);
1593 }
1594 
1595 static void
1596 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1597 {
1598 	struct spdk_bs_load_ctx	*ctx = arg;
1599 	uint64_t	mask_size, lba, lba_count;
1600 
1601 	/* Write out the used clusters mask */
1602 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1603 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1604 	if (!ctx->mask) {
1605 		spdk_dma_free(ctx->super);
1606 		free(ctx);
1607 		spdk_bs_sequence_finish(seq, -ENOMEM);
1608 		return;
1609 	}
1610 
1611 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1612 	ctx->mask->length = ctx->bs->total_clusters;
1613 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1614 
1615 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
1616 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1617 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1618 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1619 }
1620 
1621 static void
1622 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1623 {
1624 	struct spdk_bs_load_ctx	*ctx = arg;
1625 	uint64_t	mask_size, lba, lba_count;
1626 
1627 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1628 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1629 	if (!ctx->mask) {
1630 		spdk_dma_free(ctx->super);
1631 		free(ctx);
1632 		spdk_bs_sequence_finish(seq, -ENOMEM);
1633 		return;
1634 	}
1635 
1636 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1637 	ctx->mask->length = ctx->super->md_len;
1638 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1639 
1640 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
1641 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1642 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1643 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1644 }
1645 
1646 static void
1647 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1648 {
1649 	struct spdk_bs_load_ctx *ctx = cb_arg;
1650 	uint32_t		i, j;
1651 	int			rc;
1652 
1653 	/* The type must be correct */
1654 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1655 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1656 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1657 					     struct spdk_blob_md_page) * 8));
1658 	/* The length of the mask must be exactly equal to the total number of clusters */
1659 	assert(ctx->mask->length == ctx->bs->total_clusters);
1660 
1661 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1662 	if (rc < 0) {
1663 		spdk_dma_free(ctx->super);
1664 		spdk_dma_free(ctx->mask);
1665 		_spdk_bs_free(ctx->bs);
1666 		free(ctx);
1667 		spdk_bs_sequence_finish(seq, -ENOMEM);
1668 		return;
1669 	}
1670 
1671 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1672 	for (i = 0; i < ctx->mask->length / 8; i++) {
1673 		uint8_t segment = ctx->mask->mask[i];
1674 		for (j = 0; segment && (j < 8); j++) {
1675 			if (segment & 1U) {
1676 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1677 				assert(ctx->bs->num_free_clusters > 0);
1678 				ctx->bs->num_free_clusters--;
1679 			}
1680 			segment >>= 1U;
1681 		}
1682 	}
1683 
1684 	spdk_dma_free(ctx->super);
1685 	spdk_dma_free(ctx->mask);
1686 	free(ctx);
1687 
1688 	spdk_bs_sequence_finish(seq, bserrno);
1689 }
1690 
1691 static void
1692 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1693 {
1694 	struct spdk_bs_load_ctx *ctx = cb_arg;
1695 	uint64_t		lba, lba_count, mask_size;
1696 	uint32_t		i, j;
1697 	int			rc;
1698 
1699 	/* The type must be correct */
1700 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1701 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1702 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
1703 				     8));
1704 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1705 	assert(ctx->mask->length == ctx->super->md_len);
1706 
1707 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1708 	if (rc < 0) {
1709 		spdk_dma_free(ctx->super);
1710 		spdk_dma_free(ctx->mask);
1711 		_spdk_bs_free(ctx->bs);
1712 		free(ctx);
1713 		spdk_bs_sequence_finish(seq, -ENOMEM);
1714 		return;
1715 	}
1716 
1717 	for (i = 0; i < ctx->mask->length / 8; i++) {
1718 		uint8_t segment = ctx->mask->mask[i];
1719 		for (j = 0; segment && (j < 8); j++) {
1720 			if (segment & 1U) {
1721 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1722 			}
1723 			segment >>= 1U;
1724 		}
1725 	}
1726 	spdk_dma_free(ctx->mask);
1727 
1728 	/* Read the used clusters mask */
1729 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1730 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1731 	if (!ctx->mask) {
1732 		spdk_dma_free(ctx->super);
1733 		_spdk_bs_free(ctx->bs);
1734 		free(ctx);
1735 		spdk_bs_sequence_finish(seq, -ENOMEM);
1736 		return;
1737 	}
1738 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1739 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1740 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1741 			      _spdk_bs_load_used_clusters_cpl, ctx);
1742 }
1743 
1744 static void
1745 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1746 {
1747 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1748 	uint64_t lba, lba_count, mask_size;
1749 
1750 	/* Read the used pages mask */
1751 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1752 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1753 	if (!ctx->mask) {
1754 		spdk_dma_free(ctx->super);
1755 		_spdk_bs_free(ctx->bs);
1756 		free(ctx);
1757 		spdk_bs_sequence_finish(seq, -ENOMEM);
1758 		return;
1759 	}
1760 
1761 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1762 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1763 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1764 			      _spdk_bs_load_used_pages_cpl, ctx);
1765 }
1766 
1767 static int
1768 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
1769 {
1770 	struct spdk_blob_md_descriptor *desc;
1771 	size_t	cur_desc = 0;
1772 
1773 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
1774 	while (cur_desc < sizeof(page->descriptors)) {
1775 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
1776 			if (desc->length == 0) {
1777 				/* If padding and length are 0, this terminates the page */
1778 				break;
1779 			}
1780 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
1781 			struct spdk_blob_md_descriptor_extent	*desc_extent;
1782 			unsigned int				i, j;
1783 			unsigned int				cluster_count = 0;
1784 
1785 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
1786 
1787 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
1788 				for (j = 0; j < desc_extent->extents[i].length; j++) {
1789 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
1790 					if (bs->num_free_clusters == 0) {
1791 						return -1;
1792 					}
1793 					bs->num_free_clusters--;
1794 					cluster_count++;
1795 				}
1796 			}
1797 			if (cluster_count == 0) {
1798 				return -1;
1799 			}
1800 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
1801 			/* Skip this item */
1802 		} else {
1803 			/* Error */
1804 			return -1;
1805 		}
1806 		/* Advance to the next descriptor */
1807 		cur_desc += sizeof(*desc) + desc->length;
1808 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
1809 			break;
1810 		}
1811 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
1812 	}
1813 	return 0;
1814 }
1815 
1816 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
1817 {
1818 	uint32_t crc;
1819 
1820 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
1821 	if (crc != ctx->page->crc) {
1822 		return false;
1823 	}
1824 
1825 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
1826 		return false;
1827 	}
1828 	return true;
1829 }
1830 
1831 static void
1832 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
1833 
1834 static void
1835 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1836 {
1837 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1838 
1839 	spdk_dma_free(ctx->mask);
1840 	spdk_dma_free(ctx->super);
1841 	spdk_bs_sequence_finish(seq, bserrno);
1842 	free(ctx);
1843 }
1844 
1845 static void
1846 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1847 {
1848 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1849 
1850 	spdk_dma_free(ctx->mask);
1851 
1852 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
1853 }
1854 
1855 static void
1856 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1857 {
1858 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
1859 }
1860 
1861 static void
1862 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1863 {
1864 	struct spdk_bs_load_ctx *ctx = cb_arg;
1865 	uint32_t page_num;
1866 
1867 	if (bserrno != 0) {
1868 		spdk_dma_free(ctx->super);
1869 		_spdk_bs_free(ctx->bs);
1870 		free(ctx);
1871 		spdk_bs_sequence_finish(seq, bserrno);
1872 		return;
1873 	}
1874 
1875 	page_num = ctx->cur_page;
1876 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
1877 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
1878 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
1879 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
1880 				spdk_dma_free(ctx->super);
1881 				_spdk_bs_free(ctx->bs);
1882 				free(ctx);
1883 				spdk_bs_sequence_finish(seq, -EILSEQ);
1884 				return;
1885 			}
1886 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
1887 				ctx->in_page_chain = true;
1888 				ctx->cur_page = ctx->page->next;
1889 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1890 				return;
1891 			}
1892 		}
1893 	}
1894 
1895 	ctx->in_page_chain = false;
1896 
1897 	do {
1898 		ctx->page_index++;
1899 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
1900 
1901 	if (ctx->page_index < ctx->super->md_len) {
1902 		ctx->cur_page = ctx->page_index;
1903 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1904 	} else {
1905 		spdk_dma_free(ctx->page);
1906 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
1907 	}
1908 }
1909 
1910 static void
1911 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
1912 {
1913 	struct spdk_bs_load_ctx *ctx = cb_arg;
1914 	uint64_t lba;
1915 
1916 	assert(ctx->cur_page < ctx->super->md_len);
1917 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
1918 	spdk_bs_sequence_read(seq, ctx->page, lba,
1919 			      _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
1920 			      _spdk_bs_load_replay_md_cpl, ctx);
1921 }
1922 
1923 static void
1924 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
1925 {
1926 	struct spdk_bs_load_ctx *ctx = cb_arg;
1927 
1928 	ctx->page_index = 0;
1929 	ctx->cur_page = 0;
1930 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
1931 				     SPDK_BS_PAGE_SIZE,
1932 				     NULL);
1933 	if (!ctx->page) {
1934 		spdk_dma_free(ctx->super);
1935 		_spdk_bs_free(ctx->bs);
1936 		free(ctx);
1937 		spdk_bs_sequence_finish(seq, -ENOMEM);
1938 		return;
1939 	}
1940 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1941 }
1942 
1943 static void
1944 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg)
1945 {
1946 	struct spdk_bs_load_ctx *ctx = cb_arg;
1947 	int 		rc;
1948 
1949 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
1950 	if (rc < 0) {
1951 		spdk_dma_free(ctx->super);
1952 		_spdk_bs_free(ctx->bs);
1953 		free(ctx);
1954 		spdk_bs_sequence_finish(seq, -ENOMEM);
1955 		return;
1956 	}
1957 
1958 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1959 	if (rc < 0) {
1960 		spdk_dma_free(ctx->super);
1961 		_spdk_bs_free(ctx->bs);
1962 		free(ctx);
1963 		spdk_bs_sequence_finish(seq, -ENOMEM);
1964 		return;
1965 	}
1966 
1967 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1968 	_spdk_bs_load_replay_md(seq, cb_arg);
1969 }
1970 
1971 static void
1972 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1973 {
1974 	struct spdk_bs_load_ctx *ctx = cb_arg;
1975 	uint32_t	crc;
1976 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
1977 
1978 	if (ctx->super->version > SPDK_BS_VERSION ||
1979 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
1980 		spdk_dma_free(ctx->super);
1981 		_spdk_bs_free(ctx->bs);
1982 		free(ctx);
1983 		spdk_bs_sequence_finish(seq, -EILSEQ);
1984 		return;
1985 	}
1986 
1987 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1988 		   sizeof(ctx->super->signature)) != 0) {
1989 		spdk_dma_free(ctx->super);
1990 		_spdk_bs_free(ctx->bs);
1991 		free(ctx);
1992 		spdk_bs_sequence_finish(seq, -EILSEQ);
1993 		return;
1994 	}
1995 
1996 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
1997 	if (crc != ctx->super->crc) {
1998 		spdk_dma_free(ctx->super);
1999 		_spdk_bs_free(ctx->bs);
2000 		free(ctx);
2001 		spdk_bs_sequence_finish(seq, -EILSEQ);
2002 		return;
2003 	}
2004 
2005 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2006 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2007 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2008 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2009 	} else {
2010 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2011 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2012 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2013 		spdk_dma_free(ctx->super);
2014 		_spdk_bs_free(ctx->bs);
2015 		free(ctx);
2016 		spdk_bs_sequence_finish(seq, -ENXIO);
2017 		return;
2018 	}
2019 
2020 	/* Parse the super block */
2021 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2022 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2023 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2024 	ctx->bs->md_start = ctx->super->md_start;
2025 	ctx->bs->md_len = ctx->super->md_len;
2026 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2027 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2028 	ctx->bs->super_blob = ctx->super->super_blob;
2029 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2030 
2031 	if (ctx->super->clean == 1) {
2032 		ctx->super->clean = 0;
2033 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2034 	} else {
2035 		_spdk_bs_recover(seq, ctx);
2036 	}
2037 }
2038 
2039 void
2040 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2041 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2042 {
2043 	struct spdk_blob_store	*bs;
2044 	struct spdk_bs_cpl	cpl;
2045 	spdk_bs_sequence_t	*seq;
2046 	struct spdk_bs_load_ctx *ctx;
2047 	struct spdk_bs_opts	opts = {};
2048 
2049 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2050 
2051 	if (o) {
2052 		opts = *o;
2053 	} else {
2054 		spdk_bs_opts_init(&opts);
2055 	}
2056 
2057 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2058 		cb_fn(cb_arg, NULL, -EINVAL);
2059 		return;
2060 	}
2061 
2062 	bs = _spdk_bs_alloc(dev, &opts);
2063 	if (!bs) {
2064 		cb_fn(cb_arg, NULL, -ENOMEM);
2065 		return;
2066 	}
2067 
2068 	ctx = calloc(1, sizeof(*ctx));
2069 	if (!ctx) {
2070 		_spdk_bs_free(bs);
2071 		cb_fn(cb_arg, NULL, -ENOMEM);
2072 		return;
2073 	}
2074 
2075 	ctx->bs = bs;
2076 
2077 	/* Allocate memory for the super block */
2078 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2079 	if (!ctx->super) {
2080 		free(ctx);
2081 		_spdk_bs_free(bs);
2082 		return;
2083 	}
2084 
2085 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2086 	cpl.u.bs_handle.cb_fn = cb_fn;
2087 	cpl.u.bs_handle.cb_arg = cb_arg;
2088 	cpl.u.bs_handle.bs = bs;
2089 
2090 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2091 	if (!seq) {
2092 		spdk_dma_free(ctx->super);
2093 		free(ctx);
2094 		_spdk_bs_free(bs);
2095 		cb_fn(cb_arg, NULL, -ENOMEM);
2096 		return;
2097 	}
2098 
2099 	/* Read the super block */
2100 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2101 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2102 			      _spdk_bs_load_super_cpl, ctx);
2103 }
2104 
2105 /* END spdk_bs_load */
2106 
2107 /* START spdk_bs_init */
2108 
2109 struct spdk_bs_init_ctx {
2110 	struct spdk_blob_store		*bs;
2111 	struct spdk_bs_super_block	*super;
2112 };
2113 
2114 static void
2115 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2116 {
2117 	struct spdk_bs_init_ctx *ctx = cb_arg;
2118 
2119 	spdk_dma_free(ctx->super);
2120 	free(ctx);
2121 
2122 	spdk_bs_sequence_finish(seq, bserrno);
2123 }
2124 
2125 static void
2126 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2127 {
2128 	struct spdk_bs_init_ctx *ctx = cb_arg;
2129 
2130 	/* Write super block */
2131 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2132 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2133 			       _spdk_bs_init_persist_super_cpl, ctx);
2134 }
2135 
2136 void
2137 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2138 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2139 {
2140 	struct spdk_bs_init_ctx *ctx;
2141 	struct spdk_blob_store	*bs;
2142 	struct spdk_bs_cpl	cpl;
2143 	spdk_bs_sequence_t	*seq;
2144 	spdk_bs_batch_t		*batch;
2145 	uint64_t		num_md_lba;
2146 	uint64_t		num_md_pages;
2147 	uint64_t		num_md_clusters;
2148 	uint32_t		i;
2149 	struct spdk_bs_opts	opts = {};
2150 	int			rc;
2151 
2152 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2153 
2154 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2155 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2156 			    dev->blocklen);
2157 		dev->destroy(dev);
2158 		cb_fn(cb_arg, NULL, -EINVAL);
2159 		return;
2160 	}
2161 
2162 	if (o) {
2163 		opts = *o;
2164 	} else {
2165 		spdk_bs_opts_init(&opts);
2166 	}
2167 
2168 	if (_spdk_bs_opts_verify(&opts) != 0) {
2169 		dev->destroy(dev);
2170 		cb_fn(cb_arg, NULL, -EINVAL);
2171 		return;
2172 	}
2173 
2174 	bs = _spdk_bs_alloc(dev, &opts);
2175 	if (!bs) {
2176 		dev->destroy(dev);
2177 		cb_fn(cb_arg, NULL, -ENOMEM);
2178 		return;
2179 	}
2180 
2181 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2182 		/* By default, allocate 1 page per cluster.
2183 		 * Technically, this over-allocates metadata
2184 		 * because more metadata will reduce the number
2185 		 * of usable clusters. This can be addressed with
2186 		 * more complex math in the future.
2187 		 */
2188 		bs->md_len = bs->total_clusters;
2189 	} else {
2190 		bs->md_len = opts.num_md_pages;
2191 	}
2192 
2193 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2194 	if (rc < 0) {
2195 		_spdk_bs_free(bs);
2196 		cb_fn(cb_arg, NULL, -ENOMEM);
2197 		return;
2198 	}
2199 
2200 	ctx = calloc(1, sizeof(*ctx));
2201 	if (!ctx) {
2202 		_spdk_bs_free(bs);
2203 		cb_fn(cb_arg, NULL, -ENOMEM);
2204 		return;
2205 	}
2206 
2207 	ctx->bs = bs;
2208 
2209 	/* Allocate memory for the super block */
2210 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2211 	if (!ctx->super) {
2212 		free(ctx);
2213 		_spdk_bs_free(bs);
2214 		return;
2215 	}
2216 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2217 	       sizeof(ctx->super->signature));
2218 	ctx->super->version = SPDK_BS_VERSION;
2219 	ctx->super->length = sizeof(*ctx->super);
2220 	ctx->super->super_blob = bs->super_blob;
2221 	ctx->super->clean = 0;
2222 	ctx->super->cluster_size = bs->cluster_sz;
2223 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2224 
2225 	/* Calculate how many pages the metadata consumes at the front
2226 	 * of the disk.
2227 	 */
2228 
2229 	/* The super block uses 1 page */
2230 	num_md_pages = 1;
2231 
2232 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2233 	 * up to the nearest page, plus a header.
2234 	 */
2235 	ctx->super->used_page_mask_start = num_md_pages;
2236 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2237 					 divide_round_up(bs->md_len, 8),
2238 					 SPDK_BS_PAGE_SIZE);
2239 	num_md_pages += ctx->super->used_page_mask_len;
2240 
2241 	/* The used_clusters mask requires 1 bit per cluster, rounded
2242 	 * up to the nearest page, plus a header.
2243 	 */
2244 	ctx->super->used_cluster_mask_start = num_md_pages;
2245 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2246 					    divide_round_up(bs->total_clusters, 8),
2247 					    SPDK_BS_PAGE_SIZE);
2248 	num_md_pages += ctx->super->used_cluster_mask_len;
2249 
2250 	/* The metadata region size was chosen above */
2251 	ctx->super->md_start = bs->md_start = num_md_pages;
2252 	ctx->super->md_len = bs->md_len;
2253 	num_md_pages += bs->md_len;
2254 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2255 
2256 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2257 
2258 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2259 	if (num_md_clusters > bs->total_clusters) {
2260 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2261 			    "please decrease number of pages reserved for metadata "
2262 			    "or increase cluster size.\n");
2263 		spdk_dma_free(ctx->super);
2264 		free(ctx);
2265 		_spdk_bs_free(bs);
2266 		cb_fn(cb_arg, NULL, -ENOMEM);
2267 		return;
2268 	}
2269 	/* Claim all of the clusters used by the metadata */
2270 	for (i = 0; i < num_md_clusters; i++) {
2271 		_spdk_bs_claim_cluster(bs, i);
2272 	}
2273 
2274 	bs->total_data_clusters = bs->num_free_clusters;
2275 
2276 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2277 	cpl.u.bs_handle.cb_fn = cb_fn;
2278 	cpl.u.bs_handle.cb_arg = cb_arg;
2279 	cpl.u.bs_handle.bs = bs;
2280 
2281 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2282 	if (!seq) {
2283 		spdk_dma_free(ctx->super);
2284 		free(ctx);
2285 		_spdk_bs_free(bs);
2286 		cb_fn(cb_arg, NULL, -ENOMEM);
2287 		return;
2288 	}
2289 
2290 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
2291 
2292 	/* Clear metadata space */
2293 	spdk_bs_batch_write_zeroes(batch, 0, num_md_lba);
2294 	/* Trim data clusters */
2295 	spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
2296 
2297 	spdk_bs_batch_close(batch);
2298 }
2299 
2300 /* END spdk_bs_init */
2301 
2302 /* START spdk_bs_destroy */
2303 
2304 static void
2305 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2306 {
2307 	struct spdk_bs_init_ctx *ctx = cb_arg;
2308 	struct spdk_blob_store *bs = ctx->bs;
2309 
2310 	/*
2311 	 * We need to defer calling spdk_bs_call_cpl() until after
2312 	 * dev destruction, so tuck these away for later use.
2313 	 */
2314 	bs->unload_err = bserrno;
2315 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2316 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2317 
2318 	spdk_bs_sequence_finish(seq, bserrno);
2319 
2320 	_spdk_bs_free(bs);
2321 	free(ctx);
2322 }
2323 
2324 void
2325 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
2326 		void *cb_arg)
2327 {
2328 	struct spdk_bs_cpl	cpl;
2329 	spdk_bs_sequence_t	*seq;
2330 	struct spdk_bs_init_ctx *ctx;
2331 
2332 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
2333 
2334 	if (!TAILQ_EMPTY(&bs->blobs)) {
2335 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2336 		cb_fn(cb_arg, -EBUSY);
2337 		return;
2338 	}
2339 
2340 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2341 	cpl.u.bs_basic.cb_fn = cb_fn;
2342 	cpl.u.bs_basic.cb_arg = cb_arg;
2343 
2344 	ctx = calloc(1, sizeof(*ctx));
2345 	if (!ctx) {
2346 		cb_fn(cb_arg, -ENOMEM);
2347 		return;
2348 	}
2349 
2350 	ctx->bs = bs;
2351 
2352 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2353 	if (!seq) {
2354 		free(ctx);
2355 		cb_fn(cb_arg, -ENOMEM);
2356 		return;
2357 	}
2358 
2359 	/* Write zeroes to the super block */
2360 	spdk_bs_sequence_write_zeroes(seq,
2361 				      _spdk_bs_page_to_lba(bs, 0),
2362 				      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
2363 				      _spdk_bs_destroy_trim_cpl, ctx);
2364 }
2365 
2366 /* END spdk_bs_destroy */
2367 
2368 /* START spdk_bs_unload */
2369 
2370 static void
2371 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2372 {
2373 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2374 
2375 	spdk_dma_free(ctx->super);
2376 
2377 	/*
2378 	 * We need to defer calling spdk_bs_call_cpl() until after
2379 	 * dev destuction, so tuck these away for later use.
2380 	 */
2381 	ctx->bs->unload_err = bserrno;
2382 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2383 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2384 
2385 	spdk_bs_sequence_finish(seq, bserrno);
2386 
2387 	_spdk_bs_free(ctx->bs);
2388 	free(ctx);
2389 }
2390 
2391 static void
2392 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2393 {
2394 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2395 
2396 	spdk_dma_free(ctx->mask);
2397 	ctx->super->clean = 1;
2398 
2399 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
2400 }
2401 
2402 static void
2403 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2404 {
2405 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2406 
2407 	spdk_dma_free(ctx->mask);
2408 
2409 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
2410 }
2411 
2412 static void
2413 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2414 {
2415 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
2416 }
2417 
2418 void
2419 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
2420 {
2421 	struct spdk_bs_cpl	cpl;
2422 	spdk_bs_sequence_t	*seq;
2423 	struct spdk_bs_load_ctx *ctx;
2424 
2425 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
2426 
2427 	if (!TAILQ_EMPTY(&bs->blobs)) {
2428 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2429 		cb_fn(cb_arg, -EBUSY);
2430 		return;
2431 	}
2432 
2433 	ctx = calloc(1, sizeof(*ctx));
2434 	if (!ctx) {
2435 		cb_fn(cb_arg, -ENOMEM);
2436 		return;
2437 	}
2438 
2439 	ctx->bs = bs;
2440 
2441 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2442 	if (!ctx->super) {
2443 		free(ctx);
2444 		cb_fn(cb_arg, -ENOMEM);
2445 		return;
2446 	}
2447 
2448 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2449 	cpl.u.bs_basic.cb_fn = cb_fn;
2450 	cpl.u.bs_basic.cb_arg = cb_arg;
2451 
2452 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2453 	if (!seq) {
2454 		spdk_dma_free(ctx->super);
2455 		free(ctx);
2456 		cb_fn(cb_arg, -ENOMEM);
2457 		return;
2458 	}
2459 
2460 	/* Read super block */
2461 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2462 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2463 			      _spdk_bs_unload_read_super_cpl, ctx);
2464 }
2465 
2466 /* END spdk_bs_unload */
2467 
2468 void
2469 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
2470 		  spdk_bs_op_complete cb_fn, void *cb_arg)
2471 {
2472 	bs->super_blob = blobid;
2473 	cb_fn(cb_arg, 0);
2474 }
2475 
2476 void
2477 spdk_bs_get_super(struct spdk_blob_store *bs,
2478 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2479 {
2480 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
2481 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
2482 	} else {
2483 		cb_fn(cb_arg, bs->super_blob, 0);
2484 	}
2485 }
2486 
2487 uint64_t
2488 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
2489 {
2490 	return bs->cluster_sz;
2491 }
2492 
2493 uint64_t
2494 spdk_bs_get_page_size(struct spdk_blob_store *bs)
2495 {
2496 	return SPDK_BS_PAGE_SIZE;
2497 }
2498 
2499 uint64_t
2500 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
2501 {
2502 	return bs->num_free_clusters;
2503 }
2504 
2505 uint64_t
2506 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
2507 {
2508 	return bs->total_data_clusters;
2509 }
2510 
2511 static int
2512 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
2513 {
2514 	bs->md_channel = spdk_get_io_channel(bs);
2515 	if (!bs->md_channel) {
2516 		SPDK_ERRLOG("Failed to get IO channel.\n");
2517 		return -1;
2518 	}
2519 
2520 	return 0;
2521 }
2522 
2523 static int
2524 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
2525 {
2526 	spdk_put_io_channel(bs->md_channel);
2527 
2528 	return 0;
2529 }
2530 
2531 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob)
2532 {
2533 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2534 
2535 	assert(blob != NULL);
2536 
2537 	return blob->id;
2538 }
2539 
2540 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob)
2541 {
2542 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2543 
2544 	assert(blob != NULL);
2545 
2546 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
2547 }
2548 
2549 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob)
2550 {
2551 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2552 
2553 	assert(blob != NULL);
2554 
2555 	return blob->active.num_clusters;
2556 }
2557 
2558 /* START spdk_bs_create_blob */
2559 
2560 static void
2561 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2562 {
2563 	struct spdk_blob_data *blob = cb_arg;
2564 
2565 	_spdk_blob_free(blob);
2566 
2567 	spdk_bs_sequence_finish(seq, bserrno);
2568 }
2569 
2570 void spdk_bs_create_blob(struct spdk_blob_store *bs,
2571 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2572 {
2573 	struct spdk_blob_data	*blob;
2574 	uint32_t		page_idx;
2575 	struct spdk_bs_cpl 	cpl;
2576 	spdk_bs_sequence_t	*seq;
2577 	spdk_blob_id		id;
2578 
2579 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
2580 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
2581 		cb_fn(cb_arg, 0, -ENOMEM);
2582 		return;
2583 	}
2584 	spdk_bit_array_set(bs->used_md_pages, page_idx);
2585 
2586 	id = _spdk_bs_page_to_blobid(page_idx);
2587 
2588 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
2589 
2590 	blob = _spdk_blob_alloc(bs, id);
2591 	if (!blob) {
2592 		cb_fn(cb_arg, 0, -ENOMEM);
2593 		return;
2594 	}
2595 
2596 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
2597 	cpl.u.blobid.cb_fn = cb_fn;
2598 	cpl.u.blobid.cb_arg = cb_arg;
2599 	cpl.u.blobid.blobid = blob->id;
2600 
2601 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2602 	if (!seq) {
2603 		_spdk_blob_free(blob);
2604 		cb_fn(cb_arg, 0, -ENOMEM);
2605 		return;
2606 	}
2607 
2608 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
2609 }
2610 
2611 /* END spdk_bs_create_blob */
2612 
2613 /* START spdk_blob_resize */
2614 int
2615 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz)
2616 {
2617 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2618 	int			rc;
2619 
2620 	assert(blob != NULL);
2621 
2622 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
2623 
2624 	if (blob->md_ro) {
2625 		return -EPERM;
2626 	}
2627 
2628 	if (sz == blob->active.num_clusters) {
2629 		return 0;
2630 	}
2631 
2632 	rc = _spdk_resize_blob(blob, sz);
2633 	if (rc < 0) {
2634 		return rc;
2635 	}
2636 
2637 	return 0;
2638 }
2639 
2640 /* END spdk_blob_resize */
2641 
2642 
2643 /* START spdk_bs_delete_blob */
2644 
2645 static void
2646 _spdk_bs_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2647 {
2648 	struct spdk_blob_data *blob = cb_arg;
2649 
2650 	_spdk_blob_free(blob);
2651 
2652 	spdk_bs_sequence_finish(seq, bserrno);
2653 }
2654 
2655 static void
2656 _spdk_bs_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2657 {
2658 	struct spdk_blob_data *blob = cb_arg;
2659 
2660 	/* If the blob have crc error, we just return NULL. */
2661 	if (blob == NULL) {
2662 		spdk_bs_sequence_finish(seq, bserrno);
2663 		return;
2664 	}
2665 	blob->state = SPDK_BLOB_STATE_DIRTY;
2666 	blob->active.num_pages = 0;
2667 	_spdk_resize_blob(blob, 0);
2668 
2669 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_blob_cpl, blob);
2670 }
2671 
2672 void
2673 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2674 		    spdk_blob_op_complete cb_fn, void *cb_arg)
2675 {
2676 	struct spdk_blob_data	*blob;
2677 	struct spdk_bs_cpl	cpl;
2678 	spdk_bs_sequence_t 	*seq;
2679 
2680 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
2681 
2682 	blob = _spdk_blob_lookup(bs, blobid);
2683 	if (blob) {
2684 		assert(blob->open_ref > 0);
2685 		cb_fn(cb_arg, -EINVAL);
2686 		return;
2687 	}
2688 
2689 	blob = _spdk_blob_alloc(bs, blobid);
2690 	if (!blob) {
2691 		cb_fn(cb_arg, -ENOMEM);
2692 		return;
2693 	}
2694 
2695 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2696 	cpl.u.blob_basic.cb_fn = cb_fn;
2697 	cpl.u.blob_basic.cb_arg = cb_arg;
2698 
2699 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2700 	if (!seq) {
2701 		_spdk_blob_free(blob);
2702 		cb_fn(cb_arg, -ENOMEM);
2703 		return;
2704 	}
2705 
2706 	_spdk_blob_load(seq, blob, _spdk_bs_delete_open_cpl, blob);
2707 }
2708 
2709 /* END spdk_bs_delete_blob */
2710 
2711 /* START spdk_bs_open_blob */
2712 
2713 static void
2714 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2715 {
2716 	struct spdk_blob_data *blob = cb_arg;
2717 
2718 	/* If the blob have crc error, we just return NULL. */
2719 	if (blob == NULL) {
2720 		seq->cpl.u.blob_handle.blob = NULL;
2721 		spdk_bs_sequence_finish(seq, bserrno);
2722 		return;
2723 	}
2724 
2725 	blob->open_ref++;
2726 
2727 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
2728 
2729 	spdk_bs_sequence_finish(seq, bserrno);
2730 }
2731 
2732 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2733 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2734 {
2735 	struct spdk_blob_data		*blob;
2736 	struct spdk_bs_cpl		cpl;
2737 	spdk_bs_sequence_t		*seq;
2738 	uint32_t			page_num;
2739 
2740 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
2741 
2742 	blob = _spdk_blob_lookup(bs, blobid);
2743 	if (blob) {
2744 		blob->open_ref++;
2745 		cb_fn(cb_arg, __data_to_blob(blob), 0);
2746 		return;
2747 	}
2748 
2749 	page_num = _spdk_bs_blobid_to_page(blobid);
2750 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
2751 		/* Invalid blobid */
2752 		cb_fn(cb_arg, NULL, -ENOENT);
2753 		return;
2754 	}
2755 
2756 	blob = _spdk_blob_alloc(bs, blobid);
2757 	if (!blob) {
2758 		cb_fn(cb_arg, NULL, -ENOMEM);
2759 		return;
2760 	}
2761 
2762 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2763 	cpl.u.blob_handle.cb_fn = cb_fn;
2764 	cpl.u.blob_handle.cb_arg = cb_arg;
2765 	cpl.u.blob_handle.blob = __data_to_blob(blob);
2766 
2767 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2768 	if (!seq) {
2769 		_spdk_blob_free(blob);
2770 		cb_fn(cb_arg, NULL, -ENOMEM);
2771 		return;
2772 	}
2773 
2774 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
2775 }
2776 
2777 /* END spdk_bs_open_blob */
2778 
2779 /* START spdk_blob_sync_md */
2780 
2781 static void
2782 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2783 {
2784 	spdk_bs_sequence_finish(seq, bserrno);
2785 }
2786 
2787 void
2788 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg)
2789 {
2790 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2791 	struct spdk_bs_cpl	cpl;
2792 	spdk_bs_sequence_t	*seq;
2793 
2794 	assert(blob != NULL);
2795 
2796 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
2797 
2798 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2799 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2800 
2801 	if (blob->md_ro) {
2802 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
2803 		return;
2804 	}
2805 
2806 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2807 		cb_fn(cb_arg, 0);
2808 		return;
2809 	}
2810 
2811 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2812 	cpl.u.blob_basic.cb_fn = cb_fn;
2813 	cpl.u.blob_basic.cb_arg = cb_arg;
2814 
2815 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2816 	if (!seq) {
2817 		cb_fn(cb_arg, -ENOMEM);
2818 		return;
2819 	}
2820 
2821 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
2822 }
2823 
2824 /* END spdk_blob_sync_md */
2825 
2826 /* START spdk_blob_close */
2827 
2828 static void
2829 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2830 {
2831 	struct spdk_blob_data *blob = cb_arg;
2832 
2833 	if (blob->open_ref == 0) {
2834 		TAILQ_REMOVE(&blob->bs->blobs, blob, link);
2835 		_spdk_blob_free(blob);
2836 	}
2837 
2838 	spdk_bs_sequence_finish(seq, bserrno);
2839 }
2840 
2841 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg)
2842 {
2843 	struct spdk_bs_cpl	cpl;
2844 	struct spdk_blob_data	*blob;
2845 	spdk_bs_sequence_t	*seq;
2846 
2847 	assert(b != NULL);
2848 	blob = __blob_to_data(b);
2849 	assert(blob != NULL);
2850 
2851 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
2852 
2853 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2854 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2855 
2856 	if (blob->open_ref == 0) {
2857 		cb_fn(cb_arg, -EBADF);
2858 		return;
2859 	}
2860 
2861 	blob->open_ref--;
2862 
2863 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2864 	cpl.u.blob_basic.cb_fn = cb_fn;
2865 	cpl.u.blob_basic.cb_arg = cb_arg;
2866 
2867 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2868 	if (!seq) {
2869 		cb_fn(cb_arg, -ENOMEM);
2870 		return;
2871 	}
2872 
2873 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2874 		_spdk_blob_close_cpl(seq, blob, 0);
2875 		return;
2876 	}
2877 
2878 	/* Sync metadata */
2879 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
2880 }
2881 
2882 /* END spdk_blob_close */
2883 
2884 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
2885 {
2886 	return spdk_get_io_channel(bs);
2887 }
2888 
2889 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2890 {
2891 	spdk_put_io_channel(channel);
2892 }
2893 
2894 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2895 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
2896 {
2897 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
2898 				     SPDK_BLOB_UNMAP);
2899 }
2900 
2901 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2902 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
2903 {
2904 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
2905 				     SPDK_BLOB_WRITE_ZEROES);
2906 }
2907 
2908 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2909 			   void *payload, uint64_t offset, uint64_t length,
2910 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2911 {
2912 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
2913 				     SPDK_BLOB_WRITE);
2914 }
2915 
2916 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2917 			  void *payload, uint64_t offset, uint64_t length,
2918 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2919 {
2920 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
2921 				     SPDK_BLOB_READ);
2922 }
2923 
2924 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2925 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2926 			    spdk_blob_op_complete cb_fn, void *cb_arg)
2927 {
2928 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
2929 }
2930 
2931 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2932 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2933 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2934 {
2935 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
2936 }
2937 
2938 struct spdk_bs_iter_ctx {
2939 	int64_t page_num;
2940 	struct spdk_blob_store *bs;
2941 
2942 	spdk_blob_op_with_handle_complete cb_fn;
2943 	void *cb_arg;
2944 };
2945 
2946 static void
2947 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
2948 {
2949 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2950 	struct spdk_blob_store *bs = ctx->bs;
2951 	spdk_blob_id id;
2952 
2953 	if (bserrno == 0) {
2954 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
2955 		free(ctx);
2956 		return;
2957 	}
2958 
2959 	ctx->page_num++;
2960 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2961 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2962 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2963 		free(ctx);
2964 		return;
2965 	}
2966 
2967 	id = _spdk_bs_page_to_blobid(ctx->page_num);
2968 
2969 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
2970 }
2971 
2972 void
2973 spdk_bs_iter_first(struct spdk_blob_store *bs,
2974 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2975 {
2976 	struct spdk_bs_iter_ctx *ctx;
2977 
2978 	ctx = calloc(1, sizeof(*ctx));
2979 	if (!ctx) {
2980 		cb_fn(cb_arg, NULL, -ENOMEM);
2981 		return;
2982 	}
2983 
2984 	ctx->page_num = -1;
2985 	ctx->bs = bs;
2986 	ctx->cb_fn = cb_fn;
2987 	ctx->cb_arg = cb_arg;
2988 
2989 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2990 }
2991 
2992 static void
2993 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
2994 {
2995 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2996 
2997 	_spdk_bs_iter_cpl(ctx, NULL, -1);
2998 }
2999 
3000 void
3001 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
3002 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3003 {
3004 	struct spdk_bs_iter_ctx *ctx;
3005 	struct spdk_blob_data	*blob;
3006 
3007 	assert(b != NULL);
3008 	blob = __blob_to_data(*b);
3009 	assert(blob != NULL);
3010 
3011 	ctx = calloc(1, sizeof(*ctx));
3012 	if (!ctx) {
3013 		cb_fn(cb_arg, NULL, -ENOMEM);
3014 		return;
3015 	}
3016 
3017 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
3018 	ctx->bs = bs;
3019 	ctx->cb_fn = cb_fn;
3020 	ctx->cb_arg = cb_arg;
3021 
3022 	/* Close the existing blob */
3023 	spdk_blob_close(*b, _spdk_bs_iter_close_cpl, ctx);
3024 }
3025 
3026 int
3027 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value,
3028 		    uint16_t value_len)
3029 {
3030 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3031 	struct spdk_xattr 	*xattr;
3032 
3033 	assert(blob != NULL);
3034 
3035 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3036 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3037 
3038 	if (blob->md_ro) {
3039 		return -EPERM;
3040 	}
3041 
3042 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3043 		if (!strcmp(name, xattr->name)) {
3044 			free(xattr->value);
3045 			xattr->value_len = value_len;
3046 			xattr->value = malloc(value_len);
3047 			memcpy(xattr->value, value, value_len);
3048 
3049 			blob->state = SPDK_BLOB_STATE_DIRTY;
3050 
3051 			return 0;
3052 		}
3053 	}
3054 
3055 	xattr = calloc(1, sizeof(*xattr));
3056 	if (!xattr) {
3057 		return -1;
3058 	}
3059 	xattr->name = strdup(name);
3060 	xattr->value_len = value_len;
3061 	xattr->value = malloc(value_len);
3062 	memcpy(xattr->value, value, value_len);
3063 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
3064 
3065 	blob->state = SPDK_BLOB_STATE_DIRTY;
3066 
3067 	return 0;
3068 }
3069 
3070 int
3071 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name)
3072 {
3073 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3074 	struct spdk_xattr	*xattr;
3075 
3076 	assert(blob != NULL);
3077 
3078 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3079 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3080 
3081 	if (blob->md_ro) {
3082 		return -EPERM;
3083 	}
3084 
3085 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3086 		if (!strcmp(name, xattr->name)) {
3087 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
3088 			free(xattr->value);
3089 			free(xattr->name);
3090 			free(xattr);
3091 
3092 			blob->state = SPDK_BLOB_STATE_DIRTY;
3093 
3094 			return 0;
3095 		}
3096 	}
3097 
3098 	return -ENOENT;
3099 }
3100 
3101 int
3102 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name,
3103 			  const void **value, size_t *value_len)
3104 {
3105 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3106 	struct spdk_xattr	*xattr;
3107 
3108 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3109 		if (!strcmp(name, xattr->name)) {
3110 			*value = xattr->value;
3111 			*value_len = xattr->value_len;
3112 			return 0;
3113 		}
3114 	}
3115 
3116 	return -ENOENT;
3117 }
3118 
3119 struct spdk_xattr_names {
3120 	uint32_t	count;
3121 	const char	*names[0];
3122 };
3123 
3124 int
3125 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names)
3126 {
3127 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3128 	struct spdk_xattr	*xattr;
3129 	int			count = 0;
3130 
3131 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3132 		count++;
3133 	}
3134 
3135 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
3136 	if (*names == NULL) {
3137 		return -ENOMEM;
3138 	}
3139 
3140 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3141 		(*names)->names[(*names)->count++] = xattr->name;
3142 	}
3143 
3144 	return 0;
3145 }
3146 
3147 uint32_t
3148 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
3149 {
3150 	assert(names != NULL);
3151 
3152 	return names->count;
3153 }
3154 
3155 const char *
3156 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
3157 {
3158 	if (index >= names->count) {
3159 		return NULL;
3160 	}
3161 
3162 	return names->names[index];
3163 }
3164 
3165 void
3166 spdk_xattr_names_free(struct spdk_xattr_names *names)
3167 {
3168 	free(names);
3169 }
3170 
3171 struct spdk_bs_type
3172 spdk_bs_get_bstype(struct spdk_blob_store *bs)
3173 {
3174 	return bs->bstype;
3175 }
3176 
3177 void
3178 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
3179 {
3180 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
3181 }
3182 
3183 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
3184