xref: /spdk/lib/blob/blobstore.c (revision f86f10757912918b8ba7b4b3bfdab1cd4c2d180c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
53 
54 static inline size_t
55 divide_round_up(size_t num, size_t divisor)
56 {
57 	return (num + divisor - 1) / divisor;
58 }
59 
60 static void
61 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
62 {
63 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
64 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
65 	assert(bs->num_free_clusters > 0);
66 
67 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
68 
69 	spdk_bit_array_set(bs->used_clusters, cluster_num);
70 	bs->num_free_clusters--;
71 }
72 
73 static void
74 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
75 {
76 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
77 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
78 	assert(bs->num_free_clusters < bs->total_clusters);
79 
80 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
81 
82 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
83 	bs->num_free_clusters++;
84 }
85 
86 static struct spdk_blob_data *
87 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
88 {
89 	struct spdk_blob_data *blob;
90 
91 	blob = calloc(1, sizeof(*blob));
92 	if (!blob) {
93 		return NULL;
94 	}
95 
96 	blob->id = id;
97 	blob->bs = bs;
98 
99 	blob->state = SPDK_BLOB_STATE_DIRTY;
100 	blob->active.num_pages = 1;
101 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
102 	if (!blob->active.pages) {
103 		free(blob);
104 		return NULL;
105 	}
106 
107 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
108 
109 	TAILQ_INIT(&blob->xattrs);
110 
111 	return blob;
112 }
113 
114 static void
115 _spdk_blob_free(struct spdk_blob_data *blob)
116 {
117 	struct spdk_xattr 	*xattr, *xattr_tmp;
118 
119 	assert(blob != NULL);
120 
121 	free(blob->active.clusters);
122 	free(blob->clean.clusters);
123 	free(blob->active.pages);
124 	free(blob->clean.pages);
125 
126 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
127 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
128 		free(xattr->name);
129 		free(xattr->value);
130 		free(xattr);
131 	}
132 
133 	free(blob);
134 }
135 
136 static int
137 _spdk_blob_mark_clean(struct spdk_blob_data *blob)
138 {
139 	uint64_t *clusters = NULL;
140 	uint32_t *pages = NULL;
141 
142 	assert(blob != NULL);
143 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
144 	       blob->state == SPDK_BLOB_STATE_SYNCING);
145 
146 	if (blob->active.num_clusters) {
147 		assert(blob->active.clusters);
148 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
149 		if (!clusters) {
150 			return -1;
151 		}
152 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
153 	}
154 
155 	if (blob->active.num_pages) {
156 		assert(blob->active.pages);
157 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
158 		if (!pages) {
159 			free(clusters);
160 			return -1;
161 		}
162 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
163 	}
164 
165 	free(blob->clean.clusters);
166 	free(blob->clean.pages);
167 
168 	blob->clean.num_clusters = blob->active.num_clusters;
169 	blob->clean.clusters = blob->active.clusters;
170 	blob->clean.num_pages = blob->active.num_pages;
171 	blob->clean.pages = blob->active.pages;
172 
173 	blob->active.clusters = clusters;
174 	blob->active.pages = pages;
175 
176 	blob->state = SPDK_BLOB_STATE_CLEAN;
177 
178 	return 0;
179 }
180 
181 static int
182 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob)
183 {
184 	struct spdk_blob_md_descriptor *desc;
185 	size_t	cur_desc = 0;
186 	void *tmp;
187 
188 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
189 	while (cur_desc < sizeof(page->descriptors)) {
190 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
191 			if (desc->length == 0) {
192 				/* If padding and length are 0, this terminates the page */
193 				break;
194 			}
195 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
196 			struct spdk_blob_md_descriptor_flags	*desc_flags;
197 
198 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
199 
200 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
201 				return -EINVAL;
202 			}
203 
204 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
205 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
206 				return -EINVAL;
207 			}
208 
209 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
210 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
211 				blob->data_ro = true;
212 				blob->md_ro = true;
213 			}
214 
215 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
216 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
217 				blob->md_ro = true;
218 			}
219 
220 			blob->invalid_flags = desc_flags->invalid_flags;
221 			blob->data_ro_flags = desc_flags->data_ro_flags;
222 			blob->md_ro_flags = desc_flags->md_ro_flags;
223 
224 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
225 			struct spdk_blob_md_descriptor_extent	*desc_extent;
226 			unsigned int				i, j;
227 			unsigned int				cluster_count = blob->active.num_clusters;
228 
229 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
230 
231 			if (desc_extent->length == 0 ||
232 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
233 				return -EINVAL;
234 			}
235 
236 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
237 				for (j = 0; j < desc_extent->extents[i].length; j++) {
238 					if (!spdk_bit_array_get(blob->bs->used_clusters,
239 								desc_extent->extents[i].cluster_idx + j)) {
240 						return -EINVAL;
241 					}
242 					cluster_count++;
243 				}
244 			}
245 
246 			if (cluster_count == 0) {
247 				return -EINVAL;
248 			}
249 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
250 			if (tmp == NULL) {
251 				return -ENOMEM;
252 			}
253 			blob->active.clusters = tmp;
254 			blob->active.cluster_array_size = cluster_count;
255 
256 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
257 				for (j = 0; j < desc_extent->extents[i].length; j++) {
258 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
259 							desc_extent->extents[i].cluster_idx + j);
260 				}
261 			}
262 
263 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
264 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
265 			struct spdk_xattr 			*xattr;
266 
267 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
268 
269 			if (desc_xattr->length != sizeof(desc_xattr->name_length) +
270 			    sizeof(desc_xattr->value_length) +
271 			    desc_xattr->name_length + desc_xattr->value_length) {
272 				return -EINVAL;
273 			}
274 
275 			xattr = calloc(1, sizeof(*xattr));
276 			if (xattr == NULL) {
277 				return -ENOMEM;
278 			}
279 
280 			xattr->name = malloc(desc_xattr->name_length + 1);
281 			if (xattr->name == NULL) {
282 				free(xattr);
283 				return -ENOMEM;
284 			}
285 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
286 			xattr->name[desc_xattr->name_length] = '\0';
287 
288 			xattr->value = malloc(desc_xattr->value_length);
289 			if (xattr->value == NULL) {
290 				free(xattr->name);
291 				free(xattr);
292 				return -ENOMEM;
293 			}
294 			xattr->value_len = desc_xattr->value_length;
295 			memcpy(xattr->value,
296 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
297 			       desc_xattr->value_length);
298 
299 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
300 		} else {
301 			/* Unrecognized descriptor type.  Do not fail - just continue to the
302 			 *  next descriptor.  If this descriptor is associated with some feature
303 			 *  defined in a newer version of blobstore, that version of blobstore
304 			 *  should create and set an associated feature flag to specify if this
305 			 *  blob can be loaded or not.
306 			 */
307 		}
308 
309 		/* Advance to the next descriptor */
310 		cur_desc += sizeof(*desc) + desc->length;
311 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
312 			break;
313 		}
314 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
315 	}
316 
317 	return 0;
318 }
319 
320 static int
321 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
322 		 struct spdk_blob_data *blob)
323 {
324 	const struct spdk_blob_md_page *page;
325 	uint32_t i;
326 	int rc;
327 
328 	assert(page_count > 0);
329 	assert(pages[0].sequence_num == 0);
330 	assert(blob != NULL);
331 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
332 	assert(blob->active.clusters == NULL);
333 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
334 
335 	/* The blobid provided doesn't match what's in the MD, this can
336 	 * happen for example if a bogus blobid is passed in through open.
337 	 */
338 	if (blob->id != pages[0].id) {
339 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
340 			    blob->id, pages[0].id);
341 		return -ENOENT;
342 	}
343 
344 	for (i = 0; i < page_count; i++) {
345 		page = &pages[i];
346 
347 		assert(page->id == blob->id);
348 		assert(page->sequence_num == i);
349 
350 		rc = _spdk_blob_parse_page(page, blob);
351 		if (rc != 0) {
352 			return rc;
353 		}
354 	}
355 
356 	return 0;
357 }
358 
359 static int
360 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob,
361 			      struct spdk_blob_md_page **pages,
362 			      uint32_t *page_count,
363 			      struct spdk_blob_md_page **last_page)
364 {
365 	struct spdk_blob_md_page *page;
366 
367 	assert(pages != NULL);
368 	assert(page_count != NULL);
369 
370 	if (*page_count == 0) {
371 		assert(*pages == NULL);
372 		*page_count = 1;
373 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
374 					 SPDK_BS_PAGE_SIZE,
375 					 NULL);
376 	} else {
377 		assert(*pages != NULL);
378 		(*page_count)++;
379 		*pages = spdk_dma_realloc(*pages,
380 					  SPDK_BS_PAGE_SIZE * (*page_count),
381 					  SPDK_BS_PAGE_SIZE,
382 					  NULL);
383 	}
384 
385 	if (*pages == NULL) {
386 		*page_count = 0;
387 		*last_page = NULL;
388 		return -ENOMEM;
389 	}
390 
391 	page = &(*pages)[*page_count - 1];
392 	memset(page, 0, sizeof(*page));
393 	page->id = blob->id;
394 	page->sequence_num = *page_count - 1;
395 	page->next = SPDK_INVALID_MD_PAGE;
396 	*last_page = page;
397 
398 	return 0;
399 }
400 
401 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
402  * Update required_sz on both success and failure.
403  *
404  */
405 static int
406 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
407 			   uint8_t *buf, size_t buf_sz,
408 			   size_t *required_sz)
409 {
410 	struct spdk_blob_md_descriptor_xattr	*desc;
411 
412 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
413 		       strlen(xattr->name) +
414 		       xattr->value_len;
415 
416 	if (buf_sz < *required_sz) {
417 		return -1;
418 	}
419 
420 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
421 
422 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
423 	desc->length = sizeof(desc->name_length) +
424 		       sizeof(desc->value_length) +
425 		       strlen(xattr->name) +
426 		       xattr->value_len;
427 	desc->name_length = strlen(xattr->name);
428 	desc->value_length = xattr->value_len;
429 
430 	memcpy(desc->name, xattr->name, desc->name_length);
431 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
432 	       xattr->value,
433 	       desc->value_length);
434 
435 	return 0;
436 }
437 
438 static void
439 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob,
440 			    uint64_t start_cluster, uint64_t *next_cluster,
441 			    uint8_t *buf, size_t buf_sz)
442 {
443 	struct spdk_blob_md_descriptor_extent *desc;
444 	size_t cur_sz;
445 	uint64_t i, extent_idx;
446 	uint32_t lba, lba_per_cluster, lba_count;
447 
448 	/* The buffer must have room for at least one extent */
449 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
450 	if (buf_sz < cur_sz) {
451 		*next_cluster = start_cluster;
452 		return;
453 	}
454 
455 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
456 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
457 
458 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
459 
460 	lba = blob->active.clusters[start_cluster];
461 	lba_count = lba_per_cluster;
462 	extent_idx = 0;
463 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
464 		if ((lba + lba_count) == blob->active.clusters[i]) {
465 			lba_count += lba_per_cluster;
466 			continue;
467 		}
468 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
469 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
470 		extent_idx++;
471 
472 		cur_sz += sizeof(desc->extents[extent_idx]);
473 
474 		if (buf_sz < cur_sz) {
475 			/* If we ran out of buffer space, return */
476 			desc->length = sizeof(desc->extents[0]) * extent_idx;
477 			*next_cluster = i;
478 			return;
479 		}
480 
481 		lba = blob->active.clusters[i];
482 		lba_count = lba_per_cluster;
483 	}
484 
485 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
486 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
487 	extent_idx++;
488 
489 	desc->length = sizeof(desc->extents[0]) * extent_idx;
490 	*next_cluster = blob->active.num_clusters;
491 
492 	return;
493 }
494 
495 static void
496 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob,
497 			   uint8_t *buf, size_t *buf_sz)
498 {
499 	struct spdk_blob_md_descriptor_flags *desc;
500 
501 	/*
502 	 * Flags get serialized first, so we should always have room for the flags
503 	 *  descriptor.
504 	 */
505 	assert(*buf_sz >= sizeof(*desc));
506 
507 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
508 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
509 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
510 	desc->invalid_flags = blob->invalid_flags;
511 	desc->data_ro_flags = blob->data_ro_flags;
512 	desc->md_ro_flags = blob->md_ro_flags;
513 
514 	*buf_sz -= sizeof(*desc);
515 }
516 
517 static int
518 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages,
519 		     uint32_t *page_count)
520 {
521 	struct spdk_blob_md_page		*cur_page;
522 	const struct spdk_xattr			*xattr;
523 	int 					rc;
524 	uint8_t					*buf;
525 	size_t					remaining_sz;
526 	uint64_t				last_cluster;
527 
528 	assert(pages != NULL);
529 	assert(page_count != NULL);
530 	assert(blob != NULL);
531 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
532 
533 	*pages = NULL;
534 	*page_count = 0;
535 
536 	/* A blob always has at least 1 page, even if it has no descriptors */
537 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
538 	if (rc < 0) {
539 		return rc;
540 	}
541 
542 	buf = (uint8_t *)cur_page->descriptors;
543 	remaining_sz = sizeof(cur_page->descriptors);
544 
545 	/* Serialize flags */
546 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
547 
548 	/* Serialize xattrs */
549 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
550 		size_t required_sz = 0;
551 		rc = _spdk_blob_serialize_xattr(xattr,
552 						buf, remaining_sz,
553 						&required_sz);
554 		if (rc < 0) {
555 			/* Need to add a new page to the chain */
556 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
557 							   &cur_page);
558 			if (rc < 0) {
559 				spdk_dma_free(*pages);
560 				*pages = NULL;
561 				*page_count = 0;
562 				return rc;
563 			}
564 
565 			buf = (uint8_t *)cur_page->descriptors;
566 			remaining_sz = sizeof(cur_page->descriptors);
567 
568 			/* Try again */
569 			required_sz = 0;
570 			rc = _spdk_blob_serialize_xattr(xattr,
571 							buf, remaining_sz,
572 							&required_sz);
573 
574 			if (rc < 0) {
575 				spdk_dma_free(*pages);
576 				*pages = NULL;
577 				*page_count = 0;
578 				return -1;
579 			}
580 		}
581 
582 		remaining_sz -= required_sz;
583 		buf += required_sz;
584 	}
585 
586 	/* Serialize extents */
587 	last_cluster = 0;
588 	while (last_cluster < blob->active.num_clusters) {
589 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
590 					    buf, remaining_sz);
591 
592 		if (last_cluster == blob->active.num_clusters) {
593 			break;
594 		}
595 
596 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
597 						   &cur_page);
598 		if (rc < 0) {
599 			return rc;
600 		}
601 
602 		buf = (uint8_t *)cur_page->descriptors;
603 		remaining_sz = sizeof(cur_page->descriptors);
604 	}
605 
606 	return 0;
607 }
608 
609 struct spdk_blob_load_ctx {
610 	struct spdk_blob_data 		*blob;
611 
612 	struct spdk_blob_md_page	*pages;
613 	uint32_t			num_pages;
614 
615 	spdk_bs_sequence_cpl		cb_fn;
616 	void				*cb_arg;
617 };
618 
619 static uint32_t
620 _spdk_blob_md_page_calc_crc(void *page)
621 {
622 	uint32_t		crc;
623 
624 	crc = BLOB_CRC32C_INITIAL;
625 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
626 	crc ^= BLOB_CRC32C_INITIAL;
627 
628 	return crc;
629 
630 }
631 
632 static void
633 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
634 {
635 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
636 	struct spdk_blob_data 		*blob = ctx->blob;
637 	struct spdk_blob_md_page	*page;
638 	int				rc;
639 	uint32_t			crc;
640 
641 	page = &ctx->pages[ctx->num_pages - 1];
642 	crc = _spdk_blob_md_page_calc_crc(page);
643 	if (crc != page->crc) {
644 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
645 		_spdk_blob_free(blob);
646 		ctx->cb_fn(seq, NULL, -EINVAL);
647 		spdk_dma_free(ctx->pages);
648 		free(ctx);
649 		return;
650 	}
651 
652 	if (page->next != SPDK_INVALID_MD_PAGE) {
653 		uint32_t next_page = page->next;
654 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
655 
656 
657 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
658 
659 		/* Read the next page */
660 		ctx->num_pages++;
661 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
662 					      sizeof(*page), NULL);
663 		if (ctx->pages == NULL) {
664 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
665 			free(ctx);
666 			return;
667 		}
668 
669 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
670 				      next_lba,
671 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
672 				      _spdk_blob_load_cpl, ctx);
673 		return;
674 	}
675 
676 	/* Parse the pages */
677 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
678 	if (rc) {
679 		_spdk_blob_free(blob);
680 		ctx->cb_fn(seq, NULL, rc);
681 		spdk_dma_free(ctx->pages);
682 		free(ctx);
683 		return;
684 	}
685 
686 	_spdk_blob_mark_clean(blob);
687 
688 	ctx->cb_fn(seq, ctx->cb_arg, rc);
689 
690 	/* Free the memory */
691 	spdk_dma_free(ctx->pages);
692 	free(ctx);
693 }
694 
695 /* Load a blob from disk given a blobid */
696 static void
697 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
698 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
699 {
700 	struct spdk_blob_load_ctx *ctx;
701 	struct spdk_blob_store *bs;
702 	uint32_t page_num;
703 	uint64_t lba;
704 
705 	assert(blob != NULL);
706 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
707 	       blob->state == SPDK_BLOB_STATE_DIRTY);
708 
709 	bs = blob->bs;
710 
711 	ctx = calloc(1, sizeof(*ctx));
712 	if (!ctx) {
713 		cb_fn(seq, cb_arg, -ENOMEM);
714 		return;
715 	}
716 
717 	ctx->blob = blob;
718 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
719 				      SPDK_BS_PAGE_SIZE, NULL);
720 	if (!ctx->pages) {
721 		free(ctx);
722 		cb_fn(seq, cb_arg, -ENOMEM);
723 		return;
724 	}
725 	ctx->num_pages = 1;
726 	ctx->cb_fn = cb_fn;
727 	ctx->cb_arg = cb_arg;
728 
729 	page_num = _spdk_bs_blobid_to_page(blob->id);
730 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
731 
732 	blob->state = SPDK_BLOB_STATE_LOADING;
733 
734 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
735 			      _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
736 			      _spdk_blob_load_cpl, ctx);
737 }
738 
739 struct spdk_blob_persist_ctx {
740 	struct spdk_blob_data 		*blob;
741 
742 	struct spdk_blob_md_page	*pages;
743 
744 	uint64_t			idx;
745 
746 	spdk_bs_sequence_cpl		cb_fn;
747 	void				*cb_arg;
748 };
749 
750 static void
751 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
752 {
753 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
754 	struct spdk_blob_data 		*blob = ctx->blob;
755 
756 	if (bserrno == 0) {
757 		_spdk_blob_mark_clean(blob);
758 	}
759 
760 	/* Call user callback */
761 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
762 
763 	/* Free the memory */
764 	spdk_dma_free(ctx->pages);
765 	free(ctx);
766 }
767 
768 static void
769 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
770 {
771 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
772 	struct spdk_blob_data 		*blob = ctx->blob;
773 	struct spdk_blob_store		*bs = blob->bs;
774 	void				*tmp;
775 	size_t				i;
776 
777 	/* Release all clusters that were truncated */
778 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
779 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
780 
781 		_spdk_bs_release_cluster(bs, cluster_num);
782 	}
783 
784 	if (blob->active.num_clusters == 0) {
785 		free(blob->active.clusters);
786 		blob->active.clusters = NULL;
787 		blob->active.cluster_array_size = 0;
788 	} else {
789 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
790 		assert(tmp != NULL);
791 		blob->active.clusters = tmp;
792 		blob->active.cluster_array_size = blob->active.num_clusters;
793 	}
794 
795 	_spdk_blob_persist_complete(seq, ctx, bserrno);
796 }
797 
798 static void
799 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
800 {
801 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
802 	struct spdk_blob_data 		*blob = ctx->blob;
803 	struct spdk_blob_store		*bs = blob->bs;
804 	spdk_bs_batch_t			*batch;
805 	size_t				i;
806 	uint64_t			lba;
807 	uint32_t			lba_count;
808 
809 	/* Clusters don't move around in blobs. The list shrinks or grows
810 	 * at the end, but no changes ever occur in the middle of the list.
811 	 */
812 
813 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
814 
815 	/* Unmap all clusters that were truncated */
816 	lba = 0;
817 	lba_count = 0;
818 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
819 		uint64_t next_lba = blob->active.clusters[i];
820 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
821 
822 		if ((lba + lba_count) == next_lba) {
823 			/* This cluster is contiguous with the previous one. */
824 			lba_count += next_lba_count;
825 			continue;
826 		}
827 
828 		/* This cluster is not contiguous with the previous one. */
829 
830 		/* If a run of LBAs previously existing, send them
831 		 * as an unmap.
832 		 */
833 		if (lba_count > 0) {
834 			spdk_bs_batch_unmap(batch, lba, lba_count);
835 		}
836 
837 		/* Start building the next batch */
838 		lba = next_lba;
839 		lba_count = next_lba_count;
840 	}
841 
842 	/* If we ended with a contiguous set of LBAs, send the unmap now */
843 	if (lba_count > 0) {
844 		spdk_bs_batch_unmap(batch, lba, lba_count);
845 	}
846 
847 	spdk_bs_batch_close(batch);
848 }
849 
850 static void
851 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
852 {
853 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
854 	struct spdk_blob_data 		*blob = ctx->blob;
855 	struct spdk_blob_store		*bs = blob->bs;
856 	size_t				i;
857 
858 	/* This loop starts at 1 because the first page is special and handled
859 	 * below. The pages (except the first) are never written in place,
860 	 * so any pages in the clean list must be zeroed.
861 	 */
862 	for (i = 1; i < blob->clean.num_pages; i++) {
863 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
864 	}
865 
866 	if (blob->active.num_pages == 0) {
867 		uint32_t page_num;
868 
869 		page_num = _spdk_bs_blobid_to_page(blob->id);
870 		spdk_bit_array_clear(bs->used_md_pages, page_num);
871 	}
872 
873 	/* Move on to unmapping clusters */
874 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
875 }
876 
877 static void
878 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
879 {
880 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
881 	struct spdk_blob_data 		*blob = ctx->blob;
882 	struct spdk_blob_store		*bs = blob->bs;
883 	uint64_t			lba;
884 	uint32_t			lba_count;
885 	spdk_bs_batch_t			*batch;
886 	size_t				i;
887 
888 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
889 
890 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
891 
892 	/* This loop starts at 1 because the first page is special and handled
893 	 * below. The pages (except the first) are never written in place,
894 	 * so any pages in the clean list must be zeroed.
895 	 */
896 	for (i = 1; i < blob->clean.num_pages; i++) {
897 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
898 
899 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
900 	}
901 
902 	/* The first page will only be zeroed if this is a delete. */
903 	if (blob->active.num_pages == 0) {
904 		uint32_t page_num;
905 
906 		/* The first page in the metadata goes where the blobid indicates */
907 		page_num = _spdk_bs_blobid_to_page(blob->id);
908 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
909 
910 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
911 	}
912 
913 	spdk_bs_batch_close(batch);
914 }
915 
916 static void
917 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
918 {
919 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
920 	struct spdk_blob_data		*blob = ctx->blob;
921 	struct spdk_blob_store		*bs = blob->bs;
922 	uint64_t			lba;
923 	uint32_t			lba_count;
924 	struct spdk_blob_md_page	*page;
925 
926 	if (blob->active.num_pages == 0) {
927 		/* Move on to the next step */
928 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
929 		return;
930 	}
931 
932 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
933 
934 	page = &ctx->pages[0];
935 	/* The first page in the metadata goes where the blobid indicates */
936 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
937 
938 	spdk_bs_sequence_write(seq, page, lba, lba_count,
939 			       _spdk_blob_persist_zero_pages, ctx);
940 }
941 
942 static void
943 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
944 {
945 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
946 	struct spdk_blob_data 		*blob = ctx->blob;
947 	struct spdk_blob_store		*bs = blob->bs;
948 	uint64_t 			lba;
949 	uint32_t			lba_count;
950 	struct spdk_blob_md_page	*page;
951 	spdk_bs_batch_t			*batch;
952 	size_t				i;
953 
954 	/* Clusters don't move around in blobs. The list shrinks or grows
955 	 * at the end, but no changes ever occur in the middle of the list.
956 	 */
957 
958 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
959 
960 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
961 
962 	/* This starts at 1. The root page is not written until
963 	 * all of the others are finished
964 	 */
965 	for (i = 1; i < blob->active.num_pages; i++) {
966 		page = &ctx->pages[i];
967 		assert(page->sequence_num == i);
968 
969 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
970 
971 		spdk_bs_batch_write(batch, page, lba, lba_count);
972 	}
973 
974 	spdk_bs_batch_close(batch);
975 }
976 
977 static int
978 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz)
979 {
980 	uint64_t	i;
981 	uint64_t	*tmp;
982 	uint64_t	lfc; /* lowest free cluster */
983 	struct spdk_blob_store *bs;
984 
985 	bs = blob->bs;
986 
987 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
988 	       blob->state != SPDK_BLOB_STATE_SYNCING);
989 
990 	if (blob->active.num_clusters == sz) {
991 		return 0;
992 	}
993 
994 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
995 		/* If this blob was resized to be larger, then smaller, then
996 		 * larger without syncing, then the cluster array already
997 		 * contains spare assigned clusters we can use.
998 		 */
999 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
1000 						     sz);
1001 	}
1002 
1003 	blob->state = SPDK_BLOB_STATE_DIRTY;
1004 
1005 	/* Do two passes - one to verify that we can obtain enough clusters
1006 	 * and another to actually claim them.
1007 	 */
1008 
1009 	lfc = 0;
1010 	for (i = blob->active.num_clusters; i < sz; i++) {
1011 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1012 		if (lfc >= bs->total_clusters) {
1013 			/* No more free clusters. Cannot satisfy the request */
1014 			assert(false);
1015 			return -1;
1016 		}
1017 		lfc++;
1018 	}
1019 
1020 	if (sz > blob->active.num_clusters) {
1021 		/* Expand the cluster array if necessary.
1022 		 * We only shrink the array when persisting.
1023 		 */
1024 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1025 		if (sz > 0 && tmp == NULL) {
1026 			assert(false);
1027 			return -1;
1028 		}
1029 		blob->active.clusters = tmp;
1030 		blob->active.cluster_array_size = sz;
1031 	}
1032 
1033 	lfc = 0;
1034 	for (i = blob->active.num_clusters; i < sz; i++) {
1035 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1036 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
1037 		_spdk_bs_claim_cluster(bs, lfc);
1038 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
1039 		lfc++;
1040 	}
1041 
1042 	blob->active.num_clusters = sz;
1043 
1044 	return 0;
1045 }
1046 
1047 /* Write a blob to disk */
1048 static void
1049 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
1050 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1051 {
1052 	struct spdk_blob_persist_ctx *ctx;
1053 	int rc;
1054 	uint64_t i;
1055 	uint32_t page_num;
1056 	struct spdk_blob_store *bs;
1057 
1058 	assert(blob != NULL);
1059 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
1060 	       blob->state == SPDK_BLOB_STATE_DIRTY);
1061 
1062 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1063 		cb_fn(seq, cb_arg, 0);
1064 		return;
1065 	}
1066 
1067 	bs = blob->bs;
1068 
1069 	ctx = calloc(1, sizeof(*ctx));
1070 	if (!ctx) {
1071 		cb_fn(seq, cb_arg, -ENOMEM);
1072 		return;
1073 	}
1074 	ctx->blob = blob;
1075 	ctx->cb_fn = cb_fn;
1076 	ctx->cb_arg = cb_arg;
1077 
1078 	blob->state = SPDK_BLOB_STATE_SYNCING;
1079 
1080 	if (blob->active.num_pages == 0) {
1081 		/* This is the signal that the blob should be deleted.
1082 		 * Immediately jump to the clean up routine. */
1083 		assert(blob->clean.num_pages > 0);
1084 		ctx->idx = blob->clean.num_pages - 1;
1085 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1086 		return;
1087 
1088 	}
1089 
1090 	/* Generate the new metadata */
1091 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1092 	if (rc < 0) {
1093 		free(ctx);
1094 		cb_fn(seq, cb_arg, rc);
1095 		return;
1096 	}
1097 
1098 	assert(blob->active.num_pages >= 1);
1099 
1100 	/* Resize the cache of page indices */
1101 	blob->active.pages = realloc(blob->active.pages,
1102 				     blob->active.num_pages * sizeof(*blob->active.pages));
1103 	if (!blob->active.pages) {
1104 		free(ctx);
1105 		cb_fn(seq, cb_arg, -ENOMEM);
1106 		return;
1107 	}
1108 
1109 	/* Assign this metadata to pages. This requires two passes -
1110 	 * one to verify that there are enough pages and a second
1111 	 * to actually claim them. */
1112 	page_num = 0;
1113 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1114 	for (i = 1; i < blob->active.num_pages; i++) {
1115 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1116 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1117 			spdk_dma_free(ctx->pages);
1118 			free(ctx);
1119 			blob->state = SPDK_BLOB_STATE_DIRTY;
1120 			cb_fn(seq, cb_arg, -ENOMEM);
1121 			return;
1122 		}
1123 		page_num++;
1124 	}
1125 
1126 	page_num = 0;
1127 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1128 	for (i = 1; i < blob->active.num_pages; i++) {
1129 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1130 		ctx->pages[i - 1].next = page_num;
1131 		/* Now that previous metadata page is complete, calculate the crc for it. */
1132 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1133 		blob->active.pages[i] = page_num;
1134 		spdk_bit_array_set(bs->used_md_pages, page_num);
1135 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1136 		page_num++;
1137 	}
1138 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1139 	/* Start writing the metadata from last page to first */
1140 	ctx->idx = blob->active.num_pages - 1;
1141 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1142 }
1143 
1144 static void
1145 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1146 			     void *payload, uint64_t offset, uint64_t length,
1147 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1148 {
1149 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1150 	spdk_bs_batch_t			*batch;
1151 	struct spdk_bs_cpl		cpl;
1152 	uint64_t			lba;
1153 	uint32_t			lba_count;
1154 	uint8_t				*buf;
1155 	uint64_t			page;
1156 
1157 	assert(blob != NULL);
1158 
1159 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1160 		cb_fn(cb_arg, -EPERM);
1161 		return;
1162 	}
1163 
1164 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1165 		cb_fn(cb_arg, -EINVAL);
1166 		return;
1167 	}
1168 
1169 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1170 	cpl.u.blob_basic.cb_fn = cb_fn;
1171 	cpl.u.blob_basic.cb_arg = cb_arg;
1172 
1173 	batch = spdk_bs_batch_open(_channel, &cpl);
1174 	if (!batch) {
1175 		cb_fn(cb_arg, -ENOMEM);
1176 		return;
1177 	}
1178 
1179 	length = _spdk_bs_page_to_lba(blob->bs, length);
1180 	page = offset;
1181 	buf = payload;
1182 	while (length > 0) {
1183 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1184 		lba_count = spdk_min(length,
1185 				     _spdk_bs_page_to_lba(blob->bs,
1186 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1187 
1188 		switch (op_type) {
1189 		case SPDK_BLOB_READ:
1190 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1191 			break;
1192 		case SPDK_BLOB_WRITE:
1193 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1194 			break;
1195 		case SPDK_BLOB_UNMAP:
1196 			spdk_bs_batch_unmap(batch, lba, lba_count);
1197 			break;
1198 		case SPDK_BLOB_WRITE_ZEROES:
1199 			spdk_bs_batch_write_zeroes(batch, lba, lba_count);
1200 			break;
1201 		}
1202 
1203 		length -= lba_count;
1204 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1205 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1206 			buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1207 		}
1208 	}
1209 
1210 	spdk_bs_batch_close(batch);
1211 }
1212 
1213 struct rw_iov_ctx {
1214 	struct spdk_blob_data *blob;
1215 	bool read;
1216 	int iovcnt;
1217 	struct iovec *orig_iov;
1218 	uint64_t page_offset;
1219 	uint64_t pages_remaining;
1220 	uint64_t pages_done;
1221 	struct iovec iov[0];
1222 };
1223 
1224 static void
1225 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1226 {
1227 	assert(cb_arg == NULL);
1228 	spdk_bs_sequence_finish(seq, bserrno);
1229 }
1230 
1231 static void
1232 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1233 {
1234 	struct rw_iov_ctx *ctx = cb_arg;
1235 	struct iovec *iov, *orig_iov;
1236 	int iovcnt;
1237 	size_t orig_iovoff;
1238 	uint64_t lba;
1239 	uint64_t page_count, pages_to_boundary;
1240 	uint32_t lba_count;
1241 	uint64_t byte_count;
1242 
1243 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1244 		free(ctx);
1245 		spdk_bs_sequence_finish(seq, bserrno);
1246 		return;
1247 	}
1248 
1249 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
1250 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1251 	lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
1252 	lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);
1253 
1254 	/*
1255 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1256 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1257 	 *  point to the current position in the I/O sequence.
1258 	 */
1259 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1260 	orig_iov = &ctx->orig_iov[0];
1261 	orig_iovoff = 0;
1262 	while (byte_count > 0) {
1263 		if (byte_count >= orig_iov->iov_len) {
1264 			byte_count -= orig_iov->iov_len;
1265 			orig_iov++;
1266 		} else {
1267 			orig_iovoff = byte_count;
1268 			byte_count = 0;
1269 		}
1270 	}
1271 
1272 	/*
1273 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1274 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1275 	 */
1276 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1277 	iov = &ctx->iov[0];
1278 	iovcnt = 0;
1279 	while (byte_count > 0) {
1280 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1281 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1282 		byte_count -= iov->iov_len;
1283 		orig_iovoff = 0;
1284 		orig_iov++;
1285 		iov++;
1286 		iovcnt++;
1287 	}
1288 
1289 	ctx->page_offset += page_count;
1290 	ctx->pages_done += page_count;
1291 	ctx->pages_remaining -= page_count;
1292 	iov = &ctx->iov[0];
1293 
1294 	if (ctx->read) {
1295 		spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1296 	} else {
1297 		spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1298 	}
1299 }
1300 
1301 static void
1302 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1303 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1304 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1305 {
1306 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1307 	spdk_bs_sequence_t		*seq;
1308 	struct spdk_bs_cpl		cpl;
1309 
1310 	assert(blob != NULL);
1311 
1312 	if (!read && blob->data_ro) {
1313 		cb_fn(cb_arg, -EPERM);
1314 		return;
1315 	}
1316 
1317 	if (length == 0) {
1318 		cb_fn(cb_arg, 0);
1319 		return;
1320 	}
1321 
1322 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1323 		cb_fn(cb_arg, -EINVAL);
1324 		return;
1325 	}
1326 
1327 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1328 	cpl.u.blob_basic.cb_fn = cb_fn;
1329 	cpl.u.blob_basic.cb_arg = cb_arg;
1330 
1331 	/*
1332 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1333 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1334 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1335 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1336 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1337 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1338 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1339 	 *
1340 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1341 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1342 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1343 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1344 	 */
1345 	seq = spdk_bs_sequence_start(_channel, &cpl);
1346 	if (!seq) {
1347 		cb_fn(cb_arg, -ENOMEM);
1348 		return;
1349 	}
1350 
1351 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1352 		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
1353 		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1354 
1355 		if (read) {
1356 			spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1357 		} else {
1358 			spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1359 		}
1360 	} else {
1361 		struct rw_iov_ctx *ctx;
1362 
1363 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1364 		if (ctx == NULL) {
1365 			spdk_bs_sequence_finish(seq, -ENOMEM);
1366 			return;
1367 		}
1368 
1369 		ctx->blob = blob;
1370 		ctx->read = read;
1371 		ctx->orig_iov = iov;
1372 		ctx->iovcnt = iovcnt;
1373 		ctx->page_offset = offset;
1374 		ctx->pages_remaining = length;
1375 		ctx->pages_done = 0;
1376 
1377 		_spdk_rw_iov_split_next(seq, ctx, 0);
1378 	}
1379 }
1380 
1381 static struct spdk_blob_data *
1382 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1383 {
1384 	struct spdk_blob_data *blob;
1385 
1386 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1387 		if (blob->id == blobid) {
1388 			return blob;
1389 		}
1390 	}
1391 
1392 	return NULL;
1393 }
1394 
1395 static int
1396 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1397 {
1398 	struct spdk_blob_store		*bs = io_device;
1399 	struct spdk_bs_channel		*channel = ctx_buf;
1400 	struct spdk_bs_dev		*dev;
1401 	uint32_t			max_ops = bs->max_channel_ops;
1402 	uint32_t			i;
1403 
1404 	dev = bs->dev;
1405 
1406 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1407 	if (!channel->req_mem) {
1408 		return -1;
1409 	}
1410 
1411 	TAILQ_INIT(&channel->reqs);
1412 
1413 	for (i = 0; i < max_ops; i++) {
1414 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1415 	}
1416 
1417 	channel->bs = bs;
1418 	channel->dev = dev;
1419 	channel->dev_channel = dev->create_channel(dev);
1420 
1421 	if (!channel->dev_channel) {
1422 		SPDK_ERRLOG("Failed to create device channel.\n");
1423 		free(channel->req_mem);
1424 		return -1;
1425 	}
1426 
1427 	return 0;
1428 }
1429 
1430 static void
1431 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1432 {
1433 	struct spdk_bs_channel *channel = ctx_buf;
1434 
1435 	free(channel->req_mem);
1436 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1437 }
1438 
1439 static void
1440 _spdk_bs_dev_destroy(void *io_device)
1441 {
1442 	struct spdk_blob_store *bs = io_device;
1443 	struct spdk_blob_data	*blob, *blob_tmp;
1444 
1445 	bs->dev->destroy(bs->dev);
1446 
1447 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1448 		TAILQ_REMOVE(&bs->blobs, blob, link);
1449 		_spdk_blob_free(blob);
1450 	}
1451 
1452 	spdk_bit_array_free(&bs->used_md_pages);
1453 	spdk_bit_array_free(&bs->used_clusters);
1454 	/*
1455 	 * If this function is called for any reason except a successful unload,
1456 	 * the unload_cpl type will be NONE and this will be a nop.
1457 	 */
1458 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
1459 
1460 	free(bs);
1461 }
1462 
1463 static void
1464 _spdk_bs_free(struct spdk_blob_store *bs)
1465 {
1466 	spdk_bs_unregister_md_thread(bs);
1467 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
1468 }
1469 
1470 void
1471 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1472 {
1473 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1474 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1475 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1476 	opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS;
1477 	memset(&opts->bstype, 0, sizeof(opts->bstype));
1478 }
1479 
1480 static int
1481 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
1482 {
1483 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
1484 	    opts->max_channel_ops == 0) {
1485 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
1486 		return -1;
1487 	}
1488 
1489 	return 0;
1490 }
1491 
1492 static struct spdk_blob_store *
1493 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1494 {
1495 	struct spdk_blob_store	*bs;
1496 	uint64_t dev_size;
1497 	int rc;
1498 
1499 	dev_size = dev->blocklen * dev->blockcnt;
1500 	if (dev_size < opts->cluster_sz) {
1501 		/* Device size cannot be smaller than cluster size of blobstore */
1502 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size,
1503 			    opts->cluster_sz);
1504 		return NULL;
1505 	}
1506 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
1507 		/* Cluster size cannot be smaller than page size */
1508 		SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n",
1509 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
1510 		return NULL;
1511 	}
1512 	bs = calloc(1, sizeof(struct spdk_blob_store));
1513 	if (!bs) {
1514 		return NULL;
1515 	}
1516 
1517 	TAILQ_INIT(&bs->blobs);
1518 	bs->dev = dev;
1519 
1520 	/*
1521 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1522 	 *  even multiple of the cluster size.
1523 	 */
1524 	bs->cluster_sz = opts->cluster_sz;
1525 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1526 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
1527 	bs->num_free_clusters = bs->total_clusters;
1528 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1529 	if (bs->used_clusters == NULL) {
1530 		free(bs);
1531 		return NULL;
1532 	}
1533 
1534 	bs->max_channel_ops = opts->max_channel_ops;
1535 	bs->super_blob = SPDK_BLOBID_INVALID;
1536 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
1537 
1538 	/* The metadata is assumed to be at least 1 page */
1539 	bs->used_md_pages = spdk_bit_array_create(1);
1540 
1541 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1542 				sizeof(struct spdk_bs_channel));
1543 	rc = spdk_bs_register_md_thread(bs);
1544 	if (rc == -1) {
1545 		spdk_io_device_unregister(bs, NULL);
1546 		spdk_bit_array_free(&bs->used_md_pages);
1547 		spdk_bit_array_free(&bs->used_clusters);
1548 		free(bs);
1549 		return NULL;
1550 	}
1551 
1552 	return bs;
1553 }
1554 
1555 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
1556 
1557 struct spdk_bs_load_ctx {
1558 	struct spdk_blob_store		*bs;
1559 	struct spdk_bs_super_block	*super;
1560 
1561 	struct spdk_bs_md_mask		*mask;
1562 	bool				in_page_chain;
1563 	uint32_t			page_index;
1564 	uint32_t			cur_page;
1565 	struct spdk_blob_md_page	*page;
1566 };
1567 
1568 static void
1569 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
1570 {
1571 	uint32_t i = 0;
1572 
1573 	while (true) {
1574 		i = spdk_bit_array_find_first_set(array, i);
1575 		if (i >= mask->length) {
1576 			break;
1577 		}
1578 		mask->mask[i / 8] |= 1U << (i % 8);
1579 		i++;
1580 	}
1581 }
1582 
1583 static void
1584 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1585 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1586 {
1587 	/* Update the values in the super block */
1588 	super->super_blob = bs->super_blob;
1589 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
1590 	super->crc = _spdk_blob_md_page_calc_crc(super);
1591 	spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0),
1592 			       _spdk_bs_byte_to_lba(bs, sizeof(*super)),
1593 			       cb_fn, cb_arg);
1594 }
1595 
1596 static void
1597 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1598 {
1599 	struct spdk_bs_load_ctx	*ctx = arg;
1600 	uint64_t	mask_size, lba, lba_count;
1601 
1602 	/* Write out the used clusters mask */
1603 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1604 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1605 	if (!ctx->mask) {
1606 		spdk_dma_free(ctx->super);
1607 		free(ctx);
1608 		spdk_bs_sequence_finish(seq, -ENOMEM);
1609 		return;
1610 	}
1611 
1612 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1613 	ctx->mask->length = ctx->bs->total_clusters;
1614 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1615 
1616 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
1617 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1618 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1619 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1620 }
1621 
1622 static void
1623 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1624 {
1625 	struct spdk_bs_load_ctx	*ctx = arg;
1626 	uint64_t	mask_size, lba, lba_count;
1627 
1628 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1629 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1630 	if (!ctx->mask) {
1631 		spdk_dma_free(ctx->super);
1632 		free(ctx);
1633 		spdk_bs_sequence_finish(seq, -ENOMEM);
1634 		return;
1635 	}
1636 
1637 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1638 	ctx->mask->length = ctx->super->md_len;
1639 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1640 
1641 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
1642 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1643 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1644 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1645 }
1646 
1647 static void
1648 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1649 {
1650 	struct spdk_bs_load_ctx *ctx = cb_arg;
1651 	uint32_t		i, j;
1652 	int			rc;
1653 
1654 	/* The type must be correct */
1655 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1656 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1657 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1658 					     struct spdk_blob_md_page) * 8));
1659 	/* The length of the mask must be exactly equal to the total number of clusters */
1660 	assert(ctx->mask->length == ctx->bs->total_clusters);
1661 
1662 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1663 	if (rc < 0) {
1664 		spdk_dma_free(ctx->super);
1665 		spdk_dma_free(ctx->mask);
1666 		_spdk_bs_free(ctx->bs);
1667 		free(ctx);
1668 		spdk_bs_sequence_finish(seq, -ENOMEM);
1669 		return;
1670 	}
1671 
1672 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1673 	for (i = 0; i < ctx->mask->length / 8; i++) {
1674 		uint8_t segment = ctx->mask->mask[i];
1675 		for (j = 0; segment && (j < 8); j++) {
1676 			if (segment & 1U) {
1677 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1678 				assert(ctx->bs->num_free_clusters > 0);
1679 				ctx->bs->num_free_clusters--;
1680 			}
1681 			segment >>= 1U;
1682 		}
1683 	}
1684 
1685 	spdk_dma_free(ctx->super);
1686 	spdk_dma_free(ctx->mask);
1687 	free(ctx);
1688 
1689 	spdk_bs_sequence_finish(seq, bserrno);
1690 }
1691 
1692 static void
1693 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1694 {
1695 	struct spdk_bs_load_ctx *ctx = cb_arg;
1696 	uint64_t		lba, lba_count, mask_size;
1697 	uint32_t		i, j;
1698 	int			rc;
1699 
1700 	/* The type must be correct */
1701 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1702 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1703 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
1704 				     8));
1705 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1706 	assert(ctx->mask->length == ctx->super->md_len);
1707 
1708 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1709 	if (rc < 0) {
1710 		spdk_dma_free(ctx->super);
1711 		spdk_dma_free(ctx->mask);
1712 		_spdk_bs_free(ctx->bs);
1713 		free(ctx);
1714 		spdk_bs_sequence_finish(seq, -ENOMEM);
1715 		return;
1716 	}
1717 
1718 	for (i = 0; i < ctx->mask->length / 8; i++) {
1719 		uint8_t segment = ctx->mask->mask[i];
1720 		for (j = 0; segment && (j < 8); j++) {
1721 			if (segment & 1U) {
1722 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1723 			}
1724 			segment >>= 1U;
1725 		}
1726 	}
1727 	spdk_dma_free(ctx->mask);
1728 
1729 	/* Read the used clusters mask */
1730 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1731 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1732 	if (!ctx->mask) {
1733 		spdk_dma_free(ctx->super);
1734 		_spdk_bs_free(ctx->bs);
1735 		free(ctx);
1736 		spdk_bs_sequence_finish(seq, -ENOMEM);
1737 		return;
1738 	}
1739 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1740 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1741 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1742 			      _spdk_bs_load_used_clusters_cpl, ctx);
1743 }
1744 
1745 static void
1746 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1747 {
1748 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1749 	uint64_t lba, lba_count, mask_size;
1750 
1751 	/* Read the used pages mask */
1752 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1753 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1754 	if (!ctx->mask) {
1755 		spdk_dma_free(ctx->super);
1756 		_spdk_bs_free(ctx->bs);
1757 		free(ctx);
1758 		spdk_bs_sequence_finish(seq, -ENOMEM);
1759 		return;
1760 	}
1761 
1762 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1763 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1764 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1765 			      _spdk_bs_load_used_pages_cpl, ctx);
1766 }
1767 
1768 static int
1769 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
1770 {
1771 	struct spdk_blob_md_descriptor *desc;
1772 	size_t	cur_desc = 0;
1773 
1774 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
1775 	while (cur_desc < sizeof(page->descriptors)) {
1776 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
1777 			if (desc->length == 0) {
1778 				/* If padding and length are 0, this terminates the page */
1779 				break;
1780 			}
1781 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
1782 			struct spdk_blob_md_descriptor_extent	*desc_extent;
1783 			unsigned int				i, j;
1784 			unsigned int				cluster_count = 0;
1785 
1786 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
1787 
1788 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
1789 				for (j = 0; j < desc_extent->extents[i].length; j++) {
1790 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
1791 					if (bs->num_free_clusters == 0) {
1792 						return -1;
1793 					}
1794 					bs->num_free_clusters--;
1795 					cluster_count++;
1796 				}
1797 			}
1798 			if (cluster_count == 0) {
1799 				return -1;
1800 			}
1801 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
1802 			/* Skip this item */
1803 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
1804 			/* Skip this item */
1805 		} else {
1806 			/* Error */
1807 			return -1;
1808 		}
1809 		/* Advance to the next descriptor */
1810 		cur_desc += sizeof(*desc) + desc->length;
1811 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
1812 			break;
1813 		}
1814 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
1815 	}
1816 	return 0;
1817 }
1818 
1819 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
1820 {
1821 	uint32_t crc;
1822 
1823 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
1824 	if (crc != ctx->page->crc) {
1825 		return false;
1826 	}
1827 
1828 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
1829 		return false;
1830 	}
1831 	return true;
1832 }
1833 
1834 static void
1835 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
1836 
1837 static void
1838 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1839 {
1840 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1841 
1842 	spdk_dma_free(ctx->mask);
1843 	spdk_dma_free(ctx->super);
1844 	spdk_bs_sequence_finish(seq, bserrno);
1845 	free(ctx);
1846 }
1847 
1848 static void
1849 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1850 {
1851 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1852 
1853 	spdk_dma_free(ctx->mask);
1854 
1855 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
1856 }
1857 
1858 static void
1859 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1860 {
1861 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
1862 }
1863 
1864 static void
1865 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1866 {
1867 	struct spdk_bs_load_ctx *ctx = cb_arg;
1868 	uint32_t page_num;
1869 
1870 	if (bserrno != 0) {
1871 		spdk_dma_free(ctx->super);
1872 		_spdk_bs_free(ctx->bs);
1873 		free(ctx);
1874 		spdk_bs_sequence_finish(seq, bserrno);
1875 		return;
1876 	}
1877 
1878 	page_num = ctx->cur_page;
1879 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
1880 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
1881 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
1882 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
1883 				spdk_dma_free(ctx->super);
1884 				_spdk_bs_free(ctx->bs);
1885 				free(ctx);
1886 				spdk_bs_sequence_finish(seq, -EILSEQ);
1887 				return;
1888 			}
1889 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
1890 				ctx->in_page_chain = true;
1891 				ctx->cur_page = ctx->page->next;
1892 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1893 				return;
1894 			}
1895 		}
1896 	}
1897 
1898 	ctx->in_page_chain = false;
1899 
1900 	do {
1901 		ctx->page_index++;
1902 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
1903 
1904 	if (ctx->page_index < ctx->super->md_len) {
1905 		ctx->cur_page = ctx->page_index;
1906 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1907 	} else {
1908 		spdk_dma_free(ctx->page);
1909 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
1910 	}
1911 }
1912 
1913 static void
1914 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
1915 {
1916 	struct spdk_bs_load_ctx *ctx = cb_arg;
1917 	uint64_t lba;
1918 
1919 	assert(ctx->cur_page < ctx->super->md_len);
1920 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
1921 	spdk_bs_sequence_read(seq, ctx->page, lba,
1922 			      _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
1923 			      _spdk_bs_load_replay_md_cpl, ctx);
1924 }
1925 
1926 static void
1927 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
1928 {
1929 	struct spdk_bs_load_ctx *ctx = cb_arg;
1930 
1931 	ctx->page_index = 0;
1932 	ctx->cur_page = 0;
1933 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
1934 				     SPDK_BS_PAGE_SIZE,
1935 				     NULL);
1936 	if (!ctx->page) {
1937 		spdk_dma_free(ctx->super);
1938 		_spdk_bs_free(ctx->bs);
1939 		free(ctx);
1940 		spdk_bs_sequence_finish(seq, -ENOMEM);
1941 		return;
1942 	}
1943 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1944 }
1945 
1946 static void
1947 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg)
1948 {
1949 	struct spdk_bs_load_ctx *ctx = cb_arg;
1950 	int 		rc;
1951 
1952 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
1953 	if (rc < 0) {
1954 		spdk_dma_free(ctx->super);
1955 		_spdk_bs_free(ctx->bs);
1956 		free(ctx);
1957 		spdk_bs_sequence_finish(seq, -ENOMEM);
1958 		return;
1959 	}
1960 
1961 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1962 	if (rc < 0) {
1963 		spdk_dma_free(ctx->super);
1964 		_spdk_bs_free(ctx->bs);
1965 		free(ctx);
1966 		spdk_bs_sequence_finish(seq, -ENOMEM);
1967 		return;
1968 	}
1969 
1970 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1971 	_spdk_bs_load_replay_md(seq, cb_arg);
1972 }
1973 
1974 static void
1975 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1976 {
1977 	struct spdk_bs_load_ctx *ctx = cb_arg;
1978 	uint32_t	crc;
1979 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
1980 
1981 	if (ctx->super->version > SPDK_BS_VERSION ||
1982 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
1983 		spdk_dma_free(ctx->super);
1984 		_spdk_bs_free(ctx->bs);
1985 		free(ctx);
1986 		spdk_bs_sequence_finish(seq, -EILSEQ);
1987 		return;
1988 	}
1989 
1990 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
1991 		   sizeof(ctx->super->signature)) != 0) {
1992 		spdk_dma_free(ctx->super);
1993 		_spdk_bs_free(ctx->bs);
1994 		free(ctx);
1995 		spdk_bs_sequence_finish(seq, -EILSEQ);
1996 		return;
1997 	}
1998 
1999 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2000 	if (crc != ctx->super->crc) {
2001 		spdk_dma_free(ctx->super);
2002 		_spdk_bs_free(ctx->bs);
2003 		free(ctx);
2004 		spdk_bs_sequence_finish(seq, -EILSEQ);
2005 		return;
2006 	}
2007 
2008 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2009 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2010 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2011 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2012 	} else {
2013 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2014 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2015 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2016 		spdk_dma_free(ctx->super);
2017 		_spdk_bs_free(ctx->bs);
2018 		free(ctx);
2019 		spdk_bs_sequence_finish(seq, -ENXIO);
2020 		return;
2021 	}
2022 
2023 	/* Parse the super block */
2024 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2025 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2026 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2027 	ctx->bs->md_start = ctx->super->md_start;
2028 	ctx->bs->md_len = ctx->super->md_len;
2029 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2030 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2031 	ctx->bs->super_blob = ctx->super->super_blob;
2032 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2033 
2034 	if (ctx->super->clean == 1) {
2035 		ctx->super->clean = 0;
2036 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2037 	} else {
2038 		_spdk_bs_recover(seq, ctx);
2039 	}
2040 }
2041 
2042 void
2043 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2044 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2045 {
2046 	struct spdk_blob_store	*bs;
2047 	struct spdk_bs_cpl	cpl;
2048 	spdk_bs_sequence_t	*seq;
2049 	struct spdk_bs_load_ctx *ctx;
2050 	struct spdk_bs_opts	opts = {};
2051 
2052 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2053 
2054 	if (o) {
2055 		opts = *o;
2056 	} else {
2057 		spdk_bs_opts_init(&opts);
2058 	}
2059 
2060 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2061 		cb_fn(cb_arg, NULL, -EINVAL);
2062 		return;
2063 	}
2064 
2065 	bs = _spdk_bs_alloc(dev, &opts);
2066 	if (!bs) {
2067 		cb_fn(cb_arg, NULL, -ENOMEM);
2068 		return;
2069 	}
2070 
2071 	ctx = calloc(1, sizeof(*ctx));
2072 	if (!ctx) {
2073 		_spdk_bs_free(bs);
2074 		cb_fn(cb_arg, NULL, -ENOMEM);
2075 		return;
2076 	}
2077 
2078 	ctx->bs = bs;
2079 
2080 	/* Allocate memory for the super block */
2081 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2082 	if (!ctx->super) {
2083 		free(ctx);
2084 		_spdk_bs_free(bs);
2085 		return;
2086 	}
2087 
2088 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2089 	cpl.u.bs_handle.cb_fn = cb_fn;
2090 	cpl.u.bs_handle.cb_arg = cb_arg;
2091 	cpl.u.bs_handle.bs = bs;
2092 
2093 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2094 	if (!seq) {
2095 		spdk_dma_free(ctx->super);
2096 		free(ctx);
2097 		_spdk_bs_free(bs);
2098 		cb_fn(cb_arg, NULL, -ENOMEM);
2099 		return;
2100 	}
2101 
2102 	/* Read the super block */
2103 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2104 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2105 			      _spdk_bs_load_super_cpl, ctx);
2106 }
2107 
2108 /* END spdk_bs_load */
2109 
2110 /* START spdk_bs_init */
2111 
2112 struct spdk_bs_init_ctx {
2113 	struct spdk_blob_store		*bs;
2114 	struct spdk_bs_super_block	*super;
2115 };
2116 
2117 static void
2118 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2119 {
2120 	struct spdk_bs_init_ctx *ctx = cb_arg;
2121 
2122 	spdk_dma_free(ctx->super);
2123 	free(ctx);
2124 
2125 	spdk_bs_sequence_finish(seq, bserrno);
2126 }
2127 
2128 static void
2129 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2130 {
2131 	struct spdk_bs_init_ctx *ctx = cb_arg;
2132 
2133 	/* Write super block */
2134 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2135 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2136 			       _spdk_bs_init_persist_super_cpl, ctx);
2137 }
2138 
2139 void
2140 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2141 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2142 {
2143 	struct spdk_bs_init_ctx *ctx;
2144 	struct spdk_blob_store	*bs;
2145 	struct spdk_bs_cpl	cpl;
2146 	spdk_bs_sequence_t	*seq;
2147 	spdk_bs_batch_t		*batch;
2148 	uint64_t		num_md_lba;
2149 	uint64_t		num_md_pages;
2150 	uint64_t		num_md_clusters;
2151 	uint32_t		i;
2152 	struct spdk_bs_opts	opts = {};
2153 	int			rc;
2154 
2155 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2156 
2157 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2158 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2159 			    dev->blocklen);
2160 		dev->destroy(dev);
2161 		cb_fn(cb_arg, NULL, -EINVAL);
2162 		return;
2163 	}
2164 
2165 	if (o) {
2166 		opts = *o;
2167 	} else {
2168 		spdk_bs_opts_init(&opts);
2169 	}
2170 
2171 	if (_spdk_bs_opts_verify(&opts) != 0) {
2172 		dev->destroy(dev);
2173 		cb_fn(cb_arg, NULL, -EINVAL);
2174 		return;
2175 	}
2176 
2177 	bs = _spdk_bs_alloc(dev, &opts);
2178 	if (!bs) {
2179 		dev->destroy(dev);
2180 		cb_fn(cb_arg, NULL, -ENOMEM);
2181 		return;
2182 	}
2183 
2184 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2185 		/* By default, allocate 1 page per cluster.
2186 		 * Technically, this over-allocates metadata
2187 		 * because more metadata will reduce the number
2188 		 * of usable clusters. This can be addressed with
2189 		 * more complex math in the future.
2190 		 */
2191 		bs->md_len = bs->total_clusters;
2192 	} else {
2193 		bs->md_len = opts.num_md_pages;
2194 	}
2195 
2196 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2197 	if (rc < 0) {
2198 		_spdk_bs_free(bs);
2199 		cb_fn(cb_arg, NULL, -ENOMEM);
2200 		return;
2201 	}
2202 
2203 	ctx = calloc(1, sizeof(*ctx));
2204 	if (!ctx) {
2205 		_spdk_bs_free(bs);
2206 		cb_fn(cb_arg, NULL, -ENOMEM);
2207 		return;
2208 	}
2209 
2210 	ctx->bs = bs;
2211 
2212 	/* Allocate memory for the super block */
2213 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2214 	if (!ctx->super) {
2215 		free(ctx);
2216 		_spdk_bs_free(bs);
2217 		return;
2218 	}
2219 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2220 	       sizeof(ctx->super->signature));
2221 	ctx->super->version = SPDK_BS_VERSION;
2222 	ctx->super->length = sizeof(*ctx->super);
2223 	ctx->super->super_blob = bs->super_blob;
2224 	ctx->super->clean = 0;
2225 	ctx->super->cluster_size = bs->cluster_sz;
2226 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2227 
2228 	/* Calculate how many pages the metadata consumes at the front
2229 	 * of the disk.
2230 	 */
2231 
2232 	/* The super block uses 1 page */
2233 	num_md_pages = 1;
2234 
2235 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2236 	 * up to the nearest page, plus a header.
2237 	 */
2238 	ctx->super->used_page_mask_start = num_md_pages;
2239 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2240 					 divide_round_up(bs->md_len, 8),
2241 					 SPDK_BS_PAGE_SIZE);
2242 	num_md_pages += ctx->super->used_page_mask_len;
2243 
2244 	/* The used_clusters mask requires 1 bit per cluster, rounded
2245 	 * up to the nearest page, plus a header.
2246 	 */
2247 	ctx->super->used_cluster_mask_start = num_md_pages;
2248 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2249 					    divide_round_up(bs->total_clusters, 8),
2250 					    SPDK_BS_PAGE_SIZE);
2251 	num_md_pages += ctx->super->used_cluster_mask_len;
2252 
2253 	/* The metadata region size was chosen above */
2254 	ctx->super->md_start = bs->md_start = num_md_pages;
2255 	ctx->super->md_len = bs->md_len;
2256 	num_md_pages += bs->md_len;
2257 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2258 
2259 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2260 
2261 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2262 	if (num_md_clusters > bs->total_clusters) {
2263 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2264 			    "please decrease number of pages reserved for metadata "
2265 			    "or increase cluster size.\n");
2266 		spdk_dma_free(ctx->super);
2267 		free(ctx);
2268 		_spdk_bs_free(bs);
2269 		cb_fn(cb_arg, NULL, -ENOMEM);
2270 		return;
2271 	}
2272 	/* Claim all of the clusters used by the metadata */
2273 	for (i = 0; i < num_md_clusters; i++) {
2274 		_spdk_bs_claim_cluster(bs, i);
2275 	}
2276 
2277 	bs->total_data_clusters = bs->num_free_clusters;
2278 
2279 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2280 	cpl.u.bs_handle.cb_fn = cb_fn;
2281 	cpl.u.bs_handle.cb_arg = cb_arg;
2282 	cpl.u.bs_handle.bs = bs;
2283 
2284 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2285 	if (!seq) {
2286 		spdk_dma_free(ctx->super);
2287 		free(ctx);
2288 		_spdk_bs_free(bs);
2289 		cb_fn(cb_arg, NULL, -ENOMEM);
2290 		return;
2291 	}
2292 
2293 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
2294 
2295 	/* Clear metadata space */
2296 	spdk_bs_batch_write_zeroes(batch, 0, num_md_lba);
2297 	/* Trim data clusters */
2298 	spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
2299 
2300 	spdk_bs_batch_close(batch);
2301 }
2302 
2303 /* END spdk_bs_init */
2304 
2305 /* START spdk_bs_destroy */
2306 
2307 static void
2308 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2309 {
2310 	struct spdk_bs_init_ctx *ctx = cb_arg;
2311 	struct spdk_blob_store *bs = ctx->bs;
2312 
2313 	/*
2314 	 * We need to defer calling spdk_bs_call_cpl() until after
2315 	 * dev destruction, so tuck these away for later use.
2316 	 */
2317 	bs->unload_err = bserrno;
2318 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2319 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2320 
2321 	spdk_bs_sequence_finish(seq, bserrno);
2322 
2323 	_spdk_bs_free(bs);
2324 	free(ctx);
2325 }
2326 
2327 void
2328 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
2329 		void *cb_arg)
2330 {
2331 	struct spdk_bs_cpl	cpl;
2332 	spdk_bs_sequence_t	*seq;
2333 	struct spdk_bs_init_ctx *ctx;
2334 
2335 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
2336 
2337 	if (!TAILQ_EMPTY(&bs->blobs)) {
2338 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2339 		cb_fn(cb_arg, -EBUSY);
2340 		return;
2341 	}
2342 
2343 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2344 	cpl.u.bs_basic.cb_fn = cb_fn;
2345 	cpl.u.bs_basic.cb_arg = cb_arg;
2346 
2347 	ctx = calloc(1, sizeof(*ctx));
2348 	if (!ctx) {
2349 		cb_fn(cb_arg, -ENOMEM);
2350 		return;
2351 	}
2352 
2353 	ctx->bs = bs;
2354 
2355 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2356 	if (!seq) {
2357 		free(ctx);
2358 		cb_fn(cb_arg, -ENOMEM);
2359 		return;
2360 	}
2361 
2362 	/* Write zeroes to the super block */
2363 	spdk_bs_sequence_write_zeroes(seq,
2364 				      _spdk_bs_page_to_lba(bs, 0),
2365 				      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
2366 				      _spdk_bs_destroy_trim_cpl, ctx);
2367 }
2368 
2369 /* END spdk_bs_destroy */
2370 
2371 /* START spdk_bs_unload */
2372 
2373 static void
2374 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2375 {
2376 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2377 
2378 	spdk_dma_free(ctx->super);
2379 
2380 	/*
2381 	 * We need to defer calling spdk_bs_call_cpl() until after
2382 	 * dev destuction, so tuck these away for later use.
2383 	 */
2384 	ctx->bs->unload_err = bserrno;
2385 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2386 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2387 
2388 	spdk_bs_sequence_finish(seq, bserrno);
2389 
2390 	_spdk_bs_free(ctx->bs);
2391 	free(ctx);
2392 }
2393 
2394 static void
2395 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2396 {
2397 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2398 
2399 	spdk_dma_free(ctx->mask);
2400 	ctx->super->clean = 1;
2401 
2402 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
2403 }
2404 
2405 static void
2406 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2407 {
2408 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2409 
2410 	spdk_dma_free(ctx->mask);
2411 
2412 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
2413 }
2414 
2415 static void
2416 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2417 {
2418 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
2419 }
2420 
2421 void
2422 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
2423 {
2424 	struct spdk_bs_cpl	cpl;
2425 	spdk_bs_sequence_t	*seq;
2426 	struct spdk_bs_load_ctx *ctx;
2427 
2428 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
2429 
2430 	if (!TAILQ_EMPTY(&bs->blobs)) {
2431 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2432 		cb_fn(cb_arg, -EBUSY);
2433 		return;
2434 	}
2435 
2436 	ctx = calloc(1, sizeof(*ctx));
2437 	if (!ctx) {
2438 		cb_fn(cb_arg, -ENOMEM);
2439 		return;
2440 	}
2441 
2442 	ctx->bs = bs;
2443 
2444 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2445 	if (!ctx->super) {
2446 		free(ctx);
2447 		cb_fn(cb_arg, -ENOMEM);
2448 		return;
2449 	}
2450 
2451 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2452 	cpl.u.bs_basic.cb_fn = cb_fn;
2453 	cpl.u.bs_basic.cb_arg = cb_arg;
2454 
2455 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2456 	if (!seq) {
2457 		spdk_dma_free(ctx->super);
2458 		free(ctx);
2459 		cb_fn(cb_arg, -ENOMEM);
2460 		return;
2461 	}
2462 
2463 	/* Read super block */
2464 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2465 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2466 			      _spdk_bs_unload_read_super_cpl, ctx);
2467 }
2468 
2469 /* END spdk_bs_unload */
2470 
2471 void
2472 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
2473 		  spdk_bs_op_complete cb_fn, void *cb_arg)
2474 {
2475 	bs->super_blob = blobid;
2476 	cb_fn(cb_arg, 0);
2477 }
2478 
2479 void
2480 spdk_bs_get_super(struct spdk_blob_store *bs,
2481 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2482 {
2483 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
2484 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
2485 	} else {
2486 		cb_fn(cb_arg, bs->super_blob, 0);
2487 	}
2488 }
2489 
2490 uint64_t
2491 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
2492 {
2493 	return bs->cluster_sz;
2494 }
2495 
2496 uint64_t
2497 spdk_bs_get_page_size(struct spdk_blob_store *bs)
2498 {
2499 	return SPDK_BS_PAGE_SIZE;
2500 }
2501 
2502 uint64_t
2503 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
2504 {
2505 	return bs->num_free_clusters;
2506 }
2507 
2508 uint64_t
2509 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
2510 {
2511 	return bs->total_data_clusters;
2512 }
2513 
2514 static int
2515 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
2516 {
2517 	bs->md_channel = spdk_get_io_channel(bs);
2518 	if (!bs->md_channel) {
2519 		SPDK_ERRLOG("Failed to get IO channel.\n");
2520 		return -1;
2521 	}
2522 
2523 	return 0;
2524 }
2525 
2526 static int
2527 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
2528 {
2529 	spdk_put_io_channel(bs->md_channel);
2530 
2531 	return 0;
2532 }
2533 
2534 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob)
2535 {
2536 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2537 
2538 	assert(blob != NULL);
2539 
2540 	return blob->id;
2541 }
2542 
2543 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob)
2544 {
2545 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2546 
2547 	assert(blob != NULL);
2548 
2549 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
2550 }
2551 
2552 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob)
2553 {
2554 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2555 
2556 	assert(blob != NULL);
2557 
2558 	return blob->active.num_clusters;
2559 }
2560 
2561 /* START spdk_bs_create_blob */
2562 
2563 static void
2564 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2565 {
2566 	struct spdk_blob_data *blob = cb_arg;
2567 
2568 	_spdk_blob_free(blob);
2569 
2570 	spdk_bs_sequence_finish(seq, bserrno);
2571 }
2572 
2573 void spdk_bs_create_blob(struct spdk_blob_store *bs,
2574 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2575 {
2576 	struct spdk_blob_data	*blob;
2577 	uint32_t		page_idx;
2578 	struct spdk_bs_cpl 	cpl;
2579 	spdk_bs_sequence_t	*seq;
2580 	spdk_blob_id		id;
2581 
2582 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
2583 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
2584 		cb_fn(cb_arg, 0, -ENOMEM);
2585 		return;
2586 	}
2587 	spdk_bit_array_set(bs->used_md_pages, page_idx);
2588 
2589 	id = _spdk_bs_page_to_blobid(page_idx);
2590 
2591 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
2592 
2593 	blob = _spdk_blob_alloc(bs, id);
2594 	if (!blob) {
2595 		cb_fn(cb_arg, 0, -ENOMEM);
2596 		return;
2597 	}
2598 
2599 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
2600 	cpl.u.blobid.cb_fn = cb_fn;
2601 	cpl.u.blobid.cb_arg = cb_arg;
2602 	cpl.u.blobid.blobid = blob->id;
2603 
2604 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2605 	if (!seq) {
2606 		_spdk_blob_free(blob);
2607 		cb_fn(cb_arg, 0, -ENOMEM);
2608 		return;
2609 	}
2610 
2611 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
2612 }
2613 
2614 /* END spdk_bs_create_blob */
2615 
2616 /* START spdk_blob_resize */
2617 int
2618 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz)
2619 {
2620 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2621 	int			rc;
2622 
2623 	assert(blob != NULL);
2624 
2625 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
2626 
2627 	if (blob->md_ro) {
2628 		return -EPERM;
2629 	}
2630 
2631 	if (sz == blob->active.num_clusters) {
2632 		return 0;
2633 	}
2634 
2635 	rc = _spdk_resize_blob(blob, sz);
2636 	if (rc < 0) {
2637 		return rc;
2638 	}
2639 
2640 	return 0;
2641 }
2642 
2643 /* END spdk_blob_resize */
2644 
2645 
2646 /* START spdk_bs_delete_blob */
2647 
2648 static void
2649 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
2650 {
2651 	spdk_bs_sequence_t *seq = cb_arg;
2652 
2653 	spdk_bs_sequence_finish(seq, bserrno);
2654 }
2655 
2656 static void
2657 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2658 {
2659 	struct spdk_blob *_blob = cb_arg;
2660 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2661 
2662 	if (bserrno != 0) {
2663 		/*
2664 		 * We already removed this blob from the blobstore tailq, so
2665 		 *  we need to free it here since this is the last reference
2666 		 *  to it.
2667 		 */
2668 		_spdk_blob_free(blob);
2669 		_spdk_bs_delete_close_cpl(seq, bserrno);
2670 		return;
2671 	}
2672 
2673 	/*
2674 	 * This will immediately decrement the ref_count and call
2675 	 *  the completion routine since the metadata state is clean.
2676 	 *  By calling spdk_blob_close, we reduce the number of call
2677 	 *  points into code that touches the blob->open_ref count
2678 	 *  and the blobstore's blob list.
2679 	 */
2680 	spdk_blob_close(_blob, _spdk_bs_delete_close_cpl, seq);
2681 }
2682 
2683 static void
2684 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
2685 {
2686 	spdk_bs_sequence_t *seq = cb_arg;
2687 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2688 
2689 	if (bserrno != 0) {
2690 		spdk_bs_sequence_finish(seq, bserrno);
2691 		return;
2692 	}
2693 
2694 	if (blob->open_ref > 1) {
2695 		/*
2696 		 * Someone has this blob open (besides this delete context).
2697 		 *  Decrement the ref count directly and return -EBUSY.
2698 		 */
2699 		blob->open_ref--;
2700 		spdk_bs_sequence_finish(seq, -EBUSY);
2701 		return;
2702 	}
2703 
2704 	/*
2705 	 * Remove the blob from the blob_store list now, to ensure it does not
2706 	 *  get returned after this point by _spdk_blob_lookup().
2707 	 */
2708 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
2709 	blob->state = SPDK_BLOB_STATE_DIRTY;
2710 	blob->active.num_pages = 0;
2711 	_spdk_resize_blob(blob, 0);
2712 
2713 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, _blob);
2714 }
2715 
2716 void
2717 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2718 		    spdk_blob_op_complete cb_fn, void *cb_arg)
2719 {
2720 	struct spdk_bs_cpl	cpl;
2721 	spdk_bs_sequence_t 	*seq;
2722 
2723 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
2724 
2725 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2726 	cpl.u.blob_basic.cb_fn = cb_fn;
2727 	cpl.u.blob_basic.cb_arg = cb_arg;
2728 
2729 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2730 	if (!seq) {
2731 		cb_fn(cb_arg, -ENOMEM);
2732 		return;
2733 	}
2734 
2735 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
2736 }
2737 
2738 /* END spdk_bs_delete_blob */
2739 
2740 /* START spdk_bs_open_blob */
2741 
2742 static void
2743 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2744 {
2745 	struct spdk_blob_data *blob = cb_arg;
2746 
2747 	/* If the blob have crc error, we just return NULL. */
2748 	if (blob == NULL) {
2749 		seq->cpl.u.blob_handle.blob = NULL;
2750 		spdk_bs_sequence_finish(seq, bserrno);
2751 		return;
2752 	}
2753 
2754 	blob->open_ref++;
2755 
2756 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
2757 
2758 	spdk_bs_sequence_finish(seq, bserrno);
2759 }
2760 
2761 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2762 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2763 {
2764 	struct spdk_blob_data		*blob;
2765 	struct spdk_bs_cpl		cpl;
2766 	spdk_bs_sequence_t		*seq;
2767 	uint32_t			page_num;
2768 
2769 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
2770 
2771 	blob = _spdk_blob_lookup(bs, blobid);
2772 	if (blob) {
2773 		blob->open_ref++;
2774 		cb_fn(cb_arg, __data_to_blob(blob), 0);
2775 		return;
2776 	}
2777 
2778 	page_num = _spdk_bs_blobid_to_page(blobid);
2779 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
2780 		/* Invalid blobid */
2781 		cb_fn(cb_arg, NULL, -ENOENT);
2782 		return;
2783 	}
2784 
2785 	blob = _spdk_blob_alloc(bs, blobid);
2786 	if (!blob) {
2787 		cb_fn(cb_arg, NULL, -ENOMEM);
2788 		return;
2789 	}
2790 
2791 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2792 	cpl.u.blob_handle.cb_fn = cb_fn;
2793 	cpl.u.blob_handle.cb_arg = cb_arg;
2794 	cpl.u.blob_handle.blob = __data_to_blob(blob);
2795 
2796 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2797 	if (!seq) {
2798 		_spdk_blob_free(blob);
2799 		cb_fn(cb_arg, NULL, -ENOMEM);
2800 		return;
2801 	}
2802 
2803 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
2804 }
2805 
2806 /* END spdk_bs_open_blob */
2807 
2808 /* START spdk_blob_sync_md */
2809 
2810 static void
2811 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2812 {
2813 	spdk_bs_sequence_finish(seq, bserrno);
2814 }
2815 
2816 void
2817 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg)
2818 {
2819 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2820 	struct spdk_bs_cpl	cpl;
2821 	spdk_bs_sequence_t	*seq;
2822 
2823 	assert(blob != NULL);
2824 
2825 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
2826 
2827 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2828 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2829 
2830 	if (blob->md_ro) {
2831 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
2832 		return;
2833 	}
2834 
2835 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2836 		cb_fn(cb_arg, 0);
2837 		return;
2838 	}
2839 
2840 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2841 	cpl.u.blob_basic.cb_fn = cb_fn;
2842 	cpl.u.blob_basic.cb_arg = cb_arg;
2843 
2844 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2845 	if (!seq) {
2846 		cb_fn(cb_arg, -ENOMEM);
2847 		return;
2848 	}
2849 
2850 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
2851 }
2852 
2853 /* END spdk_blob_sync_md */
2854 
2855 /* START spdk_blob_close */
2856 
2857 static void
2858 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2859 {
2860 	struct spdk_blob_data *blob = cb_arg;
2861 
2862 	if (bserrno == 0) {
2863 		blob->open_ref--;
2864 		if (blob->open_ref == 0) {
2865 			/*
2866 			 * Blobs with active.num_pages == 0 are deleted blobs.
2867 			 *  these blobs are removed from the blob_store list
2868 			 *  when the deletion process starts - so don't try to
2869 			 *  remove them again.
2870 			 */
2871 			if (blob->active.num_pages > 0) {
2872 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
2873 			}
2874 			_spdk_blob_free(blob);
2875 		}
2876 	}
2877 
2878 	spdk_bs_sequence_finish(seq, bserrno);
2879 }
2880 
2881 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg)
2882 {
2883 	struct spdk_bs_cpl	cpl;
2884 	struct spdk_blob_data	*blob;
2885 	spdk_bs_sequence_t	*seq;
2886 
2887 	assert(b != NULL);
2888 	blob = __blob_to_data(b);
2889 	assert(blob != NULL);
2890 
2891 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
2892 
2893 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2894 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2895 
2896 	if (blob->open_ref == 0) {
2897 		cb_fn(cb_arg, -EBADF);
2898 		return;
2899 	}
2900 
2901 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2902 	cpl.u.blob_basic.cb_fn = cb_fn;
2903 	cpl.u.blob_basic.cb_arg = cb_arg;
2904 
2905 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2906 	if (!seq) {
2907 		cb_fn(cb_arg, -ENOMEM);
2908 		return;
2909 	}
2910 
2911 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2912 		_spdk_blob_close_cpl(seq, blob, 0);
2913 		return;
2914 	}
2915 
2916 	/* Sync metadata */
2917 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
2918 }
2919 
2920 /* END spdk_blob_close */
2921 
2922 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
2923 {
2924 	return spdk_get_io_channel(bs);
2925 }
2926 
2927 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2928 {
2929 	spdk_put_io_channel(channel);
2930 }
2931 
2932 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2933 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
2934 {
2935 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
2936 				     SPDK_BLOB_UNMAP);
2937 }
2938 
2939 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2940 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
2941 {
2942 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
2943 				     SPDK_BLOB_WRITE_ZEROES);
2944 }
2945 
2946 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2947 			   void *payload, uint64_t offset, uint64_t length,
2948 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2949 {
2950 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
2951 				     SPDK_BLOB_WRITE);
2952 }
2953 
2954 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2955 			  void *payload, uint64_t offset, uint64_t length,
2956 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2957 {
2958 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
2959 				     SPDK_BLOB_READ);
2960 }
2961 
2962 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2963 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2964 			    spdk_blob_op_complete cb_fn, void *cb_arg)
2965 {
2966 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
2967 }
2968 
2969 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2970 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2971 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2972 {
2973 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
2974 }
2975 
2976 struct spdk_bs_iter_ctx {
2977 	int64_t page_num;
2978 	struct spdk_blob_store *bs;
2979 
2980 	spdk_blob_op_with_handle_complete cb_fn;
2981 	void *cb_arg;
2982 };
2983 
2984 static void
2985 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
2986 {
2987 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2988 	struct spdk_blob_store *bs = ctx->bs;
2989 	spdk_blob_id id;
2990 
2991 	if (bserrno == 0) {
2992 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
2993 		free(ctx);
2994 		return;
2995 	}
2996 
2997 	ctx->page_num++;
2998 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2999 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
3000 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
3001 		free(ctx);
3002 		return;
3003 	}
3004 
3005 	id = _spdk_bs_page_to_blobid(ctx->page_num);
3006 
3007 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
3008 }
3009 
3010 void
3011 spdk_bs_iter_first(struct spdk_blob_store *bs,
3012 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3013 {
3014 	struct spdk_bs_iter_ctx *ctx;
3015 
3016 	ctx = calloc(1, sizeof(*ctx));
3017 	if (!ctx) {
3018 		cb_fn(cb_arg, NULL, -ENOMEM);
3019 		return;
3020 	}
3021 
3022 	ctx->page_num = -1;
3023 	ctx->bs = bs;
3024 	ctx->cb_fn = cb_fn;
3025 	ctx->cb_arg = cb_arg;
3026 
3027 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3028 }
3029 
3030 static void
3031 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
3032 {
3033 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3034 
3035 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3036 }
3037 
3038 void
3039 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *b,
3040 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3041 {
3042 	struct spdk_bs_iter_ctx *ctx;
3043 	struct spdk_blob_data	*blob;
3044 
3045 	assert(b != NULL);
3046 	blob = __blob_to_data(b);
3047 	assert(blob != NULL);
3048 
3049 	ctx = calloc(1, sizeof(*ctx));
3050 	if (!ctx) {
3051 		cb_fn(cb_arg, NULL, -ENOMEM);
3052 		return;
3053 	}
3054 
3055 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
3056 	ctx->bs = bs;
3057 	ctx->cb_fn = cb_fn;
3058 	ctx->cb_arg = cb_arg;
3059 
3060 	/* Close the existing blob */
3061 	spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx);
3062 }
3063 
3064 int
3065 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value,
3066 		    uint16_t value_len)
3067 {
3068 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3069 	struct spdk_xattr 	*xattr;
3070 
3071 	assert(blob != NULL);
3072 
3073 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3074 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3075 
3076 	if (blob->md_ro) {
3077 		return -EPERM;
3078 	}
3079 
3080 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3081 		if (!strcmp(name, xattr->name)) {
3082 			free(xattr->value);
3083 			xattr->value_len = value_len;
3084 			xattr->value = malloc(value_len);
3085 			memcpy(xattr->value, value, value_len);
3086 
3087 			blob->state = SPDK_BLOB_STATE_DIRTY;
3088 
3089 			return 0;
3090 		}
3091 	}
3092 
3093 	xattr = calloc(1, sizeof(*xattr));
3094 	if (!xattr) {
3095 		return -1;
3096 	}
3097 	xattr->name = strdup(name);
3098 	xattr->value_len = value_len;
3099 	xattr->value = malloc(value_len);
3100 	memcpy(xattr->value, value, value_len);
3101 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
3102 
3103 	blob->state = SPDK_BLOB_STATE_DIRTY;
3104 
3105 	return 0;
3106 }
3107 
3108 int
3109 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name)
3110 {
3111 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3112 	struct spdk_xattr	*xattr;
3113 
3114 	assert(blob != NULL);
3115 
3116 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3117 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3118 
3119 	if (blob->md_ro) {
3120 		return -EPERM;
3121 	}
3122 
3123 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3124 		if (!strcmp(name, xattr->name)) {
3125 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
3126 			free(xattr->value);
3127 			free(xattr->name);
3128 			free(xattr);
3129 
3130 			blob->state = SPDK_BLOB_STATE_DIRTY;
3131 
3132 			return 0;
3133 		}
3134 	}
3135 
3136 	return -ENOENT;
3137 }
3138 
3139 int
3140 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name,
3141 			  const void **value, size_t *value_len)
3142 {
3143 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3144 	struct spdk_xattr	*xattr;
3145 
3146 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3147 		if (!strcmp(name, xattr->name)) {
3148 			*value = xattr->value;
3149 			*value_len = xattr->value_len;
3150 			return 0;
3151 		}
3152 	}
3153 
3154 	return -ENOENT;
3155 }
3156 
3157 struct spdk_xattr_names {
3158 	uint32_t	count;
3159 	const char	*names[0];
3160 };
3161 
3162 int
3163 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names)
3164 {
3165 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3166 	struct spdk_xattr	*xattr;
3167 	int			count = 0;
3168 
3169 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3170 		count++;
3171 	}
3172 
3173 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
3174 	if (*names == NULL) {
3175 		return -ENOMEM;
3176 	}
3177 
3178 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3179 		(*names)->names[(*names)->count++] = xattr->name;
3180 	}
3181 
3182 	return 0;
3183 }
3184 
3185 uint32_t
3186 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
3187 {
3188 	assert(names != NULL);
3189 
3190 	return names->count;
3191 }
3192 
3193 const char *
3194 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
3195 {
3196 	if (index >= names->count) {
3197 		return NULL;
3198 	}
3199 
3200 	return names->names[index];
3201 }
3202 
3203 void
3204 spdk_xattr_names_free(struct spdk_xattr_names *names)
3205 {
3206 	free(names);
3207 }
3208 
3209 struct spdk_bs_type
3210 spdk_bs_get_bstype(struct spdk_blob_store *bs)
3211 {
3212 	return bs->bstype;
3213 }
3214 
3215 void
3216 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
3217 {
3218 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
3219 }
3220 
3221 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
3222