xref: /spdk/lib/blob/blobstore.c (revision fe8138cebbcde2415de235e8a8e43c460b6de4e6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/blob.h"
37 #include "spdk/crc32.h"
38 #include "spdk/env.h"
39 #include "spdk/queue.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/bit_array.h"
42 #include "spdk/likely.h"
43 
44 #include "spdk_internal/log.h"
45 
46 #include "blobstore.h"
47 
48 #define BLOB_CRC32C_INITIAL    0xffffffffUL
49 
50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs);
51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs);
52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno);
53 
54 static inline size_t
55 divide_round_up(size_t num, size_t divisor)
56 {
57 	return (num + divisor - 1) / divisor;
58 }
59 
60 static void
61 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
62 {
63 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
64 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false);
65 	assert(bs->num_free_clusters > 0);
66 
67 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num);
68 
69 	spdk_bit_array_set(bs->used_clusters, cluster_num);
70 	bs->num_free_clusters--;
71 }
72 
73 static void
74 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num)
75 {
76 	assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters));
77 	assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true);
78 	assert(bs->num_free_clusters < bs->total_clusters);
79 
80 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num);
81 
82 	spdk_bit_array_clear(bs->used_clusters, cluster_num);
83 	bs->num_free_clusters++;
84 }
85 
86 void
87 spdk_blob_opts_init(struct spdk_blob_opts *opts)
88 {
89 	opts->num_clusters = 0;
90 }
91 
92 static struct spdk_blob_data *
93 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id)
94 {
95 	struct spdk_blob_data *blob;
96 
97 	blob = calloc(1, sizeof(*blob));
98 	if (!blob) {
99 		return NULL;
100 	}
101 
102 	blob->id = id;
103 	blob->bs = bs;
104 
105 	blob->state = SPDK_BLOB_STATE_DIRTY;
106 	blob->active.num_pages = 1;
107 	blob->active.pages = calloc(1, sizeof(*blob->active.pages));
108 	if (!blob->active.pages) {
109 		free(blob);
110 		return NULL;
111 	}
112 
113 	blob->active.pages[0] = _spdk_bs_blobid_to_page(id);
114 
115 	TAILQ_INIT(&blob->xattrs);
116 
117 	return blob;
118 }
119 
120 static void
121 _spdk_blob_free(struct spdk_blob_data *blob)
122 {
123 	struct spdk_xattr 	*xattr, *xattr_tmp;
124 
125 	assert(blob != NULL);
126 
127 	free(blob->active.clusters);
128 	free(blob->clean.clusters);
129 	free(blob->active.pages);
130 	free(blob->clean.pages);
131 
132 	TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) {
133 		TAILQ_REMOVE(&blob->xattrs, xattr, link);
134 		free(xattr->name);
135 		free(xattr->value);
136 		free(xattr);
137 	}
138 
139 	free(blob);
140 }
141 
142 static int
143 _spdk_blob_mark_clean(struct spdk_blob_data *blob)
144 {
145 	uint64_t *clusters = NULL;
146 	uint32_t *pages = NULL;
147 
148 	assert(blob != NULL);
149 	assert(blob->state == SPDK_BLOB_STATE_LOADING ||
150 	       blob->state == SPDK_BLOB_STATE_SYNCING);
151 
152 	if (blob->active.num_clusters) {
153 		assert(blob->active.clusters);
154 		clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters));
155 		if (!clusters) {
156 			return -1;
157 		}
158 		memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters));
159 	}
160 
161 	if (blob->active.num_pages) {
162 		assert(blob->active.pages);
163 		pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages));
164 		if (!pages) {
165 			free(clusters);
166 			return -1;
167 		}
168 		memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages));
169 	}
170 
171 	free(blob->clean.clusters);
172 	free(blob->clean.pages);
173 
174 	blob->clean.num_clusters = blob->active.num_clusters;
175 	blob->clean.clusters = blob->active.clusters;
176 	blob->clean.num_pages = blob->active.num_pages;
177 	blob->clean.pages = blob->active.pages;
178 
179 	blob->active.clusters = clusters;
180 	blob->active.pages = pages;
181 
182 	blob->state = SPDK_BLOB_STATE_CLEAN;
183 
184 	return 0;
185 }
186 
187 static int
188 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob)
189 {
190 	struct spdk_blob_md_descriptor *desc;
191 	size_t	cur_desc = 0;
192 	void *tmp;
193 
194 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
195 	while (cur_desc < sizeof(page->descriptors)) {
196 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
197 			if (desc->length == 0) {
198 				/* If padding and length are 0, this terminates the page */
199 				break;
200 			}
201 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
202 			struct spdk_blob_md_descriptor_flags	*desc_flags;
203 
204 			desc_flags = (struct spdk_blob_md_descriptor_flags *)desc;
205 
206 			if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) {
207 				return -EINVAL;
208 			}
209 
210 			if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) !=
211 			    SPDK_BLOB_INVALID_FLAGS_MASK) {
212 				return -EINVAL;
213 			}
214 
215 			if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) !=
216 			    SPDK_BLOB_DATA_RO_FLAGS_MASK) {
217 				blob->data_ro = true;
218 				blob->md_ro = true;
219 			}
220 
221 			if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) !=
222 			    SPDK_BLOB_MD_RO_FLAGS_MASK) {
223 				blob->md_ro = true;
224 			}
225 
226 			blob->invalid_flags = desc_flags->invalid_flags;
227 			blob->data_ro_flags = desc_flags->data_ro_flags;
228 			blob->md_ro_flags = desc_flags->md_ro_flags;
229 
230 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
231 			struct spdk_blob_md_descriptor_extent	*desc_extent;
232 			unsigned int				i, j;
233 			unsigned int				cluster_count = blob->active.num_clusters;
234 
235 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
236 
237 			if (desc_extent->length == 0 ||
238 			    (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) {
239 				return -EINVAL;
240 			}
241 
242 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
243 				for (j = 0; j < desc_extent->extents[i].length; j++) {
244 					if (!spdk_bit_array_get(blob->bs->used_clusters,
245 								desc_extent->extents[i].cluster_idx + j)) {
246 						return -EINVAL;
247 					}
248 					cluster_count++;
249 				}
250 			}
251 
252 			if (cluster_count == 0) {
253 				return -EINVAL;
254 			}
255 			tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t));
256 			if (tmp == NULL) {
257 				return -ENOMEM;
258 			}
259 			blob->active.clusters = tmp;
260 			blob->active.cluster_array_size = cluster_count;
261 
262 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
263 				for (j = 0; j < desc_extent->extents[i].length; j++) {
264 					blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs,
265 							desc_extent->extents[i].cluster_idx + j);
266 				}
267 			}
268 
269 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
270 			struct spdk_blob_md_descriptor_xattr	*desc_xattr;
271 			struct spdk_xattr 			*xattr;
272 
273 			desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc;
274 
275 			if (desc_xattr->length != sizeof(desc_xattr->name_length) +
276 			    sizeof(desc_xattr->value_length) +
277 			    desc_xattr->name_length + desc_xattr->value_length) {
278 				return -EINVAL;
279 			}
280 
281 			xattr = calloc(1, sizeof(*xattr));
282 			if (xattr == NULL) {
283 				return -ENOMEM;
284 			}
285 
286 			xattr->name = malloc(desc_xattr->name_length + 1);
287 			if (xattr->name == NULL) {
288 				free(xattr);
289 				return -ENOMEM;
290 			}
291 			strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length);
292 			xattr->name[desc_xattr->name_length] = '\0';
293 
294 			xattr->value = malloc(desc_xattr->value_length);
295 			if (xattr->value == NULL) {
296 				free(xattr->name);
297 				free(xattr);
298 				return -ENOMEM;
299 			}
300 			xattr->value_len = desc_xattr->value_length;
301 			memcpy(xattr->value,
302 			       (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length),
303 			       desc_xattr->value_length);
304 
305 			TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
306 		} else {
307 			/* Unrecognized descriptor type.  Do not fail - just continue to the
308 			 *  next descriptor.  If this descriptor is associated with some feature
309 			 *  defined in a newer version of blobstore, that version of blobstore
310 			 *  should create and set an associated feature flag to specify if this
311 			 *  blob can be loaded or not.
312 			 */
313 		}
314 
315 		/* Advance to the next descriptor */
316 		cur_desc += sizeof(*desc) + desc->length;
317 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
318 			break;
319 		}
320 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
321 	}
322 
323 	return 0;
324 }
325 
326 static int
327 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count,
328 		 struct spdk_blob_data *blob)
329 {
330 	const struct spdk_blob_md_page *page;
331 	uint32_t i;
332 	int rc;
333 
334 	assert(page_count > 0);
335 	assert(pages[0].sequence_num == 0);
336 	assert(blob != NULL);
337 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
338 	assert(blob->active.clusters == NULL);
339 	assert(blob->state == SPDK_BLOB_STATE_LOADING);
340 
341 	/* The blobid provided doesn't match what's in the MD, this can
342 	 * happen for example if a bogus blobid is passed in through open.
343 	 */
344 	if (blob->id != pages[0].id) {
345 		SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n",
346 			    blob->id, pages[0].id);
347 		return -ENOENT;
348 	}
349 
350 	for (i = 0; i < page_count; i++) {
351 		page = &pages[i];
352 
353 		assert(page->id == blob->id);
354 		assert(page->sequence_num == i);
355 
356 		rc = _spdk_blob_parse_page(page, blob);
357 		if (rc != 0) {
358 			return rc;
359 		}
360 	}
361 
362 	return 0;
363 }
364 
365 static int
366 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob,
367 			      struct spdk_blob_md_page **pages,
368 			      uint32_t *page_count,
369 			      struct spdk_blob_md_page **last_page)
370 {
371 	struct spdk_blob_md_page *page;
372 
373 	assert(pages != NULL);
374 	assert(page_count != NULL);
375 
376 	if (*page_count == 0) {
377 		assert(*pages == NULL);
378 		*page_count = 1;
379 		*pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE,
380 					 SPDK_BS_PAGE_SIZE,
381 					 NULL);
382 	} else {
383 		assert(*pages != NULL);
384 		(*page_count)++;
385 		*pages = spdk_dma_realloc(*pages,
386 					  SPDK_BS_PAGE_SIZE * (*page_count),
387 					  SPDK_BS_PAGE_SIZE,
388 					  NULL);
389 	}
390 
391 	if (*pages == NULL) {
392 		*page_count = 0;
393 		*last_page = NULL;
394 		return -ENOMEM;
395 	}
396 
397 	page = &(*pages)[*page_count - 1];
398 	memset(page, 0, sizeof(*page));
399 	page->id = blob->id;
400 	page->sequence_num = *page_count - 1;
401 	page->next = SPDK_INVALID_MD_PAGE;
402 	*last_page = page;
403 
404 	return 0;
405 }
406 
407 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor.
408  * Update required_sz on both success and failure.
409  *
410  */
411 static int
412 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr,
413 			   uint8_t *buf, size_t buf_sz,
414 			   size_t *required_sz)
415 {
416 	struct spdk_blob_md_descriptor_xattr	*desc;
417 
418 	*required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) +
419 		       strlen(xattr->name) +
420 		       xattr->value_len;
421 
422 	if (buf_sz < *required_sz) {
423 		return -1;
424 	}
425 
426 	desc = (struct spdk_blob_md_descriptor_xattr *)buf;
427 
428 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR;
429 	desc->length = sizeof(desc->name_length) +
430 		       sizeof(desc->value_length) +
431 		       strlen(xattr->name) +
432 		       xattr->value_len;
433 	desc->name_length = strlen(xattr->name);
434 	desc->value_length = xattr->value_len;
435 
436 	memcpy(desc->name, xattr->name, desc->name_length);
437 	memcpy((void *)((uintptr_t)desc->name + desc->name_length),
438 	       xattr->value,
439 	       desc->value_length);
440 
441 	return 0;
442 }
443 
444 static void
445 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob,
446 			    uint64_t start_cluster, uint64_t *next_cluster,
447 			    uint8_t *buf, size_t buf_sz)
448 {
449 	struct spdk_blob_md_descriptor_extent *desc;
450 	size_t cur_sz;
451 	uint64_t i, extent_idx;
452 	uint32_t lba, lba_per_cluster, lba_count;
453 
454 	/* The buffer must have room for at least one extent */
455 	cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]);
456 	if (buf_sz < cur_sz) {
457 		*next_cluster = start_cluster;
458 		return;
459 	}
460 
461 	desc = (struct spdk_blob_md_descriptor_extent *)buf;
462 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT;
463 
464 	lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1);
465 
466 	lba = blob->active.clusters[start_cluster];
467 	lba_count = lba_per_cluster;
468 	extent_idx = 0;
469 	for (i = start_cluster + 1; i < blob->active.num_clusters; i++) {
470 		if ((lba + lba_count) == blob->active.clusters[i]) {
471 			lba_count += lba_per_cluster;
472 			continue;
473 		}
474 		desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
475 		desc->extents[extent_idx].length = lba_count / lba_per_cluster;
476 		extent_idx++;
477 
478 		cur_sz += sizeof(desc->extents[extent_idx]);
479 
480 		if (buf_sz < cur_sz) {
481 			/* If we ran out of buffer space, return */
482 			desc->length = sizeof(desc->extents[0]) * extent_idx;
483 			*next_cluster = i;
484 			return;
485 		}
486 
487 		lba = blob->active.clusters[i];
488 		lba_count = lba_per_cluster;
489 	}
490 
491 	desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster;
492 	desc->extents[extent_idx].length = lba_count / lba_per_cluster;
493 	extent_idx++;
494 
495 	desc->length = sizeof(desc->extents[0]) * extent_idx;
496 	*next_cluster = blob->active.num_clusters;
497 
498 	return;
499 }
500 
501 static void
502 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob,
503 			   uint8_t *buf, size_t *buf_sz)
504 {
505 	struct spdk_blob_md_descriptor_flags *desc;
506 
507 	/*
508 	 * Flags get serialized first, so we should always have room for the flags
509 	 *  descriptor.
510 	 */
511 	assert(*buf_sz >= sizeof(*desc));
512 
513 	desc = (struct spdk_blob_md_descriptor_flags *)buf;
514 	desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS;
515 	desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor);
516 	desc->invalid_flags = blob->invalid_flags;
517 	desc->data_ro_flags = blob->data_ro_flags;
518 	desc->md_ro_flags = blob->md_ro_flags;
519 
520 	*buf_sz -= sizeof(*desc);
521 }
522 
523 static int
524 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages,
525 		     uint32_t *page_count)
526 {
527 	struct spdk_blob_md_page		*cur_page;
528 	const struct spdk_xattr			*xattr;
529 	int 					rc;
530 	uint8_t					*buf;
531 	size_t					remaining_sz;
532 	uint64_t				last_cluster;
533 
534 	assert(pages != NULL);
535 	assert(page_count != NULL);
536 	assert(blob != NULL);
537 	assert(blob->state == SPDK_BLOB_STATE_SYNCING);
538 
539 	*pages = NULL;
540 	*page_count = 0;
541 
542 	/* A blob always has at least 1 page, even if it has no descriptors */
543 	rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page);
544 	if (rc < 0) {
545 		return rc;
546 	}
547 
548 	buf = (uint8_t *)cur_page->descriptors;
549 	remaining_sz = sizeof(cur_page->descriptors);
550 
551 	/* Serialize flags */
552 	_spdk_blob_serialize_flags(blob, buf, &remaining_sz);
553 
554 	/* Serialize xattrs */
555 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
556 		size_t required_sz = 0;
557 		rc = _spdk_blob_serialize_xattr(xattr,
558 						buf, remaining_sz,
559 						&required_sz);
560 		if (rc < 0) {
561 			/* Need to add a new page to the chain */
562 			rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
563 							   &cur_page);
564 			if (rc < 0) {
565 				spdk_dma_free(*pages);
566 				*pages = NULL;
567 				*page_count = 0;
568 				return rc;
569 			}
570 
571 			buf = (uint8_t *)cur_page->descriptors;
572 			remaining_sz = sizeof(cur_page->descriptors);
573 
574 			/* Try again */
575 			required_sz = 0;
576 			rc = _spdk_blob_serialize_xattr(xattr,
577 							buf, remaining_sz,
578 							&required_sz);
579 
580 			if (rc < 0) {
581 				spdk_dma_free(*pages);
582 				*pages = NULL;
583 				*page_count = 0;
584 				return -1;
585 			}
586 		}
587 
588 		remaining_sz -= required_sz;
589 		buf += required_sz;
590 	}
591 
592 	/* Serialize extents */
593 	last_cluster = 0;
594 	while (last_cluster < blob->active.num_clusters) {
595 		_spdk_blob_serialize_extent(blob, last_cluster, &last_cluster,
596 					    buf, remaining_sz);
597 
598 		if (last_cluster == blob->active.num_clusters) {
599 			break;
600 		}
601 
602 		rc = _spdk_blob_serialize_add_page(blob, pages, page_count,
603 						   &cur_page);
604 		if (rc < 0) {
605 			return rc;
606 		}
607 
608 		buf = (uint8_t *)cur_page->descriptors;
609 		remaining_sz = sizeof(cur_page->descriptors);
610 	}
611 
612 	return 0;
613 }
614 
615 struct spdk_blob_load_ctx {
616 	struct spdk_blob_data 		*blob;
617 
618 	struct spdk_blob_md_page	*pages;
619 	uint32_t			num_pages;
620 
621 	spdk_bs_sequence_cpl		cb_fn;
622 	void				*cb_arg;
623 };
624 
625 static uint32_t
626 _spdk_blob_md_page_calc_crc(void *page)
627 {
628 	uint32_t		crc;
629 
630 	crc = BLOB_CRC32C_INITIAL;
631 	crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc);
632 	crc ^= BLOB_CRC32C_INITIAL;
633 
634 	return crc;
635 
636 }
637 
638 static void
639 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
640 {
641 	struct spdk_blob_load_ctx 	*ctx = cb_arg;
642 	struct spdk_blob_data 		*blob = ctx->blob;
643 	struct spdk_blob_md_page	*page;
644 	int				rc;
645 	uint32_t			crc;
646 
647 	page = &ctx->pages[ctx->num_pages - 1];
648 	crc = _spdk_blob_md_page_calc_crc(page);
649 	if (crc != page->crc) {
650 		SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages);
651 		_spdk_blob_free(blob);
652 		ctx->cb_fn(seq, NULL, -EINVAL);
653 		spdk_dma_free(ctx->pages);
654 		free(ctx);
655 		return;
656 	}
657 
658 	if (page->next != SPDK_INVALID_MD_PAGE) {
659 		uint32_t next_page = page->next;
660 		uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page);
661 
662 
663 		assert(next_lba < (blob->bs->md_start + blob->bs->md_len));
664 
665 		/* Read the next page */
666 		ctx->num_pages++;
667 		ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages),
668 					      sizeof(*page), NULL);
669 		if (ctx->pages == NULL) {
670 			ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM);
671 			free(ctx);
672 			return;
673 		}
674 
675 		spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1],
676 				      next_lba,
677 				      _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)),
678 				      _spdk_blob_load_cpl, ctx);
679 		return;
680 	}
681 
682 	/* Parse the pages */
683 	rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob);
684 	if (rc) {
685 		_spdk_blob_free(blob);
686 		ctx->cb_fn(seq, NULL, rc);
687 		spdk_dma_free(ctx->pages);
688 		free(ctx);
689 		return;
690 	}
691 
692 	_spdk_blob_mark_clean(blob);
693 
694 	ctx->cb_fn(seq, ctx->cb_arg, rc);
695 
696 	/* Free the memory */
697 	spdk_dma_free(ctx->pages);
698 	free(ctx);
699 }
700 
701 /* Load a blob from disk given a blobid */
702 static void
703 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
704 		spdk_bs_sequence_cpl cb_fn, void *cb_arg)
705 {
706 	struct spdk_blob_load_ctx *ctx;
707 	struct spdk_blob_store *bs;
708 	uint32_t page_num;
709 	uint64_t lba;
710 
711 	assert(blob != NULL);
712 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
713 	       blob->state == SPDK_BLOB_STATE_DIRTY);
714 
715 	bs = blob->bs;
716 
717 	ctx = calloc(1, sizeof(*ctx));
718 	if (!ctx) {
719 		cb_fn(seq, cb_arg, -ENOMEM);
720 		return;
721 	}
722 
723 	ctx->blob = blob;
724 	ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE,
725 				      SPDK_BS_PAGE_SIZE, NULL);
726 	if (!ctx->pages) {
727 		free(ctx);
728 		cb_fn(seq, cb_arg, -ENOMEM);
729 		return;
730 	}
731 	ctx->num_pages = 1;
732 	ctx->cb_fn = cb_fn;
733 	ctx->cb_arg = cb_arg;
734 
735 	page_num = _spdk_bs_blobid_to_page(blob->id);
736 	lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num);
737 
738 	blob->state = SPDK_BLOB_STATE_LOADING;
739 
740 	spdk_bs_sequence_read(seq, &ctx->pages[0], lba,
741 			      _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE),
742 			      _spdk_blob_load_cpl, ctx);
743 }
744 
745 struct spdk_blob_persist_ctx {
746 	struct spdk_blob_data 		*blob;
747 
748 	struct spdk_blob_md_page	*pages;
749 
750 	uint64_t			idx;
751 
752 	spdk_bs_sequence_cpl		cb_fn;
753 	void				*cb_arg;
754 };
755 
756 static void
757 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
758 {
759 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
760 	struct spdk_blob_data 		*blob = ctx->blob;
761 
762 	if (bserrno == 0) {
763 		_spdk_blob_mark_clean(blob);
764 	}
765 
766 	/* Call user callback */
767 	ctx->cb_fn(seq, ctx->cb_arg, bserrno);
768 
769 	/* Free the memory */
770 	spdk_dma_free(ctx->pages);
771 	free(ctx);
772 }
773 
774 static void
775 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
776 {
777 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
778 	struct spdk_blob_data 		*blob = ctx->blob;
779 	struct spdk_blob_store		*bs = blob->bs;
780 	void				*tmp;
781 	size_t				i;
782 
783 	/* Release all clusters that were truncated */
784 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
785 		uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]);
786 
787 		_spdk_bs_release_cluster(bs, cluster_num);
788 	}
789 
790 	if (blob->active.num_clusters == 0) {
791 		free(blob->active.clusters);
792 		blob->active.clusters = NULL;
793 		blob->active.cluster_array_size = 0;
794 	} else {
795 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters);
796 		assert(tmp != NULL);
797 		blob->active.clusters = tmp;
798 		blob->active.cluster_array_size = blob->active.num_clusters;
799 	}
800 
801 	_spdk_blob_persist_complete(seq, ctx, bserrno);
802 }
803 
804 static void
805 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
806 {
807 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
808 	struct spdk_blob_data 		*blob = ctx->blob;
809 	struct spdk_blob_store		*bs = blob->bs;
810 	spdk_bs_batch_t			*batch;
811 	size_t				i;
812 	uint64_t			lba;
813 	uint32_t			lba_count;
814 
815 	/* Clusters don't move around in blobs. The list shrinks or grows
816 	 * at the end, but no changes ever occur in the middle of the list.
817 	 */
818 
819 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx);
820 
821 	/* Unmap all clusters that were truncated */
822 	lba = 0;
823 	lba_count = 0;
824 	for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) {
825 		uint64_t next_lba = blob->active.clusters[i];
826 		uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1);
827 
828 		if ((lba + lba_count) == next_lba) {
829 			/* This cluster is contiguous with the previous one. */
830 			lba_count += next_lba_count;
831 			continue;
832 		}
833 
834 		/* This cluster is not contiguous with the previous one. */
835 
836 		/* If a run of LBAs previously existing, send them
837 		 * as an unmap.
838 		 */
839 		if (lba_count > 0) {
840 			spdk_bs_batch_unmap(batch, lba, lba_count);
841 		}
842 
843 		/* Start building the next batch */
844 		lba = next_lba;
845 		lba_count = next_lba_count;
846 	}
847 
848 	/* If we ended with a contiguous set of LBAs, send the unmap now */
849 	if (lba_count > 0) {
850 		spdk_bs_batch_unmap(batch, lba, lba_count);
851 	}
852 
853 	spdk_bs_batch_close(batch);
854 }
855 
856 static void
857 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
858 {
859 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
860 	struct spdk_blob_data 		*blob = ctx->blob;
861 	struct spdk_blob_store		*bs = blob->bs;
862 	size_t				i;
863 
864 	/* This loop starts at 1 because the first page is special and handled
865 	 * below. The pages (except the first) are never written in place,
866 	 * so any pages in the clean list must be zeroed.
867 	 */
868 	for (i = 1; i < blob->clean.num_pages; i++) {
869 		spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]);
870 	}
871 
872 	if (blob->active.num_pages == 0) {
873 		uint32_t page_num;
874 
875 		page_num = _spdk_bs_blobid_to_page(blob->id);
876 		spdk_bit_array_clear(bs->used_md_pages, page_num);
877 	}
878 
879 	/* Move on to unmapping clusters */
880 	_spdk_blob_persist_unmap_clusters(seq, ctx, 0);
881 }
882 
883 static void
884 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
885 {
886 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
887 	struct spdk_blob_data 		*blob = ctx->blob;
888 	struct spdk_blob_store		*bs = blob->bs;
889 	uint64_t			lba;
890 	uint32_t			lba_count;
891 	spdk_bs_batch_t			*batch;
892 	size_t				i;
893 
894 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx);
895 
896 	lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE);
897 
898 	/* This loop starts at 1 because the first page is special and handled
899 	 * below. The pages (except the first) are never written in place,
900 	 * so any pages in the clean list must be zeroed.
901 	 */
902 	for (i = 1; i < blob->clean.num_pages; i++) {
903 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]);
904 
905 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
906 	}
907 
908 	/* The first page will only be zeroed if this is a delete. */
909 	if (blob->active.num_pages == 0) {
910 		uint32_t page_num;
911 
912 		/* The first page in the metadata goes where the blobid indicates */
913 		page_num = _spdk_bs_blobid_to_page(blob->id);
914 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num);
915 
916 		spdk_bs_batch_write_zeroes(batch, lba, lba_count);
917 	}
918 
919 	spdk_bs_batch_close(batch);
920 }
921 
922 static void
923 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
924 {
925 	struct spdk_blob_persist_ctx	*ctx = cb_arg;
926 	struct spdk_blob_data		*blob = ctx->blob;
927 	struct spdk_blob_store		*bs = blob->bs;
928 	uint64_t			lba;
929 	uint32_t			lba_count;
930 	struct spdk_blob_md_page	*page;
931 
932 	if (blob->active.num_pages == 0) {
933 		/* Move on to the next step */
934 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
935 		return;
936 	}
937 
938 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
939 
940 	page = &ctx->pages[0];
941 	/* The first page in the metadata goes where the blobid indicates */
942 	lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id));
943 
944 	spdk_bs_sequence_write(seq, page, lba, lba_count,
945 			       _spdk_blob_persist_zero_pages, ctx);
946 }
947 
948 static void
949 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
950 {
951 	struct spdk_blob_persist_ctx 	*ctx = cb_arg;
952 	struct spdk_blob_data 		*blob = ctx->blob;
953 	struct spdk_blob_store		*bs = blob->bs;
954 	uint64_t 			lba;
955 	uint32_t			lba_count;
956 	struct spdk_blob_md_page	*page;
957 	spdk_bs_batch_t			*batch;
958 	size_t				i;
959 
960 	/* Clusters don't move around in blobs. The list shrinks or grows
961 	 * at the end, but no changes ever occur in the middle of the list.
962 	 */
963 
964 	lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page));
965 
966 	batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx);
967 
968 	/* This starts at 1. The root page is not written until
969 	 * all of the others are finished
970 	 */
971 	for (i = 1; i < blob->active.num_pages; i++) {
972 		page = &ctx->pages[i];
973 		assert(page->sequence_num == i);
974 
975 		lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]);
976 
977 		spdk_bs_batch_write(batch, page, lba, lba_count);
978 	}
979 
980 	spdk_bs_batch_close(batch);
981 }
982 
983 static int
984 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz)
985 {
986 	uint64_t	i;
987 	uint64_t	*tmp;
988 	uint64_t	lfc; /* lowest free cluster */
989 	struct spdk_blob_store *bs;
990 
991 	bs = blob->bs;
992 
993 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
994 	       blob->state != SPDK_BLOB_STATE_SYNCING);
995 
996 	if (blob->active.num_clusters == sz) {
997 		return 0;
998 	}
999 
1000 	if (blob->active.num_clusters < blob->active.cluster_array_size) {
1001 		/* If this blob was resized to be larger, then smaller, then
1002 		 * larger without syncing, then the cluster array already
1003 		 * contains spare assigned clusters we can use.
1004 		 */
1005 		blob->active.num_clusters = spdk_min(blob->active.cluster_array_size,
1006 						     sz);
1007 	}
1008 
1009 	blob->state = SPDK_BLOB_STATE_DIRTY;
1010 
1011 	/* Do two passes - one to verify that we can obtain enough clusters
1012 	 * and another to actually claim them.
1013 	 */
1014 
1015 	lfc = 0;
1016 	for (i = blob->active.num_clusters; i < sz; i++) {
1017 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1018 		if (lfc >= bs->total_clusters) {
1019 			/* No more free clusters. Cannot satisfy the request */
1020 			assert(false);
1021 			return -1;
1022 		}
1023 		lfc++;
1024 	}
1025 
1026 	if (sz > blob->active.num_clusters) {
1027 		/* Expand the cluster array if necessary.
1028 		 * We only shrink the array when persisting.
1029 		 */
1030 		tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz);
1031 		if (sz > 0 && tmp == NULL) {
1032 			assert(false);
1033 			return -1;
1034 		}
1035 		blob->active.clusters = tmp;
1036 		blob->active.cluster_array_size = sz;
1037 	}
1038 
1039 	lfc = 0;
1040 	for (i = blob->active.num_clusters; i < sz; i++) {
1041 		lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc);
1042 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id);
1043 		_spdk_bs_claim_cluster(bs, lfc);
1044 		blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc);
1045 		lfc++;
1046 	}
1047 
1048 	blob->active.num_clusters = sz;
1049 
1050 	return 0;
1051 }
1052 
1053 /* Write a blob to disk */
1054 static void
1055 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob,
1056 		   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1057 {
1058 	struct spdk_blob_persist_ctx *ctx;
1059 	int rc;
1060 	uint64_t i;
1061 	uint32_t page_num;
1062 	struct spdk_blob_store *bs;
1063 
1064 	assert(blob != NULL);
1065 	assert(blob->state == SPDK_BLOB_STATE_CLEAN ||
1066 	       blob->state == SPDK_BLOB_STATE_DIRTY);
1067 
1068 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
1069 		cb_fn(seq, cb_arg, 0);
1070 		return;
1071 	}
1072 
1073 	bs = blob->bs;
1074 
1075 	ctx = calloc(1, sizeof(*ctx));
1076 	if (!ctx) {
1077 		cb_fn(seq, cb_arg, -ENOMEM);
1078 		return;
1079 	}
1080 	ctx->blob = blob;
1081 	ctx->cb_fn = cb_fn;
1082 	ctx->cb_arg = cb_arg;
1083 
1084 	blob->state = SPDK_BLOB_STATE_SYNCING;
1085 
1086 	if (blob->active.num_pages == 0) {
1087 		/* This is the signal that the blob should be deleted.
1088 		 * Immediately jump to the clean up routine. */
1089 		assert(blob->clean.num_pages > 0);
1090 		ctx->idx = blob->clean.num_pages - 1;
1091 		_spdk_blob_persist_zero_pages(seq, ctx, 0);
1092 		return;
1093 
1094 	}
1095 
1096 	/* Generate the new metadata */
1097 	rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages);
1098 	if (rc < 0) {
1099 		free(ctx);
1100 		cb_fn(seq, cb_arg, rc);
1101 		return;
1102 	}
1103 
1104 	assert(blob->active.num_pages >= 1);
1105 
1106 	/* Resize the cache of page indices */
1107 	blob->active.pages = realloc(blob->active.pages,
1108 				     blob->active.num_pages * sizeof(*blob->active.pages));
1109 	if (!blob->active.pages) {
1110 		free(ctx);
1111 		cb_fn(seq, cb_arg, -ENOMEM);
1112 		return;
1113 	}
1114 
1115 	/* Assign this metadata to pages. This requires two passes -
1116 	 * one to verify that there are enough pages and a second
1117 	 * to actually claim them. */
1118 	page_num = 0;
1119 	/* Note that this loop starts at one. The first page location is fixed by the blobid. */
1120 	for (i = 1; i < blob->active.num_pages; i++) {
1121 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1122 		if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) {
1123 			spdk_dma_free(ctx->pages);
1124 			free(ctx);
1125 			blob->state = SPDK_BLOB_STATE_DIRTY;
1126 			cb_fn(seq, cb_arg, -ENOMEM);
1127 			return;
1128 		}
1129 		page_num++;
1130 	}
1131 
1132 	page_num = 0;
1133 	blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id);
1134 	for (i = 1; i < blob->active.num_pages; i++) {
1135 		page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num);
1136 		ctx->pages[i - 1].next = page_num;
1137 		/* Now that previous metadata page is complete, calculate the crc for it. */
1138 		ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1139 		blob->active.pages[i] = page_num;
1140 		spdk_bit_array_set(bs->used_md_pages, page_num);
1141 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id);
1142 		page_num++;
1143 	}
1144 	ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]);
1145 	/* Start writing the metadata from last page to first */
1146 	ctx->idx = blob->active.num_pages - 1;
1147 	_spdk_blob_persist_write_page_chain(seq, ctx, 0);
1148 }
1149 
1150 static void
1151 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1152 			     void *payload, uint64_t offset, uint64_t length,
1153 			     spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type)
1154 {
1155 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1156 	spdk_bs_batch_t			*batch;
1157 	struct spdk_bs_cpl		cpl;
1158 	uint64_t			lba;
1159 	uint32_t			lba_count;
1160 	uint8_t				*buf;
1161 	uint64_t			page;
1162 
1163 	assert(blob != NULL);
1164 
1165 	if (blob->data_ro && op_type != SPDK_BLOB_READ) {
1166 		cb_fn(cb_arg, -EPERM);
1167 		return;
1168 	}
1169 
1170 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1171 		cb_fn(cb_arg, -EINVAL);
1172 		return;
1173 	}
1174 
1175 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1176 	cpl.u.blob_basic.cb_fn = cb_fn;
1177 	cpl.u.blob_basic.cb_arg = cb_arg;
1178 
1179 	batch = spdk_bs_batch_open(_channel, &cpl);
1180 	if (!batch) {
1181 		cb_fn(cb_arg, -ENOMEM);
1182 		return;
1183 	}
1184 
1185 	length = _spdk_bs_page_to_lba(blob->bs, length);
1186 	page = offset;
1187 	buf = payload;
1188 	while (length > 0) {
1189 		lba = _spdk_bs_blob_page_to_lba(blob, page);
1190 		lba_count = spdk_min(length,
1191 				     _spdk_bs_page_to_lba(blob->bs,
1192 						     _spdk_bs_num_pages_to_cluster_boundary(blob, page)));
1193 
1194 		switch (op_type) {
1195 		case SPDK_BLOB_READ:
1196 			spdk_bs_batch_read(batch, buf, lba, lba_count);
1197 			break;
1198 		case SPDK_BLOB_WRITE:
1199 			spdk_bs_batch_write(batch, buf, lba, lba_count);
1200 			break;
1201 		case SPDK_BLOB_UNMAP:
1202 			spdk_bs_batch_unmap(batch, lba, lba_count);
1203 			break;
1204 		case SPDK_BLOB_WRITE_ZEROES:
1205 			spdk_bs_batch_write_zeroes(batch, lba, lba_count);
1206 			break;
1207 		}
1208 
1209 		length -= lba_count;
1210 		page += _spdk_bs_lba_to_page(blob->bs, lba_count);
1211 		if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) {
1212 			buf += _spdk_bs_lba_to_byte(blob->bs, lba_count);
1213 		}
1214 	}
1215 
1216 	spdk_bs_batch_close(batch);
1217 }
1218 
1219 struct rw_iov_ctx {
1220 	struct spdk_blob_data *blob;
1221 	bool read;
1222 	int iovcnt;
1223 	struct iovec *orig_iov;
1224 	uint64_t page_offset;
1225 	uint64_t pages_remaining;
1226 	uint64_t pages_done;
1227 	struct iovec iov[0];
1228 };
1229 
1230 static void
1231 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1232 {
1233 	assert(cb_arg == NULL);
1234 	spdk_bs_sequence_finish(seq, bserrno);
1235 }
1236 
1237 static void
1238 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1239 {
1240 	struct rw_iov_ctx *ctx = cb_arg;
1241 	struct iovec *iov, *orig_iov;
1242 	int iovcnt;
1243 	size_t orig_iovoff;
1244 	uint64_t lba;
1245 	uint64_t page_count, pages_to_boundary;
1246 	uint32_t lba_count;
1247 	uint64_t byte_count;
1248 
1249 	if (bserrno != 0 || ctx->pages_remaining == 0) {
1250 		free(ctx);
1251 		spdk_bs_sequence_finish(seq, bserrno);
1252 		return;
1253 	}
1254 
1255 	pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset);
1256 	page_count = spdk_min(ctx->pages_remaining, pages_to_boundary);
1257 	lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset);
1258 	lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count);
1259 
1260 	/*
1261 	 * Get index and offset into the original iov array for our current position in the I/O sequence.
1262 	 *  byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will
1263 	 *  point to the current position in the I/O sequence.
1264 	 */
1265 	byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page);
1266 	orig_iov = &ctx->orig_iov[0];
1267 	orig_iovoff = 0;
1268 	while (byte_count > 0) {
1269 		if (byte_count >= orig_iov->iov_len) {
1270 			byte_count -= orig_iov->iov_len;
1271 			orig_iov++;
1272 		} else {
1273 			orig_iovoff = byte_count;
1274 			byte_count = 0;
1275 		}
1276 	}
1277 
1278 	/*
1279 	 * Build an iov array for the next I/O in the sequence.  byte_count will keep track of how many
1280 	 *  bytes of this next I/O remain to be accounted for in the new iov array.
1281 	 */
1282 	byte_count = page_count * sizeof(struct spdk_blob_md_page);
1283 	iov = &ctx->iov[0];
1284 	iovcnt = 0;
1285 	while (byte_count > 0) {
1286 		iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff);
1287 		iov->iov_base = orig_iov->iov_base + orig_iovoff;
1288 		byte_count -= iov->iov_len;
1289 		orig_iovoff = 0;
1290 		orig_iov++;
1291 		iov++;
1292 		iovcnt++;
1293 	}
1294 
1295 	ctx->page_offset += page_count;
1296 	ctx->pages_done += page_count;
1297 	ctx->pages_remaining -= page_count;
1298 	iov = &ctx->iov[0];
1299 
1300 	if (ctx->read) {
1301 		spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1302 	} else {
1303 		spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx);
1304 	}
1305 }
1306 
1307 static void
1308 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel,
1309 				 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1310 				 spdk_blob_op_complete cb_fn, void *cb_arg, bool read)
1311 {
1312 	struct spdk_blob_data		*blob = __blob_to_data(_blob);
1313 	spdk_bs_sequence_t		*seq;
1314 	struct spdk_bs_cpl		cpl;
1315 
1316 	assert(blob != NULL);
1317 
1318 	if (!read && blob->data_ro) {
1319 		cb_fn(cb_arg, -EPERM);
1320 		return;
1321 	}
1322 
1323 	if (length == 0) {
1324 		cb_fn(cb_arg, 0);
1325 		return;
1326 	}
1327 
1328 	if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) {
1329 		cb_fn(cb_arg, -EINVAL);
1330 		return;
1331 	}
1332 
1333 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
1334 	cpl.u.blob_basic.cb_fn = cb_fn;
1335 	cpl.u.blob_basic.cb_arg = cb_arg;
1336 
1337 	/*
1338 	 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having
1339 	 *  to split a request that spans a cluster boundary.  For I/O that do not span a cluster boundary,
1340 	 *  there will be no noticeable difference compared to using a batch.  For I/O that do span a cluster
1341 	 *  boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need
1342 	 *  to allocate a separate iov array and split the I/O such that none of the resulting
1343 	 *  smaller I/O cross a cluster boundary.  These smaller I/O will be issued in sequence (not in parallel)
1344 	 *  but since this case happens very infrequently, any performance impact will be negligible.
1345 	 *
1346 	 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs
1347 	 *  for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them
1348 	 *  in a batch.  That would also require creating an intermediate spdk_bs_cpl that would get called
1349 	 *  when the batch was completed, to allow for freeing the memory for the iov arrays.
1350 	 */
1351 	seq = spdk_bs_sequence_start(_channel, &cpl);
1352 	if (!seq) {
1353 		cb_fn(cb_arg, -ENOMEM);
1354 		return;
1355 	}
1356 
1357 	if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) {
1358 		uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset);
1359 		uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length);
1360 
1361 		if (read) {
1362 			spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1363 		} else {
1364 			spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL);
1365 		}
1366 	} else {
1367 		struct rw_iov_ctx *ctx;
1368 
1369 		ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec));
1370 		if (ctx == NULL) {
1371 			spdk_bs_sequence_finish(seq, -ENOMEM);
1372 			return;
1373 		}
1374 
1375 		ctx->blob = blob;
1376 		ctx->read = read;
1377 		ctx->orig_iov = iov;
1378 		ctx->iovcnt = iovcnt;
1379 		ctx->page_offset = offset;
1380 		ctx->pages_remaining = length;
1381 		ctx->pages_done = 0;
1382 
1383 		_spdk_rw_iov_split_next(seq, ctx, 0);
1384 	}
1385 }
1386 
1387 static struct spdk_blob_data *
1388 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid)
1389 {
1390 	struct spdk_blob_data *blob;
1391 
1392 	TAILQ_FOREACH(blob, &bs->blobs, link) {
1393 		if (blob->id == blobid) {
1394 			return blob;
1395 		}
1396 	}
1397 
1398 	return NULL;
1399 }
1400 
1401 static int
1402 _spdk_bs_channel_create(void *io_device, void *ctx_buf)
1403 {
1404 	struct spdk_blob_store		*bs = io_device;
1405 	struct spdk_bs_channel		*channel = ctx_buf;
1406 	struct spdk_bs_dev		*dev;
1407 	uint32_t			max_ops = bs->max_channel_ops;
1408 	uint32_t			i;
1409 
1410 	dev = bs->dev;
1411 
1412 	channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set));
1413 	if (!channel->req_mem) {
1414 		return -1;
1415 	}
1416 
1417 	TAILQ_INIT(&channel->reqs);
1418 
1419 	for (i = 0; i < max_ops; i++) {
1420 		TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link);
1421 	}
1422 
1423 	channel->bs = bs;
1424 	channel->dev = dev;
1425 	channel->dev_channel = dev->create_channel(dev);
1426 
1427 	if (!channel->dev_channel) {
1428 		SPDK_ERRLOG("Failed to create device channel.\n");
1429 		free(channel->req_mem);
1430 		return -1;
1431 	}
1432 
1433 	return 0;
1434 }
1435 
1436 static void
1437 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf)
1438 {
1439 	struct spdk_bs_channel *channel = ctx_buf;
1440 
1441 	free(channel->req_mem);
1442 	channel->dev->destroy_channel(channel->dev, channel->dev_channel);
1443 }
1444 
1445 static void
1446 _spdk_bs_dev_destroy(void *io_device)
1447 {
1448 	struct spdk_blob_store *bs = io_device;
1449 	struct spdk_blob_data	*blob, *blob_tmp;
1450 
1451 	bs->dev->destroy(bs->dev);
1452 
1453 	TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) {
1454 		TAILQ_REMOVE(&bs->blobs, blob, link);
1455 		_spdk_blob_free(blob);
1456 	}
1457 
1458 	spdk_bit_array_free(&bs->used_blobids);
1459 	spdk_bit_array_free(&bs->used_md_pages);
1460 	spdk_bit_array_free(&bs->used_clusters);
1461 	/*
1462 	 * If this function is called for any reason except a successful unload,
1463 	 * the unload_cpl type will be NONE and this will be a nop.
1464 	 */
1465 	spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err);
1466 
1467 	free(bs);
1468 }
1469 
1470 static void
1471 _spdk_bs_free(struct spdk_blob_store *bs)
1472 {
1473 	spdk_bs_unregister_md_thread(bs);
1474 	spdk_io_device_unregister(bs, _spdk_bs_dev_destroy);
1475 }
1476 
1477 void
1478 spdk_bs_opts_init(struct spdk_bs_opts *opts)
1479 {
1480 	opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ;
1481 	opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES;
1482 	opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS;
1483 	opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS;
1484 	memset(&opts->bstype, 0, sizeof(opts->bstype));
1485 }
1486 
1487 static int
1488 _spdk_bs_opts_verify(struct spdk_bs_opts *opts)
1489 {
1490 	if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 ||
1491 	    opts->max_channel_ops == 0) {
1492 		SPDK_ERRLOG("Blobstore options cannot be set to 0\n");
1493 		return -1;
1494 	}
1495 
1496 	return 0;
1497 }
1498 
1499 static struct spdk_blob_store *
1500 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts)
1501 {
1502 	struct spdk_blob_store	*bs;
1503 	uint64_t dev_size;
1504 	int rc;
1505 
1506 	dev_size = dev->blocklen * dev->blockcnt;
1507 	if (dev_size < opts->cluster_sz) {
1508 		/* Device size cannot be smaller than cluster size of blobstore */
1509 		SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size,
1510 			    opts->cluster_sz);
1511 		return NULL;
1512 	}
1513 	if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) {
1514 		/* Cluster size cannot be smaller than page size */
1515 		SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n",
1516 			    opts->cluster_sz, SPDK_BS_PAGE_SIZE);
1517 		return NULL;
1518 	}
1519 	bs = calloc(1, sizeof(struct spdk_blob_store));
1520 	if (!bs) {
1521 		return NULL;
1522 	}
1523 
1524 	TAILQ_INIT(&bs->blobs);
1525 	bs->dev = dev;
1526 
1527 	/*
1528 	 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an
1529 	 *  even multiple of the cluster size.
1530 	 */
1531 	bs->cluster_sz = opts->cluster_sz;
1532 	bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen);
1533 	bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE;
1534 	bs->num_free_clusters = bs->total_clusters;
1535 	bs->used_clusters = spdk_bit_array_create(bs->total_clusters);
1536 	if (bs->used_clusters == NULL) {
1537 		free(bs);
1538 		return NULL;
1539 	}
1540 
1541 	bs->max_channel_ops = opts->max_channel_ops;
1542 	bs->super_blob = SPDK_BLOBID_INVALID;
1543 	memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype));
1544 
1545 	/* The metadata is assumed to be at least 1 page */
1546 	bs->used_md_pages = spdk_bit_array_create(1);
1547 	bs->used_blobids = spdk_bit_array_create(0);
1548 
1549 	spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy,
1550 				sizeof(struct spdk_bs_channel));
1551 	rc = spdk_bs_register_md_thread(bs);
1552 	if (rc == -1) {
1553 		spdk_io_device_unregister(bs, NULL);
1554 		spdk_bit_array_free(&bs->used_blobids);
1555 		spdk_bit_array_free(&bs->used_md_pages);
1556 		spdk_bit_array_free(&bs->used_clusters);
1557 		free(bs);
1558 		return NULL;
1559 	}
1560 
1561 	return bs;
1562 }
1563 
1564 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */
1565 
1566 struct spdk_bs_load_ctx {
1567 	struct spdk_blob_store		*bs;
1568 	struct spdk_bs_super_block	*super;
1569 
1570 	struct spdk_bs_md_mask		*mask;
1571 	bool				in_page_chain;
1572 	uint32_t			page_index;
1573 	uint32_t			cur_page;
1574 	struct spdk_blob_md_page	*page;
1575 	bool				is_load;
1576 };
1577 
1578 static void
1579 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno)
1580 {
1581 	assert(bserrno != 0);
1582 
1583 	spdk_dma_free(ctx->super);
1584 	/*
1585 	 * Only free the blobstore when a load fails.  If an unload fails (for some reason)
1586 	 *  we want to keep the blobstore in case the caller wants to try again.
1587 	 */
1588 	if (ctx->is_load) {
1589 		_spdk_bs_free(ctx->bs);
1590 	}
1591 	free(ctx);
1592 	spdk_bs_sequence_finish(seq, bserrno);
1593 }
1594 
1595 static void
1596 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask)
1597 {
1598 	uint32_t i = 0;
1599 
1600 	while (true) {
1601 		i = spdk_bit_array_find_first_set(array, i);
1602 		if (i >= mask->length) {
1603 			break;
1604 		}
1605 		mask->mask[i / 8] |= 1U << (i % 8);
1606 		i++;
1607 	}
1608 }
1609 
1610 static void
1611 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs,
1612 		     struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
1613 {
1614 	/* Update the values in the super block */
1615 	super->super_blob = bs->super_blob;
1616 	memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype));
1617 	super->crc = _spdk_blob_md_page_calc_crc(super);
1618 	spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0),
1619 			       _spdk_bs_byte_to_lba(bs, sizeof(*super)),
1620 			       cb_fn, cb_arg);
1621 }
1622 
1623 static void
1624 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1625 {
1626 	struct spdk_bs_load_ctx	*ctx = arg;
1627 	uint64_t	mask_size, lba, lba_count;
1628 
1629 	/* Write out the used clusters mask */
1630 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1631 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1632 	if (!ctx->mask) {
1633 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1634 		return;
1635 	}
1636 
1637 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS;
1638 	ctx->mask->length = ctx->bs->total_clusters;
1639 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters));
1640 
1641 	_spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask);
1642 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1643 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1644 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1645 }
1646 
1647 static void
1648 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1649 {
1650 	struct spdk_bs_load_ctx	*ctx = arg;
1651 	uint64_t	mask_size, lba, lba_count;
1652 
1653 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1654 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1655 	if (!ctx->mask) {
1656 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1657 		return;
1658 	}
1659 
1660 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES;
1661 	ctx->mask->length = ctx->super->md_len;
1662 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages));
1663 
1664 	_spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask);
1665 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1666 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1667 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1668 }
1669 
1670 static void
1671 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn)
1672 {
1673 	struct spdk_bs_load_ctx	*ctx = arg;
1674 	uint64_t	mask_size, lba, lba_count;
1675 
1676 	if (ctx->super->used_blobid_mask_len == 0) {
1677 		/*
1678 		 * This is a pre-v3 on-disk format where the blobid mask does not get
1679 		 *  written to disk.
1680 		 */
1681 		cb_fn(seq, arg, 0);
1682 		return;
1683 	}
1684 
1685 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
1686 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1687 	if (!ctx->mask) {
1688 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1689 		return;
1690 	}
1691 
1692 	ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS;
1693 	ctx->mask->length = ctx->super->md_len;
1694 	assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids));
1695 
1696 	_spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask);
1697 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
1698 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
1699 	spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg);
1700 }
1701 
1702 static void
1703 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1704 {
1705 	struct spdk_bs_load_ctx *ctx = cb_arg;
1706 	uint32_t i, j;
1707 	int rc;
1708 
1709 	/* The type must be correct */
1710 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS);
1711 
1712 	/* The length of the mask (in bits) must not be greater than
1713 	 * the length of the buffer (converted to bits) */
1714 	assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8));
1715 
1716 	/* The length of the mask must be exactly equal to the size
1717 	 * (in pages) of the metadata region */
1718 	assert(ctx->mask->length == ctx->super->md_len);
1719 
1720 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length);
1721 	if (rc < 0) {
1722 		spdk_dma_free(ctx->mask);
1723 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1724 		return;
1725 	}
1726 
1727 	for (i = 0; i < ctx->mask->length / 8; i++) {
1728 		uint8_t segment = ctx->mask->mask[i];
1729 		for (j = 0; segment; j++) {
1730 			if (segment & 1U) {
1731 				spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j);
1732 			}
1733 			segment >>= 1U;
1734 		}
1735 	}
1736 
1737 	spdk_dma_free(ctx->super);
1738 	spdk_dma_free(ctx->mask);
1739 	free(ctx);
1740 
1741 	spdk_bs_sequence_finish(seq, bserrno);
1742 }
1743 
1744 static void
1745 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1746 {
1747 	struct spdk_bs_load_ctx *ctx = cb_arg;
1748 	uint64_t		lba, lba_count, mask_size;
1749 	uint32_t		i, j;
1750 	int			rc;
1751 
1752 	/* The type must be correct */
1753 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS);
1754 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1755 	assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof(
1756 					     struct spdk_blob_md_page) * 8));
1757 	/* The length of the mask must be exactly equal to the total number of clusters */
1758 	assert(ctx->mask->length == ctx->bs->total_clusters);
1759 
1760 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
1761 	if (rc < 0) {
1762 		spdk_dma_free(ctx->mask);
1763 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1764 		return;
1765 	}
1766 
1767 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
1768 	for (i = 0; i < ctx->mask->length / 8; i++) {
1769 		uint8_t segment = ctx->mask->mask[i];
1770 		for (j = 0; segment && (j < 8); j++) {
1771 			if (segment & 1U) {
1772 				spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j);
1773 				assert(ctx->bs->num_free_clusters > 0);
1774 				ctx->bs->num_free_clusters--;
1775 			}
1776 			segment >>= 1U;
1777 		}
1778 	}
1779 
1780 	spdk_dma_free(ctx->mask);
1781 
1782 	/* Read the used blobids mask */
1783 	mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE;
1784 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1785 	if (!ctx->mask) {
1786 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1787 		return;
1788 	}
1789 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start);
1790 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len);
1791 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1792 			      _spdk_bs_load_used_blobids_cpl, ctx);
1793 }
1794 
1795 static void
1796 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1797 {
1798 	struct spdk_bs_load_ctx *ctx = cb_arg;
1799 	uint64_t		lba, lba_count, mask_size;
1800 	uint32_t		i, j;
1801 	int			rc;
1802 
1803 	/* The type must be correct */
1804 	assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES);
1805 	/* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */
1806 	assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE *
1807 				     8));
1808 	/* The length of the mask must be exactly equal to the size (in pages) of the metadata region */
1809 	assert(ctx->mask->length == ctx->super->md_len);
1810 
1811 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length);
1812 	if (rc < 0) {
1813 		spdk_dma_free(ctx->mask);
1814 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1815 		return;
1816 	}
1817 
1818 	for (i = 0; i < ctx->mask->length / 8; i++) {
1819 		uint8_t segment = ctx->mask->mask[i];
1820 		for (j = 0; segment && (j < 8); j++) {
1821 			if (segment & 1U) {
1822 				spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j);
1823 			}
1824 			segment >>= 1U;
1825 		}
1826 	}
1827 	spdk_dma_free(ctx->mask);
1828 
1829 	/* Read the used clusters mask */
1830 	mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE;
1831 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1832 	if (!ctx->mask) {
1833 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1834 		return;
1835 	}
1836 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start);
1837 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len);
1838 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1839 			      _spdk_bs_load_used_clusters_cpl, ctx);
1840 }
1841 
1842 static void
1843 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1844 {
1845 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1846 	uint64_t lba, lba_count, mask_size;
1847 
1848 	/* Read the used pages mask */
1849 	mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE;
1850 	ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL);
1851 	if (!ctx->mask) {
1852 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
1853 		return;
1854 	}
1855 
1856 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start);
1857 	lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len);
1858 	spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count,
1859 			      _spdk_bs_load_used_pages_cpl, ctx);
1860 }
1861 
1862 static int
1863 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs)
1864 {
1865 	struct spdk_blob_md_descriptor *desc;
1866 	size_t	cur_desc = 0;
1867 
1868 	desc = (struct spdk_blob_md_descriptor *)page->descriptors;
1869 	while (cur_desc < sizeof(page->descriptors)) {
1870 		if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) {
1871 			if (desc->length == 0) {
1872 				/* If padding and length are 0, this terminates the page */
1873 				break;
1874 			}
1875 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) {
1876 			struct spdk_blob_md_descriptor_extent	*desc_extent;
1877 			unsigned int				i, j;
1878 			unsigned int				cluster_count = 0;
1879 
1880 			desc_extent = (struct spdk_blob_md_descriptor_extent *)desc;
1881 
1882 			for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) {
1883 				for (j = 0; j < desc_extent->extents[i].length; j++) {
1884 					spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j);
1885 					if (bs->num_free_clusters == 0) {
1886 						return -1;
1887 					}
1888 					bs->num_free_clusters--;
1889 					cluster_count++;
1890 				}
1891 			}
1892 			if (cluster_count == 0) {
1893 				return -1;
1894 			}
1895 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) {
1896 			/* Skip this item */
1897 		} else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) {
1898 			/* Skip this item */
1899 		} else {
1900 			/* Error */
1901 			return -1;
1902 		}
1903 		/* Advance to the next descriptor */
1904 		cur_desc += sizeof(*desc) + desc->length;
1905 		if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) {
1906 			break;
1907 		}
1908 		desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc);
1909 	}
1910 	return 0;
1911 }
1912 
1913 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx)
1914 {
1915 	uint32_t crc;
1916 
1917 	crc = _spdk_blob_md_page_calc_crc(ctx->page);
1918 	if (crc != ctx->page->crc) {
1919 		return false;
1920 	}
1921 
1922 	if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) {
1923 		return false;
1924 	}
1925 	return true;
1926 }
1927 
1928 static void
1929 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg);
1930 
1931 static void
1932 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1933 {
1934 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1935 
1936 	spdk_dma_free(ctx->mask);
1937 	spdk_dma_free(ctx->super);
1938 	spdk_bs_sequence_finish(seq, bserrno);
1939 	free(ctx);
1940 }
1941 
1942 static void
1943 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1944 {
1945 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1946 
1947 	spdk_dma_free(ctx->mask);
1948 	ctx->mask = NULL;
1949 
1950 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl);
1951 }
1952 
1953 static void
1954 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1955 {
1956 	struct spdk_bs_load_ctx	*ctx = cb_arg;
1957 
1958 	spdk_dma_free(ctx->mask);
1959 	ctx->mask = NULL;
1960 
1961 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl);
1962 }
1963 
1964 static void
1965 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1966 {
1967 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl);
1968 }
1969 
1970 static void
1971 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
1972 {
1973 	struct spdk_bs_load_ctx *ctx = cb_arg;
1974 	uint32_t page_num;
1975 
1976 	if (bserrno != 0) {
1977 		_spdk_bs_load_ctx_fail(seq, ctx, bserrno);
1978 		return;
1979 	}
1980 
1981 	page_num = ctx->cur_page;
1982 	if (_spdk_bs_load_cur_md_page_valid(ctx) == true) {
1983 		if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) {
1984 			spdk_bit_array_set(ctx->bs->used_md_pages, page_num);
1985 			if (ctx->page->sequence_num == 0) {
1986 				spdk_bit_array_set(ctx->bs->used_blobids, page_num);
1987 			}
1988 			if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) {
1989 				_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
1990 				return;
1991 			}
1992 			if (ctx->page->next != SPDK_INVALID_MD_PAGE) {
1993 				ctx->in_page_chain = true;
1994 				ctx->cur_page = ctx->page->next;
1995 				_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
1996 				return;
1997 			}
1998 		}
1999 	}
2000 
2001 	ctx->in_page_chain = false;
2002 
2003 	do {
2004 		ctx->page_index++;
2005 	} while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true);
2006 
2007 	if (ctx->page_index < ctx->super->md_len) {
2008 		ctx->cur_page = ctx->page_index;
2009 		_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2010 	} else {
2011 		spdk_dma_free(ctx->page);
2012 		_spdk_bs_load_write_used_md(seq, ctx, bserrno);
2013 	}
2014 }
2015 
2016 static void
2017 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg)
2018 {
2019 	struct spdk_bs_load_ctx *ctx = cb_arg;
2020 	uint64_t lba;
2021 
2022 	assert(ctx->cur_page < ctx->super->md_len);
2023 	lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page);
2024 	spdk_bs_sequence_read(seq, ctx->page, lba,
2025 			      _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE),
2026 			      _spdk_bs_load_replay_md_cpl, ctx);
2027 }
2028 
2029 static void
2030 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg)
2031 {
2032 	struct spdk_bs_load_ctx *ctx = cb_arg;
2033 
2034 	ctx->page_index = 0;
2035 	ctx->cur_page = 0;
2036 	ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE,
2037 				     SPDK_BS_PAGE_SIZE,
2038 				     NULL);
2039 	if (!ctx->page) {
2040 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2041 		return;
2042 	}
2043 	_spdk_bs_load_replay_cur_md_page(seq, cb_arg);
2044 }
2045 
2046 static void
2047 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2048 {
2049 	struct spdk_bs_load_ctx *ctx = cb_arg;
2050 	int 		rc;
2051 
2052 	if (bserrno != 0) {
2053 		_spdk_bs_load_ctx_fail(seq, ctx, -EIO);
2054 		return;
2055 	}
2056 
2057 	rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len);
2058 	if (rc < 0) {
2059 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2060 		return;
2061 	}
2062 
2063 	rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len);
2064 	if (rc < 0) {
2065 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2066 		return;
2067 	}
2068 
2069 	rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters);
2070 	if (rc < 0) {
2071 		_spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM);
2072 		return;
2073 	}
2074 
2075 	ctx->bs->num_free_clusters = ctx->bs->total_clusters;
2076 	_spdk_bs_load_replay_md(seq, cb_arg);
2077 }
2078 
2079 static void
2080 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2081 {
2082 	struct spdk_bs_load_ctx *ctx = cb_arg;
2083 	uint32_t	crc;
2084 	static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH];
2085 
2086 	if (ctx->super->version > SPDK_BS_VERSION ||
2087 	    ctx->super->version < SPDK_BS_INITIAL_VERSION) {
2088 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2089 		return;
2090 	}
2091 
2092 	if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2093 		   sizeof(ctx->super->signature)) != 0) {
2094 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2095 		return;
2096 	}
2097 
2098 	crc = _spdk_blob_md_page_calc_crc(ctx->super);
2099 	if (crc != ctx->super->crc) {
2100 		_spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ);
2101 		return;
2102 	}
2103 
2104 	if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2105 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n");
2106 	} else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) {
2107 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n");
2108 	} else {
2109 		SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n");
2110 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2111 		SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH);
2112 		_spdk_bs_load_ctx_fail(seq, ctx, -ENXIO);
2113 		return;
2114 	}
2115 
2116 	/* Parse the super block */
2117 	ctx->bs->cluster_sz = ctx->super->cluster_size;
2118 	ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen);
2119 	ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE;
2120 	ctx->bs->md_start = ctx->super->md_start;
2121 	ctx->bs->md_len = ctx->super->md_len;
2122 	ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up(
2123 					       ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster);
2124 	ctx->bs->super_blob = ctx->super->super_blob;
2125 	memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype));
2126 
2127 	if (ctx->super->clean == 0) {
2128 		_spdk_bs_recover(seq, ctx, 0);
2129 	} else if (ctx->super->used_blobid_mask_len == 0) {
2130 		/*
2131 		 * Metadata is clean, but this is an old metadata format without
2132 		 *  a blobid mask.  Clear the clean bit and then build the masks
2133 		 *  using _spdk_bs_recover.
2134 		 */
2135 		ctx->super->clean = 0;
2136 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx);
2137 	} else {
2138 		ctx->super->clean = 0;
2139 		_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx);
2140 	}
2141 }
2142 
2143 void
2144 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2145 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2146 {
2147 	struct spdk_blob_store	*bs;
2148 	struct spdk_bs_cpl	cpl;
2149 	spdk_bs_sequence_t	*seq;
2150 	struct spdk_bs_load_ctx *ctx;
2151 	struct spdk_bs_opts	opts = {};
2152 
2153 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev);
2154 
2155 	if (o) {
2156 		opts = *o;
2157 	} else {
2158 		spdk_bs_opts_init(&opts);
2159 	}
2160 
2161 	if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) {
2162 		cb_fn(cb_arg, NULL, -EINVAL);
2163 		return;
2164 	}
2165 
2166 	bs = _spdk_bs_alloc(dev, &opts);
2167 	if (!bs) {
2168 		cb_fn(cb_arg, NULL, -ENOMEM);
2169 		return;
2170 	}
2171 
2172 	ctx = calloc(1, sizeof(*ctx));
2173 	if (!ctx) {
2174 		_spdk_bs_free(bs);
2175 		cb_fn(cb_arg, NULL, -ENOMEM);
2176 		return;
2177 	}
2178 
2179 	ctx->bs = bs;
2180 	ctx->is_load = true;
2181 
2182 	/* Allocate memory for the super block */
2183 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2184 	if (!ctx->super) {
2185 		free(ctx);
2186 		_spdk_bs_free(bs);
2187 		return;
2188 	}
2189 
2190 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2191 	cpl.u.bs_handle.cb_fn = cb_fn;
2192 	cpl.u.bs_handle.cb_arg = cb_arg;
2193 	cpl.u.bs_handle.bs = bs;
2194 
2195 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2196 	if (!seq) {
2197 		spdk_dma_free(ctx->super);
2198 		free(ctx);
2199 		_spdk_bs_free(bs);
2200 		cb_fn(cb_arg, NULL, -ENOMEM);
2201 		return;
2202 	}
2203 
2204 	/* Read the super block */
2205 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2206 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2207 			      _spdk_bs_load_super_cpl, ctx);
2208 }
2209 
2210 /* END spdk_bs_load */
2211 
2212 /* START spdk_bs_init */
2213 
2214 struct spdk_bs_init_ctx {
2215 	struct spdk_blob_store		*bs;
2216 	struct spdk_bs_super_block	*super;
2217 };
2218 
2219 static void
2220 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2221 {
2222 	struct spdk_bs_init_ctx *ctx = cb_arg;
2223 
2224 	spdk_dma_free(ctx->super);
2225 	free(ctx);
2226 
2227 	spdk_bs_sequence_finish(seq, bserrno);
2228 }
2229 
2230 static void
2231 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2232 {
2233 	struct spdk_bs_init_ctx *ctx = cb_arg;
2234 
2235 	/* Write super block */
2236 	spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0),
2237 			       _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)),
2238 			       _spdk_bs_init_persist_super_cpl, ctx);
2239 }
2240 
2241 void
2242 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o,
2243 	     spdk_bs_op_with_handle_complete cb_fn, void *cb_arg)
2244 {
2245 	struct spdk_bs_init_ctx *ctx;
2246 	struct spdk_blob_store	*bs;
2247 	struct spdk_bs_cpl	cpl;
2248 	spdk_bs_sequence_t	*seq;
2249 	spdk_bs_batch_t		*batch;
2250 	uint64_t		num_md_lba;
2251 	uint64_t		num_md_pages;
2252 	uint64_t		num_md_clusters;
2253 	uint32_t		i;
2254 	struct spdk_bs_opts	opts = {};
2255 	int			rc;
2256 
2257 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev);
2258 
2259 	if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) {
2260 		SPDK_ERRLOG("unsupported dev block length of %d\n",
2261 			    dev->blocklen);
2262 		dev->destroy(dev);
2263 		cb_fn(cb_arg, NULL, -EINVAL);
2264 		return;
2265 	}
2266 
2267 	if (o) {
2268 		opts = *o;
2269 	} else {
2270 		spdk_bs_opts_init(&opts);
2271 	}
2272 
2273 	if (_spdk_bs_opts_verify(&opts) != 0) {
2274 		dev->destroy(dev);
2275 		cb_fn(cb_arg, NULL, -EINVAL);
2276 		return;
2277 	}
2278 
2279 	bs = _spdk_bs_alloc(dev, &opts);
2280 	if (!bs) {
2281 		dev->destroy(dev);
2282 		cb_fn(cb_arg, NULL, -ENOMEM);
2283 		return;
2284 	}
2285 
2286 	if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) {
2287 		/* By default, allocate 1 page per cluster.
2288 		 * Technically, this over-allocates metadata
2289 		 * because more metadata will reduce the number
2290 		 * of usable clusters. This can be addressed with
2291 		 * more complex math in the future.
2292 		 */
2293 		bs->md_len = bs->total_clusters;
2294 	} else {
2295 		bs->md_len = opts.num_md_pages;
2296 	}
2297 
2298 	rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len);
2299 	if (rc < 0) {
2300 		_spdk_bs_free(bs);
2301 		cb_fn(cb_arg, NULL, -ENOMEM);
2302 		return;
2303 	}
2304 
2305 	rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len);
2306 	if (rc < 0) {
2307 		_spdk_bs_free(bs);
2308 		cb_fn(cb_arg, NULL, -ENOMEM);
2309 		return;
2310 	}
2311 
2312 	ctx = calloc(1, sizeof(*ctx));
2313 	if (!ctx) {
2314 		_spdk_bs_free(bs);
2315 		cb_fn(cb_arg, NULL, -ENOMEM);
2316 		return;
2317 	}
2318 
2319 	ctx->bs = bs;
2320 
2321 	/* Allocate memory for the super block */
2322 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2323 	if (!ctx->super) {
2324 		free(ctx);
2325 		_spdk_bs_free(bs);
2326 		return;
2327 	}
2328 	memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG,
2329 	       sizeof(ctx->super->signature));
2330 	ctx->super->version = SPDK_BS_VERSION;
2331 	ctx->super->length = sizeof(*ctx->super);
2332 	ctx->super->super_blob = bs->super_blob;
2333 	ctx->super->clean = 0;
2334 	ctx->super->cluster_size = bs->cluster_sz;
2335 	memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype));
2336 
2337 	/* Calculate how many pages the metadata consumes at the front
2338 	 * of the disk.
2339 	 */
2340 
2341 	/* The super block uses 1 page */
2342 	num_md_pages = 1;
2343 
2344 	/* The used_md_pages mask requires 1 bit per metadata page, rounded
2345 	 * up to the nearest page, plus a header.
2346 	 */
2347 	ctx->super->used_page_mask_start = num_md_pages;
2348 	ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2349 					 divide_round_up(bs->md_len, 8),
2350 					 SPDK_BS_PAGE_SIZE);
2351 	num_md_pages += ctx->super->used_page_mask_len;
2352 
2353 	/* The used_clusters mask requires 1 bit per cluster, rounded
2354 	 * up to the nearest page, plus a header.
2355 	 */
2356 	ctx->super->used_cluster_mask_start = num_md_pages;
2357 	ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2358 					    divide_round_up(bs->total_clusters, 8),
2359 					    SPDK_BS_PAGE_SIZE);
2360 	num_md_pages += ctx->super->used_cluster_mask_len;
2361 
2362 	/* The used_blobids mask requires 1 bit per metadata page, rounded
2363 	 * up to the nearest page, plus a header.
2364 	 */
2365 	ctx->super->used_blobid_mask_start = num_md_pages;
2366 	ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) +
2367 					   divide_round_up(bs->md_len, 8),
2368 					   SPDK_BS_PAGE_SIZE);
2369 	num_md_pages += ctx->super->used_blobid_mask_len;
2370 
2371 	/* The metadata region size was chosen above */
2372 	ctx->super->md_start = bs->md_start = num_md_pages;
2373 	ctx->super->md_len = bs->md_len;
2374 	num_md_pages += bs->md_len;
2375 
2376 	num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages);
2377 
2378 	ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super);
2379 
2380 	num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster);
2381 	if (num_md_clusters > bs->total_clusters) {
2382 		SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, "
2383 			    "please decrease number of pages reserved for metadata "
2384 			    "or increase cluster size.\n");
2385 		spdk_dma_free(ctx->super);
2386 		free(ctx);
2387 		_spdk_bs_free(bs);
2388 		cb_fn(cb_arg, NULL, -ENOMEM);
2389 		return;
2390 	}
2391 	/* Claim all of the clusters used by the metadata */
2392 	for (i = 0; i < num_md_clusters; i++) {
2393 		_spdk_bs_claim_cluster(bs, i);
2394 	}
2395 
2396 	bs->total_data_clusters = bs->num_free_clusters;
2397 
2398 	cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE;
2399 	cpl.u.bs_handle.cb_fn = cb_fn;
2400 	cpl.u.bs_handle.cb_arg = cb_arg;
2401 	cpl.u.bs_handle.bs = bs;
2402 
2403 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2404 	if (!seq) {
2405 		spdk_dma_free(ctx->super);
2406 		free(ctx);
2407 		_spdk_bs_free(bs);
2408 		cb_fn(cb_arg, NULL, -ENOMEM);
2409 		return;
2410 	}
2411 
2412 	batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx);
2413 
2414 	/* Clear metadata space */
2415 	spdk_bs_batch_write_zeroes(batch, 0, num_md_lba);
2416 	/* Trim data clusters */
2417 	spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba);
2418 
2419 	spdk_bs_batch_close(batch);
2420 }
2421 
2422 /* END spdk_bs_init */
2423 
2424 /* START spdk_bs_destroy */
2425 
2426 static void
2427 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2428 {
2429 	struct spdk_bs_init_ctx *ctx = cb_arg;
2430 	struct spdk_blob_store *bs = ctx->bs;
2431 
2432 	/*
2433 	 * We need to defer calling spdk_bs_call_cpl() until after
2434 	 * dev destruction, so tuck these away for later use.
2435 	 */
2436 	bs->unload_err = bserrno;
2437 	memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2438 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2439 
2440 	spdk_bs_sequence_finish(seq, bserrno);
2441 
2442 	_spdk_bs_free(bs);
2443 	free(ctx);
2444 }
2445 
2446 void
2447 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn,
2448 		void *cb_arg)
2449 {
2450 	struct spdk_bs_cpl	cpl;
2451 	spdk_bs_sequence_t	*seq;
2452 	struct spdk_bs_init_ctx *ctx;
2453 
2454 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n");
2455 
2456 	if (!TAILQ_EMPTY(&bs->blobs)) {
2457 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2458 		cb_fn(cb_arg, -EBUSY);
2459 		return;
2460 	}
2461 
2462 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2463 	cpl.u.bs_basic.cb_fn = cb_fn;
2464 	cpl.u.bs_basic.cb_arg = cb_arg;
2465 
2466 	ctx = calloc(1, sizeof(*ctx));
2467 	if (!ctx) {
2468 		cb_fn(cb_arg, -ENOMEM);
2469 		return;
2470 	}
2471 
2472 	ctx->bs = bs;
2473 
2474 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2475 	if (!seq) {
2476 		free(ctx);
2477 		cb_fn(cb_arg, -ENOMEM);
2478 		return;
2479 	}
2480 
2481 	/* Write zeroes to the super block */
2482 	spdk_bs_sequence_write_zeroes(seq,
2483 				      _spdk_bs_page_to_lba(bs, 0),
2484 				      _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)),
2485 				      _spdk_bs_destroy_trim_cpl, ctx);
2486 }
2487 
2488 /* END spdk_bs_destroy */
2489 
2490 /* START spdk_bs_unload */
2491 
2492 static void
2493 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2494 {
2495 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2496 
2497 	spdk_dma_free(ctx->super);
2498 
2499 	/*
2500 	 * We need to defer calling spdk_bs_call_cpl() until after
2501 	 * dev destuction, so tuck these away for later use.
2502 	 */
2503 	ctx->bs->unload_err = bserrno;
2504 	memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl));
2505 	seq->cpl.type = SPDK_BS_CPL_TYPE_NONE;
2506 
2507 	spdk_bs_sequence_finish(seq, bserrno);
2508 
2509 	_spdk_bs_free(ctx->bs);
2510 	free(ctx);
2511 }
2512 
2513 static void
2514 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2515 {
2516 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2517 
2518 	spdk_dma_free(ctx->mask);
2519 	ctx->super->clean = 1;
2520 
2521 	_spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx);
2522 }
2523 
2524 static void
2525 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2526 {
2527 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2528 
2529 	spdk_dma_free(ctx->mask);
2530 	ctx->mask = NULL;
2531 
2532 	_spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl);
2533 }
2534 
2535 static void
2536 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2537 {
2538 	struct spdk_bs_load_ctx	*ctx = cb_arg;
2539 
2540 	spdk_dma_free(ctx->mask);
2541 	ctx->mask = NULL;
2542 
2543 	_spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl);
2544 }
2545 
2546 static void
2547 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2548 {
2549 	_spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl);
2550 }
2551 
2552 void
2553 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg)
2554 {
2555 	struct spdk_bs_cpl	cpl;
2556 	spdk_bs_sequence_t	*seq;
2557 	struct spdk_bs_load_ctx *ctx;
2558 
2559 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n");
2560 
2561 	if (!TAILQ_EMPTY(&bs->blobs)) {
2562 		SPDK_ERRLOG("Blobstore still has open blobs\n");
2563 		cb_fn(cb_arg, -EBUSY);
2564 		return;
2565 	}
2566 
2567 	ctx = calloc(1, sizeof(*ctx));
2568 	if (!ctx) {
2569 		cb_fn(cb_arg, -ENOMEM);
2570 		return;
2571 	}
2572 
2573 	ctx->bs = bs;
2574 	ctx->is_load = false;
2575 
2576 	ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL);
2577 	if (!ctx->super) {
2578 		free(ctx);
2579 		cb_fn(cb_arg, -ENOMEM);
2580 		return;
2581 	}
2582 
2583 	cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC;
2584 	cpl.u.bs_basic.cb_fn = cb_fn;
2585 	cpl.u.bs_basic.cb_arg = cb_arg;
2586 
2587 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2588 	if (!seq) {
2589 		spdk_dma_free(ctx->super);
2590 		free(ctx);
2591 		cb_fn(cb_arg, -ENOMEM);
2592 		return;
2593 	}
2594 
2595 	/* Read super block */
2596 	spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0),
2597 			      _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)),
2598 			      _spdk_bs_unload_read_super_cpl, ctx);
2599 }
2600 
2601 /* END spdk_bs_unload */
2602 
2603 void
2604 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid,
2605 		  spdk_bs_op_complete cb_fn, void *cb_arg)
2606 {
2607 	bs->super_blob = blobid;
2608 	cb_fn(cb_arg, 0);
2609 }
2610 
2611 void
2612 spdk_bs_get_super(struct spdk_blob_store *bs,
2613 		  spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2614 {
2615 	if (bs->super_blob == SPDK_BLOBID_INVALID) {
2616 		cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT);
2617 	} else {
2618 		cb_fn(cb_arg, bs->super_blob, 0);
2619 	}
2620 }
2621 
2622 uint64_t
2623 spdk_bs_get_cluster_size(struct spdk_blob_store *bs)
2624 {
2625 	return bs->cluster_sz;
2626 }
2627 
2628 uint64_t
2629 spdk_bs_get_page_size(struct spdk_blob_store *bs)
2630 {
2631 	return SPDK_BS_PAGE_SIZE;
2632 }
2633 
2634 uint64_t
2635 spdk_bs_free_cluster_count(struct spdk_blob_store *bs)
2636 {
2637 	return bs->num_free_clusters;
2638 }
2639 
2640 uint64_t
2641 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs)
2642 {
2643 	return bs->total_data_clusters;
2644 }
2645 
2646 static int
2647 spdk_bs_register_md_thread(struct spdk_blob_store *bs)
2648 {
2649 	bs->md_channel = spdk_get_io_channel(bs);
2650 	if (!bs->md_channel) {
2651 		SPDK_ERRLOG("Failed to get IO channel.\n");
2652 		return -1;
2653 	}
2654 
2655 	return 0;
2656 }
2657 
2658 static int
2659 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs)
2660 {
2661 	spdk_put_io_channel(bs->md_channel);
2662 
2663 	return 0;
2664 }
2665 
2666 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob)
2667 {
2668 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2669 
2670 	assert(blob != NULL);
2671 
2672 	return blob->id;
2673 }
2674 
2675 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob)
2676 {
2677 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2678 
2679 	assert(blob != NULL);
2680 
2681 	return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters);
2682 }
2683 
2684 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob)
2685 {
2686 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2687 
2688 	assert(blob != NULL);
2689 
2690 	return blob->active.num_clusters;
2691 }
2692 
2693 /* START spdk_bs_create_blob */
2694 
2695 static void
2696 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2697 {
2698 	struct spdk_blob_data *blob = cb_arg;
2699 
2700 	_spdk_blob_free(blob);
2701 
2702 	spdk_bs_sequence_finish(seq, bserrno);
2703 }
2704 
2705 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts,
2706 			     spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2707 {
2708 	struct spdk_blob_data	*blob;
2709 	uint32_t		page_idx;
2710 	struct spdk_bs_cpl 	cpl;
2711 	struct spdk_blob_opts	opts_default;
2712 	spdk_bs_sequence_t	*seq;
2713 	spdk_blob_id		id;
2714 
2715 	page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0);
2716 	if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) {
2717 		cb_fn(cb_arg, 0, -ENOMEM);
2718 		return;
2719 	}
2720 	spdk_bit_array_set(bs->used_blobids, page_idx);
2721 	spdk_bit_array_set(bs->used_md_pages, page_idx);
2722 
2723 	id = _spdk_bs_page_to_blobid(page_idx);
2724 
2725 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx);
2726 
2727 	blob = _spdk_blob_alloc(bs, id);
2728 	if (!blob) {
2729 		cb_fn(cb_arg, 0, -ENOMEM);
2730 		return;
2731 	}
2732 
2733 	if (!opts) {
2734 		spdk_blob_opts_init(&opts_default);
2735 		opts = &opts_default;
2736 	}
2737 
2738 	spdk_blob_resize(__data_to_blob(blob), opts->num_clusters);
2739 	cpl.type = SPDK_BS_CPL_TYPE_BLOBID;
2740 	cpl.u.blobid.cb_fn = cb_fn;
2741 	cpl.u.blobid.cb_arg = cb_arg;
2742 	cpl.u.blobid.blobid = blob->id;
2743 
2744 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2745 	if (!seq) {
2746 		_spdk_blob_free(blob);
2747 		cb_fn(cb_arg, 0, -ENOMEM);
2748 		return;
2749 	}
2750 
2751 	_spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob);
2752 }
2753 
2754 void spdk_bs_create_blob(struct spdk_blob_store *bs,
2755 			 spdk_blob_op_with_id_complete cb_fn, void *cb_arg)
2756 {
2757 	spdk_bs_create_blob_ext(bs, NULL, cb_fn, cb_arg);
2758 }
2759 
2760 /* END spdk_bs_create_blob */
2761 
2762 /* START spdk_blob_resize */
2763 int
2764 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz)
2765 {
2766 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2767 	int			rc;
2768 
2769 	assert(blob != NULL);
2770 
2771 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz);
2772 
2773 	if (blob->md_ro) {
2774 		return -EPERM;
2775 	}
2776 
2777 	if (sz == blob->active.num_clusters) {
2778 		return 0;
2779 	}
2780 
2781 	rc = _spdk_resize_blob(blob, sz);
2782 	if (rc < 0) {
2783 		return rc;
2784 	}
2785 
2786 	return 0;
2787 }
2788 
2789 /* END spdk_blob_resize */
2790 
2791 
2792 /* START spdk_bs_delete_blob */
2793 
2794 static void
2795 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno)
2796 {
2797 	spdk_bs_sequence_t *seq = cb_arg;
2798 
2799 	spdk_bs_sequence_finish(seq, bserrno);
2800 }
2801 
2802 static void
2803 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2804 {
2805 	struct spdk_blob *_blob = cb_arg;
2806 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2807 
2808 	if (bserrno != 0) {
2809 		/*
2810 		 * We already removed this blob from the blobstore tailq, so
2811 		 *  we need to free it here since this is the last reference
2812 		 *  to it.
2813 		 */
2814 		_spdk_blob_free(blob);
2815 		_spdk_bs_delete_close_cpl(seq, bserrno);
2816 		return;
2817 	}
2818 
2819 	/*
2820 	 * This will immediately decrement the ref_count and call
2821 	 *  the completion routine since the metadata state is clean.
2822 	 *  By calling spdk_blob_close, we reduce the number of call
2823 	 *  points into code that touches the blob->open_ref count
2824 	 *  and the blobstore's blob list.
2825 	 */
2826 	spdk_blob_close(_blob, _spdk_bs_delete_close_cpl, seq);
2827 }
2828 
2829 static void
2830 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
2831 {
2832 	spdk_bs_sequence_t *seq = cb_arg;
2833 	struct spdk_blob_data *blob = __blob_to_data(_blob);
2834 	uint32_t page_num;
2835 
2836 	if (bserrno != 0) {
2837 		spdk_bs_sequence_finish(seq, bserrno);
2838 		return;
2839 	}
2840 
2841 	if (blob->open_ref > 1) {
2842 		/*
2843 		 * Someone has this blob open (besides this delete context).
2844 		 *  Decrement the ref count directly and return -EBUSY.
2845 		 */
2846 		blob->open_ref--;
2847 		spdk_bs_sequence_finish(seq, -EBUSY);
2848 		return;
2849 	}
2850 
2851 	/*
2852 	 * Remove the blob from the blob_store list now, to ensure it does not
2853 	 *  get returned after this point by _spdk_blob_lookup().
2854 	 */
2855 	TAILQ_REMOVE(&blob->bs->blobs, blob, link);
2856 	page_num = _spdk_bs_blobid_to_page(blob->id);
2857 	spdk_bit_array_clear(blob->bs->used_blobids, page_num);
2858 	blob->state = SPDK_BLOB_STATE_DIRTY;
2859 	blob->active.num_pages = 0;
2860 	_spdk_resize_blob(blob, 0);
2861 
2862 	_spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, _blob);
2863 }
2864 
2865 void
2866 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2867 		    spdk_blob_op_complete cb_fn, void *cb_arg)
2868 {
2869 	struct spdk_bs_cpl	cpl;
2870 	spdk_bs_sequence_t 	*seq;
2871 
2872 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid);
2873 
2874 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2875 	cpl.u.blob_basic.cb_fn = cb_fn;
2876 	cpl.u.blob_basic.cb_arg = cb_arg;
2877 
2878 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2879 	if (!seq) {
2880 		cb_fn(cb_arg, -ENOMEM);
2881 		return;
2882 	}
2883 
2884 	spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq);
2885 }
2886 
2887 /* END spdk_bs_delete_blob */
2888 
2889 /* START spdk_bs_open_blob */
2890 
2891 static void
2892 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2893 {
2894 	struct spdk_blob_data *blob = cb_arg;
2895 
2896 	/* If the blob have crc error, we just return NULL. */
2897 	if (blob == NULL) {
2898 		seq->cpl.u.blob_handle.blob = NULL;
2899 		spdk_bs_sequence_finish(seq, bserrno);
2900 		return;
2901 	}
2902 
2903 	blob->open_ref++;
2904 
2905 	TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link);
2906 
2907 	spdk_bs_sequence_finish(seq, bserrno);
2908 }
2909 
2910 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid,
2911 		       spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
2912 {
2913 	struct spdk_blob_data		*blob;
2914 	struct spdk_bs_cpl		cpl;
2915 	spdk_bs_sequence_t		*seq;
2916 	uint32_t			page_num;
2917 
2918 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid);
2919 
2920 	page_num = _spdk_bs_blobid_to_page(blobid);
2921 	if (spdk_bit_array_get(bs->used_blobids, page_num) == false) {
2922 		/* Invalid blobid */
2923 		cb_fn(cb_arg, NULL, -ENOENT);
2924 		return;
2925 	}
2926 
2927 	blob = _spdk_blob_lookup(bs, blobid);
2928 	if (blob) {
2929 		blob->open_ref++;
2930 		cb_fn(cb_arg, __data_to_blob(blob), 0);
2931 		return;
2932 	}
2933 
2934 	blob = _spdk_blob_alloc(bs, blobid);
2935 	if (!blob) {
2936 		cb_fn(cb_arg, NULL, -ENOMEM);
2937 		return;
2938 	}
2939 
2940 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE;
2941 	cpl.u.blob_handle.cb_fn = cb_fn;
2942 	cpl.u.blob_handle.cb_arg = cb_arg;
2943 	cpl.u.blob_handle.blob = __data_to_blob(blob);
2944 
2945 	seq = spdk_bs_sequence_start(bs->md_channel, &cpl);
2946 	if (!seq) {
2947 		_spdk_blob_free(blob);
2948 		cb_fn(cb_arg, NULL, -ENOMEM);
2949 		return;
2950 	}
2951 
2952 	_spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob);
2953 }
2954 
2955 /* END spdk_bs_open_blob */
2956 
2957 /* START spdk_blob_sync_md */
2958 
2959 static void
2960 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
2961 {
2962 	spdk_bs_sequence_finish(seq, bserrno);
2963 }
2964 
2965 void
2966 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg)
2967 {
2968 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
2969 	struct spdk_bs_cpl	cpl;
2970 	spdk_bs_sequence_t	*seq;
2971 
2972 	assert(blob != NULL);
2973 
2974 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id);
2975 
2976 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
2977 	       blob->state != SPDK_BLOB_STATE_SYNCING);
2978 
2979 	if (blob->md_ro) {
2980 		assert(blob->state == SPDK_BLOB_STATE_CLEAN);
2981 		return;
2982 	}
2983 
2984 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
2985 		cb_fn(cb_arg, 0);
2986 		return;
2987 	}
2988 
2989 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
2990 	cpl.u.blob_basic.cb_fn = cb_fn;
2991 	cpl.u.blob_basic.cb_arg = cb_arg;
2992 
2993 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
2994 	if (!seq) {
2995 		cb_fn(cb_arg, -ENOMEM);
2996 		return;
2997 	}
2998 
2999 	_spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob);
3000 }
3001 
3002 /* END spdk_blob_sync_md */
3003 
3004 /* START spdk_blob_close */
3005 
3006 static void
3007 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno)
3008 {
3009 	struct spdk_blob_data *blob = cb_arg;
3010 
3011 	if (bserrno == 0) {
3012 		blob->open_ref--;
3013 		if (blob->open_ref == 0) {
3014 			/*
3015 			 * Blobs with active.num_pages == 0 are deleted blobs.
3016 			 *  these blobs are removed from the blob_store list
3017 			 *  when the deletion process starts - so don't try to
3018 			 *  remove them again.
3019 			 */
3020 			if (blob->active.num_pages > 0) {
3021 				TAILQ_REMOVE(&blob->bs->blobs, blob, link);
3022 			}
3023 			_spdk_blob_free(blob);
3024 		}
3025 	}
3026 
3027 	spdk_bs_sequence_finish(seq, bserrno);
3028 }
3029 
3030 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg)
3031 {
3032 	struct spdk_bs_cpl	cpl;
3033 	struct spdk_blob_data	*blob;
3034 	spdk_bs_sequence_t	*seq;
3035 
3036 	assert(b != NULL);
3037 	blob = __blob_to_data(b);
3038 	assert(blob != NULL);
3039 
3040 	SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id);
3041 
3042 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3043 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3044 
3045 	if (blob->open_ref == 0) {
3046 		cb_fn(cb_arg, -EBADF);
3047 		return;
3048 	}
3049 
3050 	cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
3051 	cpl.u.blob_basic.cb_fn = cb_fn;
3052 	cpl.u.blob_basic.cb_arg = cb_arg;
3053 
3054 	seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl);
3055 	if (!seq) {
3056 		cb_fn(cb_arg, -ENOMEM);
3057 		return;
3058 	}
3059 
3060 	if (blob->state == SPDK_BLOB_STATE_CLEAN) {
3061 		_spdk_blob_close_cpl(seq, blob, 0);
3062 		return;
3063 	}
3064 
3065 	/* Sync metadata */
3066 	_spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob);
3067 }
3068 
3069 /* END spdk_blob_close */
3070 
3071 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs)
3072 {
3073 	return spdk_get_io_channel(bs);
3074 }
3075 
3076 void spdk_bs_free_io_channel(struct spdk_io_channel *channel)
3077 {
3078 	spdk_put_io_channel(channel);
3079 }
3080 
3081 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3082 			   uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3083 {
3084 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3085 				     SPDK_BLOB_UNMAP);
3086 }
3087 
3088 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3089 				  uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg)
3090 {
3091 	_spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg,
3092 				     SPDK_BLOB_WRITE_ZEROES);
3093 }
3094 
3095 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3096 			   void *payload, uint64_t offset, uint64_t length,
3097 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3098 {
3099 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3100 				     SPDK_BLOB_WRITE);
3101 }
3102 
3103 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3104 			  void *payload, uint64_t offset, uint64_t length,
3105 			  spdk_blob_op_complete cb_fn, void *cb_arg)
3106 {
3107 	_spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg,
3108 				     SPDK_BLOB_READ);
3109 }
3110 
3111 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3112 			    struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3113 			    spdk_blob_op_complete cb_fn, void *cb_arg)
3114 {
3115 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false);
3116 }
3117 
3118 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel,
3119 			   struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
3120 			   spdk_blob_op_complete cb_fn, void *cb_arg)
3121 {
3122 	_spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true);
3123 }
3124 
3125 struct spdk_bs_iter_ctx {
3126 	int64_t page_num;
3127 	struct spdk_blob_store *bs;
3128 
3129 	spdk_blob_op_with_handle_complete cb_fn;
3130 	void *cb_arg;
3131 };
3132 
3133 static void
3134 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno)
3135 {
3136 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3137 	struct spdk_blob_store *bs = ctx->bs;
3138 	spdk_blob_id id;
3139 
3140 	if (bserrno == 0) {
3141 		ctx->cb_fn(ctx->cb_arg, _blob, bserrno);
3142 		free(ctx);
3143 		return;
3144 	}
3145 
3146 	ctx->page_num++;
3147 	ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num);
3148 	if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) {
3149 		ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT);
3150 		free(ctx);
3151 		return;
3152 	}
3153 
3154 	id = _spdk_bs_page_to_blobid(ctx->page_num);
3155 
3156 	spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx);
3157 }
3158 
3159 void
3160 spdk_bs_iter_first(struct spdk_blob_store *bs,
3161 		   spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3162 {
3163 	struct spdk_bs_iter_ctx *ctx;
3164 
3165 	ctx = calloc(1, sizeof(*ctx));
3166 	if (!ctx) {
3167 		cb_fn(cb_arg, NULL, -ENOMEM);
3168 		return;
3169 	}
3170 
3171 	ctx->page_num = -1;
3172 	ctx->bs = bs;
3173 	ctx->cb_fn = cb_fn;
3174 	ctx->cb_arg = cb_arg;
3175 
3176 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3177 }
3178 
3179 static void
3180 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno)
3181 {
3182 	struct spdk_bs_iter_ctx *ctx = cb_arg;
3183 
3184 	_spdk_bs_iter_cpl(ctx, NULL, -1);
3185 }
3186 
3187 void
3188 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *b,
3189 		  spdk_blob_op_with_handle_complete cb_fn, void *cb_arg)
3190 {
3191 	struct spdk_bs_iter_ctx *ctx;
3192 	struct spdk_blob_data	*blob;
3193 
3194 	assert(b != NULL);
3195 	blob = __blob_to_data(b);
3196 	assert(blob != NULL);
3197 
3198 	ctx = calloc(1, sizeof(*ctx));
3199 	if (!ctx) {
3200 		cb_fn(cb_arg, NULL, -ENOMEM);
3201 		return;
3202 	}
3203 
3204 	ctx->page_num = _spdk_bs_blobid_to_page(blob->id);
3205 	ctx->bs = bs;
3206 	ctx->cb_fn = cb_fn;
3207 	ctx->cb_arg = cb_arg;
3208 
3209 	/* Close the existing blob */
3210 	spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx);
3211 }
3212 
3213 int
3214 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value,
3215 		    uint16_t value_len)
3216 {
3217 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3218 	struct spdk_xattr 	*xattr;
3219 
3220 	assert(blob != NULL);
3221 
3222 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3223 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3224 
3225 	if (blob->md_ro) {
3226 		return -EPERM;
3227 	}
3228 
3229 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3230 		if (!strcmp(name, xattr->name)) {
3231 			free(xattr->value);
3232 			xattr->value_len = value_len;
3233 			xattr->value = malloc(value_len);
3234 			memcpy(xattr->value, value, value_len);
3235 
3236 			blob->state = SPDK_BLOB_STATE_DIRTY;
3237 
3238 			return 0;
3239 		}
3240 	}
3241 
3242 	xattr = calloc(1, sizeof(*xattr));
3243 	if (!xattr) {
3244 		return -1;
3245 	}
3246 	xattr->name = strdup(name);
3247 	xattr->value_len = value_len;
3248 	xattr->value = malloc(value_len);
3249 	memcpy(xattr->value, value, value_len);
3250 	TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link);
3251 
3252 	blob->state = SPDK_BLOB_STATE_DIRTY;
3253 
3254 	return 0;
3255 }
3256 
3257 int
3258 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name)
3259 {
3260 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3261 	struct spdk_xattr	*xattr;
3262 
3263 	assert(blob != NULL);
3264 
3265 	assert(blob->state != SPDK_BLOB_STATE_LOADING &&
3266 	       blob->state != SPDK_BLOB_STATE_SYNCING);
3267 
3268 	if (blob->md_ro) {
3269 		return -EPERM;
3270 	}
3271 
3272 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3273 		if (!strcmp(name, xattr->name)) {
3274 			TAILQ_REMOVE(&blob->xattrs, xattr, link);
3275 			free(xattr->value);
3276 			free(xattr->name);
3277 			free(xattr);
3278 
3279 			blob->state = SPDK_BLOB_STATE_DIRTY;
3280 
3281 			return 0;
3282 		}
3283 	}
3284 
3285 	return -ENOENT;
3286 }
3287 
3288 int
3289 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name,
3290 			  const void **value, size_t *value_len)
3291 {
3292 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3293 	struct spdk_xattr	*xattr;
3294 
3295 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3296 		if (!strcmp(name, xattr->name)) {
3297 			*value = xattr->value;
3298 			*value_len = xattr->value_len;
3299 			return 0;
3300 		}
3301 	}
3302 
3303 	return -ENOENT;
3304 }
3305 
3306 struct spdk_xattr_names {
3307 	uint32_t	count;
3308 	const char	*names[0];
3309 };
3310 
3311 int
3312 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names)
3313 {
3314 	struct spdk_blob_data	*blob = __blob_to_data(_blob);
3315 	struct spdk_xattr	*xattr;
3316 	int			count = 0;
3317 
3318 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3319 		count++;
3320 	}
3321 
3322 	*names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *));
3323 	if (*names == NULL) {
3324 		return -ENOMEM;
3325 	}
3326 
3327 	TAILQ_FOREACH(xattr, &blob->xattrs, link) {
3328 		(*names)->names[(*names)->count++] = xattr->name;
3329 	}
3330 
3331 	return 0;
3332 }
3333 
3334 uint32_t
3335 spdk_xattr_names_get_count(struct spdk_xattr_names *names)
3336 {
3337 	assert(names != NULL);
3338 
3339 	return names->count;
3340 }
3341 
3342 const char *
3343 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index)
3344 {
3345 	if (index >= names->count) {
3346 		return NULL;
3347 	}
3348 
3349 	return names->names[index];
3350 }
3351 
3352 void
3353 spdk_xattr_names_free(struct spdk_xattr_names *names)
3354 {
3355 	free(names);
3356 }
3357 
3358 struct spdk_bs_type
3359 spdk_bs_get_bstype(struct spdk_blob_store *bs)
3360 {
3361 	return bs->bstype;
3362 }
3363 
3364 void
3365 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype)
3366 {
3367 	memcpy(&bs->bstype, &bstype, sizeof(bstype));
3368 }
3369 
3370 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB)
3371