xref: /spdk/lib/blob/blobstore.c (revision 891c12a63ced31644a44a6a21f8d4437690c9840)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
53 
54 static inline size_t
55 divide_round_up(size_t num, size_t divisor)
56 {
57 	return (num + divisor - 1) / divisor;
58 }
59 
60 static void
61 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
62 {
63 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
64 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
65 	assert(bs->num_free_clusters > 0);
66 
67 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
68 
69 	spdk_bit_array_set(bs->used_clusters, cluster_num);
70 	bs->num_free_clusters--;
71 }
72 
73 static void
74 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
75 {
76 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
77 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
78 	assert(bs->num_free_clusters < bs->total_clusters);
79 
80 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
81 
82 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
83 	bs->num_free_clusters++;
84 }
85 
86 void
87 spdk_blob_opts_init(struct spdk_blob_opts *opts)
88 {
89 	opts->num_clusters = 0;
90 	opts->xattr_count = 0;
91 	opts->xattr_names = NULL;
92 	opts->xattr_ctx = NULL;
93 	opts->get_xattr_value = NULL;
94 }
95 
96 static struct spdk_blob_data *
97 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
98 {
99 	struct spdk_blob_data *blob;
100 
101 	blob = calloc(1, sizeof(*blob));
102 	if (!blob) {
103 		return NULL;
104 	}
105 
106 	blob->id = id;
107 	blob->bs = bs;
108 
109 	blob->state = SPDK_BLOB_STATE_DIRTY;
110 	blob->active.num_pages = 1;
111 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
112 	if (!blob->active.pages) {
113 		free(blob);
114 		return NULL;
115 	}
116 
117 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
118 
119 	TAILQ_INIT(&blob->xattrs);
120 
121 	return blob;
122 }
123 
124 static void
125 _spdk_blob_free(struct spdk_blob_data *blob)
126 {
127 	struct spdk_xattr 	*xattr, *xattr_tmp;
128 
129 	assert(blob != NULL);
130 
131 	free(blob->active.clusters);
132 	free(blob->clean.clusters);
133 	free(blob->active.pages);
134 	free(blob->clean.pages);
135 
136 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
137 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
138 		free(xattr->name);
139 		free(xattr->value);
140 		free(xattr);
141 	}
142 
143 	free(blob);
144 }
145 
146 static int
147 _spdk_blob_mark_clean(struct spdk_blob_data *blob)
148 {
149 	uint64_t *clusters = NULL;
150 	uint32_t *pages = NULL;
151 
152 	assert(blob != NULL);
153 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
154 	       blob->state == SPDK_BLOB_STATE_SYNCING);
155 
156 	if (blob->active.num_clusters) {
157 		assert(blob->active.clusters);
158 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
159 		if (!clusters) {
160 			return -1;
161 		}
162 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
163 	}
164 
165 	if (blob->active.num_pages) {
166 		assert(blob->active.pages);
167 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
168 		if (!pages) {
169 			free(clusters);
170 			return -1;
171 		}
172 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
173 	}
174 
175 	free(blob->clean.clusters);
176 	free(blob->clean.pages);
177 
178 	blob->clean.num_clusters = blob->active.num_clusters;
179 	blob->clean.clusters = blob->active.clusters;
180 	blob->clean.num_pages = blob->active.num_pages;
181 	blob->clean.pages = blob->active.pages;
182 
183 	blob->active.clusters = clusters;
184 	blob->active.pages = pages;
185 
186 	blob->state = SPDK_BLOB_STATE_CLEAN;
187 
188 	return 0;
189 }
190 
191 static int
192 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob)
193 {
194 	struct spdk_blob_md_descriptor *desc;
195 	size_t	cur_desc = 0;
196 	void *tmp;
197 
198 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
199 	while (cur_desc < sizeof(page->descriptors)) {
200 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
201 			if (desc->length == 0) {
202 				/* If padding and length are 0, this terminates the page */
203 				break;
204 			}
205 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
206 			struct spdk_blob_md_descriptor_flags	*desc_flags;
207 
208 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
209 
210 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
211 				return -EINVAL;
212 			}
213 
214 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
215 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
216 				return -EINVAL;
217 			}
218 
219 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
220 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
221 				blob->data_ro = true;
222 				blob->md_ro = true;
223 			}
224 
225 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
226 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
227 				blob->md_ro = true;
228 			}
229 
230 			if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) {
231 				blob->data_ro = true;
232 				blob->md_ro = true;
233 			}
234 
235 			blob->invalid_flags = desc_flags->invalid_flags;
236 			blob->data_ro_flags = desc_flags->data_ro_flags;
237 			blob->md_ro_flags = desc_flags->md_ro_flags;
238 
239 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
240 			struct spdk_blob_md_descriptor_extent	*desc_extent;
241 			unsigned int				i, j;
242 			unsigned int				cluster_count = blob->active.num_clusters;
243 
244 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
245 
246 			if (desc_extent->length == 0 ||
247 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
248 				return -EINVAL;
249 			}
250 
251 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
252 				for (j = 0; j < desc_extent->extents[i].length; j++) {
253 					if (!spdk_bit_array_get(blob->bs->used_clusters,
254 								desc_extent->extents[i].cluster_idx + j)) {
255 						return -EINVAL;
256 					}
257 					cluster_count++;
258 				}
259 			}
260 
261 			if (cluster_count == 0) {
262 				return -EINVAL;
263 			}
264 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
265 			if (tmp == NULL) {
266 				return -ENOMEM;
267 			}
268 			blob->active.clusters = tmp;
269 			blob->active.cluster_array_size = cluster_count;
270 
271 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
272 				for (j = 0; j < desc_extent->extents[i].length; j++) {
273 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
274 							desc_extent->extents[i].cluster_idx + j);
275 				}
276 			}
277 
278 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
279 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
280 			struct spdk_xattr 			*xattr;
281 
282 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
283 
284 			if (desc_xattr->length != sizeof(desc_xattr->name_length) +
285 			    sizeof(desc_xattr->value_length) +
286 			    desc_xattr->name_length + desc_xattr->value_length) {
287 				return -EINVAL;
288 			}
289 
290 			xattr = calloc(1, sizeof(*xattr));
291 			if (xattr == NULL) {
292 				return -ENOMEM;
293 			}
294 
295 			xattr->name = malloc(desc_xattr->name_length + 1);
296 			if (xattr->name == NULL) {
297 				free(xattr);
298 				return -ENOMEM;
299 			}
300 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
301 			xattr->name[desc_xattr->name_length] = '\0';
302 
303 			xattr->value = malloc(desc_xattr->value_length);
304 			if (xattr->value == NULL) {
305 				free(xattr->name);
306 				free(xattr);
307 				return -ENOMEM;
308 			}
309 			xattr->value_len = desc_xattr->value_length;
310 			memcpy(xattr->value,
311 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
312 			       desc_xattr->value_length);
313 
314 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
315 		} else {
316 			/* Unrecognized descriptor type.  Do not fail - just continue to the
317 			 *  next descriptor.  If this descriptor is associated with some feature
318 			 *  defined in a newer version of blobstore, that version of blobstore
319 			 *  should create and set an associated feature flag to specify if this
320 			 *  blob can be loaded or not.
321 			 */
322 		}
323 
324 		/* Advance to the next descriptor */
325 		cur_desc += sizeof(*desc) + desc->length;
326 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
327 			break;
328 		}
329 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
330 	}
331 
332 	return 0;
333 }
334 
335 static int
336 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
337 		 struct spdk_blob_data *blob)
338 {
339 	const struct spdk_blob_md_page *page;
340 	uint32_t i;
341 	int rc;
342 
343 	assert(page_count > 0);
344 	assert(pages[0].sequence_num == 0);
345 	assert(blob != NULL);
346 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
347 	assert(blob->active.clusters == NULL);
348 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
349 
350 	/* The blobid provided doesn't match what's in the MD, this can
351 	 * happen for example if a bogus blobid is passed in through open.
352 	 */
353 	if (blob->id != pages[0].id) {
354 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
355 			    blob->id, pages[0].id);
356 		return -ENOENT;
357 	}
358 
359 	for (i = 0; i < page_count; i++) {
360 		page = &pages[i];
361 
362 		assert(page->id == blob->id);
363 		assert(page->sequence_num == i);
364 
365 		rc = _spdk_blob_parse_page(page, blob);
366 		if (rc != 0) {
367 			return rc;
368 		}
369 	}
370 
371 	return 0;
372 }
373 
374 static int
375 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob,
376 			      struct spdk_blob_md_page **pages,
377 			      uint32_t *page_count,
378 			      struct spdk_blob_md_page **last_page)
379 {
380 	struct spdk_blob_md_page *page;
381 
382 	assert(pages != NULL);
383 	assert(page_count != NULL);
384 
385 	if (*page_count == 0) {
386 		assert(*pages == NULL);
387 		*page_count = 1;
388 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
389 					 SPDK_BS_PAGE_SIZE,
390 					 NULL);
391 	} else {
392 		assert(*pages != NULL);
393 		(*page_count)++;
394 		*pages = spdk_dma_realloc(*pages,
395 					  SPDK_BS_PAGE_SIZE * (*page_count),
396 					  SPDK_BS_PAGE_SIZE,
397 					  NULL);
398 	}
399 
400 	if (*pages == NULL) {
401 		*page_count = 0;
402 		*last_page = NULL;
403 		return -ENOMEM;
404 	}
405 
406 	page = &(*pages)[*page_count - 1];
407 	memset(page, 0, sizeof(*page));
408 	page->id = blob->id;
409 	page->sequence_num = *page_count - 1;
410 	page->next = SPDK_INVALID_MD_PAGE;
411 	*last_page = page;
412 
413 	return 0;
414 }
415 
416 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
417  * Update required_sz on both success and failure.
418  *
419  */
420 static int
421 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
422 			   uint8_t *buf, size_t buf_sz,
423 			   size_t *required_sz)
424 {
425 	struct spdk_blob_md_descriptor_xattr	*desc;
426 
427 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
428 		       strlen(xattr->name) +
429 		       xattr->value_len;
430 
431 	if (buf_sz < *required_sz) {
432 		return -1;
433 	}
434 
435 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
436 
437 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
438 	desc->length = sizeof(desc->name_length) +
439 		       sizeof(desc->value_length) +
440 		       strlen(xattr->name) +
441 		       xattr->value_len;
442 	desc->name_length = strlen(xattr->name);
443 	desc->value_length = xattr->value_len;
444 
445 	memcpy(desc->name, xattr->name, desc->name_length);
446 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
447 	       xattr->value,
448 	       desc->value_length);
449 
450 	return 0;
451 }
452 
453 static void
454 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob,
455 			    uint64_t start_cluster, uint64_t *next_cluster,
456 			    uint8_t *buf, size_t buf_sz)
457 {
458 	struct spdk_blob_md_descriptor_extent *desc;
459 	size_t cur_sz;
460 	uint64_t i, extent_idx;
461 	uint32_t lba, lba_per_cluster, lba_count;
462 
463 	/* The buffer must have room for at least one extent */
464 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
465 	if (buf_sz < cur_sz) {
466 		*next_cluster = start_cluster;
467 		return;
468 	}
469 
470 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
471 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
472 
473 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
474 
475 	lba = blob->active.clusters[start_cluster];
476 	lba_count = lba_per_cluster;
477 	extent_idx = 0;
478 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
479 		if ((lba + lba_count) == blob->active.clusters[i]) {
480 			lba_count += lba_per_cluster;
481 			continue;
482 		}
483 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
484 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
485 		extent_idx++;
486 
487 		cur_sz += sizeof(desc->extents[extent_idx]);
488 
489 		if (buf_sz < cur_sz) {
490 			/* If we ran out of buffer space, return */
491 			desc->length = sizeof(desc->extents[0]) * extent_idx;
492 			*next_cluster = i;
493 			return;
494 		}
495 
496 		lba = blob->active.clusters[i];
497 		lba_count = lba_per_cluster;
498 	}
499 
500 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
501 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
502 	extent_idx++;
503 
504 	desc->length = sizeof(desc->extents[0]) * extent_idx;
505 	*next_cluster = blob->active.num_clusters;
506 
507 	return;
508 }
509 
510 static void
511 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob,
512 			   uint8_t *buf, size_t *buf_sz)
513 {
514 	struct spdk_blob_md_descriptor_flags *desc;
515 
516 	/*
517 	 * Flags get serialized first, so we should always have room for the flags
518 	 *  descriptor.
519 	 */
520 	assert(*buf_sz >= sizeof(*desc));
521 
522 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
523 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
524 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
525 	desc->invalid_flags = blob->invalid_flags;
526 	desc->data_ro_flags = blob->data_ro_flags;
527 	desc->md_ro_flags = blob->md_ro_flags;
528 
529 	*buf_sz -= sizeof(*desc);
530 }
531 
532 static int
533 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages,
534 		     uint32_t *page_count)
535 {
536 	struct spdk_blob_md_page		*cur_page;
537 	const struct spdk_xattr			*xattr;
538 	int 					rc;
539 	uint8_t					*buf;
540 	size_t					remaining_sz;
541 	uint64_t				last_cluster;
542 
543 	assert(pages != NULL);
544 	assert(page_count != NULL);
545 	assert(blob != NULL);
546 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
547 
548 	*pages = NULL;
549 	*page_count = 0;
550 
551 	/* A blob always has at least 1 page, even if it has no descriptors */
552 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
553 	if (rc < 0) {
554 		return rc;
555 	}
556 
557 	buf = (uint8_t *)cur_page->descriptors;
558 	remaining_sz = sizeof(cur_page->descriptors);
559 
560 	/* Serialize flags */
561 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
562 
563 	/* Serialize xattrs */
564 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
565 		size_t required_sz = 0;
566 		rc = _spdk_blob_serialize_xattr(xattr,
567 						buf, remaining_sz,
568 						&required_sz);
569 		if (rc < 0) {
570 			/* Need to add a new page to the chain */
571 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
572 							   &cur_page);
573 			if (rc < 0) {
574 				spdk_dma_free(*pages);
575 				*pages = NULL;
576 				*page_count = 0;
577 				return rc;
578 			}
579 
580 			buf = (uint8_t *)cur_page->descriptors;
581 			remaining_sz = sizeof(cur_page->descriptors);
582 
583 			/* Try again */
584 			required_sz = 0;
585 			rc = _spdk_blob_serialize_xattr(xattr,
586 							buf, remaining_sz,
587 							&required_sz);
588 
589 			if (rc < 0) {
590 				spdk_dma_free(*pages);
591 				*pages = NULL;
592 				*page_count = 0;
593 				return -1;
594 			}
595 		}
596 
597 		remaining_sz -= required_sz;
598 		buf += required_sz;
599 	}
600 
601 	/* Serialize extents */
602 	last_cluster = 0;
603 	while (last_cluster < blob->active.num_clusters) {
604 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
605 					    buf, remaining_sz);
606 
607 		if (last_cluster == blob->active.num_clusters) {
608 			break;
609 		}
610 
611 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
612 						   &cur_page);
613 		if (rc < 0) {
614 			return rc;
615 		}
616 
617 		buf = (uint8_t *)cur_page->descriptors;
618 		remaining_sz = sizeof(cur_page->descriptors);
619 	}
620 
621 	return 0;
622 }
623 
624 struct spdk_blob_load_ctx {
625 	struct spdk_blob_data 		*blob;
626 
627 	struct spdk_blob_md_page	*pages;
628 	uint32_t			num_pages;
629 
630 	spdk_bs_sequence_cpl		cb_fn;
631 	void				*cb_arg;
632 };
633 
634 static uint32_t
635 _spdk_blob_md_page_calc_crc(void *page)
636 {
637 	uint32_t		crc;
638 
639 	crc = BLOB_CRC32C_INITIAL;
640 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
641 	crc ^= BLOB_CRC32C_INITIAL;
642 
643 	return crc;
644 
645 }
646 
647 static void
648 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
649 {
650 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
651 	struct spdk_blob_data 		*blob = ctx->blob;
652 	struct spdk_blob_md_page	*page;
653 	int				rc;
654 	uint32_t			crc;
655 
656 	page = &ctx->pages[ctx->num_pages - 1];
657 	crc = _spdk_blob_md_page_calc_crc(page);
658 	if (crc != page->crc) {
659 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
660 		_spdk_blob_free(blob);
661 		ctx->cb_fn(seq, NULL, -EINVAL);
662 		spdk_dma_free(ctx->pages);
663 		free(ctx);
664 		return;
665 	}
666 
667 	if (page->next != SPDK_INVALID_MD_PAGE) {
668 		uint32_t next_page = page->next;
669 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
670 
671 
672 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
673 
674 		/* Read the next page */
675 		ctx->num_pages++;
676 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
677 					      sizeof(*page), NULL);
678 		if (ctx->pages == NULL) {
679 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
680 			free(ctx);
681 			return;
682 		}
683 
684 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
685 				      next_lba,
686 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
687 				      _spdk_blob_load_cpl, ctx);
688 		return;
689 	}
690 
691 	/* Parse the pages */
692 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
693 	if (rc) {
694 		_spdk_blob_free(blob);
695 		ctx->cb_fn(seq, NULL, rc);
696 		spdk_dma_free(ctx->pages);
697 		free(ctx);
698 		return;
699 	}
700 
701 	_spdk_blob_mark_clean(blob);
702 
703 	ctx->cb_fn(seq, ctx->cb_arg, rc);
704 
705 	/* Free the memory */
706 	spdk_dma_free(ctx->pages);
707 	free(ctx);
708 }
709 
710 /* Load a blob from disk given a blobid */
711 static void
712 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
713 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
714 {
715 	struct spdk_blob_load_ctx *ctx;
716 	struct spdk_blob_store *bs;
717 	uint32_t page_num;
718 	uint64_t lba;
719 
720 	assert(blob != NULL);
721 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
722 	       blob->state == SPDK_BLOB_STATE_DIRTY);
723 
724 	bs = blob->bs;
725 
726 	ctx = calloc(1, sizeof(*ctx));
727 	if (!ctx) {
728 		cb_fn(seq, cb_arg, -ENOMEM);
729 		return;
730 	}
731 
732 	ctx->blob = blob;
733 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
734 				      SPDK_BS_PAGE_SIZE, NULL);
735 	if (!ctx->pages) {
736 		free(ctx);
737 		cb_fn(seq, cb_arg, -ENOMEM);
738 		return;
739 	}
740 	ctx->num_pages = 1;
741 	ctx->cb_fn = cb_fn;
742 	ctx->cb_arg = cb_arg;
743 
744 	page_num = _spdk_bs_blobid_to_page(blob->id);
745 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
746 
747 	blob->state = SPDK_BLOB_STATE_LOADING;
748 
749 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
750 			      _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
751 			      _spdk_blob_load_cpl, ctx);
752 }
753 
754 struct spdk_blob_persist_ctx {
755 	struct spdk_blob_data 		*blob;
756 
757 	struct spdk_blob_md_page	*pages;
758 
759 	uint64_t			idx;
760 
761 	spdk_bs_sequence_cpl		cb_fn;
762 	void				*cb_arg;
763 };
764 
765 static void
766 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
767 {
768 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
769 	struct spdk_blob_data 		*blob = ctx->blob;
770 
771 	if (bserrno == 0) {
772 		_spdk_blob_mark_clean(blob);
773 	}
774 
775 	/* Call user callback */
776 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
777 
778 	/* Free the memory */
779 	spdk_dma_free(ctx->pages);
780 	free(ctx);
781 }
782 
783 static void
784 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
785 {
786 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
787 	struct spdk_blob_data 		*blob = ctx->blob;
788 	struct spdk_blob_store		*bs = blob->bs;
789 	void				*tmp;
790 	size_t				i;
791 
792 	/* Release all clusters that were truncated */
793 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
794 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
795 
796 		_spdk_bs_release_cluster(bs, cluster_num);
797 	}
798 
799 	if (blob->active.num_clusters == 0) {
800 		free(blob->active.clusters);
801 		blob->active.clusters = NULL;
802 		blob->active.cluster_array_size = 0;
803 	} else {
804 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
805 		assert(tmp != NULL);
806 		blob->active.clusters = tmp;
807 		blob->active.cluster_array_size = blob->active.num_clusters;
808 	}
809 
810 	_spdk_blob_persist_complete(seq, ctx, bserrno);
811 }
812 
813 static void
814 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
815 {
816 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
817 	struct spdk_blob_data 		*blob = ctx->blob;
818 	struct spdk_blob_store		*bs = blob->bs;
819 	spdk_bs_batch_t			*batch;
820 	size_t				i;
821 	uint64_t			lba;
822 	uint32_t			lba_count;
823 
824 	/* Clusters don't move around in blobs. The list shrinks or grows
825 	 * at the end, but no changes ever occur in the middle of the list.
826 	 */
827 
828 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
829 
830 	/* Unmap all clusters that were truncated */
831 	lba = 0;
832 	lba_count = 0;
833 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
834 		uint64_t next_lba = blob->active.clusters[i];
835 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
836 
837 		if ((lba + lba_count) == next_lba) {
838 			/* This cluster is contiguous with the previous one. */
839 			lba_count += next_lba_count;
840 			continue;
841 		}
842 
843 		/* This cluster is not contiguous with the previous one. */
844 
845 		/* If a run of LBAs previously existing, send them
846 		 * as an unmap.
847 		 */
848 		if (lba_count > 0) {
849 			spdk_bs_batch_unmap(batch, lba, lba_count);
850 		}
851 
852 		/* Start building the next batch */
853 		lba = next_lba;
854 		lba_count = next_lba_count;
855 	}
856 
857 	/* If we ended with a contiguous set of LBAs, send the unmap now */
858 	if (lba_count > 0) {
859 		spdk_bs_batch_unmap(batch, lba, lba_count);
860 	}
861 
862 	spdk_bs_batch_close(batch);
863 }
864 
865 static void
866 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
867 {
868 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
869 	struct spdk_blob_data 		*blob = ctx->blob;
870 	struct spdk_blob_store		*bs = blob->bs;
871 	size_t				i;
872 
873 	/* This loop starts at 1 because the first page is special and handled
874 	 * below. The pages (except the first) are never written in place,
875 	 * so any pages in the clean list must be zeroed.
876 	 */
877 	for (i = 1; i < blob->clean.num_pages; i++) {
878 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
879 	}
880 
881 	if (blob->active.num_pages == 0) {
882 		uint32_t page_num;
883 
884 		page_num = _spdk_bs_blobid_to_page(blob->id);
885 		spdk_bit_array_clear(bs->used_md_pages, page_num);
886 	}
887 
888 	/* Move on to unmapping clusters */
889 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
890 }
891 
892 static void
893 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
894 {
895 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
896 	struct spdk_blob_data 		*blob = ctx->blob;
897 	struct spdk_blob_store		*bs = blob->bs;
898 	uint64_t			lba;
899 	uint32_t			lba_count;
900 	spdk_bs_batch_t			*batch;
901 	size_t				i;
902 
903 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
904 
905 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
906 
907 	/* This loop starts at 1 because the first page is special and handled
908 	 * below. The pages (except the first) are never written in place,
909 	 * so any pages in the clean list must be zeroed.
910 	 */
911 	for (i = 1; i < blob->clean.num_pages; i++) {
912 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
913 
914 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
915 	}
916 
917 	/* The first page will only be zeroed if this is a delete. */
918 	if (blob->active.num_pages == 0) {
919 		uint32_t page_num;
920 
921 		/* The first page in the metadata goes where the blobid indicates */
922 		page_num = _spdk_bs_blobid_to_page(blob->id);
923 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
924 
925 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
926 	}
927 
928 	spdk_bs_batch_close(batch);
929 }
930 
931 static void
932 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
933 {
934 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
935 	struct spdk_blob_data		*blob = ctx->blob;
936 	struct spdk_blob_store		*bs = blob->bs;
937 	uint64_t			lba;
938 	uint32_t			lba_count;
939 	struct spdk_blob_md_page	*page;
940 
941 	if (blob->active.num_pages == 0) {
942 		/* Move on to the next step */
943 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
944 		return;
945 	}
946 
947 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
948 
949 	page = &ctx->pages[0];
950 	/* The first page in the metadata goes where the blobid indicates */
951 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
952 
953 	spdk_bs_sequence_write(seq, page, lba, lba_count,
954 			       _spdk_blob_persist_zero_pages, ctx);
955 }
956 
957 static void
958 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
959 {
960 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
961 	struct spdk_blob_data 		*blob = ctx->blob;
962 	struct spdk_blob_store		*bs = blob->bs;
963 	uint64_t 			lba;
964 	uint32_t			lba_count;
965 	struct spdk_blob_md_page	*page;
966 	spdk_bs_batch_t			*batch;
967 	size_t				i;
968 
969 	/* Clusters don't move around in blobs. The list shrinks or grows
970 	 * at the end, but no changes ever occur in the middle of the list.
971 	 */
972 
973 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
974 
975 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
976 
977 	/* This starts at 1. The root page is not written until
978 	 * all of the others are finished
979 	 */
980 	for (i = 1; i < blob->active.num_pages; i++) {
981 		page = &ctx->pages[i];
982 		assert(page->sequence_num == i);
983 
984 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
985 
986 		spdk_bs_batch_write(batch, page, lba, lba_count);
987 	}
988 
989 	spdk_bs_batch_close(batch);
990 }
991 
992 static int
993 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz)
994 {
995 	uint64_t	i;
996 	uint64_t	*tmp;
997 	uint64_t	lfc; /* lowest free cluster */
998 	struct spdk_blob_store *bs;
999 
1000 	bs = blob->bs;
1001 
1002 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
1003 	       blob->state != SPDK_BLOB_STATE_SYNCING);
1004 
1005 	if (blob->active.num_clusters == sz) {
1006 		return 0;
1007 	}
1008 
1009 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1010 		/* If this blob was resized to be larger, then smaller, then
1011 		 * larger without syncing, then the cluster array already
1012 		 * contains spare assigned clusters we can use.
1013 		 */
1014 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
1015 						     sz);
1016 	}
1017 
1018 	blob->state = SPDK_BLOB_STATE_DIRTY;
1019 
1020 	/* Do two passes - one to verify that we can obtain enough clusters
1021 	 * and another to actually claim them.
1022 	 */
1023 
1024 	lfc = 0;
1025 	for (i = blob->active.num_clusters; i < sz; i++) {
1026 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1027 		if (lfc >= bs->total_clusters) {
1028 			/* No more free clusters. Cannot satisfy the request */
1029 			assert(false);
1030 			return -1;
1031 		}
1032 		lfc++;
1033 	}
1034 
1035 	if (sz > blob->active.num_clusters) {
1036 		/* Expand the cluster array if necessary.
1037 		 * We only shrink the array when persisting.
1038 		 */
1039 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1040 		if (sz > 0 && tmp == NULL) {
1041 			assert(false);
1042 			return -1;
1043 		}
1044 		blob->active.clusters = tmp;
1045 		blob->active.cluster_array_size = sz;
1046 	}
1047 
1048 	lfc = 0;
1049 	for (i = blob->active.num_clusters; i < sz; i++) {
1050 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1051 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
1052 		_spdk_bs_claim_cluster(bs, lfc);
1053 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
1054 		lfc++;
1055 	}
1056 
1057 	blob->active.num_clusters = sz;
1058 
1059 	return 0;
1060 }
1061 
1062 /* Write a blob to disk */
1063 static void
1064 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
1065 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1066 {
1067 	struct spdk_blob_persist_ctx *ctx;
1068 	int rc;
1069 	uint64_t i;
1070 	uint32_t page_num;
1071 	struct spdk_blob_store *bs;
1072 
1073 	assert(blob != NULL);
1074 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
1075 	       blob->state == SPDK_BLOB_STATE_DIRTY);
1076 
1077 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1078 		cb_fn(seq, cb_arg, 0);
1079 		return;
1080 	}
1081 
1082 	bs = blob->bs;
1083 
1084 	ctx = calloc(1, sizeof(*ctx));
1085 	if (!ctx) {
1086 		cb_fn(seq, cb_arg, -ENOMEM);
1087 		return;
1088 	}
1089 	ctx->blob = blob;
1090 	ctx->cb_fn = cb_fn;
1091 	ctx->cb_arg = cb_arg;
1092 
1093 	blob->state = SPDK_BLOB_STATE_SYNCING;
1094 
1095 	if (blob->active.num_pages == 0) {
1096 		/* This is the signal that the blob should be deleted.
1097 		 * Immediately jump to the clean up routine. */
1098 		assert(blob->clean.num_pages > 0);
1099 		ctx->idx = blob->clean.num_pages - 1;
1100 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1101 		return;
1102 
1103 	}
1104 
1105 	/* Generate the new metadata */
1106 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1107 	if (rc < 0) {
1108 		free(ctx);
1109 		cb_fn(seq, cb_arg, rc);
1110 		return;
1111 	}
1112 
1113 	assert(blob->active.num_pages >= 1);
1114 
1115 	/* Resize the cache of page indices */
1116 	blob->active.pages = realloc(blob->active.pages,
1117 				     blob->active.num_pages * sizeof(*blob->active.pages));
1118 	if (!blob->active.pages) {
1119 		free(ctx);
1120 		cb_fn(seq, cb_arg, -ENOMEM);
1121 		return;
1122 	}
1123 
1124 	/* Assign this metadata to pages. This requires two passes -
1125 	 * one to verify that there are enough pages and a second
1126 	 * to actually claim them. */
1127 	page_num = 0;
1128 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1129 	for (i = 1; i < blob->active.num_pages; i++) {
1130 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1131 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1132 			spdk_dma_free(ctx->pages);
1133 			free(ctx);
1134 			blob->state = SPDK_BLOB_STATE_DIRTY;
1135 			cb_fn(seq, cb_arg, -ENOMEM);
1136 			return;
1137 		}
1138 		page_num++;
1139 	}
1140 
1141 	page_num = 0;
1142 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1143 	for (i = 1; i < blob->active.num_pages; i++) {
1144 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1145 		ctx->pages[i - 1].next = page_num;
1146 		/* Now that previous metadata page is complete, calculate the crc for it. */
1147 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1148 		blob->active.pages[i] = page_num;
1149 		spdk_bit_array_set(bs->used_md_pages, page_num);
1150 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1151 		page_num++;
1152 	}
1153 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1154 	/* Start writing the metadata from last page to first */
1155 	ctx->idx = blob->active.num_pages - 1;
1156 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1157 }
1158 
1159 static void
1160 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1161 			     void *payload, uint64_t offset, uint64_t length,
1162 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1163 {
1164 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1165 	spdk_bs_batch_t			*batch;
1166 	struct spdk_bs_cpl		cpl;
1167 	uint64_t			lba;
1168 	uint32_t			lba_count;
1169 	uint8_t				*buf;
1170 	uint64_t			page;
1171 
1172 	assert(blob != NULL);
1173 
1174 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1175 		cb_fn(cb_arg, -EPERM);
1176 		return;
1177 	}
1178 
1179 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1180 		cb_fn(cb_arg, -EINVAL);
1181 		return;
1182 	}
1183 
1184 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1185 	cpl.u.blob_basic.cb_fn = cb_fn;
1186 	cpl.u.blob_basic.cb_arg = cb_arg;
1187 
1188 	batch = spdk_bs_batch_open(_channel, &cpl);
1189 	if (!batch) {
1190 		cb_fn(cb_arg, -ENOMEM);
1191 		return;
1192 	}
1193 
1194 	length = _spdk_bs_page_to_lba(blob->bs, length);
1195 	page = offset;
1196 	buf = payload;
1197 	while (length > 0) {
1198 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1199 		lba_count = spdk_min(length,
1200 				     _spdk_bs_page_to_lba(blob->bs,
1201 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1202 
1203 		switch (op_type) {
1204 		case SPDK_BLOB_READ:
1205 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1206 			break;
1207 		case SPDK_BLOB_WRITE:
1208 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1209 			break;
1210 		case SPDK_BLOB_UNMAP:
1211 			spdk_bs_batch_unmap(batch, lba, lba_count);
1212 			break;
1213 		case SPDK_BLOB_WRITE_ZEROES:
1214 			spdk_bs_batch_write_zeroes(batch, lba, lba_count);
1215 			break;
1216 		}
1217 
1218 		length -= lba_count;
1219 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1220 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1221 			buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1222 		}
1223 	}
1224 
1225 	spdk_bs_batch_close(batch);
1226 }
1227 
1228 struct rw_iov_ctx {
1229 	struct spdk_blob_data *blob;
1230 	bool read;
1231 	int iovcnt;
1232 	struct iovec *orig_iov;
1233 	uint64_t page_offset;
1234 	uint64_t pages_remaining;
1235 	uint64_t pages_done;
1236 	struct iovec iov[0];
1237 };
1238 
1239 static void
1240 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1241 {
1242 	assert(cb_arg == NULL);
1243 	spdk_bs_sequence_finish(seq, bserrno);
1244 }
1245 
1246 static void
1247 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1248 {
1249 	struct rw_iov_ctx *ctx = cb_arg;
1250 	struct iovec *iov, *orig_iov;
1251 	int iovcnt;
1252 	size_t orig_iovoff;
1253 	uint64_t lba;
1254 	uint64_t page_count, pages_to_boundary;
1255 	uint32_t lba_count;
1256 	uint64_t byte_count;
1257 
1258 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1259 		free(ctx);
1260 		spdk_bs_sequence_finish(seq, bserrno);
1261 		return;
1262 	}
1263 
1264 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
1265 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1266 	lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
1267 	lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);
1268 
1269 	/*
1270 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1271 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1272 	 *  point to the current position in the I/O sequence.
1273 	 */
1274 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1275 	orig_iov = &ctx->orig_iov[0];
1276 	orig_iovoff = 0;
1277 	while (byte_count > 0) {
1278 		if (byte_count >= orig_iov->iov_len) {
1279 			byte_count -= orig_iov->iov_len;
1280 			orig_iov++;
1281 		} else {
1282 			orig_iovoff = byte_count;
1283 			byte_count = 0;
1284 		}
1285 	}
1286 
1287 	/*
1288 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1289 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1290 	 */
1291 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1292 	iov = &ctx->iov[0];
1293 	iovcnt = 0;
1294 	while (byte_count > 0) {
1295 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1296 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1297 		byte_count -= iov->iov_len;
1298 		orig_iovoff = 0;
1299 		orig_iov++;
1300 		iov++;
1301 		iovcnt++;
1302 	}
1303 
1304 	ctx->page_offset += page_count;
1305 	ctx->pages_done += page_count;
1306 	ctx->pages_remaining -= page_count;
1307 	iov = &ctx->iov[0];
1308 
1309 	if (ctx->read) {
1310 		spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1311 	} else {
1312 		spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1313 	}
1314 }
1315 
1316 static void
1317 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1318 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1319 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1320 {
1321 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1322 	spdk_bs_sequence_t		*seq;
1323 	struct spdk_bs_cpl		cpl;
1324 
1325 	assert(blob != NULL);
1326 
1327 	if (!read && blob->data_ro) {
1328 		cb_fn(cb_arg, -EPERM);
1329 		return;
1330 	}
1331 
1332 	if (length == 0) {
1333 		cb_fn(cb_arg, 0);
1334 		return;
1335 	}
1336 
1337 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1338 		cb_fn(cb_arg, -EINVAL);
1339 		return;
1340 	}
1341 
1342 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1343 	cpl.u.blob_basic.cb_fn = cb_fn;
1344 	cpl.u.blob_basic.cb_arg = cb_arg;
1345 
1346 	/*
1347 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1348 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1349 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1350 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1351 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1352 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1353 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1354 	 *
1355 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1356 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1357 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1358 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1359 	 */
1360 	seq = spdk_bs_sequence_start(_channel, &cpl);
1361 	if (!seq) {
1362 		cb_fn(cb_arg, -ENOMEM);
1363 		return;
1364 	}
1365 
1366 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1367 		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
1368 		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1369 
1370 		if (read) {
1371 			spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1372 		} else {
1373 			spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1374 		}
1375 	} else {
1376 		struct rw_iov_ctx *ctx;
1377 
1378 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1379 		if (ctx == NULL) {
1380 			spdk_bs_sequence_finish(seq, -ENOMEM);
1381 			return;
1382 		}
1383 
1384 		ctx->blob = blob;
1385 		ctx->read = read;
1386 		ctx->orig_iov = iov;
1387 		ctx->iovcnt = iovcnt;
1388 		ctx->page_offset = offset;
1389 		ctx->pages_remaining = length;
1390 		ctx->pages_done = 0;
1391 
1392 		_spdk_rw_iov_split_next(seq, ctx, 0);
1393 	}
1394 }
1395 
1396 static struct spdk_blob_data *
1397 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1398 {
1399 	struct spdk_blob_data *blob;
1400 
1401 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1402 		if (blob->id == blobid) {
1403 			return blob;
1404 		}
1405 	}
1406 
1407 	return NULL;
1408 }
1409 
1410 static int
1411 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1412 {
1413 	struct spdk_blob_store		*bs = io_device;
1414 	struct spdk_bs_channel		*channel = ctx_buf;
1415 	struct spdk_bs_dev		*dev;
1416 	uint32_t			max_ops = bs->max_channel_ops;
1417 	uint32_t			i;
1418 
1419 	dev = bs->dev;
1420 
1421 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1422 	if (!channel->req_mem) {
1423 		return -1;
1424 	}
1425 
1426 	TAILQ_INIT(&channel->reqs);
1427 
1428 	for (i = 0; i < max_ops; i++) {
1429 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1430 	}
1431 
1432 	channel->bs = bs;
1433 	channel->dev = dev;
1434 	channel->dev_channel = dev->create_channel(dev);
1435 
1436 	if (!channel->dev_channel) {
1437 		SPDK_ERRLOG("Failed to create device channel.\n");
1438 		free(channel->req_mem);
1439 		return -1;
1440 	}
1441 
1442 	return 0;
1443 }
1444 
1445 static void
1446 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1447 {
1448 	struct spdk_bs_channel *channel = ctx_buf;
1449 
1450 	free(channel->req_mem);
1451 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1452 }
1453 
1454 static void
1455 _spdk_bs_dev_destroy(void *io_device)
1456 {
1457 	struct spdk_blob_store *bs = io_device;
1458 	struct spdk_blob_data	*blob, *blob_tmp;
1459 
1460 	bs->dev->destroy(bs->dev);
1461 
1462 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1463 		TAILQ_REMOVE(&bs->blobs, blob, link);
1464 		_spdk_blob_free(blob);
1465 	}
1466 
1467 	spdk_bit_array_free(&bs->used_blobids);
1468 	spdk_bit_array_free(&bs->used_md_pages);
1469 	spdk_bit_array_free(&bs->used_clusters);
1470 	/*
1471 	 * If this function is called for any reason except a successful unload,
1472 	 * the unload_cpl type will be NONE and this will be a nop.
1473 	 */
1474 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
1475 
1476 	free(bs);
1477 }
1478 
1479 static void
1480 _spdk_bs_free(struct spdk_blob_store *bs)
1481 {
1482 	spdk_bs_unregister_md_thread(bs);
1483 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
1484 }
1485 
1486 void
1487 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1488 {
1489 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1490 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1491 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1492 	opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS;
1493 	memset(&opts->bstype, 0, sizeof(opts->bstype));
1494 }
1495 
1496 static int
1497 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
1498 {
1499 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
1500 	    opts->max_channel_ops == 0) {
1501 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
1502 		return -1;
1503 	}
1504 
1505 	return 0;
1506 }
1507 
1508 static struct spdk_blob_store *
1509 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1510 {
1511 	struct spdk_blob_store	*bs;
1512 	uint64_t dev_size;
1513 	int rc;
1514 
1515 	dev_size = dev->blocklen * dev->blockcnt;
1516 	if (dev_size < opts->cluster_sz) {
1517 		/* Device size cannot be smaller than cluster size of blobstore */
1518 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size,
1519 			    opts->cluster_sz);
1520 		return NULL;
1521 	}
1522 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
1523 		/* Cluster size cannot be smaller than page size */
1524 		SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n",
1525 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
1526 		return NULL;
1527 	}
1528 	bs = calloc(1, sizeof(struct spdk_blob_store));
1529 	if (!bs) {
1530 		return NULL;
1531 	}
1532 
1533 	TAILQ_INIT(&bs->blobs);
1534 	bs->dev = dev;
1535 
1536 	/*
1537 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1538 	 *  even multiple of the cluster size.
1539 	 */
1540 	bs->cluster_sz = opts->cluster_sz;
1541 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1542 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
1543 	bs->num_free_clusters = bs->total_clusters;
1544 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1545 	if (bs->used_clusters == NULL) {
1546 		free(bs);
1547 		return NULL;
1548 	}
1549 
1550 	bs->max_channel_ops = opts->max_channel_ops;
1551 	bs->super_blob = SPDK_BLOBID_INVALID;
1552 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
1553 
1554 	/* The metadata is assumed to be at least 1 page */
1555 	bs->used_md_pages = spdk_bit_array_create(1);
1556 	bs->used_blobids = spdk_bit_array_create(0);
1557 
1558 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1559 				sizeof(struct spdk_bs_channel));
1560 	rc = spdk_bs_register_md_thread(bs);
1561 	if (rc == -1) {
1562 		spdk_io_device_unregister(bs, NULL);
1563 		spdk_bit_array_free(&bs->used_blobids);
1564 		spdk_bit_array_free(&bs->used_md_pages);
1565 		spdk_bit_array_free(&bs->used_clusters);
1566 		free(bs);
1567 		return NULL;
1568 	}
1569 
1570 	return bs;
1571 }
1572 
1573 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
1574 
1575 struct spdk_bs_load_ctx {
1576 	struct spdk_blob_store		*bs;
1577 	struct spdk_bs_super_block	*super;
1578 
1579 	struct spdk_bs_md_mask		*mask;
1580 	bool				in_page_chain;
1581 	uint32_t			page_index;
1582 	uint32_t			cur_page;
1583 	struct spdk_blob_md_page	*page;
1584 	bool				is_load;
1585 };
1586 
1587 static void
1588 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
1589 {
1590 	assert(bserrno != 0);
1591 
1592 	spdk_dma_free(ctx->super);
1593 	/*
1594 	 * Only free the blobstore when a load fails.  If an unload fails (for some reason)
1595 	 *  we want to keep the blobstore in case the caller wants to try again.
1596 	 */
1597 	if (ctx->is_load) {
1598 		_spdk_bs_free(ctx->bs);
1599 	}
1600 	free(ctx);
1601 	spdk_bs_sequence_finish(seq, bserrno);
1602 }
1603 
1604 static void
1605 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
1606 {
1607 	uint32_t i = 0;
1608 
1609 	while (true) {
1610 		i = spdk_bit_array_find_first_set(array, i);
1611 		if (i >= mask->length) {
1612 			break;
1613 		}
1614 		mask->mask[i / 8] |= 1U << (i % 8);
1615 		i++;
1616 	}
1617 }
1618 
1619 static void
1620 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1621 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1622 {
1623 	/* Update the values in the super block */
1624 	super->super_blob = bs->super_blob;
1625 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
1626 	super->crc = _spdk_blob_md_page_calc_crc(super);
1627 	spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0),
1628 			       _spdk_bs_byte_to_lba(bs, sizeof(*super)),
1629 			       cb_fn, cb_arg);
1630 }
1631 
1632 static void
1633 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1634 {
1635 	struct spdk_bs_load_ctx	*ctx = arg;
1636 	uint64_t	mask_size, lba, lba_count;
1637 
1638 	/* Write out the used clusters mask */
1639 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1640 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1641 	if (!ctx->mask) {
1642 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1643 		return;
1644 	}
1645 
1646 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1647 	ctx->mask->length = ctx->bs->total_clusters;
1648 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1649 
1650 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
1651 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1652 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1653 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1654 }
1655 
1656 static void
1657 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1658 {
1659 	struct spdk_bs_load_ctx	*ctx = arg;
1660 	uint64_t	mask_size, lba, lba_count;
1661 
1662 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1663 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1664 	if (!ctx->mask) {
1665 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1666 		return;
1667 	}
1668 
1669 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1670 	ctx->mask->length = ctx->super->md_len;
1671 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1672 
1673 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
1674 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1675 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1676 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1677 }
1678 
1679 static void
1680 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1681 {
1682 	struct spdk_bs_load_ctx	*ctx = arg;
1683 	uint64_t	mask_size, lba, lba_count;
1684 
1685 	if (ctx->super->used_blobid_mask_len == 0) {
1686 		/*
1687 		 * This is a pre-v3 on-disk format where the blobid mask does not get
1688 		 *  written to disk.
1689 		 */
1690 		cb_fn(seq, arg, 0);
1691 		return;
1692 	}
1693 
1694 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
1695 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1696 	if (!ctx->mask) {
1697 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1698 		return;
1699 	}
1700 
1701 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
1702 	ctx->mask->length = ctx->super->md_len;
1703 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
1704 
1705 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
1706 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
1707 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
1708 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1709 }
1710 
1711 static void
1712 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1713 {
1714 	struct spdk_bs_load_ctx *ctx = cb_arg;
1715 	uint32_t i, j;
1716 	int rc;
1717 
1718 	/* The type must be correct */
1719 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
1720 
1721 	/* The length of the mask (in bits) must not be greater than
1722 	 * the length of the buffer (converted to bits) */
1723 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
1724 
1725 	/* The length of the mask must be exactly equal to the size
1726 	 * (in pages) of the metadata region */
1727 	assert(ctx->mask->length == ctx->super->md_len);
1728 
1729 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length);
1730 	if (rc < 0) {
1731 		spdk_dma_free(ctx->mask);
1732 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1733 		return;
1734 	}
1735 
1736 	for (i = 0; i < ctx->mask->length / 8; i++) {
1737 		uint8_t segment = ctx->mask->mask[i];
1738 		for (j = 0; segment; j++) {
1739 			if (segment & 1U) {
1740 				spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j);
1741 			}
1742 			segment >>= 1U;
1743 		}
1744 	}
1745 
1746 	spdk_dma_free(ctx->super);
1747 	spdk_dma_free(ctx->mask);
1748 	free(ctx);
1749 
1750 	spdk_bs_sequence_finish(seq, bserrno);
1751 }
1752 
1753 static void
1754 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1755 {
1756 	struct spdk_bs_load_ctx *ctx = cb_arg;
1757 	uint64_t		lba, lba_count, mask_size;
1758 	uint32_t		i, j;
1759 	int			rc;
1760 
1761 	/* The type must be correct */
1762 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1763 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1764 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1765 					     struct spdk_blob_md_page) * 8));
1766 	/* The length of the mask must be exactly equal to the total number of clusters */
1767 	assert(ctx->mask->length == ctx->bs->total_clusters);
1768 
1769 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1770 	if (rc < 0) {
1771 		spdk_dma_free(ctx->mask);
1772 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1773 		return;
1774 	}
1775 
1776 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1777 	for (i = 0; i < ctx->mask->length / 8; i++) {
1778 		uint8_t segment = ctx->mask->mask[i];
1779 		for (j = 0; segment && (j < 8); j++) {
1780 			if (segment & 1U) {
1781 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1782 				assert(ctx->bs->num_free_clusters > 0);
1783 				ctx->bs->num_free_clusters--;
1784 			}
1785 			segment >>= 1U;
1786 		}
1787 	}
1788 
1789 	spdk_dma_free(ctx->mask);
1790 
1791 	/* Read the used blobids mask */
1792 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
1793 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1794 	if (!ctx->mask) {
1795 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1796 		return;
1797 	}
1798 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
1799 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
1800 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1801 			      _spdk_bs_load_used_blobids_cpl, ctx);
1802 }
1803 
1804 static void
1805 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1806 {
1807 	struct spdk_bs_load_ctx *ctx = cb_arg;
1808 	uint64_t		lba, lba_count, mask_size;
1809 	uint32_t		i, j;
1810 	int			rc;
1811 
1812 	/* The type must be correct */
1813 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1814 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1815 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
1816 				     8));
1817 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1818 	assert(ctx->mask->length == ctx->super->md_len);
1819 
1820 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1821 	if (rc < 0) {
1822 		spdk_dma_free(ctx->mask);
1823 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1824 		return;
1825 	}
1826 
1827 	for (i = 0; i < ctx->mask->length / 8; i++) {
1828 		uint8_t segment = ctx->mask->mask[i];
1829 		for (j = 0; segment && (j < 8); j++) {
1830 			if (segment & 1U) {
1831 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1832 			}
1833 			segment >>= 1U;
1834 		}
1835 	}
1836 	spdk_dma_free(ctx->mask);
1837 
1838 	/* Read the used clusters mask */
1839 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1840 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1841 	if (!ctx->mask) {
1842 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1843 		return;
1844 	}
1845 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1846 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1847 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1848 			      _spdk_bs_load_used_clusters_cpl, ctx);
1849 }
1850 
1851 static void
1852 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1853 {
1854 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1855 	uint64_t lba, lba_count, mask_size;
1856 
1857 	/* Read the used pages mask */
1858 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1859 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1860 	if (!ctx->mask) {
1861 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1862 		return;
1863 	}
1864 
1865 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1866 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1867 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1868 			      _spdk_bs_load_used_pages_cpl, ctx);
1869 }
1870 
1871 static int
1872 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
1873 {
1874 	struct spdk_blob_md_descriptor *desc;
1875 	size_t	cur_desc = 0;
1876 
1877 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
1878 	while (cur_desc < sizeof(page->descriptors)) {
1879 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
1880 			if (desc->length == 0) {
1881 				/* If padding and length are 0, this terminates the page */
1882 				break;
1883 			}
1884 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
1885 			struct spdk_blob_md_descriptor_extent	*desc_extent;
1886 			unsigned int				i, j;
1887 			unsigned int				cluster_count = 0;
1888 
1889 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
1890 
1891 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
1892 				for (j = 0; j < desc_extent->extents[i].length; j++) {
1893 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
1894 					if (bs->num_free_clusters == 0) {
1895 						return -1;
1896 					}
1897 					bs->num_free_clusters--;
1898 					cluster_count++;
1899 				}
1900 			}
1901 			if (cluster_count == 0) {
1902 				return -1;
1903 			}
1904 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
1905 			/* Skip this item */
1906 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
1907 			/* Skip this item */
1908 		} else {
1909 			/* Error */
1910 			return -1;
1911 		}
1912 		/* Advance to the next descriptor */
1913 		cur_desc += sizeof(*desc) + desc->length;
1914 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
1915 			break;
1916 		}
1917 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
1918 	}
1919 	return 0;
1920 }
1921 
1922 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
1923 {
1924 	uint32_t crc;
1925 
1926 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
1927 	if (crc != ctx->page->crc) {
1928 		return false;
1929 	}
1930 
1931 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
1932 		return false;
1933 	}
1934 	return true;
1935 }
1936 
1937 static void
1938 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
1939 
1940 static void
1941 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1942 {
1943 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1944 
1945 	spdk_dma_free(ctx->mask);
1946 	spdk_dma_free(ctx->super);
1947 	spdk_bs_sequence_finish(seq, bserrno);
1948 	free(ctx);
1949 }
1950 
1951 static void
1952 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1953 {
1954 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1955 
1956 	spdk_dma_free(ctx->mask);
1957 	ctx->mask = NULL;
1958 
1959 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
1960 }
1961 
1962 static void
1963 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1964 {
1965 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1966 
1967 	spdk_dma_free(ctx->mask);
1968 	ctx->mask = NULL;
1969 
1970 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
1971 }
1972 
1973 static void
1974 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1975 {
1976 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
1977 }
1978 
1979 static void
1980 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1981 {
1982 	struct spdk_bs_load_ctx *ctx = cb_arg;
1983 	uint32_t page_num;
1984 
1985 	if (bserrno != 0) {
1986 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
1987 		return;
1988 	}
1989 
1990 	page_num = ctx->cur_page;
1991 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
1992 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
1993 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
1994 			if (ctx->page->sequence_num == 0) {
1995 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
1996 			}
1997 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
1998 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
1999 				return;
2000 			}
2001 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
2002 				ctx->in_page_chain = true;
2003 				ctx->cur_page = ctx->page->next;
2004 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2005 				return;
2006 			}
2007 		}
2008 	}
2009 
2010 	ctx->in_page_chain = false;
2011 
2012 	do {
2013 		ctx->page_index++;
2014 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2015 
2016 	if (ctx->page_index < ctx->super->md_len) {
2017 		ctx->cur_page = ctx->page_index;
2018 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2019 	} else {
2020 		spdk_dma_free(ctx->page);
2021 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
2022 	}
2023 }
2024 
2025 static void
2026 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2027 {
2028 	struct spdk_bs_load_ctx *ctx = cb_arg;
2029 	uint64_t lba;
2030 
2031 	assert(ctx->cur_page < ctx->super->md_len);
2032 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
2033 	spdk_bs_sequence_read(seq, ctx->page, lba,
2034 			      _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
2035 			      _spdk_bs_load_replay_md_cpl, ctx);
2036 }
2037 
2038 static void
2039 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
2040 {
2041 	struct spdk_bs_load_ctx *ctx = cb_arg;
2042 
2043 	ctx->page_index = 0;
2044 	ctx->cur_page = 0;
2045 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
2046 				     SPDK_BS_PAGE_SIZE,
2047 				     NULL);
2048 	if (!ctx->page) {
2049 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2050 		return;
2051 	}
2052 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2053 }
2054 
2055 static void
2056 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2057 {
2058 	struct spdk_bs_load_ctx *ctx = cb_arg;
2059 	int 		rc;
2060 
2061 	if (bserrno != 0) {
2062 		_spdk_bs_load_ctx_fail(seq, ctx, -EIO);
2063 		return;
2064 	}
2065 
2066 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
2067 	if (rc < 0) {
2068 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2069 		return;
2070 	}
2071 
2072 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
2073 	if (rc < 0) {
2074 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2075 		return;
2076 	}
2077 
2078 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2079 	if (rc < 0) {
2080 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2081 		return;
2082 	}
2083 
2084 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2085 	_spdk_bs_load_replay_md(seq, cb_arg);
2086 }
2087 
2088 static void
2089 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2090 {
2091 	struct spdk_bs_load_ctx *ctx = cb_arg;
2092 	uint32_t	crc;
2093 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2094 
2095 	if (ctx->super->version > SPDK_BS_VERSION ||
2096 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2097 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2098 		return;
2099 	}
2100 
2101 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2102 		   sizeof(ctx->super->signature)) != 0) {
2103 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2104 		return;
2105 	}
2106 
2107 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2108 	if (crc != ctx->super->crc) {
2109 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2110 		return;
2111 	}
2112 
2113 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2114 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2115 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2116 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2117 	} else {
2118 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2119 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2120 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2121 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
2122 		return;
2123 	}
2124 
2125 	/* Parse the super block */
2126 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2127 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2128 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2129 	ctx->bs->md_start = ctx->super->md_start;
2130 	ctx->bs->md_len = ctx->super->md_len;
2131 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2132 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2133 	ctx->bs->super_blob = ctx->super->super_blob;
2134 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2135 
2136 	if (ctx->super->clean == 0) {
2137 		_spdk_bs_recover(seq, ctx, 0);
2138 	} else if (ctx->super->used_blobid_mask_len == 0) {
2139 		/*
2140 		 * Metadata is clean, but this is an old metadata format without
2141 		 *  a blobid mask.  Clear the clean bit and then build the masks
2142 		 *  using _spdk_bs_recover.
2143 		 */
2144 		ctx->super->clean = 0;
2145 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx);
2146 	} else {
2147 		ctx->super->clean = 0;
2148 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2149 	}
2150 }
2151 
2152 void
2153 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2154 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2155 {
2156 	struct spdk_blob_store	*bs;
2157 	struct spdk_bs_cpl	cpl;
2158 	spdk_bs_sequence_t	*seq;
2159 	struct spdk_bs_load_ctx *ctx;
2160 	struct spdk_bs_opts	opts = {};
2161 
2162 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2163 
2164 	if (o) {
2165 		opts = *o;
2166 	} else {
2167 		spdk_bs_opts_init(&opts);
2168 	}
2169 
2170 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2171 		cb_fn(cb_arg, NULL, -EINVAL);
2172 		return;
2173 	}
2174 
2175 	bs = _spdk_bs_alloc(dev, &opts);
2176 	if (!bs) {
2177 		cb_fn(cb_arg, NULL, -ENOMEM);
2178 		return;
2179 	}
2180 
2181 	ctx = calloc(1, sizeof(*ctx));
2182 	if (!ctx) {
2183 		_spdk_bs_free(bs);
2184 		cb_fn(cb_arg, NULL, -ENOMEM);
2185 		return;
2186 	}
2187 
2188 	ctx->bs = bs;
2189 	ctx->is_load = true;
2190 
2191 	/* Allocate memory for the super block */
2192 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2193 	if (!ctx->super) {
2194 		free(ctx);
2195 		_spdk_bs_free(bs);
2196 		return;
2197 	}
2198 
2199 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2200 	cpl.u.bs_handle.cb_fn = cb_fn;
2201 	cpl.u.bs_handle.cb_arg = cb_arg;
2202 	cpl.u.bs_handle.bs = bs;
2203 
2204 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2205 	if (!seq) {
2206 		spdk_dma_free(ctx->super);
2207 		free(ctx);
2208 		_spdk_bs_free(bs);
2209 		cb_fn(cb_arg, NULL, -ENOMEM);
2210 		return;
2211 	}
2212 
2213 	/* Read the super block */
2214 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2215 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2216 			      _spdk_bs_load_super_cpl, ctx);
2217 }
2218 
2219 /* END spdk_bs_load */
2220 
2221 /* START spdk_bs_init */
2222 
2223 struct spdk_bs_init_ctx {
2224 	struct spdk_blob_store		*bs;
2225 	struct spdk_bs_super_block	*super;
2226 };
2227 
2228 static void
2229 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2230 {
2231 	struct spdk_bs_init_ctx *ctx = cb_arg;
2232 
2233 	spdk_dma_free(ctx->super);
2234 	free(ctx);
2235 
2236 	spdk_bs_sequence_finish(seq, bserrno);
2237 }
2238 
2239 static void
2240 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2241 {
2242 	struct spdk_bs_init_ctx *ctx = cb_arg;
2243 
2244 	/* Write super block */
2245 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2246 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2247 			       _spdk_bs_init_persist_super_cpl, ctx);
2248 }
2249 
2250 void
2251 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2252 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2253 {
2254 	struct spdk_bs_init_ctx *ctx;
2255 	struct spdk_blob_store	*bs;
2256 	struct spdk_bs_cpl	cpl;
2257 	spdk_bs_sequence_t	*seq;
2258 	spdk_bs_batch_t		*batch;
2259 	uint64_t		num_md_lba;
2260 	uint64_t		num_md_pages;
2261 	uint64_t		num_md_clusters;
2262 	uint32_t		i;
2263 	struct spdk_bs_opts	opts = {};
2264 	int			rc;
2265 
2266 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2267 
2268 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2269 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2270 			    dev->blocklen);
2271 		dev->destroy(dev);
2272 		cb_fn(cb_arg, NULL, -EINVAL);
2273 		return;
2274 	}
2275 
2276 	if (o) {
2277 		opts = *o;
2278 	} else {
2279 		spdk_bs_opts_init(&opts);
2280 	}
2281 
2282 	if (_spdk_bs_opts_verify(&opts) != 0) {
2283 		dev->destroy(dev);
2284 		cb_fn(cb_arg, NULL, -EINVAL);
2285 		return;
2286 	}
2287 
2288 	bs = _spdk_bs_alloc(dev, &opts);
2289 	if (!bs) {
2290 		dev->destroy(dev);
2291 		cb_fn(cb_arg, NULL, -ENOMEM);
2292 		return;
2293 	}
2294 
2295 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2296 		/* By default, allocate 1 page per cluster.
2297 		 * Technically, this over-allocates metadata
2298 		 * because more metadata will reduce the number
2299 		 * of usable clusters. This can be addressed with
2300 		 * more complex math in the future.
2301 		 */
2302 		bs->md_len = bs->total_clusters;
2303 	} else {
2304 		bs->md_len = opts.num_md_pages;
2305 	}
2306 
2307 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2308 	if (rc < 0) {
2309 		_spdk_bs_free(bs);
2310 		cb_fn(cb_arg, NULL, -ENOMEM);
2311 		return;
2312 	}
2313 
2314 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
2315 	if (rc < 0) {
2316 		_spdk_bs_free(bs);
2317 		cb_fn(cb_arg, NULL, -ENOMEM);
2318 		return;
2319 	}
2320 
2321 	ctx = calloc(1, sizeof(*ctx));
2322 	if (!ctx) {
2323 		_spdk_bs_free(bs);
2324 		cb_fn(cb_arg, NULL, -ENOMEM);
2325 		return;
2326 	}
2327 
2328 	ctx->bs = bs;
2329 
2330 	/* Allocate memory for the super block */
2331 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2332 	if (!ctx->super) {
2333 		free(ctx);
2334 		_spdk_bs_free(bs);
2335 		return;
2336 	}
2337 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2338 	       sizeof(ctx->super->signature));
2339 	ctx->super->version = SPDK_BS_VERSION;
2340 	ctx->super->length = sizeof(*ctx->super);
2341 	ctx->super->super_blob = bs->super_blob;
2342 	ctx->super->clean = 0;
2343 	ctx->super->cluster_size = bs->cluster_sz;
2344 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2345 
2346 	/* Calculate how many pages the metadata consumes at the front
2347 	 * of the disk.
2348 	 */
2349 
2350 	/* The super block uses 1 page */
2351 	num_md_pages = 1;
2352 
2353 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2354 	 * up to the nearest page, plus a header.
2355 	 */
2356 	ctx->super->used_page_mask_start = num_md_pages;
2357 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2358 					 divide_round_up(bs->md_len, 8),
2359 					 SPDK_BS_PAGE_SIZE);
2360 	num_md_pages += ctx->super->used_page_mask_len;
2361 
2362 	/* The used_clusters mask requires 1 bit per cluster, rounded
2363 	 * up to the nearest page, plus a header.
2364 	 */
2365 	ctx->super->used_cluster_mask_start = num_md_pages;
2366 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2367 					    divide_round_up(bs->total_clusters, 8),
2368 					    SPDK_BS_PAGE_SIZE);
2369 	num_md_pages += ctx->super->used_cluster_mask_len;
2370 
2371 	/* The used_blobids mask requires 1 bit per metadata page, rounded
2372 	 * up to the nearest page, plus a header.
2373 	 */
2374 	ctx->super->used_blobid_mask_start = num_md_pages;
2375 	ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2376 					   divide_round_up(bs->md_len, 8),
2377 					   SPDK_BS_PAGE_SIZE);
2378 	num_md_pages += ctx->super->used_blobid_mask_len;
2379 
2380 	/* The metadata region size was chosen above */
2381 	ctx->super->md_start = bs->md_start = num_md_pages;
2382 	ctx->super->md_len = bs->md_len;
2383 	num_md_pages += bs->md_len;
2384 
2385 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2386 
2387 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2388 
2389 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2390 	if (num_md_clusters > bs->total_clusters) {
2391 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2392 			    "please decrease number of pages reserved for metadata "
2393 			    "or increase cluster size.\n");
2394 		spdk_dma_free(ctx->super);
2395 		free(ctx);
2396 		_spdk_bs_free(bs);
2397 		cb_fn(cb_arg, NULL, -ENOMEM);
2398 		return;
2399 	}
2400 	/* Claim all of the clusters used by the metadata */
2401 	for (i = 0; i < num_md_clusters; i++) {
2402 		_spdk_bs_claim_cluster(bs, i);
2403 	}
2404 
2405 	bs->total_data_clusters = bs->num_free_clusters;
2406 
2407 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2408 	cpl.u.bs_handle.cb_fn = cb_fn;
2409 	cpl.u.bs_handle.cb_arg = cb_arg;
2410 	cpl.u.bs_handle.bs = bs;
2411 
2412 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2413 	if (!seq) {
2414 		spdk_dma_free(ctx->super);
2415 		free(ctx);
2416 		_spdk_bs_free(bs);
2417 		cb_fn(cb_arg, NULL, -ENOMEM);
2418 		return;
2419 	}
2420 
2421 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
2422 
2423 	/* Clear metadata space */
2424 	spdk_bs_batch_write_zeroes(batch, 0, num_md_lba);
2425 	/* Trim data clusters */
2426 	spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
2427 
2428 	spdk_bs_batch_close(batch);
2429 }
2430 
2431 /* END spdk_bs_init */
2432 
2433 /* START spdk_bs_destroy */
2434 
2435 static void
2436 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2437 {
2438 	struct spdk_bs_init_ctx *ctx = cb_arg;
2439 	struct spdk_blob_store *bs = ctx->bs;
2440 
2441 	/*
2442 	 * We need to defer calling spdk_bs_call_cpl() until after
2443 	 * dev destruction, so tuck these away for later use.
2444 	 */
2445 	bs->unload_err = bserrno;
2446 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2447 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2448 
2449 	spdk_bs_sequence_finish(seq, bserrno);
2450 
2451 	_spdk_bs_free(bs);
2452 	free(ctx);
2453 }
2454 
2455 void
2456 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
2457 		void *cb_arg)
2458 {
2459 	struct spdk_bs_cpl	cpl;
2460 	spdk_bs_sequence_t	*seq;
2461 	struct spdk_bs_init_ctx *ctx;
2462 
2463 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
2464 
2465 	if (!TAILQ_EMPTY(&bs->blobs)) {
2466 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2467 		cb_fn(cb_arg, -EBUSY);
2468 		return;
2469 	}
2470 
2471 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2472 	cpl.u.bs_basic.cb_fn = cb_fn;
2473 	cpl.u.bs_basic.cb_arg = cb_arg;
2474 
2475 	ctx = calloc(1, sizeof(*ctx));
2476 	if (!ctx) {
2477 		cb_fn(cb_arg, -ENOMEM);
2478 		return;
2479 	}
2480 
2481 	ctx->bs = bs;
2482 
2483 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2484 	if (!seq) {
2485 		free(ctx);
2486 		cb_fn(cb_arg, -ENOMEM);
2487 		return;
2488 	}
2489 
2490 	/* Write zeroes to the super block */
2491 	spdk_bs_sequence_write_zeroes(seq,
2492 				      _spdk_bs_page_to_lba(bs, 0),
2493 				      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
2494 				      _spdk_bs_destroy_trim_cpl, ctx);
2495 }
2496 
2497 /* END spdk_bs_destroy */
2498 
2499 /* START spdk_bs_unload */
2500 
2501 static void
2502 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2503 {
2504 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2505 
2506 	spdk_dma_free(ctx->super);
2507 
2508 	/*
2509 	 * We need to defer calling spdk_bs_call_cpl() until after
2510 	 * dev destuction, so tuck these away for later use.
2511 	 */
2512 	ctx->bs->unload_err = bserrno;
2513 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2514 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2515 
2516 	spdk_bs_sequence_finish(seq, bserrno);
2517 
2518 	_spdk_bs_free(ctx->bs);
2519 	free(ctx);
2520 }
2521 
2522 static void
2523 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2524 {
2525 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2526 
2527 	spdk_dma_free(ctx->mask);
2528 	ctx->super->clean = 1;
2529 
2530 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
2531 }
2532 
2533 static void
2534 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2535 {
2536 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2537 
2538 	spdk_dma_free(ctx->mask);
2539 	ctx->mask = NULL;
2540 
2541 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
2542 }
2543 
2544 static void
2545 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2546 {
2547 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2548 
2549 	spdk_dma_free(ctx->mask);
2550 	ctx->mask = NULL;
2551 
2552 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
2553 }
2554 
2555 static void
2556 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2557 {
2558 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
2559 }
2560 
2561 void
2562 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
2563 {
2564 	struct spdk_bs_cpl	cpl;
2565 	spdk_bs_sequence_t	*seq;
2566 	struct spdk_bs_load_ctx *ctx;
2567 
2568 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
2569 
2570 	if (!TAILQ_EMPTY(&bs->blobs)) {
2571 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2572 		cb_fn(cb_arg, -EBUSY);
2573 		return;
2574 	}
2575 
2576 	ctx = calloc(1, sizeof(*ctx));
2577 	if (!ctx) {
2578 		cb_fn(cb_arg, -ENOMEM);
2579 		return;
2580 	}
2581 
2582 	ctx->bs = bs;
2583 	ctx->is_load = false;
2584 
2585 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2586 	if (!ctx->super) {
2587 		free(ctx);
2588 		cb_fn(cb_arg, -ENOMEM);
2589 		return;
2590 	}
2591 
2592 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2593 	cpl.u.bs_basic.cb_fn = cb_fn;
2594 	cpl.u.bs_basic.cb_arg = cb_arg;
2595 
2596 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2597 	if (!seq) {
2598 		spdk_dma_free(ctx->super);
2599 		free(ctx);
2600 		cb_fn(cb_arg, -ENOMEM);
2601 		return;
2602 	}
2603 
2604 	/* Read super block */
2605 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2606 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2607 			      _spdk_bs_unload_read_super_cpl, ctx);
2608 }
2609 
2610 /* END spdk_bs_unload */
2611 
2612 void
2613 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
2614 		  spdk_bs_op_complete cb_fn, void *cb_arg)
2615 {
2616 	bs->super_blob = blobid;
2617 	cb_fn(cb_arg, 0);
2618 }
2619 
2620 void
2621 spdk_bs_get_super(struct spdk_blob_store *bs,
2622 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2623 {
2624 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
2625 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
2626 	} else {
2627 		cb_fn(cb_arg, bs->super_blob, 0);
2628 	}
2629 }
2630 
2631 uint64_t
2632 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
2633 {
2634 	return bs->cluster_sz;
2635 }
2636 
2637 uint64_t
2638 spdk_bs_get_page_size(struct spdk_blob_store *bs)
2639 {
2640 	return SPDK_BS_PAGE_SIZE;
2641 }
2642 
2643 uint64_t
2644 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
2645 {
2646 	return bs->num_free_clusters;
2647 }
2648 
2649 uint64_t
2650 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
2651 {
2652 	return bs->total_data_clusters;
2653 }
2654 
2655 static int
2656 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
2657 {
2658 	bs->md_channel = spdk_get_io_channel(bs);
2659 	if (!bs->md_channel) {
2660 		SPDK_ERRLOG("Failed to get IO channel.\n");
2661 		return -1;
2662 	}
2663 
2664 	return 0;
2665 }
2666 
2667 static int
2668 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
2669 {
2670 	spdk_put_io_channel(bs->md_channel);
2671 
2672 	return 0;
2673 }
2674 
2675 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob)
2676 {
2677 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2678 
2679 	assert(blob != NULL);
2680 
2681 	return blob->id;
2682 }
2683 
2684 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob)
2685 {
2686 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2687 
2688 	assert(blob != NULL);
2689 
2690 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
2691 }
2692 
2693 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob)
2694 {
2695 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2696 
2697 	assert(blob != NULL);
2698 
2699 	return blob->active.num_clusters;
2700 }
2701 
2702 /* START spdk_bs_create_blob */
2703 
2704 static void
2705 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2706 {
2707 	struct spdk_blob_data *blob = cb_arg;
2708 
2709 	_spdk_blob_free(blob);
2710 
2711 	spdk_bs_sequence_finish(seq, bserrno);
2712 }
2713 
2714 static int
2715 _spdk_blob_set_xattrs(struct spdk_blob	*blob, const struct spdk_blob_opts *opts)
2716 {
2717 	uint64_t i;
2718 	size_t value_len = 0;
2719 	int rc;
2720 	const void *value = NULL;
2721 	if (opts->xattr_count > 0 && opts->get_xattr_value == NULL) {
2722 		return -EINVAL;
2723 	}
2724 	for (i = 0; i < opts->xattr_count; i++) {
2725 		opts->get_xattr_value(opts->xattr_ctx, opts->xattr_names[i], &value, &value_len);
2726 		if (value == NULL || value_len == 0) {
2727 			return -EINVAL;
2728 		}
2729 		rc = spdk_blob_set_xattr(blob, opts->xattr_names[i], value, value_len);
2730 		if (rc < 0) {
2731 			return rc;
2732 		}
2733 	}
2734 	return 0;
2735 }
2736 
2737 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
2738 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2739 {
2740 	struct spdk_blob_data	*blob;
2741 	uint32_t		page_idx;
2742 	struct spdk_bs_cpl 	cpl;
2743 	struct spdk_blob_opts	opts_default;
2744 	spdk_bs_sequence_t	*seq;
2745 	spdk_blob_id		id;
2746 	int rc;
2747 
2748 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
2749 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
2750 		cb_fn(cb_arg, 0, -ENOMEM);
2751 		return;
2752 	}
2753 	spdk_bit_array_set(bs->used_blobids, page_idx);
2754 	spdk_bit_array_set(bs->used_md_pages, page_idx);
2755 
2756 	id = _spdk_bs_page_to_blobid(page_idx);
2757 
2758 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
2759 
2760 	blob = _spdk_blob_alloc(bs, id);
2761 	if (!blob) {
2762 		cb_fn(cb_arg, 0, -ENOMEM);
2763 		return;
2764 	}
2765 
2766 	if (!opts) {
2767 		spdk_blob_opts_init(&opts_default);
2768 		opts = &opts_default;
2769 	}
2770 	rc = _spdk_blob_set_xattrs(__data_to_blob(blob), opts);
2771 	if (rc < 0) {
2772 		_spdk_blob_free(blob);
2773 		cb_fn(cb_arg, 0, rc);
2774 		return;
2775 	}
2776 	spdk_blob_resize(__data_to_blob(blob), opts->num_clusters);
2777 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
2778 	cpl.u.blobid.cb_fn = cb_fn;
2779 	cpl.u.blobid.cb_arg = cb_arg;
2780 	cpl.u.blobid.blobid = blob->id;
2781 
2782 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2783 	if (!seq) {
2784 		_spdk_blob_free(blob);
2785 		cb_fn(cb_arg, 0, -ENOMEM);
2786 		return;
2787 	}
2788 
2789 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
2790 }
2791 
2792 void spdk_bs_create_blob(struct spdk_blob_store *bs,
2793 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2794 {
2795 	spdk_bs_create_blob_ext(bs, NULL, cb_fn, cb_arg);
2796 }
2797 
2798 /* END spdk_bs_create_blob */
2799 
2800 /* START spdk_blob_resize */
2801 int
2802 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz)
2803 {
2804 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2805 	int			rc;
2806 
2807 	assert(blob != NULL);
2808 
2809 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
2810 
2811 	if (blob->md_ro) {
2812 		return -EPERM;
2813 	}
2814 
2815 	if (sz == blob->active.num_clusters) {
2816 		return 0;
2817 	}
2818 
2819 	rc = _spdk_resize_blob(blob, sz);
2820 	if (rc < 0) {
2821 		return rc;
2822 	}
2823 
2824 	return 0;
2825 }
2826 
2827 /* END spdk_blob_resize */
2828 
2829 
2830 /* START spdk_bs_delete_blob */
2831 
2832 static void
2833 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
2834 {
2835 	spdk_bs_sequence_t *seq = cb_arg;
2836 
2837 	spdk_bs_sequence_finish(seq, bserrno);
2838 }
2839 
2840 static void
2841 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2842 {
2843 	struct spdk_blob *_blob = cb_arg;
2844 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2845 
2846 	if (bserrno != 0) {
2847 		/*
2848 		 * We already removed this blob from the blobstore tailq, so
2849 		 *  we need to free it here since this is the last reference
2850 		 *  to it.
2851 		 */
2852 		_spdk_blob_free(blob);
2853 		_spdk_bs_delete_close_cpl(seq, bserrno);
2854 		return;
2855 	}
2856 
2857 	/*
2858 	 * This will immediately decrement the ref_count and call
2859 	 *  the completion routine since the metadata state is clean.
2860 	 *  By calling spdk_blob_close, we reduce the number of call
2861 	 *  points into code that touches the blob->open_ref count
2862 	 *  and the blobstore's blob list.
2863 	 */
2864 	spdk_blob_close(_blob, _spdk_bs_delete_close_cpl, seq);
2865 }
2866 
2867 static void
2868 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
2869 {
2870 	spdk_bs_sequence_t *seq = cb_arg;
2871 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2872 	uint32_t page_num;
2873 
2874 	if (bserrno != 0) {
2875 		spdk_bs_sequence_finish(seq, bserrno);
2876 		return;
2877 	}
2878 
2879 	if (blob->open_ref > 1) {
2880 		/*
2881 		 * Someone has this blob open (besides this delete context).
2882 		 *  Decrement the ref count directly and return -EBUSY.
2883 		 */
2884 		blob->open_ref--;
2885 		spdk_bs_sequence_finish(seq, -EBUSY);
2886 		return;
2887 	}
2888 
2889 	/*
2890 	 * Remove the blob from the blob_store list now, to ensure it does not
2891 	 *  get returned after this point by _spdk_blob_lookup().
2892 	 */
2893 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
2894 	page_num = _spdk_bs_blobid_to_page(blob->id);
2895 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
2896 	blob->state = SPDK_BLOB_STATE_DIRTY;
2897 	blob->active.num_pages = 0;
2898 	_spdk_resize_blob(blob, 0);
2899 
2900 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, _blob);
2901 }
2902 
2903 void
2904 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2905 		    spdk_blob_op_complete cb_fn, void *cb_arg)
2906 {
2907 	struct spdk_bs_cpl	cpl;
2908 	spdk_bs_sequence_t 	*seq;
2909 
2910 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
2911 
2912 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2913 	cpl.u.blob_basic.cb_fn = cb_fn;
2914 	cpl.u.blob_basic.cb_arg = cb_arg;
2915 
2916 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2917 	if (!seq) {
2918 		cb_fn(cb_arg, -ENOMEM);
2919 		return;
2920 	}
2921 
2922 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
2923 }
2924 
2925 /* END spdk_bs_delete_blob */
2926 
2927 /* START spdk_bs_open_blob */
2928 
2929 static void
2930 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2931 {
2932 	struct spdk_blob_data *blob = cb_arg;
2933 
2934 	/* If the blob have crc error, we just return NULL. */
2935 	if (blob == NULL) {
2936 		seq->cpl.u.blob_handle.blob = NULL;
2937 		spdk_bs_sequence_finish(seq, bserrno);
2938 		return;
2939 	}
2940 
2941 	blob->open_ref++;
2942 
2943 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
2944 
2945 	spdk_bs_sequence_finish(seq, bserrno);
2946 }
2947 
2948 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2949 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2950 {
2951 	struct spdk_blob_data		*blob;
2952 	struct spdk_bs_cpl		cpl;
2953 	spdk_bs_sequence_t		*seq;
2954 	uint32_t			page_num;
2955 
2956 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
2957 
2958 	page_num = _spdk_bs_blobid_to_page(blobid);
2959 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
2960 		/* Invalid blobid */
2961 		cb_fn(cb_arg, NULL, -ENOENT);
2962 		return;
2963 	}
2964 
2965 	blob = _spdk_blob_lookup(bs, blobid);
2966 	if (blob) {
2967 		blob->open_ref++;
2968 		cb_fn(cb_arg, __data_to_blob(blob), 0);
2969 		return;
2970 	}
2971 
2972 	blob = _spdk_blob_alloc(bs, blobid);
2973 	if (!blob) {
2974 		cb_fn(cb_arg, NULL, -ENOMEM);
2975 		return;
2976 	}
2977 
2978 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2979 	cpl.u.blob_handle.cb_fn = cb_fn;
2980 	cpl.u.blob_handle.cb_arg = cb_arg;
2981 	cpl.u.blob_handle.blob = __data_to_blob(blob);
2982 
2983 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2984 	if (!seq) {
2985 		_spdk_blob_free(blob);
2986 		cb_fn(cb_arg, NULL, -ENOMEM);
2987 		return;
2988 	}
2989 
2990 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
2991 }
2992 /* END spdk_bs_open_blob */
2993 
2994 /* START spdk_blob_set_read_only */
2995 void spdk_blob_set_read_only(struct spdk_blob *b)
2996 {
2997 	struct spdk_blob_data *blob = __blob_to_data(b);
2998 
2999 	blob->data_ro = true;
3000 	blob->md_ro = true;
3001 	blob->data_ro_flags |= SPDK_BLOB_READ_ONLY;
3002 
3003 	blob->state = SPDK_BLOB_STATE_DIRTY;
3004 }
3005 /* END spdk_blob_set_read_only */
3006 
3007 /* START spdk_blob_sync_md */
3008 
3009 static void
3010 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3011 {
3012 	spdk_bs_sequence_finish(seq, bserrno);
3013 }
3014 
3015 void
3016 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg)
3017 {
3018 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3019 	struct spdk_bs_cpl	cpl;
3020 	spdk_bs_sequence_t	*seq;
3021 
3022 	assert(blob != NULL);
3023 
3024 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
3025 
3026 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3027 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3028 
3029 	if (blob->md_ro) {
3030 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
3031 		return;
3032 	}
3033 
3034 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
3035 		cb_fn(cb_arg, 0);
3036 		return;
3037 	}
3038 
3039 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3040 	cpl.u.blob_basic.cb_fn = cb_fn;
3041 	cpl.u.blob_basic.cb_arg = cb_arg;
3042 
3043 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3044 	if (!seq) {
3045 		cb_fn(cb_arg, -ENOMEM);
3046 		return;
3047 	}
3048 
3049 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
3050 }
3051 
3052 /* END spdk_blob_sync_md */
3053 
3054 /* START spdk_blob_close */
3055 
3056 static void
3057 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3058 {
3059 	struct spdk_blob_data *blob = cb_arg;
3060 
3061 	if (bserrno == 0) {
3062 		blob->open_ref--;
3063 		if (blob->open_ref == 0) {
3064 			/*
3065 			 * Blobs with active.num_pages == 0 are deleted blobs.
3066 			 *  these blobs are removed from the blob_store list
3067 			 *  when the deletion process starts - so don't try to
3068 			 *  remove them again.
3069 			 */
3070 			if (blob->active.num_pages > 0) {
3071 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3072 			}
3073 			_spdk_blob_free(blob);
3074 		}
3075 	}
3076 
3077 	spdk_bs_sequence_finish(seq, bserrno);
3078 }
3079 
3080 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg)
3081 {
3082 	struct spdk_bs_cpl	cpl;
3083 	struct spdk_blob_data	*blob;
3084 	spdk_bs_sequence_t	*seq;
3085 
3086 	assert(b != NULL);
3087 	blob = __blob_to_data(b);
3088 	assert(blob != NULL);
3089 
3090 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
3091 
3092 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3093 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3094 
3095 	if (blob->open_ref == 0) {
3096 		cb_fn(cb_arg, -EBADF);
3097 		return;
3098 	}
3099 
3100 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3101 	cpl.u.blob_basic.cb_fn = cb_fn;
3102 	cpl.u.blob_basic.cb_arg = cb_arg;
3103 
3104 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3105 	if (!seq) {
3106 		cb_fn(cb_arg, -ENOMEM);
3107 		return;
3108 	}
3109 
3110 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
3111 		_spdk_blob_close_cpl(seq, blob, 0);
3112 		return;
3113 	}
3114 
3115 	/* Sync metadata */
3116 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
3117 }
3118 
3119 /* END spdk_blob_close */
3120 
3121 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
3122 {
3123 	return spdk_get_io_channel(bs);
3124 }
3125 
3126 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
3127 {
3128 	spdk_put_io_channel(channel);
3129 }
3130 
3131 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3132 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3133 {
3134 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3135 				     SPDK_BLOB_UNMAP);
3136 }
3137 
3138 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3139 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3140 {
3141 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3142 				     SPDK_BLOB_WRITE_ZEROES);
3143 }
3144 
3145 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3146 			   void *payload, uint64_t offset, uint64_t length,
3147 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3148 {
3149 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3150 				     SPDK_BLOB_WRITE);
3151 }
3152 
3153 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3154 			  void *payload, uint64_t offset, uint64_t length,
3155 			  spdk_blob_op_complete cb_fn, void *cb_arg)
3156 {
3157 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3158 				     SPDK_BLOB_READ);
3159 }
3160 
3161 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3162 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3163 			    spdk_blob_op_complete cb_fn, void *cb_arg)
3164 {
3165 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
3166 }
3167 
3168 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3169 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3170 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3171 {
3172 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
3173 }
3174 
3175 struct spdk_bs_iter_ctx {
3176 	int64_t page_num;
3177 	struct spdk_blob_store *bs;
3178 
3179 	spdk_blob_op_with_handle_complete cb_fn;
3180 	void *cb_arg;
3181 };
3182 
3183 static void
3184 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3185 {
3186 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3187 	struct spdk_blob_store *bs = ctx->bs;
3188 	spdk_blob_id id;
3189 
3190 	if (bserrno == 0) {
3191 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
3192 		free(ctx);
3193 		return;
3194 	}
3195 
3196 	ctx->page_num++;
3197 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
3198 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
3199 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
3200 		free(ctx);
3201 		return;
3202 	}
3203 
3204 	id = _spdk_bs_page_to_blobid(ctx->page_num);
3205 
3206 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
3207 }
3208 
3209 void
3210 spdk_bs_iter_first(struct spdk_blob_store *bs,
3211 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3212 {
3213 	struct spdk_bs_iter_ctx *ctx;
3214 
3215 	ctx = calloc(1, sizeof(*ctx));
3216 	if (!ctx) {
3217 		cb_fn(cb_arg, NULL, -ENOMEM);
3218 		return;
3219 	}
3220 
3221 	ctx->page_num = -1;
3222 	ctx->bs = bs;
3223 	ctx->cb_fn = cb_fn;
3224 	ctx->cb_arg = cb_arg;
3225 
3226 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3227 }
3228 
3229 static void
3230 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
3231 {
3232 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3233 
3234 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3235 }
3236 
3237 void
3238 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *b,
3239 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3240 {
3241 	struct spdk_bs_iter_ctx *ctx;
3242 	struct spdk_blob_data	*blob;
3243 
3244 	assert(b != NULL);
3245 	blob = __blob_to_data(b);
3246 	assert(blob != NULL);
3247 
3248 	ctx = calloc(1, sizeof(*ctx));
3249 	if (!ctx) {
3250 		cb_fn(cb_arg, NULL, -ENOMEM);
3251 		return;
3252 	}
3253 
3254 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
3255 	ctx->bs = bs;
3256 	ctx->cb_fn = cb_fn;
3257 	ctx->cb_arg = cb_arg;
3258 
3259 	/* Close the existing blob */
3260 	spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx);
3261 }
3262 
3263 int
3264 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value,
3265 		    uint16_t value_len)
3266 {
3267 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3268 	struct spdk_xattr 	*xattr;
3269 
3270 	assert(blob != NULL);
3271 
3272 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3273 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3274 
3275 	if (blob->md_ro) {
3276 		return -EPERM;
3277 	}
3278 
3279 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3280 		if (!strcmp(name, xattr->name)) {
3281 			free(xattr->value);
3282 			xattr->value_len = value_len;
3283 			xattr->value = malloc(value_len);
3284 			memcpy(xattr->value, value, value_len);
3285 
3286 			blob->state = SPDK_BLOB_STATE_DIRTY;
3287 
3288 			return 0;
3289 		}
3290 	}
3291 
3292 	xattr = calloc(1, sizeof(*xattr));
3293 	if (!xattr) {
3294 		return -1;
3295 	}
3296 	xattr->name = strdup(name);
3297 	xattr->value_len = value_len;
3298 	xattr->value = malloc(value_len);
3299 	memcpy(xattr->value, value, value_len);
3300 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
3301 
3302 	blob->state = SPDK_BLOB_STATE_DIRTY;
3303 
3304 	return 0;
3305 }
3306 
3307 int
3308 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name)
3309 {
3310 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3311 	struct spdk_xattr	*xattr;
3312 
3313 	assert(blob != NULL);
3314 
3315 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3316 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3317 
3318 	if (blob->md_ro) {
3319 		return -EPERM;
3320 	}
3321 
3322 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3323 		if (!strcmp(name, xattr->name)) {
3324 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
3325 			free(xattr->value);
3326 			free(xattr->name);
3327 			free(xattr);
3328 
3329 			blob->state = SPDK_BLOB_STATE_DIRTY;
3330 
3331 			return 0;
3332 		}
3333 	}
3334 
3335 	return -ENOENT;
3336 }
3337 
3338 int
3339 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name,
3340 			  const void **value, size_t *value_len)
3341 {
3342 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3343 	struct spdk_xattr	*xattr;
3344 
3345 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3346 		if (!strcmp(name, xattr->name)) {
3347 			*value = xattr->value;
3348 			*value_len = xattr->value_len;
3349 			return 0;
3350 		}
3351 	}
3352 
3353 	return -ENOENT;
3354 }
3355 
3356 struct spdk_xattr_names {
3357 	uint32_t	count;
3358 	const char	*names[0];
3359 };
3360 
3361 int
3362 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names)
3363 {
3364 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3365 	struct spdk_xattr	*xattr;
3366 	int			count = 0;
3367 
3368 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3369 		count++;
3370 	}
3371 
3372 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
3373 	if (*names == NULL) {
3374 		return -ENOMEM;
3375 	}
3376 
3377 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3378 		(*names)->names[(*names)->count++] = xattr->name;
3379 	}
3380 
3381 	return 0;
3382 }
3383 
3384 uint32_t
3385 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
3386 {
3387 	assert(names != NULL);
3388 
3389 	return names->count;
3390 }
3391 
3392 const char *
3393 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
3394 {
3395 	if (index >= names->count) {
3396 		return NULL;
3397 	}
3398 
3399 	return names->names[index];
3400 }
3401 
3402 void
3403 spdk_xattr_names_free(struct spdk_xattr_names *names)
3404 {
3405 	free(names);
3406 }
3407 
3408 struct spdk_bs_type
3409 spdk_bs_get_bstype(struct spdk_blob_store *bs)
3410 {
3411 	return bs->bstype;
3412 }
3413 
3414 void
3415 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
3416 {
3417 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
3418 }
3419 
3420 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
3421