xref: /spdk/lib/blob/blobstore.c (revision d52dbda28860c919ebca444a86502d36850fe22d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 
53 static inline size_t
54 divide_round_up(size_t num, size_t divisor)
55 {
56 	return (num + divisor - 1) / divisor;
57 }
58 
59 static void
60 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
61 {
62 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
63 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
64 	assert(bs->num_free_clusters > 0);
65 
66 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
67 
68 	spdk_bit_array_set(bs->used_clusters, cluster_num);
69 	bs->num_free_clusters--;
70 }
71 
72 static void
73 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
74 {
75 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
76 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
77 	assert(bs->num_free_clusters < bs->total_clusters);
78 
79 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
80 
81 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
82 	bs->num_free_clusters++;
83 }
84 
85 static struct spdk_blob_data *
86 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
87 {
88 	struct spdk_blob_data *blob;
89 
90 	blob = calloc(1, sizeof(*blob));
91 	if (!blob) {
92 		return NULL;
93 	}
94 
95 	blob->id = id;
96 	blob->bs = bs;
97 
98 	blob->state = SPDK_BLOB_STATE_DIRTY;
99 	blob->active.num_pages = 1;
100 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
101 	if (!blob->active.pages) {
102 		free(blob);
103 		return NULL;
104 	}
105 
106 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
107 
108 	TAILQ_INIT(&blob->xattrs);
109 
110 	return blob;
111 }
112 
113 static void
114 _spdk_blob_free(struct spdk_blob_data *blob)
115 {
116 	struct spdk_xattr 	*xattr, *xattr_tmp;
117 
118 	assert(blob != NULL);
119 
120 	free(blob->active.clusters);
121 	free(blob->clean.clusters);
122 	free(blob->active.pages);
123 	free(blob->clean.pages);
124 
125 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
126 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
127 		free(xattr->name);
128 		free(xattr->value);
129 		free(xattr);
130 	}
131 
132 	free(blob);
133 }
134 
135 static int
136 _spdk_blob_mark_clean(struct spdk_blob_data *blob)
137 {
138 	uint64_t *clusters = NULL;
139 	uint32_t *pages = NULL;
140 
141 	assert(blob != NULL);
142 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
143 	       blob->state == SPDK_BLOB_STATE_SYNCING);
144 
145 	if (blob->active.num_clusters) {
146 		assert(blob->active.clusters);
147 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
148 		if (!clusters) {
149 			return -1;
150 		}
151 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
152 	}
153 
154 	if (blob->active.num_pages) {
155 		assert(blob->active.pages);
156 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
157 		if (!pages) {
158 			free(clusters);
159 			return -1;
160 		}
161 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
162 	}
163 
164 	free(blob->clean.clusters);
165 	free(blob->clean.pages);
166 
167 	blob->clean.num_clusters = blob->active.num_clusters;
168 	blob->clean.clusters = blob->active.clusters;
169 	blob->clean.num_pages = blob->active.num_pages;
170 	blob->clean.pages = blob->active.pages;
171 
172 	blob->active.clusters = clusters;
173 	blob->active.pages = pages;
174 
175 	blob->state = SPDK_BLOB_STATE_CLEAN;
176 
177 	return 0;
178 }
179 
180 static int
181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob)
182 {
183 	struct spdk_blob_md_descriptor *desc;
184 	size_t	cur_desc = 0;
185 	void *tmp;
186 
187 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
188 	while (cur_desc < sizeof(page->descriptors)) {
189 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
190 			if (desc->length == 0) {
191 				/* If padding and length are 0, this terminates the page */
192 				break;
193 			}
194 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
195 			struct spdk_blob_md_descriptor_flags	*desc_flags;
196 
197 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
198 
199 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
200 				return -EINVAL;
201 			}
202 
203 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
204 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
205 				return -EINVAL;
206 			}
207 
208 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
209 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
210 				blob->data_ro = true;
211 				blob->md_ro = true;
212 			}
213 
214 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
215 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
216 				blob->md_ro = true;
217 			}
218 
219 			blob->invalid_flags = desc_flags->invalid_flags;
220 			blob->data_ro_flags = desc_flags->data_ro_flags;
221 			blob->md_ro_flags = desc_flags->md_ro_flags;
222 
223 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
224 			struct spdk_blob_md_descriptor_extent	*desc_extent;
225 			unsigned int				i, j;
226 			unsigned int				cluster_count = blob->active.num_clusters;
227 
228 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
229 
230 			if (desc_extent->length == 0 ||
231 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
232 				return -EINVAL;
233 			}
234 
235 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
236 				for (j = 0; j < desc_extent->extents[i].length; j++) {
237 					if (!spdk_bit_array_get(blob->bs->used_clusters,
238 								desc_extent->extents[i].cluster_idx + j)) {
239 						return -EINVAL;
240 					}
241 					cluster_count++;
242 				}
243 			}
244 
245 			if (cluster_count == 0) {
246 				return -EINVAL;
247 			}
248 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
249 			if (tmp == NULL) {
250 				return -ENOMEM;
251 			}
252 			blob->active.clusters = tmp;
253 			blob->active.cluster_array_size = cluster_count;
254 
255 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
256 				for (j = 0; j < desc_extent->extents[i].length; j++) {
257 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
258 							desc_extent->extents[i].cluster_idx + j);
259 				}
260 			}
261 
262 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
263 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
264 			struct spdk_xattr 			*xattr;
265 
266 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
267 
268 			if (desc_xattr->length != sizeof(desc_xattr->name_length) +
269 			    sizeof(desc_xattr->value_length) +
270 			    desc_xattr->name_length + desc_xattr->value_length) {
271 				return -EINVAL;
272 			}
273 
274 			xattr = calloc(1, sizeof(*xattr));
275 			if (xattr == NULL) {
276 				return -ENOMEM;
277 			}
278 
279 			xattr->name = malloc(desc_xattr->name_length + 1);
280 			if (xattr->name == NULL) {
281 				free(xattr);
282 				return -ENOMEM;
283 			}
284 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
285 			xattr->name[desc_xattr->name_length] = '\0';
286 
287 			xattr->value = malloc(desc_xattr->value_length);
288 			if (xattr->value == NULL) {
289 				free(xattr->name);
290 				free(xattr);
291 				return -ENOMEM;
292 			}
293 			xattr->value_len = desc_xattr->value_length;
294 			memcpy(xattr->value,
295 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
296 			       desc_xattr->value_length);
297 
298 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
299 		} else {
300 			/* Unrecognized descriptor type.  Do not fail - just continue to the
301 			 *  next descriptor.  If this descriptor is associated with some feature
302 			 *  defined in a newer version of blobstore, that version of blobstore
303 			 *  should create and set an associated feature flag to specify if this
304 			 *  blob can be loaded or not.
305 			 */
306 		}
307 
308 		/* Advance to the next descriptor */
309 		cur_desc += sizeof(*desc) + desc->length;
310 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
311 			break;
312 		}
313 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
314 	}
315 
316 	return 0;
317 }
318 
319 static int
320 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
321 		 struct spdk_blob_data *blob)
322 {
323 	const struct spdk_blob_md_page *page;
324 	uint32_t i;
325 	int rc;
326 
327 	assert(page_count > 0);
328 	assert(pages[0].sequence_num == 0);
329 	assert(blob != NULL);
330 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
331 	assert(blob->active.clusters == NULL);
332 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
333 
334 	/* The blobid provided doesn't match what's in the MD, this can
335 	 * happen for example if a bogus blobid is passed in through open.
336 	 */
337 	if (blob->id != pages[0].id) {
338 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
339 			    blob->id, pages[0].id);
340 		return -ENOENT;
341 	}
342 
343 	for (i = 0; i < page_count; i++) {
344 		page = &pages[i];
345 
346 		assert(page->id == blob->id);
347 		assert(page->sequence_num == i);
348 
349 		rc = _spdk_blob_parse_page(page, blob);
350 		if (rc != 0) {
351 			return rc;
352 		}
353 	}
354 
355 	return 0;
356 }
357 
358 static int
359 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob,
360 			      struct spdk_blob_md_page **pages,
361 			      uint32_t *page_count,
362 			      struct spdk_blob_md_page **last_page)
363 {
364 	struct spdk_blob_md_page *page;
365 
366 	assert(pages != NULL);
367 	assert(page_count != NULL);
368 
369 	if (*page_count == 0) {
370 		assert(*pages == NULL);
371 		*page_count = 1;
372 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
373 					 SPDK_BS_PAGE_SIZE,
374 					 NULL);
375 	} else {
376 		assert(*pages != NULL);
377 		(*page_count)++;
378 		*pages = spdk_dma_realloc(*pages,
379 					  SPDK_BS_PAGE_SIZE * (*page_count),
380 					  SPDK_BS_PAGE_SIZE,
381 					  NULL);
382 	}
383 
384 	if (*pages == NULL) {
385 		*page_count = 0;
386 		*last_page = NULL;
387 		return -ENOMEM;
388 	}
389 
390 	page = &(*pages)[*page_count - 1];
391 	memset(page, 0, sizeof(*page));
392 	page->id = blob->id;
393 	page->sequence_num = *page_count - 1;
394 	page->next = SPDK_INVALID_MD_PAGE;
395 	*last_page = page;
396 
397 	return 0;
398 }
399 
400 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
401  * Update required_sz on both success and failure.
402  *
403  */
404 static int
405 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
406 			   uint8_t *buf, size_t buf_sz,
407 			   size_t *required_sz)
408 {
409 	struct spdk_blob_md_descriptor_xattr	*desc;
410 
411 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
412 		       strlen(xattr->name) +
413 		       xattr->value_len;
414 
415 	if (buf_sz < *required_sz) {
416 		return -1;
417 	}
418 
419 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
420 
421 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
422 	desc->length = sizeof(desc->name_length) +
423 		       sizeof(desc->value_length) +
424 		       strlen(xattr->name) +
425 		       xattr->value_len;
426 	desc->name_length = strlen(xattr->name);
427 	desc->value_length = xattr->value_len;
428 
429 	memcpy(desc->name, xattr->name, desc->name_length);
430 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
431 	       xattr->value,
432 	       desc->value_length);
433 
434 	return 0;
435 }
436 
437 static void
438 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob,
439 			    uint64_t start_cluster, uint64_t *next_cluster,
440 			    uint8_t *buf, size_t buf_sz)
441 {
442 	struct spdk_blob_md_descriptor_extent *desc;
443 	size_t cur_sz;
444 	uint64_t i, extent_idx;
445 	uint32_t lba, lba_per_cluster, lba_count;
446 
447 	/* The buffer must have room for at least one extent */
448 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
449 	if (buf_sz < cur_sz) {
450 		*next_cluster = start_cluster;
451 		return;
452 	}
453 
454 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
455 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
456 
457 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
458 
459 	lba = blob->active.clusters[start_cluster];
460 	lba_count = lba_per_cluster;
461 	extent_idx = 0;
462 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
463 		if ((lba + lba_count) == blob->active.clusters[i]) {
464 			lba_count += lba_per_cluster;
465 			continue;
466 		}
467 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
468 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
469 		extent_idx++;
470 
471 		cur_sz += sizeof(desc->extents[extent_idx]);
472 
473 		if (buf_sz < cur_sz) {
474 			/* If we ran out of buffer space, return */
475 			desc->length = sizeof(desc->extents[0]) * extent_idx;
476 			*next_cluster = i;
477 			return;
478 		}
479 
480 		lba = blob->active.clusters[i];
481 		lba_count = lba_per_cluster;
482 	}
483 
484 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
485 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
486 	extent_idx++;
487 
488 	desc->length = sizeof(desc->extents[0]) * extent_idx;
489 	*next_cluster = blob->active.num_clusters;
490 
491 	return;
492 }
493 
494 static void
495 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob,
496 			   uint8_t *buf, size_t *buf_sz)
497 {
498 	struct spdk_blob_md_descriptor_flags *desc;
499 
500 	/*
501 	 * Flags get serialized first, so we should always have room for the flags
502 	 *  descriptor.
503 	 */
504 	assert(*buf_sz >= sizeof(*desc));
505 
506 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
507 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
508 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
509 	desc->invalid_flags = blob->invalid_flags;
510 	desc->data_ro_flags = blob->data_ro_flags;
511 	desc->md_ro_flags = blob->md_ro_flags;
512 
513 	*buf_sz -= sizeof(*desc);
514 }
515 
516 static int
517 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages,
518 		     uint32_t *page_count)
519 {
520 	struct spdk_blob_md_page		*cur_page;
521 	const struct spdk_xattr			*xattr;
522 	int 					rc;
523 	uint8_t					*buf;
524 	size_t					remaining_sz;
525 	uint64_t				last_cluster;
526 
527 	assert(pages != NULL);
528 	assert(page_count != NULL);
529 	assert(blob != NULL);
530 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
531 
532 	*pages = NULL;
533 	*page_count = 0;
534 
535 	/* A blob always has at least 1 page, even if it has no descriptors */
536 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
537 	if (rc < 0) {
538 		return rc;
539 	}
540 
541 	buf = (uint8_t *)cur_page->descriptors;
542 	remaining_sz = sizeof(cur_page->descriptors);
543 
544 	/* Serialize flags */
545 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
546 
547 	/* Serialize xattrs */
548 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
549 		size_t required_sz = 0;
550 		rc = _spdk_blob_serialize_xattr(xattr,
551 						buf, remaining_sz,
552 						&required_sz);
553 		if (rc < 0) {
554 			/* Need to add a new page to the chain */
555 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
556 							   &cur_page);
557 			if (rc < 0) {
558 				spdk_dma_free(*pages);
559 				*pages = NULL;
560 				*page_count = 0;
561 				return rc;
562 			}
563 
564 			buf = (uint8_t *)cur_page->descriptors;
565 			remaining_sz = sizeof(cur_page->descriptors);
566 
567 			/* Try again */
568 			required_sz = 0;
569 			rc = _spdk_blob_serialize_xattr(xattr,
570 							buf, remaining_sz,
571 							&required_sz);
572 
573 			if (rc < 0) {
574 				spdk_dma_free(*pages);
575 				*pages = NULL;
576 				*page_count = 0;
577 				return -1;
578 			}
579 		}
580 
581 		remaining_sz -= required_sz;
582 		buf += required_sz;
583 	}
584 
585 	/* Serialize extents */
586 	last_cluster = 0;
587 	while (last_cluster < blob->active.num_clusters) {
588 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
589 					    buf, remaining_sz);
590 
591 		if (last_cluster == blob->active.num_clusters) {
592 			break;
593 		}
594 
595 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
596 						   &cur_page);
597 		if (rc < 0) {
598 			return rc;
599 		}
600 
601 		buf = (uint8_t *)cur_page->descriptors;
602 		remaining_sz = sizeof(cur_page->descriptors);
603 	}
604 
605 	return 0;
606 }
607 
608 struct spdk_blob_load_ctx {
609 	struct spdk_blob_data 		*blob;
610 
611 	struct spdk_blob_md_page	*pages;
612 	uint32_t			num_pages;
613 
614 	spdk_bs_sequence_cpl		cb_fn;
615 	void				*cb_arg;
616 };
617 
618 static uint32_t
619 _spdk_blob_md_page_calc_crc(void *page)
620 {
621 	uint32_t		crc;
622 
623 	crc = BLOB_CRC32C_INITIAL;
624 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
625 	crc ^= BLOB_CRC32C_INITIAL;
626 
627 	return crc;
628 
629 }
630 
631 static void
632 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
633 {
634 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
635 	struct spdk_blob_data 		*blob = ctx->blob;
636 	struct spdk_blob_md_page	*page;
637 	int				rc;
638 	uint32_t			crc;
639 
640 	page = &ctx->pages[ctx->num_pages - 1];
641 	crc = _spdk_blob_md_page_calc_crc(page);
642 	if (crc != page->crc) {
643 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
644 		_spdk_blob_free(blob);
645 		ctx->cb_fn(seq, NULL, -EINVAL);
646 		spdk_dma_free(ctx->pages);
647 		free(ctx);
648 		return;
649 	}
650 
651 	if (page->next != SPDK_INVALID_MD_PAGE) {
652 		uint32_t next_page = page->next;
653 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
654 
655 
656 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
657 
658 		/* Read the next page */
659 		ctx->num_pages++;
660 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
661 					      sizeof(*page), NULL);
662 		if (ctx->pages == NULL) {
663 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
664 			free(ctx);
665 			return;
666 		}
667 
668 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
669 				      next_lba,
670 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
671 				      _spdk_blob_load_cpl, ctx);
672 		return;
673 	}
674 
675 	/* Parse the pages */
676 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
677 	if (rc) {
678 		_spdk_blob_free(blob);
679 		ctx->cb_fn(seq, NULL, rc);
680 		spdk_dma_free(ctx->pages);
681 		free(ctx);
682 		return;
683 	}
684 
685 	_spdk_blob_mark_clean(blob);
686 
687 	ctx->cb_fn(seq, ctx->cb_arg, rc);
688 
689 	/* Free the memory */
690 	spdk_dma_free(ctx->pages);
691 	free(ctx);
692 }
693 
694 /* Load a blob from disk given a blobid */
695 static void
696 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
697 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
698 {
699 	struct spdk_blob_load_ctx *ctx;
700 	struct spdk_blob_store *bs;
701 	uint32_t page_num;
702 	uint64_t lba;
703 
704 	assert(blob != NULL);
705 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
706 	       blob->state == SPDK_BLOB_STATE_DIRTY);
707 
708 	bs = blob->bs;
709 
710 	ctx = calloc(1, sizeof(*ctx));
711 	if (!ctx) {
712 		cb_fn(seq, cb_arg, -ENOMEM);
713 		return;
714 	}
715 
716 	ctx->blob = blob;
717 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
718 				      SPDK_BS_PAGE_SIZE, NULL);
719 	if (!ctx->pages) {
720 		free(ctx);
721 		cb_fn(seq, cb_arg, -ENOMEM);
722 		return;
723 	}
724 	ctx->num_pages = 1;
725 	ctx->cb_fn = cb_fn;
726 	ctx->cb_arg = cb_arg;
727 
728 	page_num = _spdk_bs_blobid_to_page(blob->id);
729 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
730 
731 	blob->state = SPDK_BLOB_STATE_LOADING;
732 
733 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
734 			      _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
735 			      _spdk_blob_load_cpl, ctx);
736 }
737 
738 struct spdk_blob_persist_ctx {
739 	struct spdk_blob_data 		*blob;
740 
741 	struct spdk_blob_md_page	*pages;
742 
743 	uint64_t			idx;
744 
745 	spdk_bs_sequence_cpl		cb_fn;
746 	void				*cb_arg;
747 };
748 
749 static void
750 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
751 {
752 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
753 	struct spdk_blob_data 		*blob = ctx->blob;
754 
755 	if (bserrno == 0) {
756 		_spdk_blob_mark_clean(blob);
757 	}
758 
759 	/* Call user callback */
760 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
761 
762 	/* Free the memory */
763 	spdk_dma_free(ctx->pages);
764 	free(ctx);
765 }
766 
767 static void
768 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
769 {
770 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
771 	struct spdk_blob_data 		*blob = ctx->blob;
772 	struct spdk_blob_store		*bs = blob->bs;
773 	void				*tmp;
774 	size_t				i;
775 
776 	/* Release all clusters that were truncated */
777 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
778 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
779 
780 		_spdk_bs_release_cluster(bs, cluster_num);
781 	}
782 
783 	if (blob->active.num_clusters == 0) {
784 		free(blob->active.clusters);
785 		blob->active.clusters = NULL;
786 		blob->active.cluster_array_size = 0;
787 	} else {
788 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
789 		assert(tmp != NULL);
790 		blob->active.clusters = tmp;
791 		blob->active.cluster_array_size = blob->active.num_clusters;
792 	}
793 
794 	_spdk_blob_persist_complete(seq, ctx, bserrno);
795 }
796 
797 static void
798 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
799 {
800 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
801 	struct spdk_blob_data 		*blob = ctx->blob;
802 	struct spdk_blob_store		*bs = blob->bs;
803 	spdk_bs_batch_t			*batch;
804 	size_t				i;
805 	uint64_t			lba;
806 	uint32_t			lba_count;
807 
808 	/* Clusters don't move around in blobs. The list shrinks or grows
809 	 * at the end, but no changes ever occur in the middle of the list.
810 	 */
811 
812 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
813 
814 	/* Unmap all clusters that were truncated */
815 	lba = 0;
816 	lba_count = 0;
817 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
818 		uint64_t next_lba = blob->active.clusters[i];
819 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
820 
821 		if ((lba + lba_count) == next_lba) {
822 			/* This cluster is contiguous with the previous one. */
823 			lba_count += next_lba_count;
824 			continue;
825 		}
826 
827 		/* This cluster is not contiguous with the previous one. */
828 
829 		/* If a run of LBAs previously existing, send them
830 		 * as an unmap.
831 		 */
832 		if (lba_count > 0) {
833 			spdk_bs_batch_unmap(batch, lba, lba_count);
834 		}
835 
836 		/* Start building the next batch */
837 		lba = next_lba;
838 		lba_count = next_lba_count;
839 	}
840 
841 	/* If we ended with a contiguous set of LBAs, send the unmap now */
842 	if (lba_count > 0) {
843 		spdk_bs_batch_unmap(batch, lba, lba_count);
844 	}
845 
846 	spdk_bs_batch_close(batch);
847 }
848 
849 static void
850 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
851 {
852 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
853 	struct spdk_blob_data 		*blob = ctx->blob;
854 	struct spdk_blob_store		*bs = blob->bs;
855 	size_t				i;
856 
857 	/* This loop starts at 1 because the first page is special and handled
858 	 * below. The pages (except the first) are never written in place,
859 	 * so any pages in the clean list must be zeroed.
860 	 */
861 	for (i = 1; i < blob->clean.num_pages; i++) {
862 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
863 	}
864 
865 	if (blob->active.num_pages == 0) {
866 		uint32_t page_num;
867 
868 		page_num = _spdk_bs_blobid_to_page(blob->id);
869 		spdk_bit_array_clear(bs->used_md_pages, page_num);
870 	}
871 
872 	/* Move on to unmapping clusters */
873 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
874 }
875 
876 static void
877 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
878 {
879 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
880 	struct spdk_blob_data 		*blob = ctx->blob;
881 	struct spdk_blob_store		*bs = blob->bs;
882 	uint64_t			lba;
883 	uint32_t			lba_count;
884 	spdk_bs_batch_t			*batch;
885 	size_t				i;
886 
887 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
888 
889 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
890 
891 	/* This loop starts at 1 because the first page is special and handled
892 	 * below. The pages (except the first) are never written in place,
893 	 * so any pages in the clean list must be zeroed.
894 	 */
895 	for (i = 1; i < blob->clean.num_pages; i++) {
896 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
897 
898 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
899 	}
900 
901 	/* The first page will only be zeroed if this is a delete. */
902 	if (blob->active.num_pages == 0) {
903 		uint32_t page_num;
904 
905 		/* The first page in the metadata goes where the blobid indicates */
906 		page_num = _spdk_bs_blobid_to_page(blob->id);
907 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
908 
909 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
910 	}
911 
912 	spdk_bs_batch_close(batch);
913 }
914 
915 static void
916 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
917 {
918 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
919 	struct spdk_blob_data		*blob = ctx->blob;
920 	struct spdk_blob_store		*bs = blob->bs;
921 	uint64_t			lba;
922 	uint32_t			lba_count;
923 	struct spdk_blob_md_page	*page;
924 
925 	if (blob->active.num_pages == 0) {
926 		/* Move on to the next step */
927 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
928 		return;
929 	}
930 
931 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
932 
933 	page = &ctx->pages[0];
934 	/* The first page in the metadata goes where the blobid indicates */
935 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
936 
937 	spdk_bs_sequence_write(seq, page, lba, lba_count,
938 			       _spdk_blob_persist_zero_pages, ctx);
939 }
940 
941 static void
942 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
943 {
944 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
945 	struct spdk_blob_data 		*blob = ctx->blob;
946 	struct spdk_blob_store		*bs = blob->bs;
947 	uint64_t 			lba;
948 	uint32_t			lba_count;
949 	struct spdk_blob_md_page	*page;
950 	spdk_bs_batch_t			*batch;
951 	size_t				i;
952 
953 	/* Clusters don't move around in blobs. The list shrinks or grows
954 	 * at the end, but no changes ever occur in the middle of the list.
955 	 */
956 
957 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
958 
959 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
960 
961 	/* This starts at 1. The root page is not written until
962 	 * all of the others are finished
963 	 */
964 	for (i = 1; i < blob->active.num_pages; i++) {
965 		page = &ctx->pages[i];
966 		assert(page->sequence_num == i);
967 
968 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
969 
970 		spdk_bs_batch_write(batch, page, lba, lba_count);
971 	}
972 
973 	spdk_bs_batch_close(batch);
974 }
975 
976 static int
977 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz)
978 {
979 	uint64_t	i;
980 	uint64_t	*tmp;
981 	uint64_t	lfc; /* lowest free cluster */
982 	struct spdk_blob_store *bs;
983 
984 	bs = blob->bs;
985 
986 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
987 	       blob->state != SPDK_BLOB_STATE_SYNCING);
988 
989 	if (blob->active.num_clusters == sz) {
990 		return 0;
991 	}
992 
993 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
994 		/* If this blob was resized to be larger, then smaller, then
995 		 * larger without syncing, then the cluster array already
996 		 * contains spare assigned clusters we can use.
997 		 */
998 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
999 						     sz);
1000 	}
1001 
1002 	blob->state = SPDK_BLOB_STATE_DIRTY;
1003 
1004 	/* Do two passes - one to verify that we can obtain enough clusters
1005 	 * and another to actually claim them.
1006 	 */
1007 
1008 	lfc = 0;
1009 	for (i = blob->active.num_clusters; i < sz; i++) {
1010 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1011 		if (lfc >= bs->total_clusters) {
1012 			/* No more free clusters. Cannot satisfy the request */
1013 			assert(false);
1014 			return -1;
1015 		}
1016 		lfc++;
1017 	}
1018 
1019 	if (sz > blob->active.num_clusters) {
1020 		/* Expand the cluster array if necessary.
1021 		 * We only shrink the array when persisting.
1022 		 */
1023 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1024 		if (sz > 0 && tmp == NULL) {
1025 			assert(false);
1026 			return -1;
1027 		}
1028 		blob->active.clusters = tmp;
1029 		blob->active.cluster_array_size = sz;
1030 	}
1031 
1032 	lfc = 0;
1033 	for (i = blob->active.num_clusters; i < sz; i++) {
1034 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1035 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
1036 		_spdk_bs_claim_cluster(bs, lfc);
1037 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
1038 		lfc++;
1039 	}
1040 
1041 	blob->active.num_clusters = sz;
1042 
1043 	return 0;
1044 }
1045 
1046 /* Write a blob to disk */
1047 static void
1048 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
1049 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1050 {
1051 	struct spdk_blob_persist_ctx *ctx;
1052 	int rc;
1053 	uint64_t i;
1054 	uint32_t page_num;
1055 	struct spdk_blob_store *bs;
1056 
1057 	assert(blob != NULL);
1058 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
1059 	       blob->state == SPDK_BLOB_STATE_DIRTY);
1060 
1061 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1062 		cb_fn(seq, cb_arg, 0);
1063 		return;
1064 	}
1065 
1066 	bs = blob->bs;
1067 
1068 	ctx = calloc(1, sizeof(*ctx));
1069 	if (!ctx) {
1070 		cb_fn(seq, cb_arg, -ENOMEM);
1071 		return;
1072 	}
1073 	ctx->blob = blob;
1074 	ctx->cb_fn = cb_fn;
1075 	ctx->cb_arg = cb_arg;
1076 
1077 	blob->state = SPDK_BLOB_STATE_SYNCING;
1078 
1079 	if (blob->active.num_pages == 0) {
1080 		/* This is the signal that the blob should be deleted.
1081 		 * Immediately jump to the clean up routine. */
1082 		assert(blob->clean.num_pages > 0);
1083 		ctx->idx = blob->clean.num_pages - 1;
1084 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1085 		return;
1086 
1087 	}
1088 
1089 	/* Generate the new metadata */
1090 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1091 	if (rc < 0) {
1092 		free(ctx);
1093 		cb_fn(seq, cb_arg, rc);
1094 		return;
1095 	}
1096 
1097 	assert(blob->active.num_pages >= 1);
1098 
1099 	/* Resize the cache of page indices */
1100 	blob->active.pages = realloc(blob->active.pages,
1101 				     blob->active.num_pages * sizeof(*blob->active.pages));
1102 	if (!blob->active.pages) {
1103 		free(ctx);
1104 		cb_fn(seq, cb_arg, -ENOMEM);
1105 		return;
1106 	}
1107 
1108 	/* Assign this metadata to pages. This requires two passes -
1109 	 * one to verify that there are enough pages and a second
1110 	 * to actually claim them. */
1111 	page_num = 0;
1112 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1113 	for (i = 1; i < blob->active.num_pages; i++) {
1114 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1115 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1116 			spdk_dma_free(ctx->pages);
1117 			free(ctx);
1118 			blob->state = SPDK_BLOB_STATE_DIRTY;
1119 			cb_fn(seq, cb_arg, -ENOMEM);
1120 			return;
1121 		}
1122 		page_num++;
1123 	}
1124 
1125 	page_num = 0;
1126 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1127 	for (i = 1; i < blob->active.num_pages; i++) {
1128 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1129 		ctx->pages[i - 1].next = page_num;
1130 		/* Now that previous metadata page is complete, calculate the crc for it. */
1131 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1132 		blob->active.pages[i] = page_num;
1133 		spdk_bit_array_set(bs->used_md_pages, page_num);
1134 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1135 		page_num++;
1136 	}
1137 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1138 	/* Start writing the metadata from last page to first */
1139 	ctx->idx = blob->active.num_pages - 1;
1140 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1141 }
1142 
1143 static void
1144 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1145 			     void *payload, uint64_t offset, uint64_t length,
1146 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1147 {
1148 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1149 	spdk_bs_batch_t			*batch;
1150 	struct spdk_bs_cpl		cpl;
1151 	uint64_t			lba;
1152 	uint32_t			lba_count;
1153 	uint8_t				*buf;
1154 	uint64_t			page;
1155 
1156 	assert(blob != NULL);
1157 
1158 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1159 		cb_fn(cb_arg, -EPERM);
1160 		return;
1161 	}
1162 
1163 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1164 		cb_fn(cb_arg, -EINVAL);
1165 		return;
1166 	}
1167 
1168 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1169 	cpl.u.blob_basic.cb_fn = cb_fn;
1170 	cpl.u.blob_basic.cb_arg = cb_arg;
1171 
1172 	batch = spdk_bs_batch_open(_channel, &cpl);
1173 	if (!batch) {
1174 		cb_fn(cb_arg, -ENOMEM);
1175 		return;
1176 	}
1177 
1178 	length = _spdk_bs_page_to_lba(blob->bs, length);
1179 	page = offset;
1180 	buf = payload;
1181 	while (length > 0) {
1182 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1183 		lba_count = spdk_min(length,
1184 				     _spdk_bs_page_to_lba(blob->bs,
1185 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1186 
1187 		switch (op_type) {
1188 		case SPDK_BLOB_READ:
1189 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1190 			break;
1191 		case SPDK_BLOB_WRITE:
1192 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1193 			break;
1194 		case SPDK_BLOB_UNMAP:
1195 			spdk_bs_batch_unmap(batch, lba, lba_count);
1196 			break;
1197 		case SPDK_BLOB_WRITE_ZEROES:
1198 			spdk_bs_batch_write_zeroes(batch, lba, lba_count);
1199 			break;
1200 		}
1201 
1202 		length -= lba_count;
1203 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1204 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1205 			buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1206 		}
1207 	}
1208 
1209 	spdk_bs_batch_close(batch);
1210 }
1211 
1212 struct rw_iov_ctx {
1213 	struct spdk_blob_data *blob;
1214 	bool read;
1215 	int iovcnt;
1216 	struct iovec *orig_iov;
1217 	uint64_t page_offset;
1218 	uint64_t pages_remaining;
1219 	uint64_t pages_done;
1220 	struct iovec iov[0];
1221 };
1222 
1223 static void
1224 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1225 {
1226 	assert(cb_arg == NULL);
1227 	spdk_bs_sequence_finish(seq, bserrno);
1228 }
1229 
1230 static void
1231 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1232 {
1233 	struct rw_iov_ctx *ctx = cb_arg;
1234 	struct iovec *iov, *orig_iov;
1235 	int iovcnt;
1236 	size_t orig_iovoff;
1237 	uint64_t lba;
1238 	uint64_t page_count, pages_to_boundary;
1239 	uint32_t lba_count;
1240 	uint64_t byte_count;
1241 
1242 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1243 		free(ctx);
1244 		spdk_bs_sequence_finish(seq, bserrno);
1245 		return;
1246 	}
1247 
1248 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
1249 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1250 	lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
1251 	lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);
1252 
1253 	/*
1254 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1255 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1256 	 *  point to the current position in the I/O sequence.
1257 	 */
1258 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1259 	orig_iov = &ctx->orig_iov[0];
1260 	orig_iovoff = 0;
1261 	while (byte_count > 0) {
1262 		if (byte_count >= orig_iov->iov_len) {
1263 			byte_count -= orig_iov->iov_len;
1264 			orig_iov++;
1265 		} else {
1266 			orig_iovoff = byte_count;
1267 			byte_count = 0;
1268 		}
1269 	}
1270 
1271 	/*
1272 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1273 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1274 	 */
1275 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1276 	iov = &ctx->iov[0];
1277 	iovcnt = 0;
1278 	while (byte_count > 0) {
1279 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1280 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1281 		byte_count -= iov->iov_len;
1282 		orig_iovoff = 0;
1283 		orig_iov++;
1284 		iov++;
1285 		iovcnt++;
1286 	}
1287 
1288 	ctx->page_offset += page_count;
1289 	ctx->pages_done += page_count;
1290 	ctx->pages_remaining -= page_count;
1291 	iov = &ctx->iov[0];
1292 
1293 	if (ctx->read) {
1294 		spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1295 	} else {
1296 		spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1297 	}
1298 }
1299 
1300 static void
1301 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1302 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1303 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1304 {
1305 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1306 	spdk_bs_sequence_t		*seq;
1307 	struct spdk_bs_cpl		cpl;
1308 
1309 	assert(blob != NULL);
1310 
1311 	if (!read && blob->data_ro) {
1312 		cb_fn(cb_arg, -EPERM);
1313 		return;
1314 	}
1315 
1316 	if (length == 0) {
1317 		cb_fn(cb_arg, 0);
1318 		return;
1319 	}
1320 
1321 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1322 		cb_fn(cb_arg, -EINVAL);
1323 		return;
1324 	}
1325 
1326 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1327 	cpl.u.blob_basic.cb_fn = cb_fn;
1328 	cpl.u.blob_basic.cb_arg = cb_arg;
1329 
1330 	/*
1331 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1332 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1333 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1334 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1335 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1336 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1337 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1338 	 *
1339 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1340 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1341 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1342 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1343 	 */
1344 	seq = spdk_bs_sequence_start(_channel, &cpl);
1345 	if (!seq) {
1346 		cb_fn(cb_arg, -ENOMEM);
1347 		return;
1348 	}
1349 
1350 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1351 		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
1352 		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1353 
1354 		if (read) {
1355 			spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1356 		} else {
1357 			spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1358 		}
1359 	} else {
1360 		struct rw_iov_ctx *ctx;
1361 
1362 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1363 		if (ctx == NULL) {
1364 			spdk_bs_sequence_finish(seq, -ENOMEM);
1365 			return;
1366 		}
1367 
1368 		ctx->blob = blob;
1369 		ctx->read = read;
1370 		ctx->orig_iov = iov;
1371 		ctx->iovcnt = iovcnt;
1372 		ctx->page_offset = offset;
1373 		ctx->pages_remaining = length;
1374 		ctx->pages_done = 0;
1375 
1376 		_spdk_rw_iov_split_next(seq, ctx, 0);
1377 	}
1378 }
1379 
1380 static struct spdk_blob_data *
1381 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1382 {
1383 	struct spdk_blob_data *blob;
1384 
1385 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1386 		if (blob->id == blobid) {
1387 			return blob;
1388 		}
1389 	}
1390 
1391 	return NULL;
1392 }
1393 
1394 static int
1395 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel,
1396 			uint32_t max_ops)
1397 {
1398 	struct spdk_bs_dev		*dev;
1399 	uint32_t			i;
1400 
1401 	dev = bs->dev;
1402 
1403 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1404 	if (!channel->req_mem) {
1405 		return -1;
1406 	}
1407 
1408 	TAILQ_INIT(&channel->reqs);
1409 
1410 	for (i = 0; i < max_ops; i++) {
1411 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1412 	}
1413 
1414 	channel->bs = bs;
1415 	channel->dev = dev;
1416 	channel->dev_channel = dev->create_channel(dev);
1417 
1418 	if (!channel->dev_channel) {
1419 		SPDK_ERRLOG("Failed to create device channel.\n");
1420 		free(channel->req_mem);
1421 		return -1;
1422 	}
1423 
1424 	return 0;
1425 }
1426 
1427 static int
1428 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf)
1429 {
1430 	struct spdk_blob_store		*bs;
1431 	struct spdk_bs_channel		*channel = ctx_buf;
1432 
1433 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target);
1434 
1435 	return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops);
1436 }
1437 
1438 static int
1439 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf)
1440 {
1441 	struct spdk_blob_store		*bs;
1442 	struct spdk_bs_channel		*channel = ctx_buf;
1443 
1444 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target);
1445 
1446 	return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops);
1447 }
1448 
1449 
1450 static void
1451 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1452 {
1453 	struct spdk_bs_channel *channel = ctx_buf;
1454 
1455 	free(channel->req_mem);
1456 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1457 }
1458 
1459 static void
1460 _spdk_bs_dev_destroy(void *io_device)
1461 {
1462 	struct spdk_blob_store *bs;
1463 	struct spdk_blob_data	*blob, *blob_tmp;
1464 
1465 	bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target);
1466 	bs->dev->destroy(bs->dev);
1467 
1468 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1469 		TAILQ_REMOVE(&bs->blobs, blob, link);
1470 		_spdk_blob_free(blob);
1471 	}
1472 
1473 	spdk_bit_array_free(&bs->used_md_pages);
1474 	spdk_bit_array_free(&bs->used_clusters);
1475 	/*
1476 	 * If this function is called for any reason except a successful unload,
1477 	 * the unload_cpl type will be NONE and this will be a nop.
1478 	 */
1479 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
1480 
1481 	free(bs);
1482 }
1483 
1484 static void
1485 _spdk_bs_free(struct spdk_blob_store *bs)
1486 {
1487 	spdk_bs_unregister_md_thread(bs);
1488 	spdk_io_device_unregister(&bs->io_target, NULL);
1489 	spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy);
1490 }
1491 
1492 void
1493 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1494 {
1495 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1496 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1497 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1498 	opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS;
1499 	memset(&opts->bstype, 0, sizeof(opts->bstype));
1500 }
1501 
1502 static int
1503 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
1504 {
1505 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
1506 	    opts->max_channel_ops == 0) {
1507 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
1508 		return -1;
1509 	}
1510 
1511 	return 0;
1512 }
1513 
1514 static struct spdk_blob_store *
1515 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1516 {
1517 	struct spdk_blob_store	*bs;
1518 	uint64_t dev_size;
1519 	int rc;
1520 
1521 	dev_size = dev->blocklen * dev->blockcnt;
1522 	if (dev_size < opts->cluster_sz) {
1523 		/* Device size cannot be smaller than cluster size of blobstore */
1524 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size,
1525 			    opts->cluster_sz);
1526 		return NULL;
1527 	}
1528 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
1529 		/* Cluster size cannot be smaller than page size */
1530 		SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n",
1531 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
1532 		return NULL;
1533 	}
1534 	bs = calloc(1, sizeof(struct spdk_blob_store));
1535 	if (!bs) {
1536 		return NULL;
1537 	}
1538 
1539 	TAILQ_INIT(&bs->blobs);
1540 	bs->dev = dev;
1541 
1542 	/*
1543 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1544 	 *  even multiple of the cluster size.
1545 	 */
1546 	bs->cluster_sz = opts->cluster_sz;
1547 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1548 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
1549 	bs->num_free_clusters = bs->total_clusters;
1550 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1551 	if (bs->used_clusters == NULL) {
1552 		free(bs);
1553 		return NULL;
1554 	}
1555 
1556 	bs->md_target.max_md_ops = opts->max_md_ops;
1557 	bs->io_target.max_channel_ops = opts->max_channel_ops;
1558 	bs->super_blob = SPDK_BLOBID_INVALID;
1559 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
1560 
1561 	/* The metadata is assumed to be at least 1 page */
1562 	bs->used_md_pages = spdk_bit_array_create(1);
1563 
1564 	spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy,
1565 				sizeof(struct spdk_bs_channel));
1566 	rc = spdk_bs_register_md_thread(bs);
1567 	if (rc == -1) {
1568 		spdk_io_device_unregister(&bs->md_target, NULL);
1569 		spdk_bit_array_free(&bs->used_md_pages);
1570 		spdk_bit_array_free(&bs->used_clusters);
1571 		free(bs);
1572 		return NULL;
1573 	}
1574 
1575 	spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy,
1576 				sizeof(struct spdk_bs_channel));
1577 
1578 	return bs;
1579 }
1580 
1581 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
1582 
1583 struct spdk_bs_load_ctx {
1584 	struct spdk_blob_store		*bs;
1585 	struct spdk_bs_super_block	*super;
1586 
1587 	struct spdk_bs_md_mask		*mask;
1588 	bool				in_page_chain;
1589 	uint32_t			page_index;
1590 	uint32_t			cur_page;
1591 	struct spdk_blob_md_page	*page;
1592 };
1593 
1594 static void
1595 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
1596 {
1597 	uint32_t i = 0;
1598 
1599 	while (true) {
1600 		i = spdk_bit_array_find_first_set(array, i);
1601 		if (i >= mask->length) {
1602 			break;
1603 		}
1604 		mask->mask[i / 8] |= 1U << (i % 8);
1605 		i++;
1606 	}
1607 }
1608 
1609 static void
1610 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1611 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1612 {
1613 	/* Update the values in the super block */
1614 	super->super_blob = bs->super_blob;
1615 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
1616 	super->crc = _spdk_blob_md_page_calc_crc(super);
1617 	spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0),
1618 			       _spdk_bs_byte_to_lba(bs, sizeof(*super)),
1619 			       cb_fn, cb_arg);
1620 }
1621 
1622 static void
1623 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1624 {
1625 	struct spdk_bs_load_ctx	*ctx = arg;
1626 	uint64_t	mask_size, lba, lba_count;
1627 
1628 	/* Write out the used clusters mask */
1629 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1630 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1631 	if (!ctx->mask) {
1632 		spdk_dma_free(ctx->super);
1633 		free(ctx);
1634 		spdk_bs_sequence_finish(seq, -ENOMEM);
1635 		return;
1636 	}
1637 
1638 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1639 	ctx->mask->length = ctx->bs->total_clusters;
1640 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1641 
1642 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
1643 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1644 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1645 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1646 }
1647 
1648 static void
1649 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1650 {
1651 	struct spdk_bs_load_ctx	*ctx = arg;
1652 	uint64_t	mask_size, lba, lba_count;
1653 
1654 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1655 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1656 	if (!ctx->mask) {
1657 		spdk_dma_free(ctx->super);
1658 		free(ctx);
1659 		spdk_bs_sequence_finish(seq, -ENOMEM);
1660 		return;
1661 	}
1662 
1663 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1664 	ctx->mask->length = ctx->super->md_len;
1665 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1666 
1667 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
1668 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1669 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1670 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1671 }
1672 
1673 static void
1674 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1675 {
1676 	struct spdk_bs_load_ctx *ctx = cb_arg;
1677 	uint32_t		i, j;
1678 	int			rc;
1679 
1680 	/* The type must be correct */
1681 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1682 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1683 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1684 					     struct spdk_blob_md_page) * 8));
1685 	/* The length of the mask must be exactly equal to the total number of clusters */
1686 	assert(ctx->mask->length == ctx->bs->total_clusters);
1687 
1688 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1689 	if (rc < 0) {
1690 		spdk_dma_free(ctx->super);
1691 		spdk_dma_free(ctx->mask);
1692 		_spdk_bs_free(ctx->bs);
1693 		free(ctx);
1694 		spdk_bs_sequence_finish(seq, -ENOMEM);
1695 		return;
1696 	}
1697 
1698 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1699 	for (i = 0; i < ctx->mask->length / 8; i++) {
1700 		uint8_t segment = ctx->mask->mask[i];
1701 		for (j = 0; segment && (j < 8); j++) {
1702 			if (segment & 1U) {
1703 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1704 				assert(ctx->bs->num_free_clusters > 0);
1705 				ctx->bs->num_free_clusters--;
1706 			}
1707 			segment >>= 1U;
1708 		}
1709 	}
1710 
1711 	spdk_dma_free(ctx->super);
1712 	spdk_dma_free(ctx->mask);
1713 	free(ctx);
1714 
1715 	spdk_bs_sequence_finish(seq, bserrno);
1716 }
1717 
1718 static void
1719 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1720 {
1721 	struct spdk_bs_load_ctx *ctx = cb_arg;
1722 	uint64_t		lba, lba_count, mask_size;
1723 	uint32_t		i, j;
1724 	int			rc;
1725 
1726 	/* The type must be correct */
1727 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1728 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1729 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
1730 				     8));
1731 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1732 	assert(ctx->mask->length == ctx->super->md_len);
1733 
1734 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1735 	if (rc < 0) {
1736 		spdk_dma_free(ctx->super);
1737 		spdk_dma_free(ctx->mask);
1738 		_spdk_bs_free(ctx->bs);
1739 		free(ctx);
1740 		spdk_bs_sequence_finish(seq, -ENOMEM);
1741 		return;
1742 	}
1743 
1744 	for (i = 0; i < ctx->mask->length / 8; i++) {
1745 		uint8_t segment = ctx->mask->mask[i];
1746 		for (j = 0; segment && (j < 8); j++) {
1747 			if (segment & 1U) {
1748 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1749 			}
1750 			segment >>= 1U;
1751 		}
1752 	}
1753 	spdk_dma_free(ctx->mask);
1754 
1755 	/* Read the used clusters mask */
1756 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1757 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1758 	if (!ctx->mask) {
1759 		spdk_dma_free(ctx->super);
1760 		_spdk_bs_free(ctx->bs);
1761 		free(ctx);
1762 		spdk_bs_sequence_finish(seq, -ENOMEM);
1763 		return;
1764 	}
1765 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1766 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1767 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1768 			      _spdk_bs_load_used_clusters_cpl, ctx);
1769 }
1770 
1771 static void
1772 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1773 {
1774 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1775 	uint64_t lba, lba_count, mask_size;
1776 
1777 	/* Read the used pages mask */
1778 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1779 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1780 	if (!ctx->mask) {
1781 		spdk_dma_free(ctx->super);
1782 		_spdk_bs_free(ctx->bs);
1783 		free(ctx);
1784 		spdk_bs_sequence_finish(seq, -ENOMEM);
1785 		return;
1786 	}
1787 
1788 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1789 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1790 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1791 			      _spdk_bs_load_used_pages_cpl, ctx);
1792 }
1793 
1794 static int
1795 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
1796 {
1797 	struct spdk_blob_md_descriptor *desc;
1798 	size_t	cur_desc = 0;
1799 
1800 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
1801 	while (cur_desc < sizeof(page->descriptors)) {
1802 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
1803 			if (desc->length == 0) {
1804 				/* If padding and length are 0, this terminates the page */
1805 				break;
1806 			}
1807 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
1808 			struct spdk_blob_md_descriptor_extent	*desc_extent;
1809 			unsigned int				i, j;
1810 			unsigned int				cluster_count = 0;
1811 
1812 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
1813 
1814 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
1815 				for (j = 0; j < desc_extent->extents[i].length; j++) {
1816 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
1817 					if (bs->num_free_clusters == 0) {
1818 						return -1;
1819 					}
1820 					bs->num_free_clusters--;
1821 					cluster_count++;
1822 				}
1823 			}
1824 			if (cluster_count == 0) {
1825 				return -1;
1826 			}
1827 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
1828 			/* Skip this item */
1829 		} else {
1830 			/* Error */
1831 			return -1;
1832 		}
1833 		/* Advance to the next descriptor */
1834 		cur_desc += sizeof(*desc) + desc->length;
1835 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
1836 			break;
1837 		}
1838 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
1839 	}
1840 	return 0;
1841 }
1842 
1843 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
1844 {
1845 	uint32_t crc;
1846 
1847 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
1848 	if (crc != ctx->page->crc) {
1849 		return false;
1850 	}
1851 
1852 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
1853 		return false;
1854 	}
1855 	return true;
1856 }
1857 
1858 static void
1859 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
1860 
1861 static void
1862 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1863 {
1864 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1865 
1866 	spdk_dma_free(ctx->mask);
1867 	spdk_dma_free(ctx->super);
1868 	spdk_bs_sequence_finish(seq, bserrno);
1869 	free(ctx);
1870 }
1871 
1872 static void
1873 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1874 {
1875 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1876 
1877 	spdk_dma_free(ctx->mask);
1878 
1879 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
1880 }
1881 
1882 static void
1883 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1884 {
1885 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
1886 }
1887 
1888 static void
1889 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1890 {
1891 	struct spdk_bs_load_ctx *ctx = cb_arg;
1892 	uint32_t page_num;
1893 
1894 	if (bserrno != 0) {
1895 		spdk_dma_free(ctx->super);
1896 		_spdk_bs_free(ctx->bs);
1897 		free(ctx);
1898 		spdk_bs_sequence_finish(seq, bserrno);
1899 		return;
1900 	}
1901 
1902 	page_num = ctx->cur_page;
1903 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
1904 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
1905 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
1906 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
1907 				spdk_dma_free(ctx->super);
1908 				_spdk_bs_free(ctx->bs);
1909 				free(ctx);
1910 				spdk_bs_sequence_finish(seq, -EILSEQ);
1911 				return;
1912 			}
1913 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
1914 				ctx->in_page_chain = true;
1915 				ctx->cur_page = ctx->page->next;
1916 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1917 				return;
1918 			}
1919 		}
1920 	}
1921 
1922 	ctx->in_page_chain = false;
1923 
1924 	do {
1925 		ctx->page_index++;
1926 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
1927 
1928 	if (ctx->page_index < ctx->super->md_len) {
1929 		ctx->cur_page = ctx->page_index;
1930 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1931 	} else {
1932 		spdk_dma_free(ctx->page);
1933 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
1934 	}
1935 }
1936 
1937 static void
1938 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
1939 {
1940 	struct spdk_bs_load_ctx *ctx = cb_arg;
1941 	uint64_t lba;
1942 
1943 	assert(ctx->cur_page < ctx->super->md_len);
1944 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
1945 	spdk_bs_sequence_read(seq, ctx->page, lba,
1946 			      _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
1947 			      _spdk_bs_load_replay_md_cpl, ctx);
1948 }
1949 
1950 static void
1951 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
1952 {
1953 	struct spdk_bs_load_ctx *ctx = cb_arg;
1954 
1955 	ctx->page_index = 0;
1956 	ctx->cur_page = 0;
1957 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
1958 				     SPDK_BS_PAGE_SIZE,
1959 				     NULL);
1960 	if (!ctx->page) {
1961 		spdk_dma_free(ctx->super);
1962 		_spdk_bs_free(ctx->bs);
1963 		free(ctx);
1964 		spdk_bs_sequence_finish(seq, -ENOMEM);
1965 		return;
1966 	}
1967 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1968 }
1969 
1970 static void
1971 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg)
1972 {
1973 	struct spdk_bs_load_ctx *ctx = cb_arg;
1974 	int 		rc;
1975 
1976 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
1977 	if (rc < 0) {
1978 		spdk_dma_free(ctx->super);
1979 		_spdk_bs_free(ctx->bs);
1980 		free(ctx);
1981 		spdk_bs_sequence_finish(seq, -ENOMEM);
1982 		return;
1983 	}
1984 
1985 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1986 	if (rc < 0) {
1987 		spdk_dma_free(ctx->super);
1988 		_spdk_bs_free(ctx->bs);
1989 		free(ctx);
1990 		spdk_bs_sequence_finish(seq, -ENOMEM);
1991 		return;
1992 	}
1993 
1994 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1995 	_spdk_bs_load_replay_md(seq, cb_arg);
1996 }
1997 
1998 static void
1999 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2000 {
2001 	struct spdk_bs_load_ctx *ctx = cb_arg;
2002 	uint32_t	crc;
2003 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2004 
2005 	if (ctx->super->version > SPDK_BS_VERSION ||
2006 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2007 		spdk_dma_free(ctx->super);
2008 		_spdk_bs_free(ctx->bs);
2009 		free(ctx);
2010 		spdk_bs_sequence_finish(seq, -EILSEQ);
2011 		return;
2012 	}
2013 
2014 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2015 		   sizeof(ctx->super->signature)) != 0) {
2016 		spdk_dma_free(ctx->super);
2017 		_spdk_bs_free(ctx->bs);
2018 		free(ctx);
2019 		spdk_bs_sequence_finish(seq, -EILSEQ);
2020 		return;
2021 	}
2022 
2023 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2024 	if (crc != ctx->super->crc) {
2025 		spdk_dma_free(ctx->super);
2026 		_spdk_bs_free(ctx->bs);
2027 		free(ctx);
2028 		spdk_bs_sequence_finish(seq, -EILSEQ);
2029 		return;
2030 	}
2031 
2032 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2033 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2034 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2035 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2036 	} else {
2037 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2038 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2039 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2040 		spdk_dma_free(ctx->super);
2041 		_spdk_bs_free(ctx->bs);
2042 		free(ctx);
2043 		spdk_bs_sequence_finish(seq, -ENXIO);
2044 		return;
2045 	}
2046 
2047 	/* Parse the super block */
2048 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2049 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2050 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2051 	ctx->bs->md_start = ctx->super->md_start;
2052 	ctx->bs->md_len = ctx->super->md_len;
2053 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2054 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2055 	ctx->bs->super_blob = ctx->super->super_blob;
2056 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2057 
2058 	if (ctx->super->clean == 1) {
2059 		ctx->super->clean = 0;
2060 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2061 	} else {
2062 		_spdk_bs_recover(seq, ctx);
2063 	}
2064 }
2065 
2066 void
2067 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2068 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2069 {
2070 	struct spdk_blob_store	*bs;
2071 	struct spdk_bs_cpl	cpl;
2072 	spdk_bs_sequence_t	*seq;
2073 	struct spdk_bs_load_ctx *ctx;
2074 	struct spdk_bs_opts	opts = {};
2075 
2076 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2077 
2078 	if (o) {
2079 		opts = *o;
2080 	} else {
2081 		spdk_bs_opts_init(&opts);
2082 	}
2083 
2084 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2085 		cb_fn(cb_arg, NULL, -EINVAL);
2086 		return;
2087 	}
2088 
2089 	bs = _spdk_bs_alloc(dev, &opts);
2090 	if (!bs) {
2091 		cb_fn(cb_arg, NULL, -ENOMEM);
2092 		return;
2093 	}
2094 
2095 	ctx = calloc(1, sizeof(*ctx));
2096 	if (!ctx) {
2097 		_spdk_bs_free(bs);
2098 		cb_fn(cb_arg, NULL, -ENOMEM);
2099 		return;
2100 	}
2101 
2102 	ctx->bs = bs;
2103 
2104 	/* Allocate memory for the super block */
2105 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2106 	if (!ctx->super) {
2107 		free(ctx);
2108 		_spdk_bs_free(bs);
2109 		return;
2110 	}
2111 
2112 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2113 	cpl.u.bs_handle.cb_fn = cb_fn;
2114 	cpl.u.bs_handle.cb_arg = cb_arg;
2115 	cpl.u.bs_handle.bs = bs;
2116 
2117 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2118 	if (!seq) {
2119 		spdk_dma_free(ctx->super);
2120 		free(ctx);
2121 		_spdk_bs_free(bs);
2122 		cb_fn(cb_arg, NULL, -ENOMEM);
2123 		return;
2124 	}
2125 
2126 	/* Read the super block */
2127 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2128 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2129 			      _spdk_bs_load_super_cpl, ctx);
2130 }
2131 
2132 /* END spdk_bs_load */
2133 
2134 /* START spdk_bs_init */
2135 
2136 struct spdk_bs_init_ctx {
2137 	struct spdk_blob_store		*bs;
2138 	struct spdk_bs_super_block	*super;
2139 };
2140 
2141 static void
2142 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2143 {
2144 	struct spdk_bs_init_ctx *ctx = cb_arg;
2145 
2146 	spdk_dma_free(ctx->super);
2147 	free(ctx);
2148 
2149 	spdk_bs_sequence_finish(seq, bserrno);
2150 }
2151 
2152 static void
2153 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2154 {
2155 	struct spdk_bs_init_ctx *ctx = cb_arg;
2156 
2157 	/* Write super block */
2158 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2159 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2160 			       _spdk_bs_init_persist_super_cpl, ctx);
2161 }
2162 
2163 void
2164 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2165 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2166 {
2167 	struct spdk_bs_init_ctx *ctx;
2168 	struct spdk_blob_store	*bs;
2169 	struct spdk_bs_cpl	cpl;
2170 	spdk_bs_sequence_t	*seq;
2171 	spdk_bs_batch_t		*batch;
2172 	uint64_t		num_md_lba;
2173 	uint64_t		num_md_pages;
2174 	uint64_t		num_md_clusters;
2175 	uint32_t		i;
2176 	struct spdk_bs_opts	opts = {};
2177 	int			rc;
2178 
2179 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2180 
2181 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2182 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2183 			    dev->blocklen);
2184 		dev->destroy(dev);
2185 		cb_fn(cb_arg, NULL, -EINVAL);
2186 		return;
2187 	}
2188 
2189 	if (o) {
2190 		opts = *o;
2191 	} else {
2192 		spdk_bs_opts_init(&opts);
2193 	}
2194 
2195 	if (_spdk_bs_opts_verify(&opts) != 0) {
2196 		dev->destroy(dev);
2197 		cb_fn(cb_arg, NULL, -EINVAL);
2198 		return;
2199 	}
2200 
2201 	bs = _spdk_bs_alloc(dev, &opts);
2202 	if (!bs) {
2203 		dev->destroy(dev);
2204 		cb_fn(cb_arg, NULL, -ENOMEM);
2205 		return;
2206 	}
2207 
2208 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2209 		/* By default, allocate 1 page per cluster.
2210 		 * Technically, this over-allocates metadata
2211 		 * because more metadata will reduce the number
2212 		 * of usable clusters. This can be addressed with
2213 		 * more complex math in the future.
2214 		 */
2215 		bs->md_len = bs->total_clusters;
2216 	} else {
2217 		bs->md_len = opts.num_md_pages;
2218 	}
2219 
2220 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2221 	if (rc < 0) {
2222 		_spdk_bs_free(bs);
2223 		cb_fn(cb_arg, NULL, -ENOMEM);
2224 		return;
2225 	}
2226 
2227 	ctx = calloc(1, sizeof(*ctx));
2228 	if (!ctx) {
2229 		_spdk_bs_free(bs);
2230 		cb_fn(cb_arg, NULL, -ENOMEM);
2231 		return;
2232 	}
2233 
2234 	ctx->bs = bs;
2235 
2236 	/* Allocate memory for the super block */
2237 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2238 	if (!ctx->super) {
2239 		free(ctx);
2240 		_spdk_bs_free(bs);
2241 		return;
2242 	}
2243 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2244 	       sizeof(ctx->super->signature));
2245 	ctx->super->version = SPDK_BS_VERSION;
2246 	ctx->super->length = sizeof(*ctx->super);
2247 	ctx->super->super_blob = bs->super_blob;
2248 	ctx->super->clean = 0;
2249 	ctx->super->cluster_size = bs->cluster_sz;
2250 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2251 
2252 	/* Calculate how many pages the metadata consumes at the front
2253 	 * of the disk.
2254 	 */
2255 
2256 	/* The super block uses 1 page */
2257 	num_md_pages = 1;
2258 
2259 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2260 	 * up to the nearest page, plus a header.
2261 	 */
2262 	ctx->super->used_page_mask_start = num_md_pages;
2263 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2264 					 divide_round_up(bs->md_len, 8),
2265 					 SPDK_BS_PAGE_SIZE);
2266 	num_md_pages += ctx->super->used_page_mask_len;
2267 
2268 	/* The used_clusters mask requires 1 bit per cluster, rounded
2269 	 * up to the nearest page, plus a header.
2270 	 */
2271 	ctx->super->used_cluster_mask_start = num_md_pages;
2272 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2273 					    divide_round_up(bs->total_clusters, 8),
2274 					    SPDK_BS_PAGE_SIZE);
2275 	num_md_pages += ctx->super->used_cluster_mask_len;
2276 
2277 	/* The metadata region size was chosen above */
2278 	ctx->super->md_start = bs->md_start = num_md_pages;
2279 	ctx->super->md_len = bs->md_len;
2280 	num_md_pages += bs->md_len;
2281 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2282 
2283 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2284 
2285 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2286 	if (num_md_clusters > bs->total_clusters) {
2287 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2288 			    "please decrease number of pages reserved for metadata "
2289 			    "or increase cluster size.\n");
2290 		spdk_dma_free(ctx->super);
2291 		free(ctx);
2292 		_spdk_bs_free(bs);
2293 		cb_fn(cb_arg, NULL, -ENOMEM);
2294 		return;
2295 	}
2296 	/* Claim all of the clusters used by the metadata */
2297 	for (i = 0; i < num_md_clusters; i++) {
2298 		_spdk_bs_claim_cluster(bs, i);
2299 	}
2300 
2301 	bs->total_data_clusters = bs->num_free_clusters;
2302 
2303 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2304 	cpl.u.bs_handle.cb_fn = cb_fn;
2305 	cpl.u.bs_handle.cb_arg = cb_arg;
2306 	cpl.u.bs_handle.bs = bs;
2307 
2308 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2309 	if (!seq) {
2310 		spdk_dma_free(ctx->super);
2311 		free(ctx);
2312 		_spdk_bs_free(bs);
2313 		cb_fn(cb_arg, NULL, -ENOMEM);
2314 		return;
2315 	}
2316 
2317 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
2318 
2319 	/* Clear metadata space */
2320 	spdk_bs_batch_write_zeroes(batch, 0, num_md_lba);
2321 	/* Trim data clusters */
2322 	spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
2323 
2324 	spdk_bs_batch_close(batch);
2325 }
2326 
2327 /* END spdk_bs_init */
2328 
2329 /* START spdk_bs_destroy */
2330 
2331 static void
2332 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2333 {
2334 	struct spdk_bs_init_ctx *ctx = cb_arg;
2335 	struct spdk_blob_store *bs = ctx->bs;
2336 
2337 	/*
2338 	 * We need to defer calling spdk_bs_call_cpl() until after
2339 	 * dev destruction, so tuck these away for later use.
2340 	 */
2341 	bs->unload_err = bserrno;
2342 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2343 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2344 
2345 	spdk_bs_sequence_finish(seq, bserrno);
2346 
2347 	_spdk_bs_free(bs);
2348 	free(ctx);
2349 }
2350 
2351 void
2352 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
2353 		void *cb_arg)
2354 {
2355 	struct spdk_bs_cpl	cpl;
2356 	spdk_bs_sequence_t	*seq;
2357 	struct spdk_bs_init_ctx *ctx;
2358 
2359 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
2360 
2361 	if (!TAILQ_EMPTY(&bs->blobs)) {
2362 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2363 		cb_fn(cb_arg, -EBUSY);
2364 		return;
2365 	}
2366 
2367 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2368 	cpl.u.bs_basic.cb_fn = cb_fn;
2369 	cpl.u.bs_basic.cb_arg = cb_arg;
2370 
2371 	ctx = calloc(1, sizeof(*ctx));
2372 	if (!ctx) {
2373 		cb_fn(cb_arg, -ENOMEM);
2374 		return;
2375 	}
2376 
2377 	ctx->bs = bs;
2378 
2379 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2380 	if (!seq) {
2381 		free(ctx);
2382 		cb_fn(cb_arg, -ENOMEM);
2383 		return;
2384 	}
2385 
2386 	/* Write zeroes to the super block */
2387 	spdk_bs_sequence_write_zeroes(seq,
2388 				      _spdk_bs_page_to_lba(bs, 0),
2389 				      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
2390 				      _spdk_bs_destroy_trim_cpl, ctx);
2391 }
2392 
2393 /* END spdk_bs_destroy */
2394 
2395 /* START spdk_bs_unload */
2396 
2397 static void
2398 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2399 {
2400 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2401 
2402 	spdk_dma_free(ctx->super);
2403 
2404 	/*
2405 	 * We need to defer calling spdk_bs_call_cpl() until after
2406 	 * dev destuction, so tuck these away for later use.
2407 	 */
2408 	ctx->bs->unload_err = bserrno;
2409 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2410 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2411 
2412 	spdk_bs_sequence_finish(seq, bserrno);
2413 
2414 	_spdk_bs_free(ctx->bs);
2415 	free(ctx);
2416 }
2417 
2418 static void
2419 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2420 {
2421 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2422 
2423 	spdk_dma_free(ctx->mask);
2424 	ctx->super->clean = 1;
2425 
2426 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
2427 }
2428 
2429 static void
2430 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2431 {
2432 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2433 
2434 	spdk_dma_free(ctx->mask);
2435 
2436 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
2437 }
2438 
2439 static void
2440 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2441 {
2442 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
2443 }
2444 
2445 void
2446 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
2447 {
2448 	struct spdk_bs_cpl	cpl;
2449 	spdk_bs_sequence_t	*seq;
2450 	struct spdk_bs_load_ctx *ctx;
2451 
2452 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
2453 
2454 	if (!TAILQ_EMPTY(&bs->blobs)) {
2455 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2456 		cb_fn(cb_arg, -EBUSY);
2457 		return;
2458 	}
2459 
2460 	ctx = calloc(1, sizeof(*ctx));
2461 	if (!ctx) {
2462 		cb_fn(cb_arg, -ENOMEM);
2463 		return;
2464 	}
2465 
2466 	ctx->bs = bs;
2467 
2468 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2469 	if (!ctx->super) {
2470 		free(ctx);
2471 		cb_fn(cb_arg, -ENOMEM);
2472 		return;
2473 	}
2474 
2475 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2476 	cpl.u.bs_basic.cb_fn = cb_fn;
2477 	cpl.u.bs_basic.cb_arg = cb_arg;
2478 
2479 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2480 	if (!seq) {
2481 		spdk_dma_free(ctx->super);
2482 		free(ctx);
2483 		cb_fn(cb_arg, -ENOMEM);
2484 		return;
2485 	}
2486 
2487 	/* Read super block */
2488 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2489 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2490 			      _spdk_bs_unload_read_super_cpl, ctx);
2491 }
2492 
2493 /* END spdk_bs_unload */
2494 
2495 void
2496 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
2497 		  spdk_bs_op_complete cb_fn, void *cb_arg)
2498 {
2499 	bs->super_blob = blobid;
2500 	cb_fn(cb_arg, 0);
2501 }
2502 
2503 void
2504 spdk_bs_get_super(struct spdk_blob_store *bs,
2505 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2506 {
2507 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
2508 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
2509 	} else {
2510 		cb_fn(cb_arg, bs->super_blob, 0);
2511 	}
2512 }
2513 
2514 uint64_t
2515 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
2516 {
2517 	return bs->cluster_sz;
2518 }
2519 
2520 uint64_t
2521 spdk_bs_get_page_size(struct spdk_blob_store *bs)
2522 {
2523 	return SPDK_BS_PAGE_SIZE;
2524 }
2525 
2526 uint64_t
2527 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
2528 {
2529 	return bs->num_free_clusters;
2530 }
2531 
2532 uint64_t
2533 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
2534 {
2535 	return bs->total_data_clusters;
2536 }
2537 
2538 static int
2539 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
2540 {
2541 	bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target);
2542 	if (!bs->md_target.md_channel) {
2543 		SPDK_ERRLOG("Failed to get IO channel.\n");
2544 		return -1;
2545 	}
2546 
2547 	return 0;
2548 }
2549 
2550 static int
2551 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
2552 {
2553 	spdk_put_io_channel(bs->md_target.md_channel);
2554 
2555 	return 0;
2556 }
2557 
2558 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob)
2559 {
2560 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2561 
2562 	assert(blob != NULL);
2563 
2564 	return blob->id;
2565 }
2566 
2567 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob)
2568 {
2569 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2570 
2571 	assert(blob != NULL);
2572 
2573 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
2574 }
2575 
2576 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob)
2577 {
2578 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2579 
2580 	assert(blob != NULL);
2581 
2582 	return blob->active.num_clusters;
2583 }
2584 
2585 /* START spdk_bs_create_blob */
2586 
2587 static void
2588 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2589 {
2590 	struct spdk_blob_data *blob = cb_arg;
2591 
2592 	_spdk_blob_free(blob);
2593 
2594 	spdk_bs_sequence_finish(seq, bserrno);
2595 }
2596 
2597 void spdk_bs_create_blob(struct spdk_blob_store *bs,
2598 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2599 {
2600 	struct spdk_blob_data	*blob;
2601 	uint32_t		page_idx;
2602 	struct spdk_bs_cpl 	cpl;
2603 	spdk_bs_sequence_t	*seq;
2604 	spdk_blob_id		id;
2605 
2606 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
2607 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
2608 		cb_fn(cb_arg, 0, -ENOMEM);
2609 		return;
2610 	}
2611 	spdk_bit_array_set(bs->used_md_pages, page_idx);
2612 
2613 	id = _spdk_bs_page_to_blobid(page_idx);
2614 
2615 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
2616 
2617 	blob = _spdk_blob_alloc(bs, id);
2618 	if (!blob) {
2619 		cb_fn(cb_arg, 0, -ENOMEM);
2620 		return;
2621 	}
2622 
2623 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
2624 	cpl.u.blobid.cb_fn = cb_fn;
2625 	cpl.u.blobid.cb_arg = cb_arg;
2626 	cpl.u.blobid.blobid = blob->id;
2627 
2628 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2629 	if (!seq) {
2630 		_spdk_blob_free(blob);
2631 		cb_fn(cb_arg, 0, -ENOMEM);
2632 		return;
2633 	}
2634 
2635 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
2636 }
2637 
2638 /* END spdk_bs_create_blob */
2639 
2640 /* START spdk_blob_resize */
2641 int
2642 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz)
2643 {
2644 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2645 	int			rc;
2646 
2647 	assert(blob != NULL);
2648 
2649 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
2650 
2651 	if (blob->md_ro) {
2652 		return -EPERM;
2653 	}
2654 
2655 	if (sz == blob->active.num_clusters) {
2656 		return 0;
2657 	}
2658 
2659 	rc = _spdk_resize_blob(blob, sz);
2660 	if (rc < 0) {
2661 		return rc;
2662 	}
2663 
2664 	return 0;
2665 }
2666 
2667 /* END spdk_blob_resize */
2668 
2669 
2670 /* START spdk_bs_delete_blob */
2671 
2672 static void
2673 _spdk_bs_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2674 {
2675 	struct spdk_blob_data *blob = cb_arg;
2676 
2677 	_spdk_blob_free(blob);
2678 
2679 	spdk_bs_sequence_finish(seq, bserrno);
2680 }
2681 
2682 static void
2683 _spdk_bs_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2684 {
2685 	struct spdk_blob_data *blob = cb_arg;
2686 
2687 	/* If the blob have crc error, we just return NULL. */
2688 	if (blob == NULL) {
2689 		spdk_bs_sequence_finish(seq, bserrno);
2690 		return;
2691 	}
2692 	blob->state = SPDK_BLOB_STATE_DIRTY;
2693 	blob->active.num_pages = 0;
2694 	_spdk_resize_blob(blob, 0);
2695 
2696 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_blob_cpl, blob);
2697 }
2698 
2699 void
2700 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2701 		    spdk_blob_op_complete cb_fn, void *cb_arg)
2702 {
2703 	struct spdk_blob_data	*blob;
2704 	struct spdk_bs_cpl	cpl;
2705 	spdk_bs_sequence_t 	*seq;
2706 
2707 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
2708 
2709 	blob = _spdk_blob_lookup(bs, blobid);
2710 	if (blob) {
2711 		assert(blob->open_ref > 0);
2712 		cb_fn(cb_arg, -EINVAL);
2713 		return;
2714 	}
2715 
2716 	blob = _spdk_blob_alloc(bs, blobid);
2717 	if (!blob) {
2718 		cb_fn(cb_arg, -ENOMEM);
2719 		return;
2720 	}
2721 
2722 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2723 	cpl.u.blob_basic.cb_fn = cb_fn;
2724 	cpl.u.blob_basic.cb_arg = cb_arg;
2725 
2726 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2727 	if (!seq) {
2728 		_spdk_blob_free(blob);
2729 		cb_fn(cb_arg, -ENOMEM);
2730 		return;
2731 	}
2732 
2733 	_spdk_blob_load(seq, blob, _spdk_bs_delete_open_cpl, blob);
2734 }
2735 
2736 /* END spdk_bs_delete_blob */
2737 
2738 /* START spdk_bs_open_blob */
2739 
2740 static void
2741 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2742 {
2743 	struct spdk_blob_data *blob = cb_arg;
2744 
2745 	/* If the blob have crc error, we just return NULL. */
2746 	if (blob == NULL) {
2747 		seq->cpl.u.blob_handle.blob = NULL;
2748 		spdk_bs_sequence_finish(seq, bserrno);
2749 		return;
2750 	}
2751 
2752 	blob->open_ref++;
2753 
2754 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
2755 
2756 	spdk_bs_sequence_finish(seq, bserrno);
2757 }
2758 
2759 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2760 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2761 {
2762 	struct spdk_blob_data		*blob;
2763 	struct spdk_bs_cpl		cpl;
2764 	spdk_bs_sequence_t		*seq;
2765 	uint32_t			page_num;
2766 
2767 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
2768 
2769 	blob = _spdk_blob_lookup(bs, blobid);
2770 	if (blob) {
2771 		blob->open_ref++;
2772 		cb_fn(cb_arg, __data_to_blob(blob), 0);
2773 		return;
2774 	}
2775 
2776 	page_num = _spdk_bs_blobid_to_page(blobid);
2777 	if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) {
2778 		/* Invalid blobid */
2779 		cb_fn(cb_arg, NULL, -ENOENT);
2780 		return;
2781 	}
2782 
2783 	blob = _spdk_blob_alloc(bs, blobid);
2784 	if (!blob) {
2785 		cb_fn(cb_arg, NULL, -ENOMEM);
2786 		return;
2787 	}
2788 
2789 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2790 	cpl.u.blob_handle.cb_fn = cb_fn;
2791 	cpl.u.blob_handle.cb_arg = cb_arg;
2792 	cpl.u.blob_handle.blob = __data_to_blob(blob);
2793 
2794 	seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl);
2795 	if (!seq) {
2796 		_spdk_blob_free(blob);
2797 		cb_fn(cb_arg, NULL, -ENOMEM);
2798 		return;
2799 	}
2800 
2801 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
2802 }
2803 
2804 /* END spdk_bs_open_blob */
2805 
2806 /* START spdk_blob_sync_md */
2807 
2808 static void
2809 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2810 {
2811 	spdk_bs_sequence_finish(seq, bserrno);
2812 }
2813 
2814 void
2815 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg)
2816 {
2817 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2818 	struct spdk_bs_cpl	cpl;
2819 	spdk_bs_sequence_t	*seq;
2820 
2821 	assert(blob != NULL);
2822 
2823 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
2824 
2825 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2826 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2827 
2828 	if (blob->md_ro) {
2829 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
2830 		return;
2831 	}
2832 
2833 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2834 		cb_fn(cb_arg, 0);
2835 		return;
2836 	}
2837 
2838 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2839 	cpl.u.blob_basic.cb_fn = cb_fn;
2840 	cpl.u.blob_basic.cb_arg = cb_arg;
2841 
2842 	seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl);
2843 	if (!seq) {
2844 		cb_fn(cb_arg, -ENOMEM);
2845 		return;
2846 	}
2847 
2848 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
2849 }
2850 
2851 /* END spdk_blob_sync_md */
2852 
2853 /* START spdk_blob_close */
2854 
2855 static void
2856 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2857 {
2858 	struct spdk_blob_data **blob = cb_arg;
2859 
2860 	if ((*blob)->open_ref == 0) {
2861 		TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link);
2862 		_spdk_blob_free((*blob));
2863 	}
2864 
2865 	*blob = NULL;
2866 
2867 	spdk_bs_sequence_finish(seq, bserrno);
2868 }
2869 
2870 void spdk_blob_close(struct spdk_blob **b, spdk_blob_op_complete cb_fn, void *cb_arg)
2871 {
2872 	struct spdk_bs_cpl	cpl;
2873 	struct spdk_blob_data	*blob;
2874 	spdk_bs_sequence_t	*seq;
2875 
2876 	assert(b != NULL);
2877 	blob = __blob_to_data(*b);
2878 	assert(blob != NULL);
2879 
2880 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
2881 
2882 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2883 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2884 
2885 	if (blob->open_ref == 0) {
2886 		cb_fn(cb_arg, -EBADF);
2887 		return;
2888 	}
2889 
2890 	blob->open_ref--;
2891 
2892 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2893 	cpl.u.blob_basic.cb_fn = cb_fn;
2894 	cpl.u.blob_basic.cb_arg = cb_arg;
2895 
2896 	seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl);
2897 	if (!seq) {
2898 		cb_fn(cb_arg, -ENOMEM);
2899 		return;
2900 	}
2901 
2902 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2903 		_spdk_blob_close_cpl(seq, b, 0);
2904 		return;
2905 	}
2906 
2907 	/* Sync metadata */
2908 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b);
2909 }
2910 
2911 /* END spdk_blob_close */
2912 
2913 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
2914 {
2915 	return spdk_get_io_channel(&bs->io_target);
2916 }
2917 
2918 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
2919 {
2920 	spdk_put_io_channel(channel);
2921 }
2922 
2923 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2924 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
2925 {
2926 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
2927 				     SPDK_BLOB_UNMAP);
2928 }
2929 
2930 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2931 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
2932 {
2933 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
2934 				     SPDK_BLOB_WRITE_ZEROES);
2935 }
2936 
2937 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2938 			   void *payload, uint64_t offset, uint64_t length,
2939 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2940 {
2941 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
2942 				     SPDK_BLOB_WRITE);
2943 }
2944 
2945 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2946 			  void *payload, uint64_t offset, uint64_t length,
2947 			  spdk_blob_op_complete cb_fn, void *cb_arg)
2948 {
2949 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
2950 				     SPDK_BLOB_READ);
2951 }
2952 
2953 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2954 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2955 			    spdk_blob_op_complete cb_fn, void *cb_arg)
2956 {
2957 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
2958 }
2959 
2960 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
2961 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
2962 			   spdk_blob_op_complete cb_fn, void *cb_arg)
2963 {
2964 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
2965 }
2966 
2967 struct spdk_bs_iter_ctx {
2968 	int64_t page_num;
2969 	struct spdk_blob_store *bs;
2970 
2971 	spdk_blob_op_with_handle_complete cb_fn;
2972 	void *cb_arg;
2973 };
2974 
2975 static void
2976 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
2977 {
2978 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2979 	struct spdk_bs_iter_ctx *ctx = cb_arg;
2980 	struct spdk_blob_store *bs = ctx->bs;
2981 	spdk_blob_id id;
2982 
2983 	if (bserrno == 0) {
2984 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
2985 		free(ctx);
2986 		return;
2987 	}
2988 
2989 	ctx->page_num++;
2990 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num);
2991 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
2992 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
2993 		free(ctx);
2994 		return;
2995 	}
2996 
2997 	id = _spdk_bs_page_to_blobid(ctx->page_num);
2998 
2999 	blob = _spdk_blob_lookup(bs, id);
3000 	if (blob) {
3001 		blob->open_ref++;
3002 		ctx->cb_fn(ctx->cb_arg, _blob, 0);
3003 		free(ctx);
3004 		return;
3005 	}
3006 
3007 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
3008 }
3009 
3010 void
3011 spdk_bs_iter_first(struct spdk_blob_store *bs,
3012 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3013 {
3014 	struct spdk_bs_iter_ctx *ctx;
3015 
3016 	ctx = calloc(1, sizeof(*ctx));
3017 	if (!ctx) {
3018 		cb_fn(cb_arg, NULL, -ENOMEM);
3019 		return;
3020 	}
3021 
3022 	ctx->page_num = -1;
3023 	ctx->bs = bs;
3024 	ctx->cb_fn = cb_fn;
3025 	ctx->cb_arg = cb_arg;
3026 
3027 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3028 }
3029 
3030 static void
3031 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
3032 {
3033 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3034 
3035 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3036 }
3037 
3038 void
3039 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b,
3040 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3041 {
3042 	struct spdk_bs_iter_ctx *ctx;
3043 	struct spdk_blob_data	*blob;
3044 
3045 	assert(b != NULL);
3046 	blob = __blob_to_data(*b);
3047 	assert(blob != NULL);
3048 
3049 	ctx = calloc(1, sizeof(*ctx));
3050 	if (!ctx) {
3051 		cb_fn(cb_arg, NULL, -ENOMEM);
3052 		return;
3053 	}
3054 
3055 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
3056 	ctx->bs = bs;
3057 	ctx->cb_fn = cb_fn;
3058 	ctx->cb_arg = cb_arg;
3059 
3060 	/* Close the existing blob */
3061 	spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx);
3062 }
3063 
3064 int
3065 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value,
3066 		    uint16_t value_len)
3067 {
3068 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3069 	struct spdk_xattr 	*xattr;
3070 
3071 	assert(blob != NULL);
3072 
3073 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3074 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3075 
3076 	if (blob->md_ro) {
3077 		return -EPERM;
3078 	}
3079 
3080 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3081 		if (!strcmp(name, xattr->name)) {
3082 			free(xattr->value);
3083 			xattr->value_len = value_len;
3084 			xattr->value = malloc(value_len);
3085 			memcpy(xattr->value, value, value_len);
3086 
3087 			blob->state = SPDK_BLOB_STATE_DIRTY;
3088 
3089 			return 0;
3090 		}
3091 	}
3092 
3093 	xattr = calloc(1, sizeof(*xattr));
3094 	if (!xattr) {
3095 		return -1;
3096 	}
3097 	xattr->name = strdup(name);
3098 	xattr->value_len = value_len;
3099 	xattr->value = malloc(value_len);
3100 	memcpy(xattr->value, value, value_len);
3101 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
3102 
3103 	blob->state = SPDK_BLOB_STATE_DIRTY;
3104 
3105 	return 0;
3106 }
3107 
3108 int
3109 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name)
3110 {
3111 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3112 	struct spdk_xattr	*xattr;
3113 
3114 	assert(blob != NULL);
3115 
3116 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3117 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3118 
3119 	if (blob->md_ro) {
3120 		return -EPERM;
3121 	}
3122 
3123 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3124 		if (!strcmp(name, xattr->name)) {
3125 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
3126 			free(xattr->value);
3127 			free(xattr->name);
3128 			free(xattr);
3129 
3130 			blob->state = SPDK_BLOB_STATE_DIRTY;
3131 
3132 			return 0;
3133 		}
3134 	}
3135 
3136 	return -ENOENT;
3137 }
3138 
3139 int
3140 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name,
3141 			  const void **value, size_t *value_len)
3142 {
3143 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3144 	struct spdk_xattr	*xattr;
3145 
3146 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3147 		if (!strcmp(name, xattr->name)) {
3148 			*value = xattr->value;
3149 			*value_len = xattr->value_len;
3150 			return 0;
3151 		}
3152 	}
3153 
3154 	return -ENOENT;
3155 }
3156 
3157 struct spdk_xattr_names {
3158 	uint32_t	count;
3159 	const char	*names[0];
3160 };
3161 
3162 int
3163 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names)
3164 {
3165 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3166 	struct spdk_xattr	*xattr;
3167 	int			count = 0;
3168 
3169 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3170 		count++;
3171 	}
3172 
3173 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
3174 	if (*names == NULL) {
3175 		return -ENOMEM;
3176 	}
3177 
3178 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3179 		(*names)->names[(*names)->count++] = xattr->name;
3180 	}
3181 
3182 	return 0;
3183 }
3184 
3185 uint32_t
3186 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
3187 {
3188 	assert(names != NULL);
3189 
3190 	return names->count;
3191 }
3192 
3193 const char *
3194 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
3195 {
3196 	if (index >= names->count) {
3197 		return NULL;
3198 	}
3199 
3200 	return names->names[index];
3201 }
3202 
3203 void
3204 spdk_xattr_names_free(struct spdk_xattr_names *names)
3205 {
3206 	free(names);
3207 }
3208 
3209 struct spdk_bs_type
3210 spdk_bs_get_bstype(struct spdk_blob_store *bs)
3211 {
3212 	return bs->bstype;
3213 }
3214 
3215 void
3216 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
3217 {
3218 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
3219 }
3220 
3221 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
3222