1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 #include "spdk/util.h" 44 #include "spdk/string.h" 45 46 #include "spdk_internal/assert.h" 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 51 #define BLOB_CRC32C_INITIAL 0xffffffffUL 52 53 static int bs_register_md_thread(struct spdk_blob_store *bs); 54 static int bs_unregister_md_thread(struct spdk_blob_store *bs); 55 static void blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 56 static void blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 57 uint64_t cluster, uint32_t extent, spdk_blob_op_complete cb_fn, void *cb_arg); 58 59 static int blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 60 uint16_t value_len, bool internal); 61 static int blob_get_xattr_value(struct spdk_blob *blob, const char *name, 62 const void **value, size_t *value_len, bool internal); 63 static int blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal); 64 65 static void blob_insert_extent(struct spdk_blob *blob, uint32_t extent, uint64_t cluster_num, 66 spdk_blob_op_complete cb_fn, void *cb_arg); 67 68 static void 69 blob_verify_md_op(struct spdk_blob *blob) 70 { 71 assert(blob != NULL); 72 assert(spdk_get_thread() == blob->bs->md_thread); 73 assert(blob->state != SPDK_BLOB_STATE_LOADING); 74 } 75 76 static struct spdk_blob_list * 77 bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) 78 { 79 struct spdk_blob_list *snapshot_entry = NULL; 80 81 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 82 if (snapshot_entry->id == blobid) { 83 break; 84 } 85 } 86 87 return snapshot_entry; 88 } 89 90 static void 91 bs_claim_md_page(struct spdk_blob_store *bs, uint32_t page) 92 { 93 assert(page < spdk_bit_array_capacity(bs->used_md_pages)); 94 assert(spdk_bit_array_get(bs->used_md_pages, page) == false); 95 96 spdk_bit_array_set(bs->used_md_pages, page); 97 } 98 99 static void 100 bs_release_md_page(struct spdk_blob_store *bs, uint32_t page) 101 { 102 assert(page < spdk_bit_array_capacity(bs->used_md_pages)); 103 assert(spdk_bit_array_get(bs->used_md_pages, page) == true); 104 105 spdk_bit_array_clear(bs->used_md_pages, page); 106 } 107 108 static void 109 bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 110 { 111 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 112 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 113 assert(bs->num_free_clusters > 0); 114 115 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 116 117 spdk_bit_array_set(bs->used_clusters, cluster_num); 118 bs->num_free_clusters--; 119 } 120 121 static int 122 blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster) 123 { 124 uint64_t *cluster_lba = &blob->active.clusters[cluster_num]; 125 126 blob_verify_md_op(blob); 127 128 if (*cluster_lba != 0) { 129 return -EEXIST; 130 } 131 132 *cluster_lba = bs_cluster_to_lba(blob->bs, cluster); 133 return 0; 134 } 135 136 static int 137 bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, 138 uint64_t *lowest_free_cluster, uint32_t *lowest_free_md_page, bool update_map) 139 { 140 uint32_t *extent_page = 0; 141 142 pthread_mutex_lock(&blob->bs->used_clusters_mutex); 143 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters, 144 *lowest_free_cluster); 145 if (*lowest_free_cluster == UINT32_MAX) { 146 /* No more free clusters. Cannot satisfy the request */ 147 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 148 return -ENOSPC; 149 } 150 151 if (blob->use_extent_table) { 152 extent_page = bs_cluster_to_extent_page(blob, cluster_num); 153 if (*extent_page == 0) { 154 /* No extent_page is allocated for the cluster */ 155 *lowest_free_md_page = spdk_bit_array_find_first_clear(blob->bs->used_md_pages, 156 *lowest_free_md_page); 157 if (*lowest_free_md_page == UINT32_MAX) { 158 /* No more free md pages. Cannot satisfy the request */ 159 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 160 return -ENOSPC; 161 } 162 bs_claim_md_page(blob->bs, *lowest_free_md_page); 163 } 164 } 165 166 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id); 167 bs_claim_cluster(blob->bs, *lowest_free_cluster); 168 169 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 170 171 if (update_map) { 172 blob_insert_cluster(blob, cluster_num, *lowest_free_cluster); 173 if (blob->use_extent_table && *extent_page == 0) { 174 *extent_page = *lowest_free_md_page; 175 } 176 } 177 178 return 0; 179 } 180 181 static void 182 bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 183 { 184 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 185 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 186 assert(bs->num_free_clusters < bs->total_clusters); 187 188 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 189 190 pthread_mutex_lock(&bs->used_clusters_mutex); 191 spdk_bit_array_clear(bs->used_clusters, cluster_num); 192 bs->num_free_clusters++; 193 pthread_mutex_unlock(&bs->used_clusters_mutex); 194 } 195 196 static void 197 blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs) 198 { 199 xattrs->count = 0; 200 xattrs->names = NULL; 201 xattrs->ctx = NULL; 202 xattrs->get_value = NULL; 203 } 204 205 void 206 spdk_blob_opts_init(struct spdk_blob_opts *opts) 207 { 208 opts->num_clusters = 0; 209 opts->thin_provision = false; 210 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 211 blob_xattrs_init(&opts->xattrs); 212 opts->use_extent_table = true; 213 } 214 215 void 216 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts) 217 { 218 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 219 } 220 221 static struct spdk_blob * 222 blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 223 { 224 struct spdk_blob *blob; 225 226 blob = calloc(1, sizeof(*blob)); 227 if (!blob) { 228 return NULL; 229 } 230 231 blob->id = id; 232 blob->bs = bs; 233 234 blob->parent_id = SPDK_BLOBID_INVALID; 235 236 blob->state = SPDK_BLOB_STATE_DIRTY; 237 blob->extent_rle_found = false; 238 blob->extent_table_found = false; 239 blob->active.num_pages = 1; 240 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 241 if (!blob->active.pages) { 242 free(blob); 243 return NULL; 244 } 245 246 blob->active.pages[0] = bs_blobid_to_page(id); 247 248 TAILQ_INIT(&blob->xattrs); 249 TAILQ_INIT(&blob->xattrs_internal); 250 TAILQ_INIT(&blob->pending_persists); 251 252 return blob; 253 } 254 255 static void 256 xattrs_free(struct spdk_xattr_tailq *xattrs) 257 { 258 struct spdk_xattr *xattr, *xattr_tmp; 259 260 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) { 261 TAILQ_REMOVE(xattrs, xattr, link); 262 free(xattr->name); 263 free(xattr->value); 264 free(xattr); 265 } 266 } 267 268 static void 269 blob_free(struct spdk_blob *blob) 270 { 271 assert(blob != NULL); 272 assert(TAILQ_EMPTY(&blob->pending_persists)); 273 274 free(blob->active.extent_pages); 275 free(blob->clean.extent_pages); 276 free(blob->active.clusters); 277 free(blob->clean.clusters); 278 free(blob->active.pages); 279 free(blob->clean.pages); 280 281 xattrs_free(&blob->xattrs); 282 xattrs_free(&blob->xattrs_internal); 283 284 if (blob->back_bs_dev) { 285 blob->back_bs_dev->destroy(blob->back_bs_dev); 286 } 287 288 free(blob); 289 } 290 291 struct freeze_io_ctx { 292 struct spdk_bs_cpl cpl; 293 struct spdk_blob *blob; 294 }; 295 296 static void 297 blob_io_sync(struct spdk_io_channel_iter *i) 298 { 299 spdk_for_each_channel_continue(i, 0); 300 } 301 302 static void 303 blob_execute_queued_io(struct spdk_io_channel_iter *i) 304 { 305 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 306 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); 307 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 308 struct spdk_bs_request_set *set; 309 struct spdk_bs_user_op_args *args; 310 spdk_bs_user_op_t *op, *tmp; 311 312 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { 313 set = (struct spdk_bs_request_set *)op; 314 args = &set->u.user_op; 315 316 if (args->blob == ctx->blob) { 317 TAILQ_REMOVE(&ch->queued_io, op, link); 318 bs_user_op_execute(op); 319 } 320 } 321 322 spdk_for_each_channel_continue(i, 0); 323 } 324 325 static void 326 blob_io_cpl(struct spdk_io_channel_iter *i, int status) 327 { 328 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 329 330 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); 331 332 free(ctx); 333 } 334 335 static void 336 blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 337 { 338 struct freeze_io_ctx *ctx; 339 340 ctx = calloc(1, sizeof(*ctx)); 341 if (!ctx) { 342 cb_fn(cb_arg, -ENOMEM); 343 return; 344 } 345 346 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 347 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 348 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 349 ctx->blob = blob; 350 351 /* Freeze I/O on blob */ 352 blob->frozen_refcnt++; 353 354 if (blob->frozen_refcnt == 1) { 355 spdk_for_each_channel(blob->bs, blob_io_sync, ctx, blob_io_cpl); 356 } else { 357 cb_fn(cb_arg, 0); 358 free(ctx); 359 } 360 } 361 362 static void 363 blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 364 { 365 struct freeze_io_ctx *ctx; 366 367 ctx = calloc(1, sizeof(*ctx)); 368 if (!ctx) { 369 cb_fn(cb_arg, -ENOMEM); 370 return; 371 } 372 373 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 374 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 375 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 376 ctx->blob = blob; 377 378 assert(blob->frozen_refcnt > 0); 379 380 blob->frozen_refcnt--; 381 382 if (blob->frozen_refcnt == 0) { 383 spdk_for_each_channel(blob->bs, blob_execute_queued_io, ctx, blob_io_cpl); 384 } else { 385 cb_fn(cb_arg, 0); 386 free(ctx); 387 } 388 } 389 390 static int 391 blob_mark_clean(struct spdk_blob *blob) 392 { 393 uint32_t *extent_pages = NULL; 394 uint64_t *clusters = NULL; 395 uint32_t *pages = NULL; 396 397 assert(blob != NULL); 398 399 if (blob->active.num_extent_pages) { 400 assert(blob->active.extent_pages); 401 extent_pages = calloc(blob->active.num_extent_pages, sizeof(*blob->active.extent_pages)); 402 if (!extent_pages) { 403 return -ENOMEM; 404 } 405 memcpy(extent_pages, blob->active.extent_pages, 406 blob->active.num_extent_pages * sizeof(*extent_pages)); 407 } 408 409 if (blob->active.num_clusters) { 410 assert(blob->active.clusters); 411 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 412 if (!clusters) { 413 free(extent_pages); 414 return -ENOMEM; 415 } 416 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 417 } 418 419 if (blob->active.num_pages) { 420 assert(blob->active.pages); 421 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 422 if (!pages) { 423 free(extent_pages); 424 free(clusters); 425 return -ENOMEM; 426 } 427 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 428 } 429 430 free(blob->clean.extent_pages); 431 free(blob->clean.clusters); 432 free(blob->clean.pages); 433 434 blob->clean.num_extent_pages = blob->active.num_extent_pages; 435 blob->clean.extent_pages = blob->active.extent_pages; 436 blob->clean.num_clusters = blob->active.num_clusters; 437 blob->clean.clusters = blob->active.clusters; 438 blob->clean.num_pages = blob->active.num_pages; 439 blob->clean.pages = blob->active.pages; 440 441 blob->active.extent_pages = extent_pages; 442 blob->active.clusters = clusters; 443 blob->active.pages = pages; 444 445 /* If the metadata was dirtied again while the metadata was being written to disk, 446 * we do not want to revert the DIRTY state back to CLEAN here. 447 */ 448 if (blob->state == SPDK_BLOB_STATE_LOADING) { 449 blob->state = SPDK_BLOB_STATE_CLEAN; 450 } 451 452 return 0; 453 } 454 455 static int 456 blob_deserialize_xattr(struct spdk_blob *blob, 457 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal) 458 { 459 struct spdk_xattr *xattr; 460 461 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 462 sizeof(desc_xattr->value_length) + 463 desc_xattr->name_length + desc_xattr->value_length) { 464 return -EINVAL; 465 } 466 467 xattr = calloc(1, sizeof(*xattr)); 468 if (xattr == NULL) { 469 return -ENOMEM; 470 } 471 472 xattr->name = malloc(desc_xattr->name_length + 1); 473 if (xattr->name == NULL) { 474 free(xattr); 475 return -ENOMEM; 476 } 477 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 478 xattr->name[desc_xattr->name_length] = '\0'; 479 480 xattr->value = malloc(desc_xattr->value_length); 481 if (xattr->value == NULL) { 482 free(xattr->name); 483 free(xattr); 484 return -ENOMEM; 485 } 486 xattr->value_len = desc_xattr->value_length; 487 memcpy(xattr->value, 488 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 489 desc_xattr->value_length); 490 491 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link); 492 493 return 0; 494 } 495 496 497 static int 498 blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 499 { 500 struct spdk_blob_md_descriptor *desc; 501 size_t cur_desc = 0; 502 void *tmp; 503 504 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 505 while (cur_desc < sizeof(page->descriptors)) { 506 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 507 if (desc->length == 0) { 508 /* If padding and length are 0, this terminates the page */ 509 break; 510 } 511 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 512 struct spdk_blob_md_descriptor_flags *desc_flags; 513 514 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 515 516 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 517 return -EINVAL; 518 } 519 520 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 521 SPDK_BLOB_INVALID_FLAGS_MASK) { 522 return -EINVAL; 523 } 524 525 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 526 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 527 blob->data_ro = true; 528 blob->md_ro = true; 529 } 530 531 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 532 SPDK_BLOB_MD_RO_FLAGS_MASK) { 533 blob->md_ro = true; 534 } 535 536 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 537 blob->data_ro = true; 538 blob->md_ro = true; 539 } 540 541 blob->invalid_flags = desc_flags->invalid_flags; 542 blob->data_ro_flags = desc_flags->data_ro_flags; 543 blob->md_ro_flags = desc_flags->md_ro_flags; 544 545 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 546 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 547 unsigned int i, j; 548 unsigned int cluster_count = blob->active.num_clusters; 549 550 if (blob->extent_table_found) { 551 /* Extent Table already present in the md, 552 * both descriptors should never be at the same time. */ 553 return -EINVAL; 554 } 555 blob->extent_rle_found = true; 556 557 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 558 559 if (desc_extent_rle->length == 0 || 560 (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) { 561 return -EINVAL; 562 } 563 564 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 565 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 566 if (desc_extent_rle->extents[i].cluster_idx != 0) { 567 if (!spdk_bit_array_get(blob->bs->used_clusters, 568 desc_extent_rle->extents[i].cluster_idx + j)) { 569 return -EINVAL; 570 } 571 } 572 cluster_count++; 573 } 574 } 575 576 if (cluster_count == 0) { 577 return -EINVAL; 578 } 579 tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters)); 580 if (tmp == NULL) { 581 return -ENOMEM; 582 } 583 blob->active.clusters = tmp; 584 blob->active.cluster_array_size = cluster_count; 585 586 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 587 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 588 if (desc_extent_rle->extents[i].cluster_idx != 0) { 589 blob->active.clusters[blob->active.num_clusters++] = bs_cluster_to_lba(blob->bs, 590 desc_extent_rle->extents[i].cluster_idx + j); 591 } else if (spdk_blob_is_thin_provisioned(blob)) { 592 blob->active.clusters[blob->active.num_clusters++] = 0; 593 } else { 594 return -EINVAL; 595 } 596 } 597 } 598 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_TABLE) { 599 struct spdk_blob_md_descriptor_extent_table *desc_extent_table; 600 uint32_t num_extent_pages = blob->active.num_extent_pages; 601 uint32_t i, j; 602 size_t extent_pages_length; 603 604 desc_extent_table = (struct spdk_blob_md_descriptor_extent_table *)desc; 605 extent_pages_length = desc_extent_table->length - sizeof(desc_extent_table->num_clusters); 606 607 if (blob->extent_rle_found) { 608 /* This means that Extent RLE is present in MD, 609 * both should never be at the same time. */ 610 return -EINVAL; 611 } else if (blob->extent_table_found && 612 desc_extent_table->num_clusters != blob->remaining_clusters_in_et) { 613 /* Number of clusters in this ET does not match number 614 * from previously read EXTENT_TABLE. */ 615 return -EINVAL; 616 } 617 618 blob->extent_table_found = true; 619 620 if (desc_extent_table->length == 0 || 621 (extent_pages_length % sizeof(desc_extent_table->extent_page[0]) != 0)) { 622 return -EINVAL; 623 } 624 625 for (i = 0; i < extent_pages_length / sizeof(desc_extent_table->extent_page[0]); i++) { 626 num_extent_pages += desc_extent_table->extent_page[i].num_pages; 627 } 628 629 tmp = realloc(blob->active.extent_pages, num_extent_pages * sizeof(uint32_t)); 630 if (tmp == NULL) { 631 return -ENOMEM; 632 } 633 blob->active.extent_pages = tmp; 634 blob->active.extent_pages_array_size = num_extent_pages; 635 636 blob->remaining_clusters_in_et = desc_extent_table->num_clusters; 637 638 /* Extent table entries contain md page numbers for extent pages. 639 * Zeroes represent unallocated extent pages, those are run-length-encoded. 640 */ 641 for (i = 0; i < extent_pages_length / sizeof(desc_extent_table->extent_page[0]); i++) { 642 if (desc_extent_table->extent_page[i].page_idx != 0) { 643 assert(desc_extent_table->extent_page[i].num_pages == 1); 644 blob->active.extent_pages[blob->active.num_extent_pages++] = 645 desc_extent_table->extent_page[i].page_idx; 646 } else if (spdk_blob_is_thin_provisioned(blob)) { 647 for (j = 0; j < desc_extent_table->extent_page[i].num_pages; j++) { 648 blob->active.extent_pages[blob->active.num_extent_pages++] = 0; 649 } 650 } else { 651 return -EINVAL; 652 } 653 } 654 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) { 655 struct spdk_blob_md_descriptor_extent_page *desc_extent; 656 unsigned int i; 657 unsigned int cluster_count = 0; 658 size_t cluster_idx_length; 659 660 if (blob->extent_rle_found) { 661 /* This means that Extent RLE is present in MD, 662 * both should never be at the same time. */ 663 return -EINVAL; 664 } 665 666 desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc; 667 cluster_idx_length = desc_extent->length - sizeof(desc_extent->start_cluster_idx); 668 669 if (desc_extent->length <= sizeof(desc_extent->start_cluster_idx) || 670 (cluster_idx_length % sizeof(desc_extent->cluster_idx[0]) != 0)) { 671 return -EINVAL; 672 } 673 674 for (i = 0; i < cluster_idx_length / sizeof(desc_extent->cluster_idx[0]); i++) { 675 if (desc_extent->cluster_idx[i] != 0) { 676 if (!spdk_bit_array_get(blob->bs->used_clusters, desc_extent->cluster_idx[i])) { 677 return -EINVAL; 678 } 679 } 680 cluster_count++; 681 } 682 683 if (cluster_count == 0) { 684 return -EINVAL; 685 } 686 687 /* When reading extent pages sequentially starting cluster idx should match 688 * current size of a blob. 689 * If changed to batch reading, this check shall be removed. */ 690 if (desc_extent->start_cluster_idx != blob->active.num_clusters) { 691 return -EINVAL; 692 } 693 694 tmp = realloc(blob->active.clusters, 695 (cluster_count + blob->active.num_clusters) * sizeof(*blob->active.clusters)); 696 if (tmp == NULL) { 697 return -ENOMEM; 698 } 699 blob->active.clusters = tmp; 700 blob->active.cluster_array_size = (cluster_count + blob->active.num_clusters); 701 702 for (i = 0; i < cluster_idx_length / sizeof(desc_extent->cluster_idx[0]); i++) { 703 if (desc_extent->cluster_idx[i] != 0) { 704 blob->active.clusters[blob->active.num_clusters++] = bs_cluster_to_lba(blob->bs, 705 desc_extent->cluster_idx[i]); 706 } else if (spdk_blob_is_thin_provisioned(blob)) { 707 blob->active.clusters[blob->active.num_clusters++] = 0; 708 } else { 709 return -EINVAL; 710 } 711 } 712 assert(desc_extent->start_cluster_idx + cluster_count == blob->active.num_clusters); 713 assert(blob->remaining_clusters_in_et >= cluster_count); 714 blob->remaining_clusters_in_et -= cluster_count; 715 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 716 int rc; 717 718 rc = blob_deserialize_xattr(blob, 719 (struct spdk_blob_md_descriptor_xattr *) desc, false); 720 if (rc != 0) { 721 return rc; 722 } 723 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 724 int rc; 725 726 rc = blob_deserialize_xattr(blob, 727 (struct spdk_blob_md_descriptor_xattr *) desc, true); 728 if (rc != 0) { 729 return rc; 730 } 731 } else { 732 /* Unrecognized descriptor type. Do not fail - just continue to the 733 * next descriptor. If this descriptor is associated with some feature 734 * defined in a newer version of blobstore, that version of blobstore 735 * should create and set an associated feature flag to specify if this 736 * blob can be loaded or not. 737 */ 738 } 739 740 /* Advance to the next descriptor */ 741 cur_desc += sizeof(*desc) + desc->length; 742 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 743 break; 744 } 745 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 746 } 747 748 return 0; 749 } 750 751 static bool bs_load_cur_extent_page_valid(struct spdk_blob_md_page *page); 752 753 static int 754 blob_parse_extent_page(struct spdk_blob_md_page *extent_page, struct spdk_blob *blob) 755 { 756 assert(blob != NULL); 757 assert(blob->state == SPDK_BLOB_STATE_LOADING); 758 759 if (bs_load_cur_extent_page_valid(extent_page) == false) { 760 return -ENOENT; 761 } 762 763 return blob_parse_page(extent_page, blob); 764 } 765 766 static int 767 blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 768 struct spdk_blob *blob) 769 { 770 const struct spdk_blob_md_page *page; 771 uint32_t i; 772 int rc; 773 774 assert(page_count > 0); 775 assert(pages[0].sequence_num == 0); 776 assert(blob != NULL); 777 assert(blob->state == SPDK_BLOB_STATE_LOADING); 778 assert(blob->active.clusters == NULL); 779 780 /* The blobid provided doesn't match what's in the MD, this can 781 * happen for example if a bogus blobid is passed in through open. 782 */ 783 if (blob->id != pages[0].id) { 784 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 785 blob->id, pages[0].id); 786 return -ENOENT; 787 } 788 789 for (i = 0; i < page_count; i++) { 790 page = &pages[i]; 791 792 assert(page->id == blob->id); 793 assert(page->sequence_num == i); 794 795 rc = blob_parse_page(page, blob); 796 if (rc != 0) { 797 return rc; 798 } 799 } 800 801 return 0; 802 } 803 804 static int 805 blob_serialize_add_page(const struct spdk_blob *blob, 806 struct spdk_blob_md_page **pages, 807 uint32_t *page_count, 808 struct spdk_blob_md_page **last_page) 809 { 810 struct spdk_blob_md_page *page; 811 812 assert(pages != NULL); 813 assert(page_count != NULL); 814 815 if (*page_count == 0) { 816 assert(*pages == NULL); 817 *page_count = 1; 818 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 819 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 820 } else { 821 assert(*pages != NULL); 822 (*page_count)++; 823 *pages = spdk_realloc(*pages, 824 SPDK_BS_PAGE_SIZE * (*page_count), 825 SPDK_BS_PAGE_SIZE); 826 } 827 828 if (*pages == NULL) { 829 *page_count = 0; 830 *last_page = NULL; 831 return -ENOMEM; 832 } 833 834 page = &(*pages)[*page_count - 1]; 835 memset(page, 0, sizeof(*page)); 836 page->id = blob->id; 837 page->sequence_num = *page_count - 1; 838 page->next = SPDK_INVALID_MD_PAGE; 839 *last_page = page; 840 841 return 0; 842 } 843 844 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 845 * Update required_sz on both success and failure. 846 * 847 */ 848 static int 849 blob_serialize_xattr(const struct spdk_xattr *xattr, 850 uint8_t *buf, size_t buf_sz, 851 size_t *required_sz, bool internal) 852 { 853 struct spdk_blob_md_descriptor_xattr *desc; 854 855 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 856 strlen(xattr->name) + 857 xattr->value_len; 858 859 if (buf_sz < *required_sz) { 860 return -1; 861 } 862 863 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 864 865 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR; 866 desc->length = sizeof(desc->name_length) + 867 sizeof(desc->value_length) + 868 strlen(xattr->name) + 869 xattr->value_len; 870 desc->name_length = strlen(xattr->name); 871 desc->value_length = xattr->value_len; 872 873 memcpy(desc->name, xattr->name, desc->name_length); 874 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 875 xattr->value, 876 desc->value_length); 877 878 return 0; 879 } 880 881 static void 882 blob_serialize_extent_table_entry(const struct spdk_blob *blob, 883 uint64_t start_ep, uint64_t *next_ep, 884 uint8_t **buf, size_t *remaining_sz) 885 { 886 struct spdk_blob_md_descriptor_extent_table *desc; 887 size_t cur_sz; 888 uint64_t i, et_idx; 889 uint32_t extent_page, ep_len; 890 891 /* The buffer must have room for at least num_clusters entry */ 892 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->num_clusters); 893 if (*remaining_sz < cur_sz) { 894 *next_ep = start_ep; 895 return; 896 } 897 898 desc = (struct spdk_blob_md_descriptor_extent_table *)*buf; 899 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_TABLE; 900 901 desc->num_clusters = blob->active.num_clusters; 902 903 ep_len = 1; 904 et_idx = 0; 905 for (i = start_ep; i < blob->active.num_extent_pages; i++) { 906 if (*remaining_sz < cur_sz + sizeof(desc->extent_page[0])) { 907 /* If we ran out of buffer space, return */ 908 break; 909 } 910 911 extent_page = blob->active.extent_pages[i]; 912 /* Verify that next extent_page is unallocated */ 913 if (extent_page == 0 && 914 (i + 1 < blob->active.num_extent_pages && blob->active.extent_pages[i + 1] == 0)) { 915 ep_len++; 916 continue; 917 } 918 desc->extent_page[et_idx].page_idx = extent_page; 919 desc->extent_page[et_idx].num_pages = ep_len; 920 et_idx++; 921 922 ep_len = 1; 923 cur_sz += sizeof(desc->extent_page[et_idx]); 924 } 925 *next_ep = i; 926 927 desc->length = sizeof(desc->num_clusters) + sizeof(desc->extent_page[0]) * et_idx; 928 *remaining_sz -= sizeof(struct spdk_blob_md_descriptor) + desc->length; 929 *buf += sizeof(struct spdk_blob_md_descriptor) + desc->length; 930 } 931 932 static int 933 blob_serialize_extent_table(const struct spdk_blob *blob, 934 struct spdk_blob_md_page **pages, 935 struct spdk_blob_md_page *cur_page, 936 uint32_t *page_count, uint8_t **buf, 937 size_t *remaining_sz) 938 { 939 uint64_t last_extent_page; 940 int rc; 941 942 last_extent_page = 0; 943 /* At least single extent table entry has to be always persisted. 944 * Such case occurs with num_extent_pages == 0. */ 945 while (last_extent_page <= blob->active.num_extent_pages) { 946 blob_serialize_extent_table_entry(blob, last_extent_page, &last_extent_page, buf, 947 remaining_sz); 948 949 if (last_extent_page == blob->active.num_extent_pages) { 950 break; 951 } 952 953 rc = blob_serialize_add_page(blob, pages, page_count, &cur_page); 954 if (rc < 0) { 955 return rc; 956 } 957 958 *buf = (uint8_t *)cur_page->descriptors; 959 *remaining_sz = sizeof(cur_page->descriptors); 960 } 961 962 return 0; 963 } 964 965 static void 966 blob_serialize_extent_rle(const struct spdk_blob *blob, 967 uint64_t start_cluster, uint64_t *next_cluster, 968 uint8_t **buf, size_t *buf_sz) 969 { 970 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 971 size_t cur_sz; 972 uint64_t i, extent_idx; 973 uint64_t lba, lba_per_cluster, lba_count; 974 975 /* The buffer must have room for at least one extent */ 976 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]); 977 if (*buf_sz < cur_sz) { 978 *next_cluster = start_cluster; 979 return; 980 } 981 982 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf; 983 desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE; 984 985 lba_per_cluster = bs_cluster_to_lba(blob->bs, 1); 986 987 lba = blob->active.clusters[start_cluster]; 988 lba_count = lba_per_cluster; 989 extent_idx = 0; 990 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 991 if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) { 992 /* Run-length encode sequential non-zero LBA */ 993 lba_count += lba_per_cluster; 994 continue; 995 } else if (lba == 0 && blob->active.clusters[i] == 0) { 996 /* Run-length encode unallocated clusters */ 997 lba_count += lba_per_cluster; 998 continue; 999 } 1000 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 1001 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 1002 extent_idx++; 1003 1004 cur_sz += sizeof(desc_extent_rle->extents[extent_idx]); 1005 1006 if (*buf_sz < cur_sz) { 1007 /* If we ran out of buffer space, return */ 1008 *next_cluster = i; 1009 break; 1010 } 1011 1012 lba = blob->active.clusters[i]; 1013 lba_count = lba_per_cluster; 1014 } 1015 1016 if (*buf_sz >= cur_sz) { 1017 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 1018 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 1019 extent_idx++; 1020 1021 *next_cluster = blob->active.num_clusters; 1022 } 1023 1024 desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx; 1025 *buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 1026 *buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 1027 } 1028 1029 static int 1030 blob_serialize_extents_rle(const struct spdk_blob *blob, 1031 struct spdk_blob_md_page **pages, 1032 struct spdk_blob_md_page *cur_page, 1033 uint32_t *page_count, uint8_t **buf, 1034 size_t *remaining_sz) 1035 { 1036 uint64_t last_cluster; 1037 int rc; 1038 1039 last_cluster = 0; 1040 while (last_cluster < blob->active.num_clusters) { 1041 blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz); 1042 1043 if (last_cluster == blob->active.num_clusters) { 1044 break; 1045 } 1046 1047 rc = blob_serialize_add_page(blob, pages, page_count, &cur_page); 1048 if (rc < 0) { 1049 return rc; 1050 } 1051 1052 *buf = (uint8_t *)cur_page->descriptors; 1053 *remaining_sz = sizeof(cur_page->descriptors); 1054 } 1055 1056 return 0; 1057 } 1058 1059 static void 1060 blob_serialize_extent_page(const struct spdk_blob *blob, 1061 uint64_t cluster, struct spdk_blob_md_page *page) 1062 { 1063 struct spdk_blob_md_descriptor_extent_page *desc_extent; 1064 uint64_t i, extent_idx; 1065 uint64_t lba, lba_per_cluster; 1066 uint64_t start_cluster_idx = (cluster / SPDK_EXTENTS_PER_EP) * SPDK_EXTENTS_PER_EP; 1067 1068 desc_extent = (struct spdk_blob_md_descriptor_extent_page *) page->descriptors; 1069 desc_extent->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE; 1070 1071 lba_per_cluster = bs_cluster_to_lba(blob->bs, 1); 1072 1073 desc_extent->start_cluster_idx = start_cluster_idx; 1074 extent_idx = 0; 1075 for (i = start_cluster_idx; i < blob->active.num_clusters; i++) { 1076 lba = blob->active.clusters[i]; 1077 desc_extent->cluster_idx[extent_idx++] = lba / lba_per_cluster; 1078 if (extent_idx >= SPDK_EXTENTS_PER_EP) { 1079 break; 1080 } 1081 } 1082 desc_extent->length = sizeof(desc_extent->start_cluster_idx) + 1083 sizeof(desc_extent->cluster_idx[0]) * extent_idx; 1084 } 1085 1086 static void 1087 blob_serialize_flags(const struct spdk_blob *blob, 1088 uint8_t *buf, size_t *buf_sz) 1089 { 1090 struct spdk_blob_md_descriptor_flags *desc; 1091 1092 /* 1093 * Flags get serialized first, so we should always have room for the flags 1094 * descriptor. 1095 */ 1096 assert(*buf_sz >= sizeof(*desc)); 1097 1098 desc = (struct spdk_blob_md_descriptor_flags *)buf; 1099 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 1100 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 1101 desc->invalid_flags = blob->invalid_flags; 1102 desc->data_ro_flags = blob->data_ro_flags; 1103 desc->md_ro_flags = blob->md_ro_flags; 1104 1105 *buf_sz -= sizeof(*desc); 1106 } 1107 1108 static int 1109 blob_serialize_xattrs(const struct spdk_blob *blob, 1110 const struct spdk_xattr_tailq *xattrs, bool internal, 1111 struct spdk_blob_md_page **pages, 1112 struct spdk_blob_md_page *cur_page, 1113 uint32_t *page_count, uint8_t **buf, 1114 size_t *remaining_sz) 1115 { 1116 const struct spdk_xattr *xattr; 1117 int rc; 1118 1119 TAILQ_FOREACH(xattr, xattrs, link) { 1120 size_t required_sz = 0; 1121 1122 rc = blob_serialize_xattr(xattr, 1123 *buf, *remaining_sz, 1124 &required_sz, internal); 1125 if (rc < 0) { 1126 /* Need to add a new page to the chain */ 1127 rc = blob_serialize_add_page(blob, pages, page_count, 1128 &cur_page); 1129 if (rc < 0) { 1130 spdk_free(*pages); 1131 *pages = NULL; 1132 *page_count = 0; 1133 return rc; 1134 } 1135 1136 *buf = (uint8_t *)cur_page->descriptors; 1137 *remaining_sz = sizeof(cur_page->descriptors); 1138 1139 /* Try again */ 1140 required_sz = 0; 1141 rc = blob_serialize_xattr(xattr, 1142 *buf, *remaining_sz, 1143 &required_sz, internal); 1144 1145 if (rc < 0) { 1146 spdk_free(*pages); 1147 *pages = NULL; 1148 *page_count = 0; 1149 return rc; 1150 } 1151 } 1152 1153 *remaining_sz -= required_sz; 1154 *buf += required_sz; 1155 } 1156 1157 return 0; 1158 } 1159 1160 static int 1161 blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 1162 uint32_t *page_count) 1163 { 1164 struct spdk_blob_md_page *cur_page; 1165 int rc; 1166 uint8_t *buf; 1167 size_t remaining_sz; 1168 1169 assert(pages != NULL); 1170 assert(page_count != NULL); 1171 assert(blob != NULL); 1172 assert(blob->state == SPDK_BLOB_STATE_DIRTY); 1173 1174 *pages = NULL; 1175 *page_count = 0; 1176 1177 /* A blob always has at least 1 page, even if it has no descriptors */ 1178 rc = blob_serialize_add_page(blob, pages, page_count, &cur_page); 1179 if (rc < 0) { 1180 return rc; 1181 } 1182 1183 buf = (uint8_t *)cur_page->descriptors; 1184 remaining_sz = sizeof(cur_page->descriptors); 1185 1186 /* Serialize flags */ 1187 blob_serialize_flags(blob, buf, &remaining_sz); 1188 buf += sizeof(struct spdk_blob_md_descriptor_flags); 1189 1190 /* Serialize xattrs */ 1191 rc = blob_serialize_xattrs(blob, &blob->xattrs, false, 1192 pages, cur_page, page_count, &buf, &remaining_sz); 1193 if (rc < 0) { 1194 return rc; 1195 } 1196 1197 /* Serialize internal xattrs */ 1198 rc = blob_serialize_xattrs(blob, &blob->xattrs_internal, true, 1199 pages, cur_page, page_count, &buf, &remaining_sz); 1200 if (rc < 0) { 1201 return rc; 1202 } 1203 1204 if (blob->use_extent_table) { 1205 /* Serialize extent table */ 1206 rc = blob_serialize_extent_table(blob, pages, cur_page, page_count, &buf, &remaining_sz); 1207 } else { 1208 /* Serialize extents */ 1209 rc = blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz); 1210 } 1211 1212 return rc; 1213 } 1214 1215 struct spdk_blob_load_ctx { 1216 struct spdk_blob *blob; 1217 1218 struct spdk_blob_md_page *pages; 1219 uint32_t num_pages; 1220 uint32_t next_extent_page; 1221 spdk_bs_sequence_t *seq; 1222 1223 spdk_bs_sequence_cpl cb_fn; 1224 void *cb_arg; 1225 }; 1226 1227 static uint32_t 1228 blob_md_page_calc_crc(void *page) 1229 { 1230 uint32_t crc; 1231 1232 crc = BLOB_CRC32C_INITIAL; 1233 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 1234 crc ^= BLOB_CRC32C_INITIAL; 1235 1236 return crc; 1237 1238 } 1239 1240 static void 1241 blob_load_final(void *cb_arg, int bserrno) 1242 { 1243 struct spdk_blob_load_ctx *ctx = cb_arg; 1244 struct spdk_blob *blob = ctx->blob; 1245 1246 if (bserrno == 0) { 1247 blob_mark_clean(blob); 1248 } 1249 1250 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno); 1251 1252 /* Free the memory */ 1253 spdk_free(ctx->pages); 1254 free(ctx); 1255 } 1256 1257 static void 1258 blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno) 1259 { 1260 struct spdk_blob_load_ctx *ctx = cb_arg; 1261 struct spdk_blob *blob = ctx->blob; 1262 1263 if (bserrno == 0) { 1264 blob->back_bs_dev = bs_create_blob_bs_dev(snapshot); 1265 if (blob->back_bs_dev == NULL) { 1266 bserrno = -ENOMEM; 1267 } 1268 } 1269 if (bserrno != 0) { 1270 SPDK_ERRLOG("Snapshot fail\n"); 1271 } 1272 1273 blob_load_final(ctx, bserrno); 1274 } 1275 1276 static void blob_update_clear_method(struct spdk_blob *blob); 1277 1278 static void 1279 blob_load_backing_dev(void *cb_arg) 1280 { 1281 struct spdk_blob_load_ctx *ctx = cb_arg; 1282 struct spdk_blob *blob = ctx->blob; 1283 const void *value; 1284 size_t len; 1285 int rc; 1286 1287 if (spdk_blob_is_thin_provisioned(blob)) { 1288 rc = blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true); 1289 if (rc == 0) { 1290 if (len != sizeof(spdk_blob_id)) { 1291 blob_load_final(ctx, -EINVAL); 1292 return; 1293 } 1294 /* open snapshot blob and continue in the callback function */ 1295 blob->parent_id = *(spdk_blob_id *)value; 1296 spdk_bs_open_blob(blob->bs, blob->parent_id, 1297 blob_load_snapshot_cpl, ctx); 1298 return; 1299 } else { 1300 /* add zeroes_dev for thin provisioned blob */ 1301 blob->back_bs_dev = bs_create_zeroes_dev(); 1302 } 1303 } else { 1304 /* standard blob */ 1305 blob->back_bs_dev = NULL; 1306 } 1307 blob_load_final(ctx, 0); 1308 } 1309 1310 static void 1311 blob_load_cpl_extents_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1312 { 1313 struct spdk_blob_load_ctx *ctx = cb_arg; 1314 struct spdk_blob *blob = ctx->blob; 1315 struct spdk_blob_md_page *page; 1316 uint64_t i; 1317 uint32_t crc; 1318 uint64_t lba; 1319 void *tmp; 1320 uint64_t sz; 1321 1322 if (bserrno) { 1323 SPDK_ERRLOG("Extent page read failed: %d\n", bserrno); 1324 blob_load_final(ctx, bserrno); 1325 return; 1326 } 1327 1328 if (ctx->pages == NULL) { 1329 /* First iteration of this function, allocate buffer for single EXTENT_PAGE */ 1330 ctx->pages = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, NULL, SPDK_ENV_SOCKET_ID_ANY, 1331 SPDK_MALLOC_DMA); 1332 if (!ctx->pages) { 1333 blob_load_final(ctx, -ENOMEM); 1334 return; 1335 } 1336 ctx->num_pages = 1; 1337 ctx->next_extent_page = 0; 1338 } else { 1339 page = &ctx->pages[0]; 1340 crc = blob_md_page_calc_crc(page); 1341 if (crc != page->crc) { 1342 blob_load_final(ctx, -EINVAL); 1343 return; 1344 } 1345 1346 if (page->next != SPDK_INVALID_MD_PAGE) { 1347 blob_load_final(ctx, -EINVAL); 1348 return; 1349 } 1350 1351 bserrno = blob_parse_extent_page(page, blob); 1352 if (bserrno) { 1353 blob_load_final(ctx, bserrno); 1354 return; 1355 } 1356 } 1357 1358 for (i = ctx->next_extent_page; i < blob->active.num_extent_pages; i++) { 1359 if (blob->active.extent_pages[i] != 0) { 1360 /* Extent page was allocated, read and parse it. */ 1361 lba = bs_md_page_to_lba(blob->bs, blob->active.extent_pages[i]); 1362 ctx->next_extent_page = i + 1; 1363 1364 bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1365 bs_byte_to_lba(blob->bs, SPDK_BS_PAGE_SIZE), 1366 blob_load_cpl_extents_cpl, ctx); 1367 return; 1368 } else { 1369 /* Thin provisioned blobs can point to unallocated extent pages. 1370 * In this case blob size should be increased by up to the amount left in remaining_clusters_in_et. */ 1371 1372 sz = spdk_min(blob->remaining_clusters_in_et, SPDK_EXTENTS_PER_EP); 1373 blob->active.num_clusters += sz; 1374 blob->remaining_clusters_in_et -= sz; 1375 1376 assert(spdk_blob_is_thin_provisioned(blob)); 1377 assert(i + 1 < blob->active.num_extent_pages || blob->remaining_clusters_in_et == 0); 1378 1379 tmp = realloc(blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 1380 if (tmp == NULL) { 1381 blob_load_final(ctx, -ENOMEM); 1382 return; 1383 } 1384 memset(tmp + sizeof(*blob->active.clusters) * blob->active.cluster_array_size, 0, 1385 sizeof(*blob->active.clusters) * (blob->active.num_clusters - blob->active.cluster_array_size)); 1386 blob->active.clusters = tmp; 1387 blob->active.cluster_array_size = blob->active.num_clusters; 1388 } 1389 } 1390 1391 blob_load_backing_dev(ctx); 1392 } 1393 1394 static void 1395 blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1396 { 1397 struct spdk_blob_load_ctx *ctx = cb_arg; 1398 struct spdk_blob *blob = ctx->blob; 1399 struct spdk_blob_md_page *page; 1400 int rc; 1401 uint32_t crc; 1402 uint32_t current_page; 1403 1404 if (ctx->num_pages == 1) { 1405 current_page = bs_blobid_to_page(blob->id); 1406 } else { 1407 assert(ctx->num_pages != 0); 1408 page = &ctx->pages[ctx->num_pages - 2]; 1409 current_page = page->next; 1410 } 1411 1412 if (bserrno) { 1413 SPDK_ERRLOG("Metadata page %d read failed for blobid %lu: %d\n", 1414 current_page, blob->id, bserrno); 1415 blob_load_final(ctx, bserrno); 1416 return; 1417 } 1418 1419 page = &ctx->pages[ctx->num_pages - 1]; 1420 crc = blob_md_page_calc_crc(page); 1421 if (crc != page->crc) { 1422 SPDK_ERRLOG("Metadata page %d crc mismatch for blobid %lu\n", 1423 current_page, blob->id); 1424 blob_load_final(ctx, -EINVAL); 1425 return; 1426 } 1427 1428 if (page->next != SPDK_INVALID_MD_PAGE) { 1429 uint32_t next_page = page->next; 1430 uint64_t next_lba = bs_md_page_to_lba(blob->bs, next_page); 1431 1432 /* Read the next page */ 1433 ctx->num_pages++; 1434 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 1435 sizeof(*page)); 1436 if (ctx->pages == NULL) { 1437 blob_load_final(ctx, -ENOMEM); 1438 return; 1439 } 1440 1441 bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1], 1442 next_lba, 1443 bs_byte_to_lba(blob->bs, sizeof(*page)), 1444 blob_load_cpl, ctx); 1445 return; 1446 } 1447 1448 /* Parse the pages */ 1449 rc = blob_parse(ctx->pages, ctx->num_pages, blob); 1450 if (rc) { 1451 blob_load_final(ctx, rc); 1452 return; 1453 } 1454 1455 if (blob->extent_table_found == true) { 1456 /* If EXTENT_TABLE was found, that means support for it should be enabled. */ 1457 assert(blob->extent_rle_found == false); 1458 blob->use_extent_table = true; 1459 } else { 1460 /* If EXTENT_RLE or no extent_* descriptor was found disable support 1461 * for extent table. No extent_* descriptors means that blob has length of 0 1462 * and no extent_rle descriptors were persisted for it. 1463 * EXTENT_TABLE if used, is always present in metadata regardless of length. */ 1464 blob->use_extent_table = false; 1465 } 1466 1467 /* Check the clear_method stored in metadata vs what may have been passed 1468 * via spdk_bs_open_blob_ext() and update accordingly. 1469 */ 1470 blob_update_clear_method(blob); 1471 1472 spdk_free(ctx->pages); 1473 ctx->pages = NULL; 1474 1475 if (blob->extent_table_found) { 1476 blob_load_cpl_extents_cpl(seq, ctx, 0); 1477 } else { 1478 blob_load_backing_dev(ctx); 1479 } 1480 } 1481 1482 /* Load a blob from disk given a blobid */ 1483 static void 1484 blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1485 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1486 { 1487 struct spdk_blob_load_ctx *ctx; 1488 struct spdk_blob_store *bs; 1489 uint32_t page_num; 1490 uint64_t lba; 1491 1492 blob_verify_md_op(blob); 1493 1494 bs = blob->bs; 1495 1496 ctx = calloc(1, sizeof(*ctx)); 1497 if (!ctx) { 1498 cb_fn(seq, cb_arg, -ENOMEM); 1499 return; 1500 } 1501 1502 ctx->blob = blob; 1503 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE); 1504 if (!ctx->pages) { 1505 free(ctx); 1506 cb_fn(seq, cb_arg, -ENOMEM); 1507 return; 1508 } 1509 ctx->num_pages = 1; 1510 ctx->cb_fn = cb_fn; 1511 ctx->cb_arg = cb_arg; 1512 ctx->seq = seq; 1513 1514 page_num = bs_blobid_to_page(blob->id); 1515 lba = bs_md_page_to_lba(blob->bs, page_num); 1516 1517 blob->state = SPDK_BLOB_STATE_LOADING; 1518 1519 bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1520 bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 1521 blob_load_cpl, ctx); 1522 } 1523 1524 struct spdk_blob_persist_ctx { 1525 struct spdk_blob *blob; 1526 1527 struct spdk_bs_super_block *super; 1528 1529 struct spdk_blob_md_page *pages; 1530 uint32_t next_extent_page; 1531 struct spdk_blob_md_page *extent_page; 1532 1533 spdk_bs_sequence_t *seq; 1534 spdk_bs_sequence_cpl cb_fn; 1535 void *cb_arg; 1536 TAILQ_ENTRY(spdk_blob_persist_ctx) link; 1537 }; 1538 1539 static void 1540 bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba, 1541 uint32_t lba_count) 1542 { 1543 switch (ctx->blob->clear_method) { 1544 case BLOB_CLEAR_WITH_DEFAULT: 1545 case BLOB_CLEAR_WITH_UNMAP: 1546 bs_batch_unmap_dev(batch, lba, lba_count); 1547 break; 1548 case BLOB_CLEAR_WITH_WRITE_ZEROES: 1549 bs_batch_write_zeroes_dev(batch, lba, lba_count); 1550 break; 1551 case BLOB_CLEAR_WITH_NONE: 1552 default: 1553 break; 1554 } 1555 } 1556 1557 static void blob_persist_check_dirty(struct spdk_blob_persist_ctx *ctx); 1558 1559 static void 1560 blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1561 { 1562 struct spdk_blob_persist_ctx *ctx = cb_arg; 1563 struct spdk_blob_persist_ctx *next_persist; 1564 struct spdk_blob *blob = ctx->blob; 1565 1566 if (bserrno == 0) { 1567 blob_mark_clean(blob); 1568 } 1569 1570 assert(ctx == TAILQ_FIRST(&blob->pending_persists)); 1571 TAILQ_REMOVE(&blob->pending_persists, ctx, link); 1572 1573 next_persist = TAILQ_FIRST(&blob->pending_persists); 1574 1575 /* Call user callback */ 1576 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 1577 1578 /* Free the memory */ 1579 spdk_free(ctx->pages); 1580 free(ctx); 1581 1582 if (next_persist != NULL) { 1583 blob_persist_check_dirty(next_persist); 1584 } 1585 } 1586 1587 static void 1588 blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1589 { 1590 struct spdk_blob_persist_ctx *ctx = cb_arg; 1591 struct spdk_blob *blob = ctx->blob; 1592 struct spdk_blob_store *bs = blob->bs; 1593 size_t i; 1594 1595 if (bserrno != 0) { 1596 blob_persist_complete(seq, ctx, bserrno); 1597 return; 1598 } 1599 1600 /* Release all clusters that were truncated */ 1601 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1602 uint32_t cluster_num = bs_lba_to_cluster(bs, blob->active.clusters[i]); 1603 1604 /* Nothing to release if it was not allocated */ 1605 if (blob->active.clusters[i] != 0) { 1606 bs_release_cluster(bs, cluster_num); 1607 } 1608 } 1609 1610 if (blob->active.num_clusters == 0) { 1611 free(blob->active.clusters); 1612 blob->active.clusters = NULL; 1613 blob->active.cluster_array_size = 0; 1614 } else if (blob->active.num_clusters != blob->active.cluster_array_size) { 1615 #ifndef __clang_analyzer__ 1616 void *tmp; 1617 1618 /* scan-build really can't figure reallocs, workaround it */ 1619 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters); 1620 assert(tmp != NULL); 1621 blob->active.clusters = tmp; 1622 1623 tmp = realloc(blob->active.extent_pages, sizeof(uint32_t) * blob->active.num_extent_pages); 1624 assert(tmp != NULL); 1625 blob->active.extent_pages = tmp; 1626 #endif 1627 blob->active.extent_pages_array_size = blob->active.num_extent_pages; 1628 blob->active.cluster_array_size = blob->active.num_clusters; 1629 } 1630 1631 /* TODO: Add path to persist clear extent pages. */ 1632 blob_persist_complete(seq, ctx, bserrno); 1633 } 1634 1635 static void 1636 blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1637 { 1638 struct spdk_blob_persist_ctx *ctx = cb_arg; 1639 struct spdk_blob *blob = ctx->blob; 1640 struct spdk_blob_store *bs = blob->bs; 1641 spdk_bs_batch_t *batch; 1642 size_t i; 1643 uint64_t lba; 1644 uint32_t lba_count; 1645 1646 if (bserrno != 0) { 1647 blob_persist_complete(seq, ctx, bserrno); 1648 return; 1649 } 1650 1651 /* Clusters don't move around in blobs. The list shrinks or grows 1652 * at the end, but no changes ever occur in the middle of the list. 1653 */ 1654 1655 batch = bs_sequence_to_batch(seq, blob_persist_clear_clusters_cpl, ctx); 1656 1657 /* Clear all clusters that were truncated */ 1658 lba = 0; 1659 lba_count = 0; 1660 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1661 uint64_t next_lba = blob->active.clusters[i]; 1662 uint32_t next_lba_count = bs_cluster_to_lba(bs, 1); 1663 1664 if (next_lba > 0 && (lba + lba_count) == next_lba) { 1665 /* This cluster is contiguous with the previous one. */ 1666 lba_count += next_lba_count; 1667 continue; 1668 } 1669 1670 /* This cluster is not contiguous with the previous one. */ 1671 1672 /* If a run of LBAs previously existing, clear them now */ 1673 if (lba_count > 0) { 1674 bs_batch_clear_dev(ctx, batch, lba, lba_count); 1675 } 1676 1677 /* Start building the next batch */ 1678 lba = next_lba; 1679 if (next_lba > 0) { 1680 lba_count = next_lba_count; 1681 } else { 1682 lba_count = 0; 1683 } 1684 } 1685 1686 /* If we ended with a contiguous set of LBAs, clear them now */ 1687 if (lba_count > 0) { 1688 bs_batch_clear_dev(ctx, batch, lba, lba_count); 1689 } 1690 1691 bs_batch_close(batch); 1692 } 1693 1694 static void 1695 blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1696 { 1697 struct spdk_blob_persist_ctx *ctx = cb_arg; 1698 struct spdk_blob *blob = ctx->blob; 1699 struct spdk_blob_store *bs = blob->bs; 1700 size_t i; 1701 1702 if (bserrno != 0) { 1703 blob_persist_complete(seq, ctx, bserrno); 1704 return; 1705 } 1706 1707 /* This loop starts at 1 because the first page is special and handled 1708 * below. The pages (except the first) are never written in place, 1709 * so any pages in the clean list must be zeroed. 1710 */ 1711 for (i = 1; i < blob->clean.num_pages; i++) { 1712 bs_release_md_page(bs, blob->clean.pages[i]); 1713 } 1714 1715 if (blob->active.num_pages == 0) { 1716 uint32_t page_num; 1717 1718 page_num = bs_blobid_to_page(blob->id); 1719 bs_release_md_page(bs, page_num); 1720 } 1721 1722 /* Move on to clearing clusters */ 1723 blob_persist_clear_clusters(seq, ctx, 0); 1724 } 1725 1726 static void 1727 blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1728 { 1729 struct spdk_blob_persist_ctx *ctx = cb_arg; 1730 struct spdk_blob *blob = ctx->blob; 1731 struct spdk_blob_store *bs = blob->bs; 1732 uint64_t lba; 1733 uint32_t lba_count; 1734 spdk_bs_batch_t *batch; 1735 size_t i; 1736 1737 if (bserrno != 0) { 1738 blob_persist_complete(seq, ctx, bserrno); 1739 return; 1740 } 1741 1742 batch = bs_sequence_to_batch(seq, blob_persist_zero_pages_cpl, ctx); 1743 1744 lba_count = bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 1745 1746 /* This loop starts at 1 because the first page is special and handled 1747 * below. The pages (except the first) are never written in place, 1748 * so any pages in the clean list must be zeroed. 1749 */ 1750 for (i = 1; i < blob->clean.num_pages; i++) { 1751 lba = bs_md_page_to_lba(bs, blob->clean.pages[i]); 1752 1753 bs_batch_write_zeroes_dev(batch, lba, lba_count); 1754 } 1755 1756 /* The first page will only be zeroed if this is a delete. */ 1757 if (blob->active.num_pages == 0) { 1758 uint32_t page_num; 1759 1760 /* The first page in the metadata goes where the blobid indicates */ 1761 page_num = bs_blobid_to_page(blob->id); 1762 lba = bs_md_page_to_lba(bs, page_num); 1763 1764 bs_batch_write_zeroes_dev(batch, lba, lba_count); 1765 } 1766 1767 bs_batch_close(batch); 1768 } 1769 1770 static void 1771 blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1772 { 1773 struct spdk_blob_persist_ctx *ctx = cb_arg; 1774 struct spdk_blob *blob = ctx->blob; 1775 struct spdk_blob_store *bs = blob->bs; 1776 uint64_t lba; 1777 uint32_t lba_count; 1778 struct spdk_blob_md_page *page; 1779 1780 if (bserrno != 0) { 1781 blob_persist_complete(seq, ctx, bserrno); 1782 return; 1783 } 1784 1785 if (blob->active.num_pages == 0) { 1786 /* Move on to the next step */ 1787 blob_persist_zero_pages(seq, ctx, 0); 1788 return; 1789 } 1790 1791 lba_count = bs_byte_to_lba(bs, sizeof(*page)); 1792 1793 page = &ctx->pages[0]; 1794 /* The first page in the metadata goes where the blobid indicates */ 1795 lba = bs_md_page_to_lba(bs, bs_blobid_to_page(blob->id)); 1796 1797 bs_sequence_write_dev(seq, page, lba, lba_count, 1798 blob_persist_zero_pages, ctx); 1799 } 1800 1801 static void 1802 blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1803 { 1804 struct spdk_blob_persist_ctx *ctx = cb_arg; 1805 struct spdk_blob *blob = ctx->blob; 1806 struct spdk_blob_store *bs = blob->bs; 1807 uint64_t lba; 1808 uint32_t lba_count; 1809 struct spdk_blob_md_page *page; 1810 spdk_bs_batch_t *batch; 1811 size_t i; 1812 1813 if (bserrno != 0) { 1814 blob_persist_complete(seq, ctx, bserrno); 1815 return; 1816 } 1817 1818 /* Clusters don't move around in blobs. The list shrinks or grows 1819 * at the end, but no changes ever occur in the middle of the list. 1820 */ 1821 1822 lba_count = bs_byte_to_lba(bs, sizeof(*page)); 1823 1824 batch = bs_sequence_to_batch(seq, blob_persist_write_page_root, ctx); 1825 1826 /* This starts at 1. The root page is not written until 1827 * all of the others are finished 1828 */ 1829 for (i = 1; i < blob->active.num_pages; i++) { 1830 page = &ctx->pages[i]; 1831 assert(page->sequence_num == i); 1832 1833 lba = bs_md_page_to_lba(bs, blob->active.pages[i]); 1834 1835 bs_batch_write_dev(batch, page, lba, lba_count); 1836 } 1837 1838 bs_batch_close(batch); 1839 } 1840 1841 static int 1842 blob_resize(struct spdk_blob *blob, uint64_t sz) 1843 { 1844 uint64_t i; 1845 uint64_t *tmp; 1846 uint64_t lfc; /* lowest free cluster */ 1847 uint32_t lfmd; /* lowest free md page */ 1848 uint64_t num_clusters; 1849 uint32_t *ep_tmp; 1850 uint64_t new_num_ep = 0, current_num_ep = 0; 1851 struct spdk_blob_store *bs; 1852 1853 bs = blob->bs; 1854 1855 blob_verify_md_op(blob); 1856 1857 if (blob->active.num_clusters == sz) { 1858 return 0; 1859 } 1860 1861 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1862 /* If this blob was resized to be larger, then smaller, then 1863 * larger without syncing, then the cluster array already 1864 * contains spare assigned clusters we can use. 1865 */ 1866 num_clusters = spdk_min(blob->active.cluster_array_size, 1867 sz); 1868 } else { 1869 num_clusters = blob->active.num_clusters; 1870 } 1871 1872 if (blob->use_extent_table) { 1873 /* Round up since every cluster beyond current Extent Table size, 1874 * requires new extent page. */ 1875 new_num_ep = spdk_divide_round_up(sz, SPDK_EXTENTS_PER_EP); 1876 current_num_ep = spdk_divide_round_up(num_clusters, SPDK_EXTENTS_PER_EP); 1877 } 1878 1879 /* Do two passes - one to verify that we can obtain enough clusters 1880 * and md pages, another to actually claim them. 1881 */ 1882 1883 if (spdk_blob_is_thin_provisioned(blob) == false) { 1884 lfc = 0; 1885 for (i = num_clusters; i < sz; i++) { 1886 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1887 if (lfc == UINT32_MAX) { 1888 /* No more free clusters. Cannot satisfy the request */ 1889 return -ENOSPC; 1890 } 1891 lfc++; 1892 } 1893 lfmd = 0; 1894 for (i = current_num_ep; i < new_num_ep ; i++) { 1895 lfmd = spdk_bit_array_find_first_clear(blob->bs->used_md_pages, lfmd); 1896 if (lfmd == UINT32_MAX) { 1897 /* No more free md pages. Cannot satisfy the request */ 1898 return -ENOSPC; 1899 } 1900 } 1901 } 1902 1903 if (sz > num_clusters) { 1904 /* Expand the cluster array if necessary. 1905 * We only shrink the array when persisting. 1906 */ 1907 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz); 1908 if (sz > 0 && tmp == NULL) { 1909 return -ENOMEM; 1910 } 1911 memset(tmp + blob->active.cluster_array_size, 0, 1912 sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); 1913 blob->active.clusters = tmp; 1914 blob->active.cluster_array_size = sz; 1915 1916 /* Expand the extents table, only if enough clusters were added */ 1917 if (new_num_ep > current_num_ep && blob->use_extent_table) { 1918 ep_tmp = realloc(blob->active.extent_pages, sizeof(*blob->active.extent_pages) * new_num_ep); 1919 if (new_num_ep > 0 && ep_tmp == NULL) { 1920 return -ENOMEM; 1921 } 1922 memset(ep_tmp + blob->active.extent_pages_array_size, 0, 1923 sizeof(*blob->active.extent_pages) * (new_num_ep - blob->active.extent_pages_array_size)); 1924 blob->active.extent_pages = ep_tmp; 1925 blob->active.extent_pages_array_size = new_num_ep; 1926 } 1927 } 1928 1929 blob->state = SPDK_BLOB_STATE_DIRTY; 1930 1931 if (spdk_blob_is_thin_provisioned(blob) == false) { 1932 lfc = 0; 1933 lfmd = 0; 1934 for (i = num_clusters; i < sz; i++) { 1935 bs_allocate_cluster(blob, i, &lfc, &lfmd, true); 1936 lfc++; 1937 lfmd++; 1938 } 1939 } 1940 1941 blob->active.num_clusters = sz; 1942 blob->active.num_extent_pages = new_num_ep; 1943 1944 return 0; 1945 } 1946 1947 static void 1948 blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) 1949 { 1950 spdk_bs_sequence_t *seq = ctx->seq; 1951 struct spdk_blob *blob = ctx->blob; 1952 struct spdk_blob_store *bs = blob->bs; 1953 uint64_t i; 1954 uint32_t page_num; 1955 void *tmp; 1956 int rc; 1957 1958 /* Generate the new metadata */ 1959 rc = blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1960 if (rc < 0) { 1961 blob_persist_complete(seq, ctx, rc); 1962 return; 1963 } 1964 1965 assert(blob->active.num_pages >= 1); 1966 1967 /* Resize the cache of page indices */ 1968 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 1969 if (!tmp) { 1970 blob_persist_complete(seq, ctx, -ENOMEM); 1971 return; 1972 } 1973 blob->active.pages = tmp; 1974 1975 /* Assign this metadata to pages. This requires two passes - 1976 * one to verify that there are enough pages and a second 1977 * to actually claim them. */ 1978 page_num = 0; 1979 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1980 for (i = 1; i < blob->active.num_pages; i++) { 1981 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1982 if (page_num == UINT32_MAX) { 1983 blob_persist_complete(seq, ctx, -ENOMEM); 1984 return; 1985 } 1986 page_num++; 1987 } 1988 1989 page_num = 0; 1990 blob->active.pages[0] = bs_blobid_to_page(blob->id); 1991 for (i = 1; i < blob->active.num_pages; i++) { 1992 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1993 ctx->pages[i - 1].next = page_num; 1994 /* Now that previous metadata page is complete, calculate the crc for it. */ 1995 ctx->pages[i - 1].crc = blob_md_page_calc_crc(&ctx->pages[i - 1]); 1996 blob->active.pages[i] = page_num; 1997 bs_claim_md_page(bs, page_num); 1998 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1999 page_num++; 2000 } 2001 ctx->pages[i - 1].crc = blob_md_page_calc_crc(&ctx->pages[i - 1]); 2002 /* Start writing the metadata from last page to first */ 2003 blob->state = SPDK_BLOB_STATE_CLEAN; 2004 blob_persist_write_page_chain(seq, ctx, 0); 2005 } 2006 2007 static void 2008 blob_persist_write_extent_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2009 { 2010 struct spdk_blob_persist_ctx *ctx = cb_arg; 2011 struct spdk_blob *blob = ctx->blob; 2012 size_t i; 2013 uint32_t extent_page_id; 2014 uint32_t page_count = 0; 2015 int rc; 2016 2017 if (ctx->extent_page != NULL) { 2018 spdk_free(ctx->extent_page); 2019 ctx->extent_page = NULL; 2020 } 2021 2022 if (bserrno != 0) { 2023 blob_persist_complete(seq, ctx, bserrno); 2024 return; 2025 } 2026 2027 /* Only write out changed extent pages */ 2028 for (i = ctx->next_extent_page; i < blob->active.num_extent_pages; i++) { 2029 extent_page_id = blob->active.extent_pages[i]; 2030 if (extent_page_id == 0) { 2031 /* No Extent Page to persist */ 2032 assert(spdk_blob_is_thin_provisioned(blob)); 2033 continue; 2034 } 2035 /* Writing out new extent page for the first time. Either active extent pages is larger 2036 * than clean extent pages or there was no extent page assigned due to thin provisioning. */ 2037 if (i >= blob->clean.extent_pages_array_size || blob->clean.extent_pages[i] == 0) { 2038 blob->state = SPDK_BLOB_STATE_DIRTY; 2039 assert(spdk_bit_array_get(blob->bs->used_md_pages, extent_page_id)); 2040 ctx->next_extent_page = i + 1; 2041 rc = blob_serialize_add_page(ctx->blob, &ctx->extent_page, &page_count, &ctx->extent_page); 2042 if (rc < 0) { 2043 blob_persist_complete(seq, ctx, rc); 2044 return; 2045 } 2046 2047 blob_serialize_extent_page(blob, i * SPDK_EXTENTS_PER_EP, ctx->extent_page); 2048 2049 ctx->extent_page->crc = blob_md_page_calc_crc(ctx->extent_page); 2050 2051 bs_sequence_write_dev(seq, ctx->extent_page, bs_md_page_to_lba(blob->bs, extent_page_id), 2052 bs_byte_to_lba(blob->bs, SPDK_BS_PAGE_SIZE), 2053 blob_persist_write_extent_pages, ctx); 2054 return; 2055 } 2056 assert(blob->clean.extent_pages[i] != 0); 2057 } 2058 2059 blob_persist_generate_new_md(ctx); 2060 } 2061 2062 static void 2063 blob_persist_start(struct spdk_blob_persist_ctx *ctx) 2064 { 2065 spdk_bs_sequence_t *seq = ctx->seq; 2066 struct spdk_blob *blob = ctx->blob; 2067 2068 if (blob->active.num_pages == 0) { 2069 /* This is the signal that the blob should be deleted. 2070 * Immediately jump to the clean up routine. */ 2071 assert(blob->clean.num_pages > 0); 2072 blob->state = SPDK_BLOB_STATE_CLEAN; 2073 blob_persist_zero_pages(seq, ctx, 0); 2074 return; 2075 2076 } 2077 2078 blob_persist_write_extent_pages(seq, ctx, 0); 2079 } 2080 2081 static void 2082 blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2083 { 2084 struct spdk_blob_persist_ctx *ctx = cb_arg; 2085 2086 spdk_free(ctx->super); 2087 2088 if (bserrno != 0) { 2089 blob_persist_complete(seq, ctx, bserrno); 2090 return; 2091 } 2092 2093 ctx->blob->bs->clean = 0; 2094 2095 blob_persist_start(ctx); 2096 } 2097 2098 static void 2099 bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 2100 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg); 2101 2102 2103 static void 2104 blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2105 { 2106 struct spdk_blob_persist_ctx *ctx = cb_arg; 2107 2108 if (bserrno != 0) { 2109 spdk_free(ctx->super); 2110 blob_persist_complete(seq, ctx, bserrno); 2111 return; 2112 } 2113 2114 ctx->super->clean = 0; 2115 if (ctx->super->size == 0) { 2116 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen; 2117 } 2118 2119 bs_write_super(seq, ctx->blob->bs, ctx->super, blob_persist_dirty_cpl, ctx); 2120 } 2121 2122 static void 2123 blob_persist_check_dirty(struct spdk_blob_persist_ctx *ctx) 2124 { 2125 if (ctx->blob->bs->clean) { 2126 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 2127 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2128 if (!ctx->super) { 2129 blob_persist_complete(ctx->seq, ctx, -ENOMEM); 2130 return; 2131 } 2132 2133 bs_sequence_read_dev(ctx->seq, ctx->super, bs_page_to_lba(ctx->blob->bs, 0), 2134 bs_byte_to_lba(ctx->blob->bs, sizeof(*ctx->super)), 2135 blob_persist_dirty, ctx); 2136 } else { 2137 blob_persist_start(ctx); 2138 } 2139 } 2140 2141 /* Write a blob to disk */ 2142 static void 2143 blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 2144 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 2145 { 2146 struct spdk_blob_persist_ctx *ctx; 2147 2148 blob_verify_md_op(blob); 2149 2150 if (blob->state == SPDK_BLOB_STATE_CLEAN && TAILQ_EMPTY(&blob->pending_persists)) { 2151 cb_fn(seq, cb_arg, 0); 2152 return; 2153 } 2154 2155 ctx = calloc(1, sizeof(*ctx)); 2156 if (!ctx) { 2157 cb_fn(seq, cb_arg, -ENOMEM); 2158 return; 2159 } 2160 ctx->blob = blob; 2161 ctx->seq = seq; 2162 ctx->cb_fn = cb_fn; 2163 ctx->cb_arg = cb_arg; 2164 ctx->next_extent_page = 0; 2165 2166 /* Multiple blob persists can affect one another, via blob->state or 2167 * blob mutable data changes. To prevent it, queue up the persists. */ 2168 if (!TAILQ_EMPTY(&blob->pending_persists)) { 2169 TAILQ_INSERT_TAIL(&blob->pending_persists, ctx, link); 2170 return; 2171 } 2172 TAILQ_INSERT_HEAD(&blob->pending_persists, ctx, link); 2173 2174 blob_persist_check_dirty(ctx); 2175 } 2176 2177 struct spdk_blob_copy_cluster_ctx { 2178 struct spdk_blob *blob; 2179 uint8_t *buf; 2180 uint64_t page; 2181 uint64_t new_cluster; 2182 uint32_t new_extent_page; 2183 spdk_bs_sequence_t *seq; 2184 }; 2185 2186 static void 2187 blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno) 2188 { 2189 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 2190 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq; 2191 TAILQ_HEAD(, spdk_bs_request_set) requests; 2192 spdk_bs_user_op_t *op; 2193 2194 TAILQ_INIT(&requests); 2195 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link); 2196 2197 while (!TAILQ_EMPTY(&requests)) { 2198 op = TAILQ_FIRST(&requests); 2199 TAILQ_REMOVE(&requests, op, link); 2200 if (bserrno == 0) { 2201 bs_user_op_execute(op); 2202 } else { 2203 bs_user_op_abort(op); 2204 } 2205 } 2206 2207 spdk_free(ctx->buf); 2208 free(ctx); 2209 } 2210 2211 static void 2212 blob_insert_cluster_cpl(void *cb_arg, int bserrno) 2213 { 2214 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 2215 2216 if (bserrno) { 2217 if (bserrno == -EEXIST) { 2218 /* The metadata insert failed because another thread 2219 * allocated the cluster first. Free our cluster 2220 * but continue without error. */ 2221 bserrno = 0; 2222 } 2223 bs_release_cluster(ctx->blob->bs, ctx->new_cluster); 2224 if (ctx->new_extent_page != 0) { 2225 bs_release_md_page(ctx->blob->bs, ctx->new_extent_page); 2226 } 2227 } 2228 2229 bs_sequence_finish(ctx->seq, bserrno); 2230 } 2231 2232 static void 2233 blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2234 { 2235 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 2236 uint32_t cluster_number; 2237 2238 if (bserrno) { 2239 /* The write failed, so jump to the final completion handler */ 2240 bs_sequence_finish(seq, bserrno); 2241 return; 2242 } 2243 2244 cluster_number = bs_page_to_cluster(ctx->blob->bs, ctx->page); 2245 2246 blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 2247 ctx->new_extent_page, blob_insert_cluster_cpl, ctx); 2248 } 2249 2250 static void 2251 blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2252 { 2253 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 2254 2255 if (bserrno != 0) { 2256 /* The read failed, so jump to the final completion handler */ 2257 bs_sequence_finish(seq, bserrno); 2258 return; 2259 } 2260 2261 /* Write whole cluster */ 2262 bs_sequence_write_dev(seq, ctx->buf, 2263 bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster), 2264 bs_cluster_to_lba(ctx->blob->bs, 1), 2265 blob_write_copy_cpl, ctx); 2266 } 2267 2268 static void 2269 bs_allocate_and_copy_cluster(struct spdk_blob *blob, 2270 struct spdk_io_channel *_ch, 2271 uint64_t io_unit, spdk_bs_user_op_t *op) 2272 { 2273 struct spdk_bs_cpl cpl; 2274 struct spdk_bs_channel *ch; 2275 struct spdk_blob_copy_cluster_ctx *ctx; 2276 uint32_t cluster_start_page; 2277 uint32_t cluster_number; 2278 int rc; 2279 2280 ch = spdk_io_channel_get_ctx(_ch); 2281 2282 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) { 2283 /* There are already operations pending. Queue this user op 2284 * and return because it will be re-executed when the outstanding 2285 * cluster allocation completes. */ 2286 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 2287 return; 2288 } 2289 2290 /* Round the io_unit offset down to the first page in the cluster */ 2291 cluster_start_page = bs_io_unit_to_cluster_start(blob, io_unit); 2292 2293 /* Calculate which index in the metadata cluster array the corresponding 2294 * cluster is supposed to be at. */ 2295 cluster_number = bs_io_unit_to_cluster_number(blob, io_unit); 2296 2297 ctx = calloc(1, sizeof(*ctx)); 2298 if (!ctx) { 2299 bs_user_op_abort(op); 2300 return; 2301 } 2302 2303 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0); 2304 2305 ctx->blob = blob; 2306 ctx->page = cluster_start_page; 2307 2308 if (blob->parent_id != SPDK_BLOBID_INVALID) { 2309 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, 2310 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2311 if (!ctx->buf) { 2312 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n", 2313 blob->bs->cluster_sz); 2314 free(ctx); 2315 bs_user_op_abort(op); 2316 return; 2317 } 2318 } 2319 2320 rc = bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, &ctx->new_extent_page, 2321 false); 2322 if (rc != 0) { 2323 spdk_free(ctx->buf); 2324 free(ctx); 2325 bs_user_op_abort(op); 2326 return; 2327 } 2328 2329 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2330 cpl.u.blob_basic.cb_fn = blob_allocate_and_copy_cluster_cpl; 2331 cpl.u.blob_basic.cb_arg = ctx; 2332 2333 ctx->seq = bs_sequence_start(_ch, &cpl); 2334 if (!ctx->seq) { 2335 bs_release_cluster(blob->bs, ctx->new_cluster); 2336 spdk_free(ctx->buf); 2337 free(ctx); 2338 bs_user_op_abort(op); 2339 return; 2340 } 2341 2342 /* Queue the user op to block other incoming operations */ 2343 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 2344 2345 if (blob->parent_id != SPDK_BLOBID_INVALID) { 2346 /* Read cluster from backing device */ 2347 bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf, 2348 bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page), 2349 bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz), 2350 blob_write_copy, ctx); 2351 } else { 2352 blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 2353 ctx->new_extent_page, blob_insert_cluster_cpl, ctx); 2354 } 2355 } 2356 2357 static inline void 2358 blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length, 2359 uint64_t *lba, uint32_t *lba_count) 2360 { 2361 *lba_count = length; 2362 2363 if (!bs_io_unit_is_allocated(blob, io_unit)) { 2364 assert(blob->back_bs_dev != NULL); 2365 *lba = bs_io_unit_to_back_dev_lba(blob, io_unit); 2366 *lba_count = bs_io_unit_to_back_dev_lba(blob, *lba_count); 2367 } else { 2368 *lba = bs_blob_io_unit_to_lba(blob, io_unit); 2369 } 2370 } 2371 2372 struct op_split_ctx { 2373 struct spdk_blob *blob; 2374 struct spdk_io_channel *channel; 2375 uint64_t io_unit_offset; 2376 uint64_t io_units_remaining; 2377 void *curr_payload; 2378 enum spdk_blob_op_type op_type; 2379 spdk_bs_sequence_t *seq; 2380 }; 2381 2382 static void 2383 blob_request_submit_op_split_next(void *cb_arg, int bserrno) 2384 { 2385 struct op_split_ctx *ctx = cb_arg; 2386 struct spdk_blob *blob = ctx->blob; 2387 struct spdk_io_channel *ch = ctx->channel; 2388 enum spdk_blob_op_type op_type = ctx->op_type; 2389 uint8_t *buf = ctx->curr_payload; 2390 uint64_t offset = ctx->io_unit_offset; 2391 uint64_t length = ctx->io_units_remaining; 2392 uint64_t op_length; 2393 2394 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2395 bs_sequence_finish(ctx->seq, bserrno); 2396 free(ctx); 2397 return; 2398 } 2399 2400 op_length = spdk_min(length, bs_num_io_units_to_cluster_boundary(blob, 2401 offset)); 2402 2403 /* Update length and payload for next operation */ 2404 ctx->io_units_remaining -= op_length; 2405 ctx->io_unit_offset += op_length; 2406 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 2407 ctx->curr_payload += op_length * blob->bs->io_unit_size; 2408 } 2409 2410 switch (op_type) { 2411 case SPDK_BLOB_READ: 2412 spdk_blob_io_read(blob, ch, buf, offset, op_length, 2413 blob_request_submit_op_split_next, ctx); 2414 break; 2415 case SPDK_BLOB_WRITE: 2416 spdk_blob_io_write(blob, ch, buf, offset, op_length, 2417 blob_request_submit_op_split_next, ctx); 2418 break; 2419 case SPDK_BLOB_UNMAP: 2420 spdk_blob_io_unmap(blob, ch, offset, op_length, 2421 blob_request_submit_op_split_next, ctx); 2422 break; 2423 case SPDK_BLOB_WRITE_ZEROES: 2424 spdk_blob_io_write_zeroes(blob, ch, offset, op_length, 2425 blob_request_submit_op_split_next, ctx); 2426 break; 2427 case SPDK_BLOB_READV: 2428 case SPDK_BLOB_WRITEV: 2429 SPDK_ERRLOG("readv/write not valid\n"); 2430 bs_sequence_finish(ctx->seq, -EINVAL); 2431 free(ctx); 2432 break; 2433 } 2434 } 2435 2436 static void 2437 blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, 2438 void *payload, uint64_t offset, uint64_t length, 2439 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 2440 { 2441 struct op_split_ctx *ctx; 2442 spdk_bs_sequence_t *seq; 2443 struct spdk_bs_cpl cpl; 2444 2445 assert(blob != NULL); 2446 2447 ctx = calloc(1, sizeof(struct op_split_ctx)); 2448 if (ctx == NULL) { 2449 cb_fn(cb_arg, -ENOMEM); 2450 return; 2451 } 2452 2453 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2454 cpl.u.blob_basic.cb_fn = cb_fn; 2455 cpl.u.blob_basic.cb_arg = cb_arg; 2456 2457 seq = bs_sequence_start(ch, &cpl); 2458 if (!seq) { 2459 free(ctx); 2460 cb_fn(cb_arg, -ENOMEM); 2461 return; 2462 } 2463 2464 ctx->blob = blob; 2465 ctx->channel = ch; 2466 ctx->curr_payload = payload; 2467 ctx->io_unit_offset = offset; 2468 ctx->io_units_remaining = length; 2469 ctx->op_type = op_type; 2470 ctx->seq = seq; 2471 2472 blob_request_submit_op_split_next(ctx, 0); 2473 } 2474 2475 static void 2476 blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob, 2477 void *payload, uint64_t offset, uint64_t length, 2478 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 2479 { 2480 struct spdk_bs_cpl cpl; 2481 uint64_t lba; 2482 uint32_t lba_count; 2483 2484 assert(blob != NULL); 2485 2486 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2487 cpl.u.blob_basic.cb_fn = cb_fn; 2488 cpl.u.blob_basic.cb_arg = cb_arg; 2489 2490 blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2491 2492 if (blob->frozen_refcnt) { 2493 /* This blob I/O is frozen */ 2494 spdk_bs_user_op_t *op; 2495 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); 2496 2497 op = bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 2498 if (!op) { 2499 cb_fn(cb_arg, -ENOMEM); 2500 return; 2501 } 2502 2503 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2504 2505 return; 2506 } 2507 2508 switch (op_type) { 2509 case SPDK_BLOB_READ: { 2510 spdk_bs_batch_t *batch; 2511 2512 batch = bs_batch_open(_ch, &cpl); 2513 if (!batch) { 2514 cb_fn(cb_arg, -ENOMEM); 2515 return; 2516 } 2517 2518 if (bs_io_unit_is_allocated(blob, offset)) { 2519 /* Read from the blob */ 2520 bs_batch_read_dev(batch, payload, lba, lba_count); 2521 } else { 2522 /* Read from the backing block device */ 2523 bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count); 2524 } 2525 2526 bs_batch_close(batch); 2527 break; 2528 } 2529 case SPDK_BLOB_WRITE: 2530 case SPDK_BLOB_WRITE_ZEROES: { 2531 if (bs_io_unit_is_allocated(blob, offset)) { 2532 /* Write to the blob */ 2533 spdk_bs_batch_t *batch; 2534 2535 if (lba_count == 0) { 2536 cb_fn(cb_arg, 0); 2537 return; 2538 } 2539 2540 batch = bs_batch_open(_ch, &cpl); 2541 if (!batch) { 2542 cb_fn(cb_arg, -ENOMEM); 2543 return; 2544 } 2545 2546 if (op_type == SPDK_BLOB_WRITE) { 2547 bs_batch_write_dev(batch, payload, lba, lba_count); 2548 } else { 2549 bs_batch_write_zeroes_dev(batch, lba, lba_count); 2550 } 2551 2552 bs_batch_close(batch); 2553 } else { 2554 /* Queue this operation and allocate the cluster */ 2555 spdk_bs_user_op_t *op; 2556 2557 op = bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 2558 if (!op) { 2559 cb_fn(cb_arg, -ENOMEM); 2560 return; 2561 } 2562 2563 bs_allocate_and_copy_cluster(blob, _ch, offset, op); 2564 } 2565 break; 2566 } 2567 case SPDK_BLOB_UNMAP: { 2568 spdk_bs_batch_t *batch; 2569 2570 batch = bs_batch_open(_ch, &cpl); 2571 if (!batch) { 2572 cb_fn(cb_arg, -ENOMEM); 2573 return; 2574 } 2575 2576 if (bs_io_unit_is_allocated(blob, offset)) { 2577 bs_batch_unmap_dev(batch, lba, lba_count); 2578 } 2579 2580 bs_batch_close(batch); 2581 break; 2582 } 2583 case SPDK_BLOB_READV: 2584 case SPDK_BLOB_WRITEV: 2585 SPDK_ERRLOG("readv/write not valid\n"); 2586 cb_fn(cb_arg, -EINVAL); 2587 break; 2588 } 2589 } 2590 2591 static void 2592 blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2593 void *payload, uint64_t offset, uint64_t length, 2594 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 2595 { 2596 assert(blob != NULL); 2597 2598 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 2599 cb_fn(cb_arg, -EPERM); 2600 return; 2601 } 2602 2603 if (offset + length > bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2604 cb_fn(cb_arg, -EINVAL); 2605 return; 2606 } 2607 if (length <= bs_num_io_units_to_cluster_boundary(blob, offset)) { 2608 blob_request_submit_op_single(_channel, blob, payload, offset, length, 2609 cb_fn, cb_arg, op_type); 2610 } else { 2611 blob_request_submit_op_split(_channel, blob, payload, offset, length, 2612 cb_fn, cb_arg, op_type); 2613 } 2614 } 2615 2616 struct rw_iov_ctx { 2617 struct spdk_blob *blob; 2618 struct spdk_io_channel *channel; 2619 spdk_blob_op_complete cb_fn; 2620 void *cb_arg; 2621 bool read; 2622 int iovcnt; 2623 struct iovec *orig_iov; 2624 uint64_t io_unit_offset; 2625 uint64_t io_units_remaining; 2626 uint64_t io_units_done; 2627 struct iovec iov[0]; 2628 }; 2629 2630 static void 2631 rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2632 { 2633 assert(cb_arg == NULL); 2634 bs_sequence_finish(seq, bserrno); 2635 } 2636 2637 static void 2638 rw_iov_split_next(void *cb_arg, int bserrno) 2639 { 2640 struct rw_iov_ctx *ctx = cb_arg; 2641 struct spdk_blob *blob = ctx->blob; 2642 struct iovec *iov, *orig_iov; 2643 int iovcnt; 2644 size_t orig_iovoff; 2645 uint64_t io_units_count, io_units_to_boundary, io_unit_offset; 2646 uint64_t byte_count; 2647 2648 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2649 ctx->cb_fn(ctx->cb_arg, bserrno); 2650 free(ctx); 2651 return; 2652 } 2653 2654 io_unit_offset = ctx->io_unit_offset; 2655 io_units_to_boundary = bs_num_io_units_to_cluster_boundary(blob, io_unit_offset); 2656 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary); 2657 /* 2658 * Get index and offset into the original iov array for our current position in the I/O sequence. 2659 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 2660 * point to the current position in the I/O sequence. 2661 */ 2662 byte_count = ctx->io_units_done * blob->bs->io_unit_size; 2663 orig_iov = &ctx->orig_iov[0]; 2664 orig_iovoff = 0; 2665 while (byte_count > 0) { 2666 if (byte_count >= orig_iov->iov_len) { 2667 byte_count -= orig_iov->iov_len; 2668 orig_iov++; 2669 } else { 2670 orig_iovoff = byte_count; 2671 byte_count = 0; 2672 } 2673 } 2674 2675 /* 2676 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 2677 * bytes of this next I/O remain to be accounted for in the new iov array. 2678 */ 2679 byte_count = io_units_count * blob->bs->io_unit_size; 2680 iov = &ctx->iov[0]; 2681 iovcnt = 0; 2682 while (byte_count > 0) { 2683 assert(iovcnt < ctx->iovcnt); 2684 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 2685 iov->iov_base = orig_iov->iov_base + orig_iovoff; 2686 byte_count -= iov->iov_len; 2687 orig_iovoff = 0; 2688 orig_iov++; 2689 iov++; 2690 iovcnt++; 2691 } 2692 2693 ctx->io_unit_offset += io_units_count; 2694 ctx->io_units_remaining -= io_units_count; 2695 ctx->io_units_done += io_units_count; 2696 iov = &ctx->iov[0]; 2697 2698 if (ctx->read) { 2699 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2700 io_units_count, rw_iov_split_next, ctx); 2701 } else { 2702 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2703 io_units_count, rw_iov_split_next, ctx); 2704 } 2705 } 2706 2707 static void 2708 blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2709 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2710 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 2711 { 2712 struct spdk_bs_cpl cpl; 2713 2714 assert(blob != NULL); 2715 2716 if (!read && blob->data_ro) { 2717 cb_fn(cb_arg, -EPERM); 2718 return; 2719 } 2720 2721 if (length == 0) { 2722 cb_fn(cb_arg, 0); 2723 return; 2724 } 2725 2726 if (offset + length > bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2727 cb_fn(cb_arg, -EINVAL); 2728 return; 2729 } 2730 2731 /* 2732 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 2733 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 2734 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 2735 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 2736 * to allocate a separate iov array and split the I/O such that none of the resulting 2737 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 2738 * but since this case happens very infrequently, any performance impact will be negligible. 2739 * 2740 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 2741 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 2742 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 2743 * when the batch was completed, to allow for freeing the memory for the iov arrays. 2744 */ 2745 if (spdk_likely(length <= bs_num_io_units_to_cluster_boundary(blob, offset))) { 2746 uint32_t lba_count; 2747 uint64_t lba; 2748 2749 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2750 cpl.u.blob_basic.cb_fn = cb_fn; 2751 cpl.u.blob_basic.cb_arg = cb_arg; 2752 2753 if (blob->frozen_refcnt) { 2754 /* This blob I/O is frozen */ 2755 enum spdk_blob_op_type op_type; 2756 spdk_bs_user_op_t *op; 2757 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); 2758 2759 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV; 2760 op = bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length); 2761 if (!op) { 2762 cb_fn(cb_arg, -ENOMEM); 2763 return; 2764 } 2765 2766 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2767 2768 return; 2769 } 2770 2771 blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2772 2773 if (read) { 2774 spdk_bs_sequence_t *seq; 2775 2776 seq = bs_sequence_start(_channel, &cpl); 2777 if (!seq) { 2778 cb_fn(cb_arg, -ENOMEM); 2779 return; 2780 } 2781 2782 if (bs_io_unit_is_allocated(blob, offset)) { 2783 bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, rw_iov_done, NULL); 2784 } else { 2785 bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count, 2786 rw_iov_done, NULL); 2787 } 2788 } else { 2789 if (bs_io_unit_is_allocated(blob, offset)) { 2790 spdk_bs_sequence_t *seq; 2791 2792 seq = bs_sequence_start(_channel, &cpl); 2793 if (!seq) { 2794 cb_fn(cb_arg, -ENOMEM); 2795 return; 2796 } 2797 2798 bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, rw_iov_done, NULL); 2799 } else { 2800 /* Queue this operation and allocate the cluster */ 2801 spdk_bs_user_op_t *op; 2802 2803 op = bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, 2804 length); 2805 if (!op) { 2806 cb_fn(cb_arg, -ENOMEM); 2807 return; 2808 } 2809 2810 bs_allocate_and_copy_cluster(blob, _channel, offset, op); 2811 } 2812 } 2813 } else { 2814 struct rw_iov_ctx *ctx; 2815 2816 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 2817 if (ctx == NULL) { 2818 cb_fn(cb_arg, -ENOMEM); 2819 return; 2820 } 2821 2822 ctx->blob = blob; 2823 ctx->channel = _channel; 2824 ctx->cb_fn = cb_fn; 2825 ctx->cb_arg = cb_arg; 2826 ctx->read = read; 2827 ctx->orig_iov = iov; 2828 ctx->iovcnt = iovcnt; 2829 ctx->io_unit_offset = offset; 2830 ctx->io_units_remaining = length; 2831 ctx->io_units_done = 0; 2832 2833 rw_iov_split_next(ctx, 0); 2834 } 2835 } 2836 2837 static struct spdk_blob * 2838 blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 2839 { 2840 struct spdk_blob *blob; 2841 2842 TAILQ_FOREACH(blob, &bs->blobs, link) { 2843 if (blob->id == blobid) { 2844 return blob; 2845 } 2846 } 2847 2848 return NULL; 2849 } 2850 2851 static void 2852 blob_get_snapshot_and_clone_entries(struct spdk_blob *blob, 2853 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry) 2854 { 2855 assert(blob != NULL); 2856 *snapshot_entry = NULL; 2857 *clone_entry = NULL; 2858 2859 if (blob->parent_id == SPDK_BLOBID_INVALID) { 2860 return; 2861 } 2862 2863 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) { 2864 if ((*snapshot_entry)->id == blob->parent_id) { 2865 break; 2866 } 2867 } 2868 2869 if (*snapshot_entry != NULL) { 2870 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) { 2871 if ((*clone_entry)->id == blob->id) { 2872 break; 2873 } 2874 } 2875 2876 assert(clone_entry != NULL); 2877 } 2878 } 2879 2880 static int 2881 bs_channel_create(void *io_device, void *ctx_buf) 2882 { 2883 struct spdk_blob_store *bs = io_device; 2884 struct spdk_bs_channel *channel = ctx_buf; 2885 struct spdk_bs_dev *dev; 2886 uint32_t max_ops = bs->max_channel_ops; 2887 uint32_t i; 2888 2889 dev = bs->dev; 2890 2891 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 2892 if (!channel->req_mem) { 2893 return -1; 2894 } 2895 2896 TAILQ_INIT(&channel->reqs); 2897 2898 for (i = 0; i < max_ops; i++) { 2899 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 2900 } 2901 2902 channel->bs = bs; 2903 channel->dev = dev; 2904 channel->dev_channel = dev->create_channel(dev); 2905 2906 if (!channel->dev_channel) { 2907 SPDK_ERRLOG("Failed to create device channel.\n"); 2908 free(channel->req_mem); 2909 return -1; 2910 } 2911 2912 TAILQ_INIT(&channel->need_cluster_alloc); 2913 TAILQ_INIT(&channel->queued_io); 2914 2915 return 0; 2916 } 2917 2918 static void 2919 bs_channel_destroy(void *io_device, void *ctx_buf) 2920 { 2921 struct spdk_bs_channel *channel = ctx_buf; 2922 spdk_bs_user_op_t *op; 2923 2924 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) { 2925 op = TAILQ_FIRST(&channel->need_cluster_alloc); 2926 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link); 2927 bs_user_op_abort(op); 2928 } 2929 2930 while (!TAILQ_EMPTY(&channel->queued_io)) { 2931 op = TAILQ_FIRST(&channel->queued_io); 2932 TAILQ_REMOVE(&channel->queued_io, op, link); 2933 bs_user_op_abort(op); 2934 } 2935 2936 free(channel->req_mem); 2937 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 2938 } 2939 2940 static void 2941 bs_dev_destroy(void *io_device) 2942 { 2943 struct spdk_blob_store *bs = io_device; 2944 struct spdk_blob *blob, *blob_tmp; 2945 2946 bs->dev->destroy(bs->dev); 2947 2948 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 2949 TAILQ_REMOVE(&bs->blobs, blob, link); 2950 blob_free(blob); 2951 } 2952 2953 pthread_mutex_destroy(&bs->used_clusters_mutex); 2954 2955 spdk_bit_array_free(&bs->used_blobids); 2956 spdk_bit_array_free(&bs->used_md_pages); 2957 spdk_bit_array_free(&bs->used_clusters); 2958 /* 2959 * If this function is called for any reason except a successful unload, 2960 * the unload_cpl type will be NONE and this will be a nop. 2961 */ 2962 bs_call_cpl(&bs->unload_cpl, bs->unload_err); 2963 2964 free(bs); 2965 } 2966 2967 static int 2968 bs_blob_list_add(struct spdk_blob *blob) 2969 { 2970 spdk_blob_id snapshot_id; 2971 struct spdk_blob_list *snapshot_entry = NULL; 2972 struct spdk_blob_list *clone_entry = NULL; 2973 2974 assert(blob != NULL); 2975 2976 snapshot_id = blob->parent_id; 2977 if (snapshot_id == SPDK_BLOBID_INVALID) { 2978 return 0; 2979 } 2980 2981 snapshot_entry = bs_get_snapshot_entry(blob->bs, snapshot_id); 2982 if (snapshot_entry == NULL) { 2983 /* Snapshot not found */ 2984 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list)); 2985 if (snapshot_entry == NULL) { 2986 return -ENOMEM; 2987 } 2988 snapshot_entry->id = snapshot_id; 2989 TAILQ_INIT(&snapshot_entry->clones); 2990 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link); 2991 } else { 2992 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 2993 if (clone_entry->id == blob->id) { 2994 break; 2995 } 2996 } 2997 } 2998 2999 if (clone_entry == NULL) { 3000 /* Clone not found */ 3001 clone_entry = calloc(1, sizeof(struct spdk_blob_list)); 3002 if (clone_entry == NULL) { 3003 return -ENOMEM; 3004 } 3005 clone_entry->id = blob->id; 3006 TAILQ_INIT(&clone_entry->clones); 3007 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link); 3008 snapshot_entry->clone_count++; 3009 } 3010 3011 return 0; 3012 } 3013 3014 static void 3015 bs_blob_list_remove(struct spdk_blob *blob) 3016 { 3017 struct spdk_blob_list *snapshot_entry = NULL; 3018 struct spdk_blob_list *clone_entry = NULL; 3019 3020 blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry); 3021 3022 if (snapshot_entry == NULL) { 3023 return; 3024 } 3025 3026 blob->parent_id = SPDK_BLOBID_INVALID; 3027 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 3028 free(clone_entry); 3029 3030 snapshot_entry->clone_count--; 3031 } 3032 3033 static int 3034 bs_blob_list_free(struct spdk_blob_store *bs) 3035 { 3036 struct spdk_blob_list *snapshot_entry; 3037 struct spdk_blob_list *snapshot_entry_tmp; 3038 struct spdk_blob_list *clone_entry; 3039 struct spdk_blob_list *clone_entry_tmp; 3040 3041 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) { 3042 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) { 3043 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 3044 free(clone_entry); 3045 } 3046 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link); 3047 free(snapshot_entry); 3048 } 3049 3050 return 0; 3051 } 3052 3053 static void 3054 bs_free(struct spdk_blob_store *bs) 3055 { 3056 bs_blob_list_free(bs); 3057 3058 bs_unregister_md_thread(bs); 3059 spdk_io_device_unregister(bs, bs_dev_destroy); 3060 } 3061 3062 void 3063 spdk_bs_opts_init(struct spdk_bs_opts *opts) 3064 { 3065 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 3066 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 3067 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 3068 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 3069 opts->clear_method = BS_CLEAR_WITH_UNMAP; 3070 memset(&opts->bstype, 0, sizeof(opts->bstype)); 3071 opts->iter_cb_fn = NULL; 3072 opts->iter_cb_arg = NULL; 3073 } 3074 3075 static int 3076 bs_opts_verify(struct spdk_bs_opts *opts) 3077 { 3078 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 3079 opts->max_channel_ops == 0) { 3080 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 3081 return -1; 3082 } 3083 3084 return 0; 3085 } 3086 3087 static int 3088 bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs) 3089 { 3090 struct spdk_blob_store *bs; 3091 uint64_t dev_size; 3092 int rc; 3093 3094 dev_size = dev->blocklen * dev->blockcnt; 3095 if (dev_size < opts->cluster_sz) { 3096 /* Device size cannot be smaller than cluster size of blobstore */ 3097 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n", 3098 dev_size, opts->cluster_sz); 3099 return -ENOSPC; 3100 } 3101 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 3102 /* Cluster size cannot be smaller than page size */ 3103 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n", 3104 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 3105 return -EINVAL; 3106 } 3107 bs = calloc(1, sizeof(struct spdk_blob_store)); 3108 if (!bs) { 3109 return -ENOMEM; 3110 } 3111 3112 TAILQ_INIT(&bs->blobs); 3113 TAILQ_INIT(&bs->snapshots); 3114 bs->dev = dev; 3115 bs->md_thread = spdk_get_thread(); 3116 assert(bs->md_thread != NULL); 3117 3118 /* 3119 * Do not use bs_lba_to_cluster() here since blockcnt may not be an 3120 * even multiple of the cluster size. 3121 */ 3122 bs->cluster_sz = opts->cluster_sz; 3123 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 3124 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 3125 if (spdk_u32_is_pow2(bs->pages_per_cluster)) { 3126 bs->pages_per_cluster_shift = spdk_u32log2(bs->pages_per_cluster); 3127 } 3128 bs->num_free_clusters = bs->total_clusters; 3129 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 3130 bs->io_unit_size = dev->blocklen; 3131 if (bs->used_clusters == NULL) { 3132 free(bs); 3133 return -ENOMEM; 3134 } 3135 3136 bs->max_channel_ops = opts->max_channel_ops; 3137 bs->super_blob = SPDK_BLOBID_INVALID; 3138 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 3139 3140 /* The metadata is assumed to be at least 1 page */ 3141 bs->used_md_pages = spdk_bit_array_create(1); 3142 bs->used_blobids = spdk_bit_array_create(0); 3143 3144 pthread_mutex_init(&bs->used_clusters_mutex, NULL); 3145 3146 spdk_io_device_register(bs, bs_channel_create, bs_channel_destroy, 3147 sizeof(struct spdk_bs_channel), "blobstore"); 3148 rc = bs_register_md_thread(bs); 3149 if (rc == -1) { 3150 spdk_io_device_unregister(bs, NULL); 3151 pthread_mutex_destroy(&bs->used_clusters_mutex); 3152 spdk_bit_array_free(&bs->used_blobids); 3153 spdk_bit_array_free(&bs->used_md_pages); 3154 spdk_bit_array_free(&bs->used_clusters); 3155 free(bs); 3156 /* FIXME: this is a lie but don't know how to get a proper error code here */ 3157 return -ENOMEM; 3158 } 3159 3160 *_bs = bs; 3161 return 0; 3162 } 3163 3164 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 3165 3166 struct spdk_bs_load_ctx { 3167 struct spdk_blob_store *bs; 3168 struct spdk_bs_super_block *super; 3169 3170 struct spdk_bs_md_mask *mask; 3171 bool in_page_chain; 3172 uint32_t page_index; 3173 uint32_t cur_page; 3174 struct spdk_blob_md_page *page; 3175 3176 uint64_t num_extent_pages; 3177 uint32_t *extent_page_num; 3178 struct spdk_blob_md_page *extent_pages; 3179 3180 spdk_bs_sequence_t *seq; 3181 spdk_blob_op_with_handle_complete iter_cb_fn; 3182 void *iter_cb_arg; 3183 struct spdk_blob *blob; 3184 spdk_blob_id blobid; 3185 }; 3186 3187 static void 3188 bs_load_ctx_fail(struct spdk_bs_load_ctx *ctx, int bserrno) 3189 { 3190 assert(bserrno != 0); 3191 3192 spdk_free(ctx->super); 3193 bs_sequence_finish(ctx->seq, bserrno); 3194 bs_free(ctx->bs); 3195 free(ctx); 3196 } 3197 3198 static void 3199 bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 3200 { 3201 uint32_t i = 0; 3202 3203 while (true) { 3204 i = spdk_bit_array_find_first_set(array, i); 3205 if (i >= mask->length) { 3206 break; 3207 } 3208 mask->mask[i / 8] |= 1U << (i % 8); 3209 i++; 3210 } 3211 } 3212 3213 static int 3214 bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask) 3215 { 3216 struct spdk_bit_array *array; 3217 uint32_t i; 3218 3219 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) { 3220 return -ENOMEM; 3221 } 3222 3223 array = *array_ptr; 3224 for (i = 0; i < mask->length; i++) { 3225 if (mask->mask[i / 8] & (1U << (i % 8))) { 3226 spdk_bit_array_set(array, i); 3227 } 3228 } 3229 3230 return 0; 3231 } 3232 3233 static void 3234 bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 3235 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 3236 { 3237 /* Update the values in the super block */ 3238 super->super_blob = bs->super_blob; 3239 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 3240 super->crc = blob_md_page_calc_crc(super); 3241 bs_sequence_write_dev(seq, super, bs_page_to_lba(bs, 0), 3242 bs_byte_to_lba(bs, sizeof(*super)), 3243 cb_fn, cb_arg); 3244 } 3245 3246 static void 3247 bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 3248 { 3249 struct spdk_bs_load_ctx *ctx = arg; 3250 uint64_t mask_size, lba, lba_count; 3251 3252 /* Write out the used clusters mask */ 3253 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 3254 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3255 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3256 if (!ctx->mask) { 3257 bs_load_ctx_fail(ctx, -ENOMEM); 3258 return; 3259 } 3260 3261 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 3262 ctx->mask->length = ctx->bs->total_clusters; 3263 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 3264 3265 bs_set_mask(ctx->bs->used_clusters, ctx->mask); 3266 lba = bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 3267 lba_count = bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 3268 bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 3269 } 3270 3271 static void 3272 bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 3273 { 3274 struct spdk_bs_load_ctx *ctx = arg; 3275 uint64_t mask_size, lba, lba_count; 3276 3277 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3278 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3279 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3280 if (!ctx->mask) { 3281 bs_load_ctx_fail(ctx, -ENOMEM); 3282 return; 3283 } 3284 3285 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 3286 ctx->mask->length = ctx->super->md_len; 3287 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 3288 3289 bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 3290 lba = bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3291 lba_count = bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3292 bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 3293 } 3294 3295 static void 3296 bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 3297 { 3298 struct spdk_bs_load_ctx *ctx = arg; 3299 uint64_t mask_size, lba, lba_count; 3300 3301 if (ctx->super->used_blobid_mask_len == 0) { 3302 /* 3303 * This is a pre-v3 on-disk format where the blobid mask does not get 3304 * written to disk. 3305 */ 3306 cb_fn(seq, arg, 0); 3307 return; 3308 } 3309 3310 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 3311 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3312 SPDK_MALLOC_DMA); 3313 if (!ctx->mask) { 3314 bs_load_ctx_fail(ctx, -ENOMEM); 3315 return; 3316 } 3317 3318 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 3319 ctx->mask->length = ctx->super->md_len; 3320 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 3321 3322 bs_set_mask(ctx->bs->used_blobids, ctx->mask); 3323 lba = bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 3324 lba_count = bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 3325 bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 3326 } 3327 3328 static void 3329 blob_set_thin_provision(struct spdk_blob *blob) 3330 { 3331 blob_verify_md_op(blob); 3332 blob->invalid_flags |= SPDK_BLOB_THIN_PROV; 3333 blob->state = SPDK_BLOB_STATE_DIRTY; 3334 } 3335 3336 static void 3337 blob_set_clear_method(struct spdk_blob *blob, enum blob_clear_method clear_method) 3338 { 3339 blob_verify_md_op(blob); 3340 blob->clear_method = clear_method; 3341 blob->md_ro_flags |= (clear_method << SPDK_BLOB_CLEAR_METHOD_SHIFT); 3342 blob->state = SPDK_BLOB_STATE_DIRTY; 3343 } 3344 3345 static void bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno); 3346 3347 static void 3348 bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno) 3349 { 3350 struct spdk_bs_load_ctx *ctx = cb_arg; 3351 spdk_blob_id id; 3352 int64_t page_num; 3353 3354 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our 3355 * last blob has been removed */ 3356 page_num = bs_blobid_to_page(ctx->blobid); 3357 page_num++; 3358 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num); 3359 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) { 3360 bs_load_iter(ctx, NULL, -ENOENT); 3361 return; 3362 } 3363 3364 id = bs_page_to_blobid(page_num); 3365 3366 spdk_bs_open_blob(ctx->bs, id, bs_load_iter, ctx); 3367 } 3368 3369 static void 3370 bs_delete_corrupted_close_cb(void *cb_arg, int bserrno) 3371 { 3372 struct spdk_bs_load_ctx *ctx = cb_arg; 3373 3374 if (bserrno != 0) { 3375 SPDK_ERRLOG("Failed to close corrupted blob\n"); 3376 spdk_bs_iter_next(ctx->bs, ctx->blob, bs_load_iter, ctx); 3377 return; 3378 } 3379 3380 spdk_bs_delete_blob(ctx->bs, ctx->blobid, bs_delete_corrupted_blob_cpl, ctx); 3381 } 3382 3383 static void 3384 bs_delete_corrupted_blob(void *cb_arg, int bserrno) 3385 { 3386 struct spdk_bs_load_ctx *ctx = cb_arg; 3387 uint64_t i; 3388 3389 if (bserrno != 0) { 3390 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 3391 spdk_bs_iter_next(ctx->bs, ctx->blob, bs_load_iter, ctx); 3392 return; 3393 } 3394 3395 /* Snapshot and clone have the same copy of cluster map and extent pages 3396 * at this point. Let's clear both for snpashot now, 3397 * so that it won't be cleared for clone later when we remove snapshot. 3398 * Also set thin provision to pass data corruption check */ 3399 for (i = 0; i < ctx->blob->active.num_clusters; i++) { 3400 ctx->blob->active.clusters[i] = 0; 3401 } 3402 for (i = 0; i < ctx->blob->active.num_extent_pages; i++) { 3403 ctx->blob->active.extent_pages[i] = 0; 3404 } 3405 3406 ctx->blob->md_ro = false; 3407 3408 blob_set_thin_provision(ctx->blob); 3409 3410 ctx->blobid = ctx->blob->id; 3411 3412 spdk_blob_close(ctx->blob, bs_delete_corrupted_close_cb, ctx); 3413 } 3414 3415 static void 3416 bs_update_corrupted_blob(void *cb_arg, int bserrno) 3417 { 3418 struct spdk_bs_load_ctx *ctx = cb_arg; 3419 3420 if (bserrno != 0) { 3421 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 3422 spdk_bs_iter_next(ctx->bs, ctx->blob, bs_load_iter, ctx); 3423 return; 3424 } 3425 3426 ctx->blob->md_ro = false; 3427 blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true); 3428 blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true); 3429 spdk_blob_set_read_only(ctx->blob); 3430 3431 if (ctx->iter_cb_fn) { 3432 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0); 3433 } 3434 bs_blob_list_add(ctx->blob); 3435 3436 spdk_bs_iter_next(ctx->bs, ctx->blob, bs_load_iter, ctx); 3437 } 3438 3439 static void 3440 bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno) 3441 { 3442 struct spdk_bs_load_ctx *ctx = cb_arg; 3443 3444 if (bserrno != 0) { 3445 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n"); 3446 spdk_bs_iter_next(ctx->bs, ctx->blob, bs_load_iter, ctx); 3447 return; 3448 } 3449 3450 if (blob->parent_id == ctx->blob->id) { 3451 /* Power failure occured before updating clone (snapshot delete case) 3452 * or after updating clone (creating snapshot case) - keep snapshot */ 3453 spdk_blob_close(blob, bs_update_corrupted_blob, ctx); 3454 } else { 3455 /* Power failure occured after updating clone (snapshot delete case) 3456 * or before updating clone (creating snapshot case) - remove snapshot */ 3457 spdk_blob_close(blob, bs_delete_corrupted_blob, ctx); 3458 } 3459 } 3460 3461 static void 3462 bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno) 3463 { 3464 struct spdk_bs_load_ctx *ctx = arg; 3465 const void *value; 3466 size_t len; 3467 int rc = 0; 3468 3469 if (bserrno == 0) { 3470 /* Examine blob if it is corrupted after power failure. Fix 3471 * the ones that can be fixed and remove any other corrupted 3472 * ones. If it is not corrupted just process it */ 3473 rc = blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true); 3474 if (rc != 0) { 3475 rc = blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true); 3476 if (rc != 0) { 3477 /* Not corrupted - process it and continue with iterating through blobs */ 3478 if (ctx->iter_cb_fn) { 3479 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0); 3480 } 3481 bs_blob_list_add(blob); 3482 spdk_bs_iter_next(ctx->bs, blob, bs_load_iter, ctx); 3483 return; 3484 } 3485 3486 } 3487 3488 assert(len == sizeof(spdk_blob_id)); 3489 3490 ctx->blob = blob; 3491 3492 /* Open clone to check if we are able to fix this blob or should we remove it */ 3493 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, bs_examine_clone, ctx); 3494 return; 3495 } else if (bserrno == -ENOENT) { 3496 bserrno = 0; 3497 } else { 3498 /* 3499 * This case needs to be looked at further. Same problem 3500 * exists with applications that rely on explicit blob 3501 * iteration. We should just skip the blob that failed 3502 * to load and continue on to the next one. 3503 */ 3504 SPDK_ERRLOG("Error in iterating blobs\n"); 3505 } 3506 3507 ctx->iter_cb_fn = NULL; 3508 3509 spdk_free(ctx->super); 3510 spdk_free(ctx->mask); 3511 bs_sequence_finish(ctx->seq, bserrno); 3512 free(ctx); 3513 } 3514 3515 static void 3516 bs_load_complete(struct spdk_bs_load_ctx *ctx) 3517 { 3518 spdk_bs_iter_first(ctx->bs, bs_load_iter, ctx); 3519 } 3520 3521 static void 3522 bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3523 { 3524 struct spdk_bs_load_ctx *ctx = cb_arg; 3525 int rc; 3526 3527 /* The type must be correct */ 3528 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 3529 3530 /* The length of the mask (in bits) must not be greater than 3531 * the length of the buffer (converted to bits) */ 3532 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 3533 3534 /* The length of the mask must be exactly equal to the size 3535 * (in pages) of the metadata region */ 3536 assert(ctx->mask->length == ctx->super->md_len); 3537 3538 rc = bs_load_mask(&ctx->bs->used_blobids, ctx->mask); 3539 if (rc < 0) { 3540 spdk_free(ctx->mask); 3541 bs_load_ctx_fail(ctx, rc); 3542 return; 3543 } 3544 3545 bs_load_complete(ctx); 3546 } 3547 3548 static void 3549 bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3550 { 3551 struct spdk_bs_load_ctx *ctx = cb_arg; 3552 uint64_t lba, lba_count, mask_size; 3553 int rc; 3554 3555 if (bserrno != 0) { 3556 bs_load_ctx_fail(ctx, bserrno); 3557 return; 3558 } 3559 3560 /* The type must be correct */ 3561 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 3562 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 3563 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 3564 struct spdk_blob_md_page) * 8)); 3565 /* The length of the mask must be exactly equal to the total number of clusters */ 3566 assert(ctx->mask->length == ctx->bs->total_clusters); 3567 3568 rc = bs_load_mask(&ctx->bs->used_clusters, ctx->mask); 3569 if (rc < 0) { 3570 spdk_free(ctx->mask); 3571 bs_load_ctx_fail(ctx, rc); 3572 return; 3573 } 3574 3575 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters); 3576 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters); 3577 3578 spdk_free(ctx->mask); 3579 3580 /* Read the used blobids mask */ 3581 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 3582 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3583 SPDK_MALLOC_DMA); 3584 if (!ctx->mask) { 3585 bs_load_ctx_fail(ctx, -ENOMEM); 3586 return; 3587 } 3588 lba = bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 3589 lba_count = bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 3590 bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3591 bs_load_used_blobids_cpl, ctx); 3592 } 3593 3594 static void 3595 bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3596 { 3597 struct spdk_bs_load_ctx *ctx = cb_arg; 3598 uint64_t lba, lba_count, mask_size; 3599 int rc; 3600 3601 if (bserrno != 0) { 3602 bs_load_ctx_fail(ctx, bserrno); 3603 return; 3604 } 3605 3606 /* The type must be correct */ 3607 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 3608 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 3609 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 3610 8)); 3611 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 3612 assert(ctx->mask->length == ctx->super->md_len); 3613 3614 rc = bs_load_mask(&ctx->bs->used_md_pages, ctx->mask); 3615 if (rc < 0) { 3616 spdk_free(ctx->mask); 3617 bs_load_ctx_fail(ctx, rc); 3618 return; 3619 } 3620 3621 spdk_free(ctx->mask); 3622 3623 /* Read the used clusters mask */ 3624 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 3625 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3626 SPDK_MALLOC_DMA); 3627 if (!ctx->mask) { 3628 bs_load_ctx_fail(ctx, -ENOMEM); 3629 return; 3630 } 3631 lba = bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 3632 lba_count = bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 3633 bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3634 bs_load_used_clusters_cpl, ctx); 3635 } 3636 3637 static void 3638 bs_load_read_used_pages(struct spdk_bs_load_ctx *ctx) 3639 { 3640 uint64_t lba, lba_count, mask_size; 3641 3642 /* Read the used pages mask */ 3643 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3644 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3645 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3646 if (!ctx->mask) { 3647 bs_load_ctx_fail(ctx, -ENOMEM); 3648 return; 3649 } 3650 3651 lba = bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3652 lba_count = bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3653 bs_sequence_read_dev(ctx->seq, ctx->mask, lba, lba_count, 3654 bs_load_used_pages_cpl, ctx); 3655 } 3656 3657 static int 3658 bs_load_replay_md_parse_page(struct spdk_bs_load_ctx *ctx, struct spdk_blob_md_page *page) 3659 { 3660 struct spdk_blob_store *bs = ctx->bs; 3661 struct spdk_blob_md_descriptor *desc; 3662 size_t cur_desc = 0; 3663 3664 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3665 while (cur_desc < sizeof(page->descriptors)) { 3666 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3667 if (desc->length == 0) { 3668 /* If padding and length are 0, this terminates the page */ 3669 break; 3670 } 3671 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3672 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3673 unsigned int i, j; 3674 unsigned int cluster_count = 0; 3675 uint32_t cluster_idx; 3676 3677 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3678 3679 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3680 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 3681 cluster_idx = desc_extent_rle->extents[i].cluster_idx; 3682 /* 3683 * cluster_idx = 0 means an unallocated cluster - don't mark that 3684 * in the used cluster map. 3685 */ 3686 if (cluster_idx != 0) { 3687 spdk_bit_array_set(bs->used_clusters, cluster_idx + j); 3688 if (bs->num_free_clusters == 0) { 3689 return -ENOSPC; 3690 } 3691 bs->num_free_clusters--; 3692 } 3693 cluster_count++; 3694 } 3695 } 3696 if (cluster_count == 0) { 3697 return -EINVAL; 3698 } 3699 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) { 3700 struct spdk_blob_md_descriptor_extent_page *desc_extent; 3701 uint32_t i; 3702 uint32_t cluster_count = 0; 3703 uint32_t cluster_idx; 3704 size_t cluster_idx_length; 3705 3706 desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc; 3707 cluster_idx_length = desc_extent->length - sizeof(desc_extent->start_cluster_idx); 3708 3709 if (desc_extent->length <= sizeof(desc_extent->start_cluster_idx) || 3710 (cluster_idx_length % sizeof(desc_extent->cluster_idx[0]) != 0)) { 3711 return -EINVAL; 3712 } 3713 3714 for (i = 0; i < cluster_idx_length / sizeof(desc_extent->cluster_idx[0]); i++) { 3715 cluster_idx = desc_extent->cluster_idx[i]; 3716 /* 3717 * cluster_idx = 0 means an unallocated cluster - don't mark that 3718 * in the used cluster map. 3719 */ 3720 if (cluster_idx != 0) { 3721 if (cluster_idx < desc_extent->start_cluster_idx && 3722 cluster_idx >= desc_extent->start_cluster_idx + cluster_count) { 3723 return -EINVAL; 3724 } 3725 spdk_bit_array_set(bs->used_clusters, cluster_idx); 3726 if (bs->num_free_clusters == 0) { 3727 return -ENOSPC; 3728 } 3729 bs->num_free_clusters--; 3730 } 3731 cluster_count++; 3732 } 3733 3734 if (cluster_count == 0) { 3735 return -EINVAL; 3736 } 3737 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3738 /* Skip this item */ 3739 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3740 /* Skip this item */ 3741 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3742 /* Skip this item */ 3743 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_TABLE) { 3744 struct spdk_blob_md_descriptor_extent_table *desc_extent_table; 3745 uint32_t num_extent_pages = ctx->num_extent_pages; 3746 uint32_t i; 3747 size_t extent_pages_length; 3748 void *tmp; 3749 3750 desc_extent_table = (struct spdk_blob_md_descriptor_extent_table *)desc; 3751 extent_pages_length = desc_extent_table->length - sizeof(desc_extent_table->num_clusters); 3752 3753 if (desc_extent_table->length == 0 || 3754 (extent_pages_length % sizeof(desc_extent_table->extent_page[0]) != 0)) { 3755 return -EINVAL; 3756 } 3757 3758 for (i = 0; i < extent_pages_length / sizeof(desc_extent_table->extent_page[0]); i++) { 3759 if (desc_extent_table->extent_page[i].page_idx != 0) { 3760 if (desc_extent_table->extent_page[i].num_pages != 1) { 3761 return -EINVAL; 3762 } 3763 num_extent_pages += 1; 3764 } 3765 } 3766 3767 if (num_extent_pages > 0) { 3768 tmp = realloc(ctx->extent_page_num, num_extent_pages * sizeof(uint32_t)); 3769 if (tmp == NULL) { 3770 return -ENOMEM; 3771 } 3772 ctx->extent_page_num = tmp; 3773 3774 /* Extent table entries contain md page numbers for extent pages. 3775 * Zeroes represent unallocated extent pages, those are run-length-encoded. 3776 */ 3777 for (i = 0; i < extent_pages_length / sizeof(desc_extent_table->extent_page[0]); i++) { 3778 if (desc_extent_table->extent_page[i].page_idx != 0) { 3779 ctx->extent_page_num[ctx->num_extent_pages] = desc_extent_table->extent_page[i].page_idx; 3780 ctx->num_extent_pages += 1; 3781 } 3782 } 3783 } 3784 } else { 3785 /* Error */ 3786 return -EINVAL; 3787 } 3788 /* Advance to the next descriptor */ 3789 cur_desc += sizeof(*desc) + desc->length; 3790 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3791 break; 3792 } 3793 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3794 } 3795 return 0; 3796 } 3797 3798 static bool bs_load_cur_extent_page_valid(struct spdk_blob_md_page *page) 3799 { 3800 uint32_t crc; 3801 struct spdk_blob_md_descriptor *desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3802 size_t desc_len; 3803 3804 crc = blob_md_page_calc_crc(page); 3805 if (crc != page->crc) { 3806 return false; 3807 } 3808 3809 /* Extent page should always be of sequence num 0. */ 3810 if (page->sequence_num != 0) { 3811 return false; 3812 } 3813 3814 /* Descriptor type must be EXTENT_PAGE. */ 3815 if (desc->type != SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) { 3816 return false; 3817 } 3818 3819 /* Descriptor length cannot exceed the page. */ 3820 desc_len = sizeof(*desc) + desc->length; 3821 if (desc_len > sizeof(page->descriptors)) { 3822 return false; 3823 } 3824 3825 /* It has to be the only descriptor in the page. */ 3826 if (desc_len + sizeof(*desc) <= sizeof(page->descriptors)) { 3827 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + desc_len); 3828 if (desc->length != 0) { 3829 return false; 3830 } 3831 } 3832 3833 return true; 3834 } 3835 3836 static bool bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 3837 { 3838 uint32_t crc; 3839 struct spdk_blob_md_page *page = ctx->page; 3840 3841 crc = blob_md_page_calc_crc(page); 3842 if (crc != page->crc) { 3843 return false; 3844 } 3845 3846 /* First page of a sequence should match the blobid. */ 3847 if (page->sequence_num == 0 && 3848 bs_page_to_blobid(ctx->cur_page) != page->id) { 3849 return false; 3850 } 3851 assert(bs_load_cur_extent_page_valid(page) == false); 3852 3853 return true; 3854 } 3855 3856 static void 3857 bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx); 3858 3859 static void 3860 bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3861 { 3862 struct spdk_bs_load_ctx *ctx = cb_arg; 3863 3864 if (bserrno != 0) { 3865 bs_load_ctx_fail(ctx, bserrno); 3866 return; 3867 } 3868 3869 bs_load_complete(ctx); 3870 } 3871 3872 static void 3873 bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3874 { 3875 struct spdk_bs_load_ctx *ctx = cb_arg; 3876 3877 spdk_free(ctx->mask); 3878 ctx->mask = NULL; 3879 3880 if (bserrno != 0) { 3881 bs_load_ctx_fail(ctx, bserrno); 3882 return; 3883 } 3884 3885 bs_write_used_clusters(seq, ctx, bs_load_write_used_clusters_cpl); 3886 } 3887 3888 static void 3889 bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3890 { 3891 struct spdk_bs_load_ctx *ctx = cb_arg; 3892 3893 spdk_free(ctx->mask); 3894 ctx->mask = NULL; 3895 3896 if (bserrno != 0) { 3897 bs_load_ctx_fail(ctx, bserrno); 3898 return; 3899 } 3900 3901 bs_write_used_blobids(seq, ctx, bs_load_write_used_blobids_cpl); 3902 } 3903 3904 static void 3905 bs_load_write_used_md(struct spdk_bs_load_ctx *ctx) 3906 { 3907 bs_write_used_md(ctx->seq, ctx, bs_load_write_used_pages_cpl); 3908 } 3909 3910 static void 3911 bs_load_replay_md_chain_cpl(struct spdk_bs_load_ctx *ctx) 3912 { 3913 uint64_t num_md_clusters; 3914 uint64_t i; 3915 3916 ctx->in_page_chain = false; 3917 3918 do { 3919 ctx->page_index++; 3920 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 3921 3922 if (ctx->page_index < ctx->super->md_len) { 3923 ctx->cur_page = ctx->page_index; 3924 bs_load_replay_cur_md_page(ctx); 3925 } else { 3926 /* Claim all of the clusters used by the metadata */ 3927 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster); 3928 for (i = 0; i < num_md_clusters; i++) { 3929 bs_claim_cluster(ctx->bs, i); 3930 } 3931 spdk_free(ctx->page); 3932 bs_load_write_used_md(ctx); 3933 } 3934 } 3935 3936 static void 3937 bs_load_replay_extent_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3938 { 3939 struct spdk_bs_load_ctx *ctx = cb_arg; 3940 uint32_t page_num; 3941 uint64_t i; 3942 3943 if (bserrno != 0) { 3944 spdk_free(ctx->extent_pages); 3945 bs_load_ctx_fail(ctx, bserrno); 3946 return; 3947 } 3948 3949 for (i = 0; i < ctx->num_extent_pages; i++) { 3950 /* Extent pages are only read when present within in chain md. 3951 * Integrity of md is not right if that page was not a valid extent page. */ 3952 if (bs_load_cur_extent_page_valid(&ctx->extent_pages[i]) != true) { 3953 spdk_free(ctx->extent_pages); 3954 bs_load_ctx_fail(ctx, -EILSEQ); 3955 return; 3956 } 3957 3958 page_num = ctx->extent_page_num[i]; 3959 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 3960 if (bs_load_replay_md_parse_page(ctx, &ctx->extent_pages[i])) { 3961 spdk_free(ctx->extent_pages); 3962 bs_load_ctx_fail(ctx, -EILSEQ); 3963 return; 3964 } 3965 } 3966 3967 spdk_free(ctx->extent_pages); 3968 free(ctx->extent_page_num); 3969 ctx->extent_page_num = NULL; 3970 ctx->num_extent_pages = 0; 3971 3972 bs_load_replay_md_chain_cpl(ctx); 3973 } 3974 3975 static void 3976 bs_load_replay_extent_pages(struct spdk_bs_load_ctx *ctx) 3977 { 3978 spdk_bs_batch_t *batch; 3979 uint32_t page; 3980 uint64_t lba; 3981 uint64_t i; 3982 3983 ctx->extent_pages = spdk_zmalloc(SPDK_BS_PAGE_SIZE * ctx->num_extent_pages, SPDK_BS_PAGE_SIZE, 3984 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3985 if (!ctx->extent_pages) { 3986 bs_load_ctx_fail(ctx, -ENOMEM); 3987 return; 3988 } 3989 3990 batch = bs_sequence_to_batch(ctx->seq, bs_load_replay_extent_page_cpl, ctx); 3991 3992 for (i = 0; i < ctx->num_extent_pages; i++) { 3993 page = ctx->extent_page_num[i]; 3994 assert(page < ctx->super->md_len); 3995 lba = bs_md_page_to_lba(ctx->bs, page); 3996 bs_batch_read_dev(batch, &ctx->extent_pages[i], lba, 3997 bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE)); 3998 } 3999 4000 bs_batch_close(batch); 4001 } 4002 4003 static void 4004 bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4005 { 4006 struct spdk_bs_load_ctx *ctx = cb_arg; 4007 uint32_t page_num; 4008 struct spdk_blob_md_page *page; 4009 4010 if (bserrno != 0) { 4011 bs_load_ctx_fail(ctx, bserrno); 4012 return; 4013 } 4014 4015 page_num = ctx->cur_page; 4016 page = ctx->page; 4017 if (bs_load_cur_md_page_valid(ctx) == true) { 4018 if (page->sequence_num == 0 || ctx->in_page_chain == true) { 4019 bs_claim_md_page(ctx->bs, page_num); 4020 if (page->sequence_num == 0) { 4021 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 4022 } 4023 if (bs_load_replay_md_parse_page(ctx, page)) { 4024 bs_load_ctx_fail(ctx, -EILSEQ); 4025 return; 4026 } 4027 if (page->next != SPDK_INVALID_MD_PAGE) { 4028 ctx->in_page_chain = true; 4029 ctx->cur_page = page->next; 4030 bs_load_replay_cur_md_page(ctx); 4031 return; 4032 } 4033 if (ctx->num_extent_pages != 0) { 4034 bs_load_replay_extent_pages(ctx); 4035 return; 4036 } 4037 } 4038 } 4039 bs_load_replay_md_chain_cpl(ctx); 4040 } 4041 4042 static void 4043 bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx) 4044 { 4045 uint64_t lba; 4046 4047 assert(ctx->cur_page < ctx->super->md_len); 4048 lba = bs_md_page_to_lba(ctx->bs, ctx->cur_page); 4049 bs_sequence_read_dev(ctx->seq, ctx->page, lba, 4050 bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 4051 bs_load_replay_md_cpl, ctx); 4052 } 4053 4054 static void 4055 bs_load_replay_md(struct spdk_bs_load_ctx *ctx) 4056 { 4057 ctx->page_index = 0; 4058 ctx->cur_page = 0; 4059 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 4060 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4061 if (!ctx->page) { 4062 bs_load_ctx_fail(ctx, -ENOMEM); 4063 return; 4064 } 4065 bs_load_replay_cur_md_page(ctx); 4066 } 4067 4068 static void 4069 bs_recover(struct spdk_bs_load_ctx *ctx) 4070 { 4071 int rc; 4072 4073 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 4074 if (rc < 0) { 4075 bs_load_ctx_fail(ctx, -ENOMEM); 4076 return; 4077 } 4078 4079 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 4080 if (rc < 0) { 4081 bs_load_ctx_fail(ctx, -ENOMEM); 4082 return; 4083 } 4084 4085 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 4086 if (rc < 0) { 4087 bs_load_ctx_fail(ctx, -ENOMEM); 4088 return; 4089 } 4090 4091 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 4092 bs_load_replay_md(ctx); 4093 } 4094 4095 static void 4096 bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4097 { 4098 struct spdk_bs_load_ctx *ctx = cb_arg; 4099 uint32_t crc; 4100 int rc; 4101 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 4102 4103 if (ctx->super->version > SPDK_BS_VERSION || 4104 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 4105 bs_load_ctx_fail(ctx, -EILSEQ); 4106 return; 4107 } 4108 4109 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 4110 sizeof(ctx->super->signature)) != 0) { 4111 bs_load_ctx_fail(ctx, -EILSEQ); 4112 return; 4113 } 4114 4115 crc = blob_md_page_calc_crc(ctx->super); 4116 if (crc != ctx->super->crc) { 4117 bs_load_ctx_fail(ctx, -EILSEQ); 4118 return; 4119 } 4120 4121 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 4122 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 4123 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 4124 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 4125 } else { 4126 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 4127 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 4128 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 4129 bs_load_ctx_fail(ctx, -ENXIO); 4130 return; 4131 } 4132 4133 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) { 4134 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n", 4135 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size); 4136 bs_load_ctx_fail(ctx, -EILSEQ); 4137 return; 4138 } 4139 4140 if (ctx->super->size == 0) { 4141 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen; 4142 } 4143 4144 if (ctx->super->io_unit_size == 0) { 4145 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE; 4146 } 4147 4148 /* Parse the super block */ 4149 ctx->bs->clean = 1; 4150 ctx->bs->cluster_sz = ctx->super->cluster_size; 4151 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size; 4152 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 4153 if (spdk_u32_is_pow2(ctx->bs->pages_per_cluster)) { 4154 ctx->bs->pages_per_cluster_shift = spdk_u32log2(ctx->bs->pages_per_cluster); 4155 } 4156 ctx->bs->io_unit_size = ctx->super->io_unit_size; 4157 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 4158 if (rc < 0) { 4159 bs_load_ctx_fail(ctx, -ENOMEM); 4160 return; 4161 } 4162 ctx->bs->md_start = ctx->super->md_start; 4163 ctx->bs->md_len = ctx->super->md_len; 4164 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up( 4165 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 4166 ctx->bs->super_blob = ctx->super->super_blob; 4167 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 4168 4169 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) { 4170 bs_recover(ctx); 4171 } else { 4172 bs_load_read_used_pages(ctx); 4173 } 4174 } 4175 4176 void 4177 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 4178 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 4179 { 4180 struct spdk_blob_store *bs; 4181 struct spdk_bs_cpl cpl; 4182 struct spdk_bs_load_ctx *ctx; 4183 struct spdk_bs_opts opts = {}; 4184 int err; 4185 4186 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 4187 4188 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 4189 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen); 4190 dev->destroy(dev); 4191 cb_fn(cb_arg, NULL, -EINVAL); 4192 return; 4193 } 4194 4195 if (o) { 4196 opts = *o; 4197 } else { 4198 spdk_bs_opts_init(&opts); 4199 } 4200 4201 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 4202 dev->destroy(dev); 4203 cb_fn(cb_arg, NULL, -EINVAL); 4204 return; 4205 } 4206 4207 err = bs_alloc(dev, &opts, &bs); 4208 if (err) { 4209 dev->destroy(dev); 4210 cb_fn(cb_arg, NULL, err); 4211 return; 4212 } 4213 4214 ctx = calloc(1, sizeof(*ctx)); 4215 if (!ctx) { 4216 bs_free(bs); 4217 cb_fn(cb_arg, NULL, -ENOMEM); 4218 return; 4219 } 4220 4221 ctx->bs = bs; 4222 ctx->iter_cb_fn = opts.iter_cb_fn; 4223 ctx->iter_cb_arg = opts.iter_cb_arg; 4224 4225 /* Allocate memory for the super block */ 4226 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4227 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4228 if (!ctx->super) { 4229 free(ctx); 4230 bs_free(bs); 4231 cb_fn(cb_arg, NULL, -ENOMEM); 4232 return; 4233 } 4234 4235 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 4236 cpl.u.bs_handle.cb_fn = cb_fn; 4237 cpl.u.bs_handle.cb_arg = cb_arg; 4238 cpl.u.bs_handle.bs = bs; 4239 4240 ctx->seq = bs_sequence_start(bs->md_channel, &cpl); 4241 if (!ctx->seq) { 4242 spdk_free(ctx->super); 4243 free(ctx); 4244 bs_free(bs); 4245 cb_fn(cb_arg, NULL, -ENOMEM); 4246 return; 4247 } 4248 4249 /* Read the super block */ 4250 bs_sequence_read_dev(ctx->seq, ctx->super, bs_page_to_lba(bs, 0), 4251 bs_byte_to_lba(bs, sizeof(*ctx->super)), 4252 bs_load_super_cpl, ctx); 4253 } 4254 4255 /* END spdk_bs_load */ 4256 4257 /* START spdk_bs_dump */ 4258 4259 struct spdk_bs_dump_ctx { 4260 struct spdk_blob_store *bs; 4261 struct spdk_bs_super_block *super; 4262 uint32_t cur_page; 4263 struct spdk_blob_md_page *page; 4264 spdk_bs_sequence_t *seq; 4265 FILE *fp; 4266 spdk_bs_dump_print_xattr print_xattr_fn; 4267 char xattr_name[4096]; 4268 }; 4269 4270 static void 4271 bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno) 4272 { 4273 spdk_free(ctx->super); 4274 4275 /* 4276 * We need to defer calling bs_call_cpl() until after 4277 * dev destruction, so tuck these away for later use. 4278 */ 4279 ctx->bs->unload_err = bserrno; 4280 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 4281 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 4282 4283 bs_sequence_finish(seq, 0); 4284 bs_free(ctx->bs); 4285 free(ctx); 4286 } 4287 4288 static void bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 4289 4290 static void 4291 bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx) 4292 { 4293 uint32_t page_idx = ctx->cur_page; 4294 struct spdk_blob_md_page *page = ctx->page; 4295 struct spdk_blob_md_descriptor *desc; 4296 size_t cur_desc = 0; 4297 uint32_t crc; 4298 4299 fprintf(ctx->fp, "=========\n"); 4300 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx); 4301 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id); 4302 4303 crc = blob_md_page_calc_crc(page); 4304 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch"); 4305 4306 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 4307 while (cur_desc < sizeof(page->descriptors)) { 4308 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 4309 if (desc->length == 0) { 4310 /* If padding and length are 0, this terminates the page */ 4311 break; 4312 } 4313 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 4314 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 4315 unsigned int i; 4316 4317 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 4318 4319 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 4320 if (desc_extent_rle->extents[i].cluster_idx != 0) { 4321 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 4322 desc_extent_rle->extents[i].cluster_idx); 4323 } else { 4324 fprintf(ctx->fp, "Unallocated Extent - "); 4325 } 4326 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length); 4327 fprintf(ctx->fp, "\n"); 4328 } 4329 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_PAGE) { 4330 struct spdk_blob_md_descriptor_extent_page *desc_extent; 4331 unsigned int i; 4332 4333 desc_extent = (struct spdk_blob_md_descriptor_extent_page *)desc; 4334 4335 for (i = 0; i < desc_extent->length / sizeof(desc_extent->cluster_idx[0]); i++) { 4336 if (desc_extent->cluster_idx[i] != 0) { 4337 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 4338 desc_extent->cluster_idx[i]); 4339 } else { 4340 fprintf(ctx->fp, "Unallocated Extent"); 4341 } 4342 fprintf(ctx->fp, "\n"); 4343 } 4344 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 4345 struct spdk_blob_md_descriptor_xattr *desc_xattr; 4346 uint32_t i; 4347 4348 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 4349 4350 if (desc_xattr->length != 4351 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) + 4352 desc_xattr->name_length + desc_xattr->value_length) { 4353 } 4354 4355 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length); 4356 ctx->xattr_name[desc_xattr->name_length] = '\0'; 4357 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name); 4358 fprintf(ctx->fp, " value = \""); 4359 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name, 4360 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 4361 desc_xattr->value_length); 4362 fprintf(ctx->fp, "\"\n"); 4363 for (i = 0; i < desc_xattr->value_length; i++) { 4364 if (i % 16 == 0) { 4365 fprintf(ctx->fp, " "); 4366 } 4367 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i)); 4368 if ((i + 1) % 16 == 0) { 4369 fprintf(ctx->fp, "\n"); 4370 } 4371 } 4372 if (i % 16 != 0) { 4373 fprintf(ctx->fp, "\n"); 4374 } 4375 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 4376 /* TODO */ 4377 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 4378 /* TODO */ 4379 } else { 4380 /* Error */ 4381 } 4382 /* Advance to the next descriptor */ 4383 cur_desc += sizeof(*desc) + desc->length; 4384 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 4385 break; 4386 } 4387 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 4388 } 4389 } 4390 4391 static void 4392 bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4393 { 4394 struct spdk_bs_dump_ctx *ctx = cb_arg; 4395 4396 if (bserrno != 0) { 4397 bs_dump_finish(seq, ctx, bserrno); 4398 return; 4399 } 4400 4401 if (ctx->page->id != 0) { 4402 bs_dump_print_md_page(ctx); 4403 } 4404 4405 ctx->cur_page++; 4406 4407 if (ctx->cur_page < ctx->super->md_len) { 4408 bs_dump_read_md_page(seq, ctx); 4409 } else { 4410 spdk_free(ctx->page); 4411 bs_dump_finish(seq, ctx, 0); 4412 } 4413 } 4414 4415 static void 4416 bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 4417 { 4418 struct spdk_bs_dump_ctx *ctx = cb_arg; 4419 uint64_t lba; 4420 4421 assert(ctx->cur_page < ctx->super->md_len); 4422 lba = bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 4423 bs_sequence_read_dev(seq, ctx->page, lba, 4424 bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 4425 bs_dump_read_md_page_cpl, ctx); 4426 } 4427 4428 static void 4429 bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4430 { 4431 struct spdk_bs_dump_ctx *ctx = cb_arg; 4432 4433 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature); 4434 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 4435 sizeof(ctx->super->signature)) != 0) { 4436 fprintf(ctx->fp, "(Mismatch)\n"); 4437 bs_dump_finish(seq, ctx, bserrno); 4438 return; 4439 } else { 4440 fprintf(ctx->fp, "(OK)\n"); 4441 } 4442 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version); 4443 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc, 4444 (ctx->super->crc == blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch"); 4445 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype); 4446 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size); 4447 fprintf(ctx->fp, "Super Blob ID: "); 4448 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) { 4449 fprintf(ctx->fp, "(None)\n"); 4450 } else { 4451 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob); 4452 } 4453 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean); 4454 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start); 4455 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len); 4456 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start); 4457 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len); 4458 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start); 4459 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len); 4460 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start); 4461 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len); 4462 4463 ctx->cur_page = 0; 4464 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 4465 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4466 if (!ctx->page) { 4467 bs_dump_finish(seq, ctx, -ENOMEM); 4468 return; 4469 } 4470 bs_dump_read_md_page(seq, ctx); 4471 } 4472 4473 void 4474 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn, 4475 spdk_bs_op_complete cb_fn, void *cb_arg) 4476 { 4477 struct spdk_blob_store *bs; 4478 struct spdk_bs_cpl cpl; 4479 spdk_bs_sequence_t *seq; 4480 struct spdk_bs_dump_ctx *ctx; 4481 struct spdk_bs_opts opts = {}; 4482 int err; 4483 4484 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev); 4485 4486 spdk_bs_opts_init(&opts); 4487 4488 err = bs_alloc(dev, &opts, &bs); 4489 if (err) { 4490 dev->destroy(dev); 4491 cb_fn(cb_arg, err); 4492 return; 4493 } 4494 4495 ctx = calloc(1, sizeof(*ctx)); 4496 if (!ctx) { 4497 bs_free(bs); 4498 cb_fn(cb_arg, -ENOMEM); 4499 return; 4500 } 4501 4502 ctx->bs = bs; 4503 ctx->fp = fp; 4504 ctx->print_xattr_fn = print_xattr_fn; 4505 4506 /* Allocate memory for the super block */ 4507 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4508 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4509 if (!ctx->super) { 4510 free(ctx); 4511 bs_free(bs); 4512 cb_fn(cb_arg, -ENOMEM); 4513 return; 4514 } 4515 4516 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4517 cpl.u.bs_basic.cb_fn = cb_fn; 4518 cpl.u.bs_basic.cb_arg = cb_arg; 4519 4520 seq = bs_sequence_start(bs->md_channel, &cpl); 4521 if (!seq) { 4522 spdk_free(ctx->super); 4523 free(ctx); 4524 bs_free(bs); 4525 cb_fn(cb_arg, -ENOMEM); 4526 return; 4527 } 4528 4529 /* Read the super block */ 4530 bs_sequence_read_dev(seq, ctx->super, bs_page_to_lba(bs, 0), 4531 bs_byte_to_lba(bs, sizeof(*ctx->super)), 4532 bs_dump_super_cpl, ctx); 4533 } 4534 4535 /* END spdk_bs_dump */ 4536 4537 /* START spdk_bs_init */ 4538 4539 struct spdk_bs_init_ctx { 4540 struct spdk_blob_store *bs; 4541 struct spdk_bs_super_block *super; 4542 }; 4543 4544 static void 4545 bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4546 { 4547 struct spdk_bs_init_ctx *ctx = cb_arg; 4548 4549 spdk_free(ctx->super); 4550 free(ctx); 4551 4552 bs_sequence_finish(seq, bserrno); 4553 } 4554 4555 static void 4556 bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4557 { 4558 struct spdk_bs_init_ctx *ctx = cb_arg; 4559 4560 /* Write super block */ 4561 bs_sequence_write_dev(seq, ctx->super, bs_page_to_lba(ctx->bs, 0), 4562 bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 4563 bs_init_persist_super_cpl, ctx); 4564 } 4565 4566 void 4567 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 4568 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 4569 { 4570 struct spdk_bs_init_ctx *ctx; 4571 struct spdk_blob_store *bs; 4572 struct spdk_bs_cpl cpl; 4573 spdk_bs_sequence_t *seq; 4574 spdk_bs_batch_t *batch; 4575 uint64_t num_md_lba; 4576 uint64_t num_md_pages; 4577 uint64_t num_md_clusters; 4578 uint32_t i; 4579 struct spdk_bs_opts opts = {}; 4580 int rc; 4581 4582 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 4583 4584 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 4585 SPDK_ERRLOG("unsupported dev block length of %d\n", 4586 dev->blocklen); 4587 dev->destroy(dev); 4588 cb_fn(cb_arg, NULL, -EINVAL); 4589 return; 4590 } 4591 4592 if (o) { 4593 opts = *o; 4594 } else { 4595 spdk_bs_opts_init(&opts); 4596 } 4597 4598 if (bs_opts_verify(&opts) != 0) { 4599 dev->destroy(dev); 4600 cb_fn(cb_arg, NULL, -EINVAL); 4601 return; 4602 } 4603 4604 rc = bs_alloc(dev, &opts, &bs); 4605 if (rc) { 4606 dev->destroy(dev); 4607 cb_fn(cb_arg, NULL, rc); 4608 return; 4609 } 4610 4611 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 4612 /* By default, allocate 1 page per cluster. 4613 * Technically, this over-allocates metadata 4614 * because more metadata will reduce the number 4615 * of usable clusters. This can be addressed with 4616 * more complex math in the future. 4617 */ 4618 bs->md_len = bs->total_clusters; 4619 } else { 4620 bs->md_len = opts.num_md_pages; 4621 } 4622 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 4623 if (rc < 0) { 4624 bs_free(bs); 4625 cb_fn(cb_arg, NULL, -ENOMEM); 4626 return; 4627 } 4628 4629 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 4630 if (rc < 0) { 4631 bs_free(bs); 4632 cb_fn(cb_arg, NULL, -ENOMEM); 4633 return; 4634 } 4635 4636 ctx = calloc(1, sizeof(*ctx)); 4637 if (!ctx) { 4638 bs_free(bs); 4639 cb_fn(cb_arg, NULL, -ENOMEM); 4640 return; 4641 } 4642 4643 ctx->bs = bs; 4644 4645 /* Allocate memory for the super block */ 4646 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4647 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4648 if (!ctx->super) { 4649 free(ctx); 4650 bs_free(bs); 4651 cb_fn(cb_arg, NULL, -ENOMEM); 4652 return; 4653 } 4654 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 4655 sizeof(ctx->super->signature)); 4656 ctx->super->version = SPDK_BS_VERSION; 4657 ctx->super->length = sizeof(*ctx->super); 4658 ctx->super->super_blob = bs->super_blob; 4659 ctx->super->clean = 0; 4660 ctx->super->cluster_size = bs->cluster_sz; 4661 ctx->super->io_unit_size = bs->io_unit_size; 4662 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 4663 4664 /* Calculate how many pages the metadata consumes at the front 4665 * of the disk. 4666 */ 4667 4668 /* The super block uses 1 page */ 4669 num_md_pages = 1; 4670 4671 /* The used_md_pages mask requires 1 bit per metadata page, rounded 4672 * up to the nearest page, plus a header. 4673 */ 4674 ctx->super->used_page_mask_start = num_md_pages; 4675 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 4676 spdk_divide_round_up(bs->md_len, 8), 4677 SPDK_BS_PAGE_SIZE); 4678 num_md_pages += ctx->super->used_page_mask_len; 4679 4680 /* The used_clusters mask requires 1 bit per cluster, rounded 4681 * up to the nearest page, plus a header. 4682 */ 4683 ctx->super->used_cluster_mask_start = num_md_pages; 4684 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 4685 spdk_divide_round_up(bs->total_clusters, 8), 4686 SPDK_BS_PAGE_SIZE); 4687 num_md_pages += ctx->super->used_cluster_mask_len; 4688 4689 /* The used_blobids mask requires 1 bit per metadata page, rounded 4690 * up to the nearest page, plus a header. 4691 */ 4692 ctx->super->used_blobid_mask_start = num_md_pages; 4693 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 4694 spdk_divide_round_up(bs->md_len, 8), 4695 SPDK_BS_PAGE_SIZE); 4696 num_md_pages += ctx->super->used_blobid_mask_len; 4697 4698 /* The metadata region size was chosen above */ 4699 ctx->super->md_start = bs->md_start = num_md_pages; 4700 ctx->super->md_len = bs->md_len; 4701 num_md_pages += bs->md_len; 4702 4703 num_md_lba = bs_page_to_lba(bs, num_md_pages); 4704 4705 ctx->super->size = dev->blockcnt * dev->blocklen; 4706 4707 ctx->super->crc = blob_md_page_calc_crc(ctx->super); 4708 4709 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster); 4710 if (num_md_clusters > bs->total_clusters) { 4711 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 4712 "please decrease number of pages reserved for metadata " 4713 "or increase cluster size.\n"); 4714 spdk_free(ctx->super); 4715 free(ctx); 4716 bs_free(bs); 4717 cb_fn(cb_arg, NULL, -ENOMEM); 4718 return; 4719 } 4720 /* Claim all of the clusters used by the metadata */ 4721 for (i = 0; i < num_md_clusters; i++) { 4722 bs_claim_cluster(bs, i); 4723 } 4724 4725 bs->total_data_clusters = bs->num_free_clusters; 4726 4727 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 4728 cpl.u.bs_handle.cb_fn = cb_fn; 4729 cpl.u.bs_handle.cb_arg = cb_arg; 4730 cpl.u.bs_handle.bs = bs; 4731 4732 seq = bs_sequence_start(bs->md_channel, &cpl); 4733 if (!seq) { 4734 spdk_free(ctx->super); 4735 free(ctx); 4736 bs_free(bs); 4737 cb_fn(cb_arg, NULL, -ENOMEM); 4738 return; 4739 } 4740 4741 batch = bs_sequence_to_batch(seq, bs_init_trim_cpl, ctx); 4742 4743 /* Clear metadata space */ 4744 bs_batch_write_zeroes_dev(batch, 0, num_md_lba); 4745 4746 switch (opts.clear_method) { 4747 case BS_CLEAR_WITH_UNMAP: 4748 /* Trim data clusters */ 4749 bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 4750 break; 4751 case BS_CLEAR_WITH_WRITE_ZEROES: 4752 /* Write_zeroes to data clusters */ 4753 bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 4754 break; 4755 case BS_CLEAR_WITH_NONE: 4756 default: 4757 break; 4758 } 4759 4760 bs_batch_close(batch); 4761 } 4762 4763 /* END spdk_bs_init */ 4764 4765 /* START spdk_bs_destroy */ 4766 4767 static void 4768 bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4769 { 4770 struct spdk_bs_init_ctx *ctx = cb_arg; 4771 struct spdk_blob_store *bs = ctx->bs; 4772 4773 /* 4774 * We need to defer calling bs_call_cpl() until after 4775 * dev destruction, so tuck these away for later use. 4776 */ 4777 bs->unload_err = bserrno; 4778 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 4779 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 4780 4781 bs_sequence_finish(seq, bserrno); 4782 4783 bs_free(bs); 4784 free(ctx); 4785 } 4786 4787 void 4788 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 4789 void *cb_arg) 4790 { 4791 struct spdk_bs_cpl cpl; 4792 spdk_bs_sequence_t *seq; 4793 struct spdk_bs_init_ctx *ctx; 4794 4795 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 4796 4797 if (!TAILQ_EMPTY(&bs->blobs)) { 4798 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4799 cb_fn(cb_arg, -EBUSY); 4800 return; 4801 } 4802 4803 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4804 cpl.u.bs_basic.cb_fn = cb_fn; 4805 cpl.u.bs_basic.cb_arg = cb_arg; 4806 4807 ctx = calloc(1, sizeof(*ctx)); 4808 if (!ctx) { 4809 cb_fn(cb_arg, -ENOMEM); 4810 return; 4811 } 4812 4813 ctx->bs = bs; 4814 4815 seq = bs_sequence_start(bs->md_channel, &cpl); 4816 if (!seq) { 4817 free(ctx); 4818 cb_fn(cb_arg, -ENOMEM); 4819 return; 4820 } 4821 4822 /* Write zeroes to the super block */ 4823 bs_sequence_write_zeroes_dev(seq, 4824 bs_page_to_lba(bs, 0), 4825 bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 4826 bs_destroy_trim_cpl, ctx); 4827 } 4828 4829 /* END spdk_bs_destroy */ 4830 4831 /* START spdk_bs_unload */ 4832 4833 static void 4834 bs_unload_finish(struct spdk_bs_load_ctx *ctx, int bserrno) 4835 { 4836 spdk_bs_sequence_t *seq = ctx->seq; 4837 4838 spdk_free(ctx->super); 4839 4840 /* 4841 * We need to defer calling bs_call_cpl() until after 4842 * dev destruction, so tuck these away for later use. 4843 */ 4844 ctx->bs->unload_err = bserrno; 4845 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 4846 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 4847 4848 bs_sequence_finish(seq, bserrno); 4849 4850 bs_free(ctx->bs); 4851 free(ctx); 4852 } 4853 4854 static void 4855 bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4856 { 4857 struct spdk_bs_load_ctx *ctx = cb_arg; 4858 4859 bs_unload_finish(ctx, bserrno); 4860 } 4861 4862 static void 4863 bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4864 { 4865 struct spdk_bs_load_ctx *ctx = cb_arg; 4866 4867 spdk_free(ctx->mask); 4868 4869 if (bserrno != 0) { 4870 bs_unload_finish(ctx, bserrno); 4871 return; 4872 } 4873 4874 ctx->super->clean = 1; 4875 4876 bs_write_super(seq, ctx->bs, ctx->super, bs_unload_write_super_cpl, ctx); 4877 } 4878 4879 static void 4880 bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4881 { 4882 struct spdk_bs_load_ctx *ctx = cb_arg; 4883 4884 spdk_free(ctx->mask); 4885 ctx->mask = NULL; 4886 4887 if (bserrno != 0) { 4888 bs_unload_finish(ctx, bserrno); 4889 return; 4890 } 4891 4892 bs_write_used_clusters(seq, ctx, bs_unload_write_used_clusters_cpl); 4893 } 4894 4895 static void 4896 bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4897 { 4898 struct spdk_bs_load_ctx *ctx = cb_arg; 4899 4900 spdk_free(ctx->mask); 4901 ctx->mask = NULL; 4902 4903 if (bserrno != 0) { 4904 bs_unload_finish(ctx, bserrno); 4905 return; 4906 } 4907 4908 bs_write_used_blobids(seq, ctx, bs_unload_write_used_blobids_cpl); 4909 } 4910 4911 static void 4912 bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4913 { 4914 struct spdk_bs_load_ctx *ctx = cb_arg; 4915 4916 if (bserrno != 0) { 4917 bs_unload_finish(ctx, bserrno); 4918 return; 4919 } 4920 4921 bs_write_used_md(seq, cb_arg, bs_unload_write_used_pages_cpl); 4922 } 4923 4924 void 4925 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 4926 { 4927 struct spdk_bs_cpl cpl; 4928 struct spdk_bs_load_ctx *ctx; 4929 4930 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 4931 4932 if (!TAILQ_EMPTY(&bs->blobs)) { 4933 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4934 cb_fn(cb_arg, -EBUSY); 4935 return; 4936 } 4937 4938 ctx = calloc(1, sizeof(*ctx)); 4939 if (!ctx) { 4940 cb_fn(cb_arg, -ENOMEM); 4941 return; 4942 } 4943 4944 ctx->bs = bs; 4945 4946 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4947 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4948 if (!ctx->super) { 4949 free(ctx); 4950 cb_fn(cb_arg, -ENOMEM); 4951 return; 4952 } 4953 4954 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4955 cpl.u.bs_basic.cb_fn = cb_fn; 4956 cpl.u.bs_basic.cb_arg = cb_arg; 4957 4958 ctx->seq = bs_sequence_start(bs->md_channel, &cpl); 4959 if (!ctx->seq) { 4960 spdk_free(ctx->super); 4961 free(ctx); 4962 cb_fn(cb_arg, -ENOMEM); 4963 return; 4964 } 4965 4966 /* Read super block */ 4967 bs_sequence_read_dev(ctx->seq, ctx->super, bs_page_to_lba(bs, 0), 4968 bs_byte_to_lba(bs, sizeof(*ctx->super)), 4969 bs_unload_read_super_cpl, ctx); 4970 } 4971 4972 /* END spdk_bs_unload */ 4973 4974 /* START spdk_bs_set_super */ 4975 4976 struct spdk_bs_set_super_ctx { 4977 struct spdk_blob_store *bs; 4978 struct spdk_bs_super_block *super; 4979 }; 4980 4981 static void 4982 bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4983 { 4984 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4985 4986 if (bserrno != 0) { 4987 SPDK_ERRLOG("Unable to write to super block of blobstore\n"); 4988 } 4989 4990 spdk_free(ctx->super); 4991 4992 bs_sequence_finish(seq, bserrno); 4993 4994 free(ctx); 4995 } 4996 4997 static void 4998 bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4999 { 5000 struct spdk_bs_set_super_ctx *ctx = cb_arg; 5001 5002 if (bserrno != 0) { 5003 SPDK_ERRLOG("Unable to read super block of blobstore\n"); 5004 spdk_free(ctx->super); 5005 bs_sequence_finish(seq, bserrno); 5006 free(ctx); 5007 return; 5008 } 5009 5010 bs_write_super(seq, ctx->bs, ctx->super, bs_set_super_write_cpl, ctx); 5011 } 5012 5013 void 5014 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 5015 spdk_bs_op_complete cb_fn, void *cb_arg) 5016 { 5017 struct spdk_bs_cpl cpl; 5018 spdk_bs_sequence_t *seq; 5019 struct spdk_bs_set_super_ctx *ctx; 5020 5021 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n"); 5022 5023 ctx = calloc(1, sizeof(*ctx)); 5024 if (!ctx) { 5025 cb_fn(cb_arg, -ENOMEM); 5026 return; 5027 } 5028 5029 ctx->bs = bs; 5030 5031 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 5032 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 5033 if (!ctx->super) { 5034 free(ctx); 5035 cb_fn(cb_arg, -ENOMEM); 5036 return; 5037 } 5038 5039 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 5040 cpl.u.bs_basic.cb_fn = cb_fn; 5041 cpl.u.bs_basic.cb_arg = cb_arg; 5042 5043 seq = bs_sequence_start(bs->md_channel, &cpl); 5044 if (!seq) { 5045 spdk_free(ctx->super); 5046 free(ctx); 5047 cb_fn(cb_arg, -ENOMEM); 5048 return; 5049 } 5050 5051 bs->super_blob = blobid; 5052 5053 /* Read super block */ 5054 bs_sequence_read_dev(seq, ctx->super, bs_page_to_lba(bs, 0), 5055 bs_byte_to_lba(bs, sizeof(*ctx->super)), 5056 bs_set_super_read_cpl, ctx); 5057 } 5058 5059 /* END spdk_bs_set_super */ 5060 5061 void 5062 spdk_bs_get_super(struct spdk_blob_store *bs, 5063 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 5064 { 5065 if (bs->super_blob == SPDK_BLOBID_INVALID) { 5066 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 5067 } else { 5068 cb_fn(cb_arg, bs->super_blob, 0); 5069 } 5070 } 5071 5072 uint64_t 5073 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 5074 { 5075 return bs->cluster_sz; 5076 } 5077 5078 uint64_t 5079 spdk_bs_get_page_size(struct spdk_blob_store *bs) 5080 { 5081 return SPDK_BS_PAGE_SIZE; 5082 } 5083 5084 uint64_t 5085 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs) 5086 { 5087 return bs->io_unit_size; 5088 } 5089 5090 uint64_t 5091 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 5092 { 5093 return bs->num_free_clusters; 5094 } 5095 5096 uint64_t 5097 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 5098 { 5099 return bs->total_data_clusters; 5100 } 5101 5102 static int 5103 bs_register_md_thread(struct spdk_blob_store *bs) 5104 { 5105 bs->md_channel = spdk_get_io_channel(bs); 5106 if (!bs->md_channel) { 5107 SPDK_ERRLOG("Failed to get IO channel.\n"); 5108 return -1; 5109 } 5110 5111 return 0; 5112 } 5113 5114 static int 5115 bs_unregister_md_thread(struct spdk_blob_store *bs) 5116 { 5117 spdk_put_io_channel(bs->md_channel); 5118 5119 return 0; 5120 } 5121 5122 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 5123 { 5124 assert(blob != NULL); 5125 5126 return blob->id; 5127 } 5128 5129 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 5130 { 5131 assert(blob != NULL); 5132 5133 return bs_cluster_to_page(blob->bs, blob->active.num_clusters); 5134 } 5135 5136 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob) 5137 { 5138 assert(blob != NULL); 5139 5140 return spdk_blob_get_num_pages(blob) * bs_io_unit_per_page(blob->bs); 5141 } 5142 5143 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 5144 { 5145 assert(blob != NULL); 5146 5147 return blob->active.num_clusters; 5148 } 5149 5150 /* START spdk_bs_create_blob */ 5151 5152 static void 5153 bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5154 { 5155 struct spdk_blob *blob = cb_arg; 5156 uint32_t page_idx = bs_blobid_to_page(blob->id); 5157 5158 if (bserrno != 0) { 5159 spdk_bit_array_clear(blob->bs->used_blobids, page_idx); 5160 bs_release_md_page(blob->bs, page_idx); 5161 } 5162 5163 blob_free(blob); 5164 5165 bs_sequence_finish(seq, bserrno); 5166 } 5167 5168 static int 5169 blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs, 5170 bool internal) 5171 { 5172 uint64_t i; 5173 size_t value_len = 0; 5174 int rc; 5175 const void *value = NULL; 5176 if (xattrs->count > 0 && xattrs->get_value == NULL) { 5177 return -EINVAL; 5178 } 5179 for (i = 0; i < xattrs->count; i++) { 5180 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len); 5181 if (value == NULL || value_len == 0) { 5182 return -EINVAL; 5183 } 5184 rc = blob_set_xattr(blob, xattrs->names[i], value, value_len, internal); 5185 if (rc < 0) { 5186 return rc; 5187 } 5188 } 5189 return 0; 5190 } 5191 5192 static void 5193 bs_create_blob(struct spdk_blob_store *bs, 5194 const struct spdk_blob_opts *opts, 5195 const struct spdk_blob_xattr_opts *internal_xattrs, 5196 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 5197 { 5198 struct spdk_blob *blob; 5199 uint32_t page_idx; 5200 struct spdk_bs_cpl cpl; 5201 struct spdk_blob_opts opts_default; 5202 struct spdk_blob_xattr_opts internal_xattrs_default; 5203 spdk_bs_sequence_t *seq; 5204 spdk_blob_id id; 5205 int rc; 5206 5207 assert(spdk_get_thread() == bs->md_thread); 5208 5209 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 5210 if (page_idx == UINT32_MAX) { 5211 cb_fn(cb_arg, 0, -ENOMEM); 5212 return; 5213 } 5214 spdk_bit_array_set(bs->used_blobids, page_idx); 5215 bs_claim_md_page(bs, page_idx); 5216 5217 id = bs_page_to_blobid(page_idx); 5218 5219 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 5220 5221 blob = blob_alloc(bs, id); 5222 if (!blob) { 5223 spdk_bit_array_clear(bs->used_blobids, page_idx); 5224 bs_release_md_page(bs, page_idx); 5225 cb_fn(cb_arg, 0, -ENOMEM); 5226 return; 5227 } 5228 5229 if (!opts) { 5230 spdk_blob_opts_init(&opts_default); 5231 opts = &opts_default; 5232 } 5233 5234 blob->use_extent_table = opts->use_extent_table; 5235 if (blob->use_extent_table) { 5236 blob->invalid_flags |= SPDK_BLOB_EXTENT_TABLE; 5237 } 5238 5239 if (!internal_xattrs) { 5240 blob_xattrs_init(&internal_xattrs_default); 5241 internal_xattrs = &internal_xattrs_default; 5242 } 5243 5244 rc = blob_set_xattrs(blob, &opts->xattrs, false); 5245 if (rc < 0) { 5246 blob_free(blob); 5247 spdk_bit_array_clear(bs->used_blobids, page_idx); 5248 bs_release_md_page(bs, page_idx); 5249 cb_fn(cb_arg, 0, rc); 5250 return; 5251 } 5252 5253 rc = blob_set_xattrs(blob, internal_xattrs, true); 5254 if (rc < 0) { 5255 blob_free(blob); 5256 spdk_bit_array_clear(bs->used_blobids, page_idx); 5257 bs_release_md_page(bs, page_idx); 5258 cb_fn(cb_arg, 0, rc); 5259 return; 5260 } 5261 5262 if (opts->thin_provision) { 5263 blob_set_thin_provision(blob); 5264 } 5265 5266 blob_set_clear_method(blob, opts->clear_method); 5267 5268 rc = blob_resize(blob, opts->num_clusters); 5269 if (rc < 0) { 5270 blob_free(blob); 5271 spdk_bit_array_clear(bs->used_blobids, page_idx); 5272 bs_release_md_page(bs, page_idx); 5273 cb_fn(cb_arg, 0, rc); 5274 return; 5275 } 5276 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 5277 cpl.u.blobid.cb_fn = cb_fn; 5278 cpl.u.blobid.cb_arg = cb_arg; 5279 cpl.u.blobid.blobid = blob->id; 5280 5281 seq = bs_sequence_start(bs->md_channel, &cpl); 5282 if (!seq) { 5283 blob_free(blob); 5284 spdk_bit_array_clear(bs->used_blobids, page_idx); 5285 bs_release_md_page(bs, page_idx); 5286 cb_fn(cb_arg, 0, -ENOMEM); 5287 return; 5288 } 5289 5290 blob_persist(seq, blob, bs_create_blob_cpl, blob); 5291 } 5292 5293 void spdk_bs_create_blob(struct spdk_blob_store *bs, 5294 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 5295 { 5296 bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg); 5297 } 5298 5299 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 5300 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 5301 { 5302 bs_create_blob(bs, opts, NULL, cb_fn, cb_arg); 5303 } 5304 5305 /* END spdk_bs_create_blob */ 5306 5307 /* START blob_cleanup */ 5308 5309 struct spdk_clone_snapshot_ctx { 5310 struct spdk_bs_cpl cpl; 5311 int bserrno; 5312 bool frozen; 5313 5314 struct spdk_io_channel *channel; 5315 5316 /* Current cluster for inflate operation */ 5317 uint64_t cluster; 5318 5319 /* For inflation force allocation of all unallocated clusters and remove 5320 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */ 5321 bool allocate_all; 5322 5323 struct { 5324 spdk_blob_id id; 5325 struct spdk_blob *blob; 5326 } original; 5327 struct { 5328 spdk_blob_id id; 5329 struct spdk_blob *blob; 5330 } new; 5331 5332 /* xattrs specified for snapshot/clones only. They have no impact on 5333 * the original blobs xattrs. */ 5334 const struct spdk_blob_xattr_opts *xattrs; 5335 }; 5336 5337 static void 5338 bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) 5339 { 5340 struct spdk_clone_snapshot_ctx *ctx = cb_arg; 5341 struct spdk_bs_cpl *cpl = &ctx->cpl; 5342 5343 if (bserrno != 0) { 5344 if (ctx->bserrno != 0) { 5345 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 5346 } else { 5347 ctx->bserrno = bserrno; 5348 } 5349 } 5350 5351 switch (cpl->type) { 5352 case SPDK_BS_CPL_TYPE_BLOBID: 5353 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno); 5354 break; 5355 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 5356 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno); 5357 break; 5358 default: 5359 SPDK_UNREACHABLE(); 5360 break; 5361 } 5362 5363 free(ctx); 5364 } 5365 5366 static void 5367 bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 5368 { 5369 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5370 struct spdk_blob *origblob = ctx->original.blob; 5371 5372 if (bserrno != 0) { 5373 if (ctx->bserrno != 0) { 5374 SPDK_ERRLOG("Unfreeze error %d\n", bserrno); 5375 } else { 5376 ctx->bserrno = bserrno; 5377 } 5378 } 5379 5380 ctx->original.id = origblob->id; 5381 origblob->locked_operation_in_progress = false; 5382 5383 spdk_blob_close(origblob, bs_clone_snapshot_cleanup_finish, ctx); 5384 } 5385 5386 static void 5387 bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) 5388 { 5389 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5390 struct spdk_blob *origblob = ctx->original.blob; 5391 5392 if (bserrno != 0) { 5393 if (ctx->bserrno != 0) { 5394 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 5395 } else { 5396 ctx->bserrno = bserrno; 5397 } 5398 } 5399 5400 if (ctx->frozen) { 5401 /* Unfreeze any outstanding I/O */ 5402 blob_unfreeze_io(origblob, bs_snapshot_unfreeze_cpl, ctx); 5403 } else { 5404 bs_snapshot_unfreeze_cpl(ctx, 0); 5405 } 5406 5407 } 5408 5409 static void 5410 bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno) 5411 { 5412 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5413 struct spdk_blob *newblob = ctx->new.blob; 5414 5415 if (bserrno != 0) { 5416 if (ctx->bserrno != 0) { 5417 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 5418 } else { 5419 ctx->bserrno = bserrno; 5420 } 5421 } 5422 5423 ctx->new.id = newblob->id; 5424 spdk_blob_close(newblob, bs_clone_snapshot_origblob_cleanup, ctx); 5425 } 5426 5427 /* END blob_cleanup */ 5428 5429 /* START spdk_bs_create_snapshot */ 5430 5431 static void 5432 bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2) 5433 { 5434 uint64_t *cluster_temp; 5435 uint32_t *extent_page_temp; 5436 5437 cluster_temp = blob1->active.clusters; 5438 blob1->active.clusters = blob2->active.clusters; 5439 blob2->active.clusters = cluster_temp; 5440 5441 extent_page_temp = blob1->active.extent_pages; 5442 blob1->active.extent_pages = blob2->active.extent_pages; 5443 blob2->active.extent_pages = extent_page_temp; 5444 } 5445 5446 static void 5447 bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno) 5448 { 5449 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5450 struct spdk_blob *origblob = ctx->original.blob; 5451 struct spdk_blob *newblob = ctx->new.blob; 5452 5453 if (bserrno != 0) { 5454 bs_snapshot_swap_cluster_maps(newblob, origblob); 5455 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5456 return; 5457 } 5458 5459 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */ 5460 bserrno = blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true); 5461 if (bserrno != 0) { 5462 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5463 return; 5464 } 5465 5466 bs_blob_list_add(ctx->original.blob); 5467 5468 spdk_blob_set_read_only(newblob); 5469 5470 /* sync snapshot metadata */ 5471 spdk_blob_sync_md(newblob, bs_clone_snapshot_origblob_cleanup, ctx); 5472 } 5473 5474 static void 5475 bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) 5476 { 5477 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5478 struct spdk_blob *origblob = ctx->original.blob; 5479 struct spdk_blob *newblob = ctx->new.blob; 5480 5481 if (bserrno != 0) { 5482 /* return cluster map back to original */ 5483 bs_snapshot_swap_cluster_maps(newblob, origblob); 5484 5485 /* Newblob md sync failed. Valid clusters are only present in origblob. 5486 * Since I/O is frozen on origblob, not changes to zeroed out cluster map should have occured. 5487 * Newblob needs to be reverted to thin_provisioned state at creation to properly close. */ 5488 blob_set_thin_provision(newblob); 5489 assert(spdk_mem_all_zero(newblob->active.clusters, 5490 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 5491 assert(spdk_mem_all_zero(newblob->active.extent_pages, 5492 newblob->active.num_extent_pages * sizeof(*newblob->active.extent_pages))); 5493 5494 bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 5495 return; 5496 } 5497 5498 /* Set internal xattr for snapshot id */ 5499 bserrno = blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true); 5500 if (bserrno != 0) { 5501 /* return cluster map back to original */ 5502 bs_snapshot_swap_cluster_maps(newblob, origblob); 5503 bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 5504 return; 5505 } 5506 5507 bs_blob_list_remove(origblob); 5508 origblob->parent_id = newblob->id; 5509 5510 /* Create new back_bs_dev for snapshot */ 5511 origblob->back_bs_dev = bs_create_blob_bs_dev(newblob); 5512 if (origblob->back_bs_dev == NULL) { 5513 /* return cluster map back to original */ 5514 bs_snapshot_swap_cluster_maps(newblob, origblob); 5515 bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL); 5516 return; 5517 } 5518 5519 /* set clone blob as thin provisioned */ 5520 blob_set_thin_provision(origblob); 5521 5522 bs_blob_list_add(newblob); 5523 5524 /* sync clone metadata */ 5525 spdk_blob_sync_md(origblob, bs_snapshot_origblob_sync_cpl, ctx); 5526 } 5527 5528 static void 5529 bs_snapshot_freeze_cpl(void *cb_arg, int rc) 5530 { 5531 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5532 struct spdk_blob *origblob = ctx->original.blob; 5533 struct spdk_blob *newblob = ctx->new.blob; 5534 int bserrno; 5535 5536 if (rc != 0) { 5537 bs_clone_snapshot_newblob_cleanup(ctx, rc); 5538 return; 5539 } 5540 5541 ctx->frozen = true; 5542 5543 /* set new back_bs_dev for snapshot */ 5544 newblob->back_bs_dev = origblob->back_bs_dev; 5545 /* Set invalid flags from origblob */ 5546 newblob->invalid_flags = origblob->invalid_flags; 5547 5548 /* inherit parent from original blob if set */ 5549 newblob->parent_id = origblob->parent_id; 5550 if (origblob->parent_id != SPDK_BLOBID_INVALID) { 5551 /* Set internal xattr for snapshot id */ 5552 bserrno = blob_set_xattr(newblob, BLOB_SNAPSHOT, 5553 &origblob->parent_id, sizeof(spdk_blob_id), true); 5554 if (bserrno != 0) { 5555 bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 5556 return; 5557 } 5558 } 5559 5560 /* swap cluster maps */ 5561 bs_snapshot_swap_cluster_maps(newblob, origblob); 5562 5563 /* Set the clear method on the new blob to match the original. */ 5564 blob_set_clear_method(newblob, origblob->clear_method); 5565 5566 /* sync snapshot metadata */ 5567 spdk_blob_sync_md(newblob, bs_snapshot_newblob_sync_cpl, ctx); 5568 } 5569 5570 static void 5571 bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5572 { 5573 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5574 struct spdk_blob *origblob = ctx->original.blob; 5575 struct spdk_blob *newblob = _blob; 5576 5577 if (bserrno != 0) { 5578 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5579 return; 5580 } 5581 5582 ctx->new.blob = newblob; 5583 assert(spdk_blob_is_thin_provisioned(newblob)); 5584 assert(spdk_mem_all_zero(newblob->active.clusters, 5585 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 5586 assert(spdk_mem_all_zero(newblob->active.extent_pages, 5587 newblob->active.num_extent_pages * sizeof(*newblob->active.extent_pages))); 5588 5589 blob_freeze_io(origblob, bs_snapshot_freeze_cpl, ctx); 5590 } 5591 5592 static void 5593 bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 5594 { 5595 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5596 struct spdk_blob *origblob = ctx->original.blob; 5597 5598 if (bserrno != 0) { 5599 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5600 return; 5601 } 5602 5603 ctx->new.id = blobid; 5604 ctx->cpl.u.blobid.blobid = blobid; 5605 5606 spdk_bs_open_blob(origblob->bs, ctx->new.id, bs_snapshot_newblob_open_cpl, ctx); 5607 } 5608 5609 5610 static void 5611 bs_xattr_snapshot(void *arg, const char *name, 5612 const void **value, size_t *value_len) 5613 { 5614 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0); 5615 5616 struct spdk_blob *blob = (struct spdk_blob *)arg; 5617 *value = &blob->id; 5618 *value_len = sizeof(blob->id); 5619 } 5620 5621 static void 5622 bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5623 { 5624 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5625 struct spdk_blob_opts opts; 5626 struct spdk_blob_xattr_opts internal_xattrs; 5627 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS }; 5628 5629 if (bserrno != 0) { 5630 bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5631 return; 5632 } 5633 5634 ctx->original.blob = _blob; 5635 5636 if (_blob->data_ro || _blob->md_ro) { 5637 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n", 5638 _blob->id); 5639 ctx->bserrno = -EINVAL; 5640 spdk_blob_close(_blob, bs_clone_snapshot_cleanup_finish, ctx); 5641 return; 5642 } 5643 5644 if (_blob->locked_operation_in_progress) { 5645 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n"); 5646 ctx->bserrno = -EBUSY; 5647 spdk_blob_close(_blob, bs_clone_snapshot_cleanup_finish, ctx); 5648 return; 5649 } 5650 5651 _blob->locked_operation_in_progress = true; 5652 5653 spdk_blob_opts_init(&opts); 5654 blob_xattrs_init(&internal_xattrs); 5655 5656 /* Change the size of new blob to the same as in original blob, 5657 * but do not allocate clusters */ 5658 opts.thin_provision = true; 5659 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 5660 opts.use_extent_table = _blob->use_extent_table; 5661 5662 /* If there are any xattrs specified for snapshot, set them now */ 5663 if (ctx->xattrs) { 5664 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 5665 } 5666 /* Set internal xattr SNAPSHOT_IN_PROGRESS */ 5667 internal_xattrs.count = 1; 5668 internal_xattrs.ctx = _blob; 5669 internal_xattrs.names = xattrs_names; 5670 internal_xattrs.get_value = bs_xattr_snapshot; 5671 5672 bs_create_blob(_blob->bs, &opts, &internal_xattrs, 5673 bs_snapshot_newblob_create_cpl, ctx); 5674 } 5675 5676 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, 5677 const struct spdk_blob_xattr_opts *snapshot_xattrs, 5678 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 5679 { 5680 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5681 5682 if (!ctx) { 5683 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 5684 return; 5685 } 5686 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 5687 ctx->cpl.u.blobid.cb_fn = cb_fn; 5688 ctx->cpl.u.blobid.cb_arg = cb_arg; 5689 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 5690 ctx->bserrno = 0; 5691 ctx->frozen = false; 5692 ctx->original.id = blobid; 5693 ctx->xattrs = snapshot_xattrs; 5694 5695 spdk_bs_open_blob(bs, ctx->original.id, bs_snapshot_origblob_open_cpl, ctx); 5696 } 5697 /* END spdk_bs_create_snapshot */ 5698 5699 /* START spdk_bs_create_clone */ 5700 5701 static void 5702 bs_xattr_clone(void *arg, const char *name, 5703 const void **value, size_t *value_len) 5704 { 5705 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0); 5706 5707 struct spdk_blob *blob = (struct spdk_blob *)arg; 5708 *value = &blob->id; 5709 *value_len = sizeof(blob->id); 5710 } 5711 5712 static void 5713 bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5714 { 5715 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5716 struct spdk_blob *clone = _blob; 5717 5718 ctx->new.blob = clone; 5719 bs_blob_list_add(clone); 5720 5721 spdk_blob_close(clone, bs_clone_snapshot_origblob_cleanup, ctx); 5722 } 5723 5724 static void 5725 bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 5726 { 5727 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5728 5729 ctx->cpl.u.blobid.blobid = blobid; 5730 spdk_bs_open_blob(ctx->original.blob->bs, blobid, bs_clone_newblob_open_cpl, ctx); 5731 } 5732 5733 static void 5734 bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5735 { 5736 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5737 struct spdk_blob_opts opts; 5738 struct spdk_blob_xattr_opts internal_xattrs; 5739 char *xattr_names[] = { BLOB_SNAPSHOT }; 5740 5741 if (bserrno != 0) { 5742 bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5743 return; 5744 } 5745 5746 ctx->original.blob = _blob; 5747 5748 if (!_blob->data_ro || !_blob->md_ro) { 5749 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n"); 5750 ctx->bserrno = -EINVAL; 5751 spdk_blob_close(_blob, bs_clone_snapshot_cleanup_finish, ctx); 5752 return; 5753 } 5754 5755 if (_blob->locked_operation_in_progress) { 5756 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n"); 5757 ctx->bserrno = -EBUSY; 5758 spdk_blob_close(_blob, bs_clone_snapshot_cleanup_finish, ctx); 5759 return; 5760 } 5761 5762 _blob->locked_operation_in_progress = true; 5763 5764 spdk_blob_opts_init(&opts); 5765 blob_xattrs_init(&internal_xattrs); 5766 5767 opts.thin_provision = true; 5768 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 5769 opts.use_extent_table = _blob->use_extent_table; 5770 if (ctx->xattrs) { 5771 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 5772 } 5773 5774 /* Set internal xattr BLOB_SNAPSHOT */ 5775 internal_xattrs.count = 1; 5776 internal_xattrs.ctx = _blob; 5777 internal_xattrs.names = xattr_names; 5778 internal_xattrs.get_value = bs_xattr_clone; 5779 5780 bs_create_blob(_blob->bs, &opts, &internal_xattrs, 5781 bs_clone_newblob_create_cpl, ctx); 5782 } 5783 5784 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid, 5785 const struct spdk_blob_xattr_opts *clone_xattrs, 5786 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 5787 { 5788 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5789 5790 if (!ctx) { 5791 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 5792 return; 5793 } 5794 5795 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 5796 ctx->cpl.u.blobid.cb_fn = cb_fn; 5797 ctx->cpl.u.blobid.cb_arg = cb_arg; 5798 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 5799 ctx->bserrno = 0; 5800 ctx->xattrs = clone_xattrs; 5801 ctx->original.id = blobid; 5802 5803 spdk_bs_open_blob(bs, ctx->original.id, bs_clone_origblob_open_cpl, ctx); 5804 } 5805 5806 /* END spdk_bs_create_clone */ 5807 5808 /* START spdk_bs_inflate_blob */ 5809 5810 static void 5811 bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno) 5812 { 5813 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5814 struct spdk_blob *_blob = ctx->original.blob; 5815 5816 if (bserrno != 0) { 5817 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5818 return; 5819 } 5820 5821 assert(_parent != NULL); 5822 5823 bs_blob_list_remove(_blob); 5824 _blob->parent_id = _parent->id; 5825 blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id, 5826 sizeof(spdk_blob_id), true); 5827 5828 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 5829 _blob->back_bs_dev = bs_create_blob_bs_dev(_parent); 5830 bs_blob_list_add(_blob); 5831 5832 spdk_blob_sync_md(_blob, bs_clone_snapshot_origblob_cleanup, ctx); 5833 } 5834 5835 static void 5836 bs_inflate_blob_done(void *cb_arg, int bserrno) 5837 { 5838 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5839 struct spdk_blob *_blob = ctx->original.blob; 5840 struct spdk_blob *_parent; 5841 5842 if (bserrno != 0) { 5843 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5844 return; 5845 } 5846 5847 if (ctx->allocate_all) { 5848 /* remove thin provisioning */ 5849 bs_blob_list_remove(_blob); 5850 blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 5851 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV; 5852 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 5853 _blob->back_bs_dev = NULL; 5854 _blob->parent_id = SPDK_BLOBID_INVALID; 5855 } else { 5856 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob; 5857 if (_parent->parent_id != SPDK_BLOBID_INVALID) { 5858 /* We must change the parent of the inflated blob */ 5859 spdk_bs_open_blob(_blob->bs, _parent->parent_id, 5860 bs_inflate_blob_set_parent_cpl, ctx); 5861 return; 5862 } 5863 5864 bs_blob_list_remove(_blob); 5865 blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 5866 _blob->parent_id = SPDK_BLOBID_INVALID; 5867 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 5868 _blob->back_bs_dev = bs_create_zeroes_dev(); 5869 } 5870 5871 _blob->state = SPDK_BLOB_STATE_DIRTY; 5872 spdk_blob_sync_md(_blob, bs_clone_snapshot_origblob_cleanup, ctx); 5873 } 5874 5875 /* Check if cluster needs allocation */ 5876 static inline bool 5877 bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all) 5878 { 5879 struct spdk_blob_bs_dev *b; 5880 5881 assert(blob != NULL); 5882 5883 if (blob->active.clusters[cluster] != 0) { 5884 /* Cluster is already allocated */ 5885 return false; 5886 } 5887 5888 if (blob->parent_id == SPDK_BLOBID_INVALID) { 5889 /* Blob have no parent blob */ 5890 return allocate_all; 5891 } 5892 5893 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev; 5894 return (allocate_all || b->blob->active.clusters[cluster] != 0); 5895 } 5896 5897 static void 5898 bs_inflate_blob_touch_next(void *cb_arg, int bserrno) 5899 { 5900 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5901 struct spdk_blob *_blob = ctx->original.blob; 5902 uint64_t offset; 5903 5904 if (bserrno != 0) { 5905 bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5906 return; 5907 } 5908 5909 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) { 5910 if (bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) { 5911 break; 5912 } 5913 } 5914 5915 if (ctx->cluster < _blob->active.num_clusters) { 5916 offset = bs_cluster_to_lba(_blob->bs, ctx->cluster); 5917 5918 /* We may safely increment a cluster before write */ 5919 ctx->cluster++; 5920 5921 /* Use zero length write to touch a cluster */ 5922 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0, 5923 bs_inflate_blob_touch_next, ctx); 5924 } else { 5925 bs_inflate_blob_done(cb_arg, bserrno); 5926 } 5927 } 5928 5929 static void 5930 bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5931 { 5932 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5933 uint64_t lfc; /* lowest free cluster */ 5934 uint64_t i; 5935 5936 if (bserrno != 0) { 5937 bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5938 return; 5939 } 5940 5941 ctx->original.blob = _blob; 5942 5943 if (_blob->locked_operation_in_progress) { 5944 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n"); 5945 ctx->bserrno = -EBUSY; 5946 spdk_blob_close(_blob, bs_clone_snapshot_cleanup_finish, ctx); 5947 return; 5948 } 5949 5950 _blob->locked_operation_in_progress = true; 5951 5952 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) { 5953 /* This blob have no parent, so we cannot decouple it. */ 5954 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n"); 5955 bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL); 5956 return; 5957 } 5958 5959 if (spdk_blob_is_thin_provisioned(_blob) == false) { 5960 /* This is not thin provisioned blob. No need to inflate. */ 5961 bs_clone_snapshot_origblob_cleanup(ctx, 0); 5962 return; 5963 } 5964 5965 /* Do two passes - one to verify that we can obtain enough clusters 5966 * and another to actually claim them. 5967 */ 5968 lfc = 0; 5969 for (i = 0; i < _blob->active.num_clusters; i++) { 5970 if (bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) { 5971 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc); 5972 if (lfc == UINT32_MAX) { 5973 /* No more free clusters. Cannot satisfy the request */ 5974 bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC); 5975 return; 5976 } 5977 lfc++; 5978 } 5979 } 5980 5981 ctx->cluster = 0; 5982 bs_inflate_blob_touch_next(ctx, 0); 5983 } 5984 5985 static void 5986 bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5987 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg) 5988 { 5989 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5990 5991 if (!ctx) { 5992 cb_fn(cb_arg, -ENOMEM); 5993 return; 5994 } 5995 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5996 ctx->cpl.u.bs_basic.cb_fn = cb_fn; 5997 ctx->cpl.u.bs_basic.cb_arg = cb_arg; 5998 ctx->bserrno = 0; 5999 ctx->original.id = blobid; 6000 ctx->channel = channel; 6001 ctx->allocate_all = allocate_all; 6002 6003 spdk_bs_open_blob(bs, ctx->original.id, bs_inflate_blob_open_cpl, ctx); 6004 } 6005 6006 void 6007 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 6008 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 6009 { 6010 bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg); 6011 } 6012 6013 void 6014 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 6015 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 6016 { 6017 bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg); 6018 } 6019 /* END spdk_bs_inflate_blob */ 6020 6021 /* START spdk_blob_resize */ 6022 struct spdk_bs_resize_ctx { 6023 spdk_blob_op_complete cb_fn; 6024 void *cb_arg; 6025 struct spdk_blob *blob; 6026 uint64_t sz; 6027 int rc; 6028 }; 6029 6030 static void 6031 bs_resize_unfreeze_cpl(void *cb_arg, int rc) 6032 { 6033 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 6034 6035 if (rc != 0) { 6036 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc); 6037 } 6038 6039 if (ctx->rc != 0) { 6040 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc); 6041 rc = ctx->rc; 6042 } 6043 6044 ctx->blob->locked_operation_in_progress = false; 6045 6046 ctx->cb_fn(ctx->cb_arg, rc); 6047 free(ctx); 6048 } 6049 6050 static void 6051 bs_resize_freeze_cpl(void *cb_arg, int rc) 6052 { 6053 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 6054 6055 if (rc != 0) { 6056 ctx->blob->locked_operation_in_progress = false; 6057 ctx->cb_fn(ctx->cb_arg, rc); 6058 free(ctx); 6059 return; 6060 } 6061 6062 ctx->rc = blob_resize(ctx->blob, ctx->sz); 6063 6064 blob_unfreeze_io(ctx->blob, bs_resize_unfreeze_cpl, ctx); 6065 } 6066 6067 void 6068 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg) 6069 { 6070 struct spdk_bs_resize_ctx *ctx; 6071 6072 blob_verify_md_op(blob); 6073 6074 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 6075 6076 if (blob->md_ro) { 6077 cb_fn(cb_arg, -EPERM); 6078 return; 6079 } 6080 6081 if (sz == blob->active.num_clusters) { 6082 cb_fn(cb_arg, 0); 6083 return; 6084 } 6085 6086 if (blob->locked_operation_in_progress) { 6087 cb_fn(cb_arg, -EBUSY); 6088 return; 6089 } 6090 6091 ctx = calloc(1, sizeof(*ctx)); 6092 if (!ctx) { 6093 cb_fn(cb_arg, -ENOMEM); 6094 return; 6095 } 6096 6097 blob->locked_operation_in_progress = true; 6098 ctx->cb_fn = cb_fn; 6099 ctx->cb_arg = cb_arg; 6100 ctx->blob = blob; 6101 ctx->sz = sz; 6102 blob_freeze_io(blob, bs_resize_freeze_cpl, ctx); 6103 } 6104 6105 /* END spdk_blob_resize */ 6106 6107 6108 /* START spdk_bs_delete_blob */ 6109 6110 static void 6111 bs_delete_close_cpl(void *cb_arg, int bserrno) 6112 { 6113 spdk_bs_sequence_t *seq = cb_arg; 6114 6115 bs_sequence_finish(seq, bserrno); 6116 } 6117 6118 static void 6119 bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 6120 { 6121 struct spdk_blob *blob = cb_arg; 6122 6123 if (bserrno != 0) { 6124 /* 6125 * We already removed this blob from the blobstore tailq, so 6126 * we need to free it here since this is the last reference 6127 * to it. 6128 */ 6129 blob_free(blob); 6130 bs_delete_close_cpl(seq, bserrno); 6131 return; 6132 } 6133 6134 /* 6135 * This will immediately decrement the ref_count and call 6136 * the completion routine since the metadata state is clean. 6137 * By calling spdk_blob_close, we reduce the number of call 6138 * points into code that touches the blob->open_ref count 6139 * and the blobstore's blob list. 6140 */ 6141 spdk_blob_close(blob, bs_delete_close_cpl, seq); 6142 } 6143 6144 struct delete_snapshot_ctx { 6145 struct spdk_blob_list *parent_snapshot_entry; 6146 struct spdk_blob *snapshot; 6147 bool snapshot_md_ro; 6148 struct spdk_blob *clone; 6149 bool clone_md_ro; 6150 spdk_blob_op_with_handle_complete cb_fn; 6151 void *cb_arg; 6152 int bserrno; 6153 }; 6154 6155 static void 6156 delete_blob_cleanup_finish(void *cb_arg, int bserrno) 6157 { 6158 struct delete_snapshot_ctx *ctx = cb_arg; 6159 6160 if (bserrno != 0) { 6161 SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno); 6162 } 6163 6164 assert(ctx != NULL); 6165 6166 if (bserrno != 0 && ctx->bserrno == 0) { 6167 ctx->bserrno = bserrno; 6168 } 6169 6170 ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno); 6171 free(ctx); 6172 } 6173 6174 static void 6175 delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno) 6176 { 6177 struct delete_snapshot_ctx *ctx = cb_arg; 6178 6179 if (bserrno != 0) { 6180 ctx->bserrno = bserrno; 6181 SPDK_ERRLOG("Clone cleanup error %d\n", bserrno); 6182 } 6183 6184 if (ctx->bserrno != 0) { 6185 assert(blob_lookup(ctx->snapshot->bs, ctx->snapshot->id) == NULL); 6186 TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link); 6187 } 6188 6189 ctx->snapshot->locked_operation_in_progress = false; 6190 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 6191 6192 spdk_blob_close(ctx->snapshot, delete_blob_cleanup_finish, ctx); 6193 } 6194 6195 static void 6196 delete_snapshot_cleanup_clone(void *cb_arg, int bserrno) 6197 { 6198 struct delete_snapshot_ctx *ctx = cb_arg; 6199 6200 ctx->clone->locked_operation_in_progress = false; 6201 ctx->clone->md_ro = ctx->clone_md_ro; 6202 6203 spdk_blob_close(ctx->clone, delete_snapshot_cleanup_snapshot, ctx); 6204 } 6205 6206 static void 6207 delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 6208 { 6209 struct delete_snapshot_ctx *ctx = cb_arg; 6210 6211 if (bserrno) { 6212 ctx->bserrno = bserrno; 6213 delete_snapshot_cleanup_clone(ctx, 0); 6214 return; 6215 } 6216 6217 ctx->clone->locked_operation_in_progress = false; 6218 spdk_blob_close(ctx->clone, delete_blob_cleanup_finish, ctx); 6219 } 6220 6221 static void 6222 delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno) 6223 { 6224 struct delete_snapshot_ctx *ctx = cb_arg; 6225 struct spdk_blob_list *parent_snapshot_entry = NULL; 6226 struct spdk_blob_list *snapshot_entry = NULL; 6227 struct spdk_blob_list *clone_entry = NULL; 6228 struct spdk_blob_list *snapshot_clone_entry = NULL; 6229 6230 if (bserrno) { 6231 SPDK_ERRLOG("Failed to sync MD on blob\n"); 6232 ctx->bserrno = bserrno; 6233 delete_snapshot_cleanup_clone(ctx, 0); 6234 return; 6235 } 6236 6237 /* Get snapshot entry for the snapshot we want to remove */ 6238 snapshot_entry = bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id); 6239 6240 assert(snapshot_entry != NULL); 6241 6242 /* Remove clone entry in this snapshot (at this point there can be only one clone) */ 6243 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 6244 assert(clone_entry != NULL); 6245 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 6246 snapshot_entry->clone_count--; 6247 assert(TAILQ_EMPTY(&snapshot_entry->clones)); 6248 6249 if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) { 6250 /* This snapshot is at the same time a clone of another snapshot - we need to 6251 * update parent snapshot (remove current clone, add new one inherited from 6252 * the snapshot that is being removed) */ 6253 6254 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 6255 * snapshot that we are removing */ 6256 blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry, 6257 &snapshot_clone_entry); 6258 6259 /* Switch clone entry in parent snapshot */ 6260 TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link); 6261 TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link); 6262 free(snapshot_clone_entry); 6263 } else { 6264 /* No parent snapshot - just remove clone entry */ 6265 free(clone_entry); 6266 } 6267 6268 /* Restore md_ro flags */ 6269 ctx->clone->md_ro = ctx->clone_md_ro; 6270 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 6271 6272 blob_unfreeze_io(ctx->clone, delete_snapshot_unfreeze_cpl, ctx); 6273 } 6274 6275 static void 6276 delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno) 6277 { 6278 struct delete_snapshot_ctx *ctx = cb_arg; 6279 uint64_t i; 6280 6281 ctx->snapshot->md_ro = false; 6282 6283 if (bserrno) { 6284 SPDK_ERRLOG("Failed to sync MD on clone\n"); 6285 ctx->bserrno = bserrno; 6286 6287 /* Restore snapshot to previous state */ 6288 bserrno = blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true); 6289 if (bserrno != 0) { 6290 delete_snapshot_cleanup_clone(ctx, bserrno); 6291 return; 6292 } 6293 6294 spdk_blob_sync_md(ctx->snapshot, delete_snapshot_cleanup_clone, ctx); 6295 return; 6296 } 6297 6298 /* Clear cluster map entries for snapshot */ 6299 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 6300 if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) { 6301 ctx->snapshot->active.clusters[i] = 0; 6302 } 6303 } 6304 for (i = 0; i < ctx->snapshot->active.num_extent_pages && 6305 i < ctx->clone->active.num_extent_pages; i++) { 6306 if (ctx->clone->active.extent_pages[i] == ctx->snapshot->active.extent_pages[i]) { 6307 ctx->snapshot->active.extent_pages[i] = 0; 6308 } 6309 } 6310 6311 blob_set_thin_provision(ctx->snapshot); 6312 ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY; 6313 6314 if (ctx->parent_snapshot_entry != NULL) { 6315 ctx->snapshot->back_bs_dev = NULL; 6316 } 6317 6318 spdk_blob_sync_md(ctx->snapshot, delete_snapshot_sync_snapshot_cpl, ctx); 6319 } 6320 6321 static void 6322 delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno) 6323 { 6324 struct delete_snapshot_ctx *ctx = cb_arg; 6325 uint64_t i; 6326 6327 /* Temporarily override md_ro flag for clone for MD modification */ 6328 ctx->clone_md_ro = ctx->clone->md_ro; 6329 ctx->clone->md_ro = false; 6330 6331 if (bserrno) { 6332 SPDK_ERRLOG("Failed to sync MD with xattr on blob\n"); 6333 ctx->bserrno = bserrno; 6334 delete_snapshot_cleanup_clone(ctx, 0); 6335 return; 6336 } 6337 6338 /* Copy snapshot map to clone map (only unallocated clusters in clone) */ 6339 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 6340 if (ctx->clone->active.clusters[i] == 0) { 6341 ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i]; 6342 } 6343 } 6344 for (i = 0; i < ctx->snapshot->active.num_extent_pages && 6345 i < ctx->clone->active.num_extent_pages; i++) { 6346 if (ctx->clone->active.extent_pages[i] == 0) { 6347 ctx->clone->active.extent_pages[i] = ctx->snapshot->active.extent_pages[i]; 6348 } 6349 } 6350 6351 /* Delete old backing bs_dev from clone (related to snapshot that will be removed) */ 6352 ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev); 6353 6354 /* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */ 6355 if (ctx->parent_snapshot_entry != NULL) { 6356 /* ...to parent snapshot */ 6357 ctx->clone->parent_id = ctx->parent_snapshot_entry->id; 6358 ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev; 6359 blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id, 6360 sizeof(spdk_blob_id), 6361 true); 6362 } else { 6363 /* ...to blobid invalid and zeroes dev */ 6364 ctx->clone->parent_id = SPDK_BLOBID_INVALID; 6365 ctx->clone->back_bs_dev = bs_create_zeroes_dev(); 6366 blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true); 6367 } 6368 6369 spdk_blob_sync_md(ctx->clone, delete_snapshot_sync_clone_cpl, ctx); 6370 } 6371 6372 static void 6373 delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno) 6374 { 6375 struct delete_snapshot_ctx *ctx = cb_arg; 6376 6377 if (bserrno) { 6378 SPDK_ERRLOG("Failed to freeze I/O on clone\n"); 6379 ctx->bserrno = bserrno; 6380 delete_snapshot_cleanup_clone(ctx, 0); 6381 return; 6382 } 6383 6384 /* Temporarily override md_ro flag for snapshot for MD modification */ 6385 ctx->snapshot_md_ro = ctx->snapshot->md_ro; 6386 ctx->snapshot->md_ro = false; 6387 6388 /* Mark blob as pending for removal for power failure safety, use clone id for recovery */ 6389 ctx->bserrno = blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id, 6390 sizeof(spdk_blob_id), true); 6391 if (ctx->bserrno != 0) { 6392 delete_snapshot_cleanup_clone(ctx, 0); 6393 return; 6394 } 6395 6396 spdk_blob_sync_md(ctx->snapshot, delete_snapshot_sync_snapshot_xattr_cpl, ctx); 6397 } 6398 6399 static void 6400 delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno) 6401 { 6402 struct delete_snapshot_ctx *ctx = cb_arg; 6403 6404 if (bserrno) { 6405 SPDK_ERRLOG("Failed to open clone\n"); 6406 ctx->bserrno = bserrno; 6407 delete_snapshot_cleanup_snapshot(ctx, 0); 6408 return; 6409 } 6410 6411 ctx->clone = clone; 6412 6413 if (clone->locked_operation_in_progress) { 6414 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n"); 6415 ctx->bserrno = -EBUSY; 6416 spdk_blob_close(ctx->clone, delete_snapshot_cleanup_snapshot, ctx); 6417 return; 6418 } 6419 6420 clone->locked_operation_in_progress = true; 6421 6422 blob_freeze_io(clone, delete_snapshot_freeze_io_cb, ctx); 6423 } 6424 6425 static void 6426 update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx) 6427 { 6428 struct spdk_blob_list *snapshot_entry = NULL; 6429 struct spdk_blob_list *clone_entry = NULL; 6430 struct spdk_blob_list *snapshot_clone_entry = NULL; 6431 6432 /* Get snapshot entry for the snapshot we want to remove */ 6433 snapshot_entry = bs_get_snapshot_entry(snapshot->bs, snapshot->id); 6434 6435 assert(snapshot_entry != NULL); 6436 6437 /* Get clone of the snapshot (at this point there can be only one clone) */ 6438 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 6439 assert(snapshot_entry->clone_count == 1); 6440 assert(clone_entry != NULL); 6441 6442 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 6443 * snapshot that we are removing */ 6444 blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry, 6445 &snapshot_clone_entry); 6446 6447 spdk_bs_open_blob(snapshot->bs, clone_entry->id, delete_snapshot_open_clone_cb, ctx); 6448 } 6449 6450 static void 6451 bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno) 6452 { 6453 spdk_bs_sequence_t *seq = cb_arg; 6454 struct spdk_blob_list *snapshot_entry = NULL; 6455 uint32_t page_num; 6456 6457 if (bserrno) { 6458 SPDK_ERRLOG("Failed to remove blob\n"); 6459 bs_sequence_finish(seq, bserrno); 6460 return; 6461 } 6462 6463 /* Remove snapshot from the list */ 6464 snapshot_entry = bs_get_snapshot_entry(blob->bs, blob->id); 6465 if (snapshot_entry != NULL) { 6466 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link); 6467 free(snapshot_entry); 6468 } 6469 6470 page_num = bs_blobid_to_page(blob->id); 6471 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 6472 blob->state = SPDK_BLOB_STATE_DIRTY; 6473 blob->active.num_pages = 0; 6474 blob_resize(blob, 0); 6475 6476 blob_persist(seq, blob, bs_delete_persist_cpl, blob); 6477 } 6478 6479 static int 6480 bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone) 6481 { 6482 struct spdk_blob_list *snapshot_entry = NULL; 6483 struct spdk_blob_list *clone_entry = NULL; 6484 struct spdk_blob *clone = NULL; 6485 bool has_one_clone = false; 6486 6487 /* Check if this is a snapshot with clones */ 6488 snapshot_entry = bs_get_snapshot_entry(blob->bs, blob->id); 6489 if (snapshot_entry != NULL) { 6490 if (snapshot_entry->clone_count > 1) { 6491 SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n"); 6492 return -EBUSY; 6493 } else if (snapshot_entry->clone_count == 1) { 6494 has_one_clone = true; 6495 } 6496 } 6497 6498 /* Check if someone has this blob open (besides this delete context): 6499 * - open_ref = 1 - only this context opened blob, so it is ok to remove it 6500 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot 6501 * and that is ok, because we will update it accordingly */ 6502 if (blob->open_ref <= 2 && has_one_clone) { 6503 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 6504 assert(clone_entry != NULL); 6505 clone = blob_lookup(blob->bs, clone_entry->id); 6506 6507 if (blob->open_ref == 2 && clone == NULL) { 6508 /* Clone is closed and someone else opened this blob */ 6509 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 6510 return -EBUSY; 6511 } 6512 6513 *update_clone = true; 6514 return 0; 6515 } 6516 6517 if (blob->open_ref > 1) { 6518 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 6519 return -EBUSY; 6520 } 6521 6522 assert(has_one_clone == false); 6523 *update_clone = false; 6524 return 0; 6525 } 6526 6527 static void 6528 bs_delete_enomem_close_cpl(void *cb_arg, int bserrno) 6529 { 6530 spdk_bs_sequence_t *seq = cb_arg; 6531 6532 bs_sequence_finish(seq, -ENOMEM); 6533 } 6534 6535 static void 6536 bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 6537 { 6538 spdk_bs_sequence_t *seq = cb_arg; 6539 struct delete_snapshot_ctx *ctx; 6540 bool update_clone = false; 6541 6542 if (bserrno != 0) { 6543 bs_sequence_finish(seq, bserrno); 6544 return; 6545 } 6546 6547 blob_verify_md_op(blob); 6548 6549 ctx = calloc(1, sizeof(*ctx)); 6550 if (ctx == NULL) { 6551 spdk_blob_close(blob, bs_delete_enomem_close_cpl, seq); 6552 return; 6553 } 6554 6555 ctx->snapshot = blob; 6556 ctx->cb_fn = bs_delete_blob_finish; 6557 ctx->cb_arg = seq; 6558 6559 /* Check if blob can be removed and if it is a snapshot with clone on top of it */ 6560 ctx->bserrno = bs_is_blob_deletable(blob, &update_clone); 6561 if (ctx->bserrno) { 6562 spdk_blob_close(blob, delete_blob_cleanup_finish, ctx); 6563 return; 6564 } 6565 6566 if (blob->locked_operation_in_progress) { 6567 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n"); 6568 ctx->bserrno = -EBUSY; 6569 spdk_blob_close(blob, delete_blob_cleanup_finish, ctx); 6570 return; 6571 } 6572 6573 blob->locked_operation_in_progress = true; 6574 6575 /* 6576 * Remove the blob from the blob_store list now, to ensure it does not 6577 * get returned after this point by blob_lookup(). 6578 */ 6579 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 6580 6581 if (update_clone) { 6582 /* This blob is a snapshot with active clone - update clone first */ 6583 update_clone_on_snapshot_deletion(blob, ctx); 6584 } else { 6585 /* This blob does not have any clones - just remove it */ 6586 bs_blob_list_remove(blob); 6587 bs_delete_blob_finish(seq, blob, 0); 6588 free(ctx); 6589 } 6590 } 6591 6592 void 6593 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 6594 spdk_blob_op_complete cb_fn, void *cb_arg) 6595 { 6596 struct spdk_bs_cpl cpl; 6597 spdk_bs_sequence_t *seq; 6598 6599 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 6600 6601 assert(spdk_get_thread() == bs->md_thread); 6602 6603 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 6604 cpl.u.blob_basic.cb_fn = cb_fn; 6605 cpl.u.blob_basic.cb_arg = cb_arg; 6606 6607 seq = bs_sequence_start(bs->md_channel, &cpl); 6608 if (!seq) { 6609 cb_fn(cb_arg, -ENOMEM); 6610 return; 6611 } 6612 6613 spdk_bs_open_blob(bs, blobid, bs_delete_open_cpl, seq); 6614 } 6615 6616 /* END spdk_bs_delete_blob */ 6617 6618 /* START spdk_bs_open_blob */ 6619 6620 static void 6621 bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 6622 { 6623 struct spdk_blob *blob = cb_arg; 6624 6625 if (bserrno != 0) { 6626 blob_free(blob); 6627 seq->cpl.u.blob_handle.blob = NULL; 6628 bs_sequence_finish(seq, bserrno); 6629 return; 6630 } 6631 6632 blob->open_ref++; 6633 6634 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 6635 6636 bs_sequence_finish(seq, bserrno); 6637 } 6638 6639 static void bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 6640 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6641 { 6642 struct spdk_blob *blob; 6643 struct spdk_bs_cpl cpl; 6644 struct spdk_blob_open_opts opts_default; 6645 spdk_bs_sequence_t *seq; 6646 uint32_t page_num; 6647 6648 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 6649 assert(spdk_get_thread() == bs->md_thread); 6650 6651 page_num = bs_blobid_to_page(blobid); 6652 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 6653 /* Invalid blobid */ 6654 cb_fn(cb_arg, NULL, -ENOENT); 6655 return; 6656 } 6657 6658 blob = blob_lookup(bs, blobid); 6659 if (blob) { 6660 blob->open_ref++; 6661 cb_fn(cb_arg, blob, 0); 6662 return; 6663 } 6664 6665 blob = blob_alloc(bs, blobid); 6666 if (!blob) { 6667 cb_fn(cb_arg, NULL, -ENOMEM); 6668 return; 6669 } 6670 6671 if (!opts) { 6672 spdk_blob_open_opts_init(&opts_default); 6673 opts = &opts_default; 6674 } 6675 6676 blob->clear_method = opts->clear_method; 6677 6678 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 6679 cpl.u.blob_handle.cb_fn = cb_fn; 6680 cpl.u.blob_handle.cb_arg = cb_arg; 6681 cpl.u.blob_handle.blob = blob; 6682 6683 seq = bs_sequence_start(bs->md_channel, &cpl); 6684 if (!seq) { 6685 blob_free(blob); 6686 cb_fn(cb_arg, NULL, -ENOMEM); 6687 return; 6688 } 6689 6690 blob_load(seq, blob, bs_open_blob_cpl, blob); 6691 } 6692 6693 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 6694 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6695 { 6696 bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg); 6697 } 6698 6699 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid, 6700 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6701 { 6702 bs_open_blob(bs, blobid, opts, cb_fn, cb_arg); 6703 } 6704 6705 /* END spdk_bs_open_blob */ 6706 6707 /* START spdk_blob_set_read_only */ 6708 int spdk_blob_set_read_only(struct spdk_blob *blob) 6709 { 6710 blob_verify_md_op(blob); 6711 6712 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 6713 6714 blob->state = SPDK_BLOB_STATE_DIRTY; 6715 return 0; 6716 } 6717 /* END spdk_blob_set_read_only */ 6718 6719 /* START spdk_blob_sync_md */ 6720 6721 static void 6722 blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 6723 { 6724 struct spdk_blob *blob = cb_arg; 6725 6726 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 6727 blob->data_ro = true; 6728 blob->md_ro = true; 6729 } 6730 6731 bs_sequence_finish(seq, bserrno); 6732 } 6733 6734 static void 6735 blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 6736 { 6737 struct spdk_bs_cpl cpl; 6738 spdk_bs_sequence_t *seq; 6739 6740 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 6741 cpl.u.blob_basic.cb_fn = cb_fn; 6742 cpl.u.blob_basic.cb_arg = cb_arg; 6743 6744 seq = bs_sequence_start(blob->bs->md_channel, &cpl); 6745 if (!seq) { 6746 cb_fn(cb_arg, -ENOMEM); 6747 return; 6748 } 6749 6750 blob_persist(seq, blob, blob_sync_md_cpl, blob); 6751 } 6752 6753 void 6754 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 6755 { 6756 blob_verify_md_op(blob); 6757 6758 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 6759 6760 if (blob->md_ro) { 6761 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 6762 cb_fn(cb_arg, 0); 6763 return; 6764 } 6765 6766 blob_sync_md(blob, cb_fn, cb_arg); 6767 } 6768 6769 /* END spdk_blob_sync_md */ 6770 6771 struct spdk_blob_insert_cluster_ctx { 6772 struct spdk_thread *thread; 6773 struct spdk_blob *blob; 6774 uint32_t cluster_num; /* cluster index in blob */ 6775 uint32_t cluster; /* cluster on disk */ 6776 uint32_t extent_page; /* extent page on disk */ 6777 int rc; 6778 spdk_blob_op_complete cb_fn; 6779 void *cb_arg; 6780 }; 6781 6782 static void 6783 blob_insert_cluster_msg_cpl(void *arg) 6784 { 6785 struct spdk_blob_insert_cluster_ctx *ctx = arg; 6786 6787 ctx->cb_fn(ctx->cb_arg, ctx->rc); 6788 free(ctx); 6789 } 6790 6791 static void 6792 blob_insert_cluster_msg_cb(void *arg, int bserrno) 6793 { 6794 struct spdk_blob_insert_cluster_ctx *ctx = arg; 6795 6796 ctx->rc = bserrno; 6797 spdk_thread_send_msg(ctx->thread, blob_insert_cluster_msg_cpl, ctx); 6798 } 6799 6800 static void 6801 blob_persist_extent_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 6802 { 6803 struct spdk_blob_md_page *page = cb_arg; 6804 6805 bs_sequence_finish(seq, bserrno); 6806 spdk_free(page); 6807 } 6808 6809 static void 6810 blob_insert_extent(struct spdk_blob *blob, uint32_t extent, uint64_t cluster_num, 6811 spdk_blob_op_complete cb_fn, void *cb_arg) 6812 { 6813 spdk_bs_sequence_t *seq; 6814 struct spdk_bs_cpl cpl; 6815 struct spdk_blob_md_page *page = NULL; 6816 uint32_t page_count = 0; 6817 int rc; 6818 6819 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 6820 cpl.u.blob_basic.cb_fn = cb_fn; 6821 cpl.u.blob_basic.cb_arg = cb_arg; 6822 6823 seq = bs_sequence_start(blob->bs->md_channel, &cpl); 6824 if (!seq) { 6825 cb_fn(cb_arg, -ENOMEM); 6826 return; 6827 } 6828 rc = blob_serialize_add_page(blob, &page, &page_count, &page); 6829 if (rc < 0) { 6830 bs_sequence_finish(seq, rc); 6831 return; 6832 } 6833 6834 blob_serialize_extent_page(blob, cluster_num, page); 6835 6836 page->crc = blob_md_page_calc_crc(page); 6837 6838 assert(spdk_bit_array_get(blob->bs->used_md_pages, extent) == true); 6839 6840 bs_sequence_write_dev(seq, page, bs_md_page_to_lba(blob->bs, extent), 6841 bs_byte_to_lba(blob->bs, SPDK_BS_PAGE_SIZE), 6842 blob_persist_extent_page_cpl, page); 6843 } 6844 6845 static void 6846 blob_insert_cluster_msg(void *arg) 6847 { 6848 struct spdk_blob_insert_cluster_ctx *ctx = arg; 6849 uint32_t *extent_page; 6850 6851 ctx->rc = blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster); 6852 if (ctx->rc != 0) { 6853 spdk_thread_send_msg(ctx->thread, blob_insert_cluster_msg_cpl, ctx); 6854 return; 6855 } 6856 6857 if (ctx->blob->use_extent_table == false) { 6858 /* Extent table is not used, proceed with sync of md that will only use extents_rle. */ 6859 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 6860 blob_sync_md(ctx->blob, blob_insert_cluster_msg_cb, ctx); 6861 return; 6862 } 6863 6864 extent_page = bs_cluster_to_extent_page(ctx->blob, ctx->cluster_num); 6865 if (*extent_page == 0) { 6866 /* Extent page requires allocation. 6867 * It was already claimed in the used_md_pages map and placed in ctx. 6868 * Blob persist will take care of writing out new extent page on disk. */ 6869 assert(ctx->extent_page != 0); 6870 assert(spdk_bit_array_get(ctx->blob->bs->used_md_pages, ctx->extent_page) == true); 6871 *extent_page = ctx->extent_page; 6872 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 6873 blob_sync_md(ctx->blob, blob_insert_cluster_msg_cb, ctx); 6874 } else { 6875 /* It is possible for original thread to allocate extent page for 6876 * different cluster in the same extent page. In such case proceed with 6877 * updating the existing extent page, but release the additional one. */ 6878 if (ctx->extent_page != 0) { 6879 assert(spdk_bit_array_get(ctx->blob->bs->used_md_pages, ctx->extent_page) == true); 6880 bs_release_md_page(ctx->blob->bs, ctx->extent_page); 6881 } 6882 /* Extent page already allocated. 6883 * Every cluster allocation, requires just an update of single extent page. */ 6884 blob_insert_extent(ctx->blob, *extent_page, ctx->cluster_num, 6885 blob_insert_cluster_msg_cb, ctx); 6886 } 6887 } 6888 6889 static void 6890 blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 6891 uint64_t cluster, uint32_t extent_page, spdk_blob_op_complete cb_fn, void *cb_arg) 6892 { 6893 struct spdk_blob_insert_cluster_ctx *ctx; 6894 6895 ctx = calloc(1, sizeof(*ctx)); 6896 if (ctx == NULL) { 6897 cb_fn(cb_arg, -ENOMEM); 6898 return; 6899 } 6900 6901 ctx->thread = spdk_get_thread(); 6902 ctx->blob = blob; 6903 ctx->cluster_num = cluster_num; 6904 ctx->cluster = cluster; 6905 ctx->extent_page = extent_page; 6906 ctx->cb_fn = cb_fn; 6907 ctx->cb_arg = cb_arg; 6908 6909 spdk_thread_send_msg(blob->bs->md_thread, blob_insert_cluster_msg, ctx); 6910 } 6911 6912 /* START spdk_blob_close */ 6913 6914 static void 6915 blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 6916 { 6917 struct spdk_blob *blob = cb_arg; 6918 6919 if (bserrno == 0) { 6920 blob->open_ref--; 6921 if (blob->open_ref == 0) { 6922 /* 6923 * Blobs with active.num_pages == 0 are deleted blobs. 6924 * these blobs are removed from the blob_store list 6925 * when the deletion process starts - so don't try to 6926 * remove them again. 6927 */ 6928 if (blob->active.num_pages > 0) { 6929 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 6930 } 6931 blob_free(blob); 6932 } 6933 } 6934 6935 bs_sequence_finish(seq, bserrno); 6936 } 6937 6938 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 6939 { 6940 struct spdk_bs_cpl cpl; 6941 spdk_bs_sequence_t *seq; 6942 6943 blob_verify_md_op(blob); 6944 6945 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 6946 6947 if (blob->open_ref == 0) { 6948 cb_fn(cb_arg, -EBADF); 6949 return; 6950 } 6951 6952 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 6953 cpl.u.blob_basic.cb_fn = cb_fn; 6954 cpl.u.blob_basic.cb_arg = cb_arg; 6955 6956 seq = bs_sequence_start(blob->bs->md_channel, &cpl); 6957 if (!seq) { 6958 cb_fn(cb_arg, -ENOMEM); 6959 return; 6960 } 6961 6962 /* Sync metadata */ 6963 blob_persist(seq, blob, blob_close_cpl, blob); 6964 } 6965 6966 /* END spdk_blob_close */ 6967 6968 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 6969 { 6970 return spdk_get_io_channel(bs); 6971 } 6972 6973 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 6974 { 6975 spdk_put_io_channel(channel); 6976 } 6977 6978 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel, 6979 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 6980 { 6981 blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 6982 SPDK_BLOB_UNMAP); 6983 } 6984 6985 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel, 6986 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 6987 { 6988 blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 6989 SPDK_BLOB_WRITE_ZEROES); 6990 } 6991 6992 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel, 6993 void *payload, uint64_t offset, uint64_t length, 6994 spdk_blob_op_complete cb_fn, void *cb_arg) 6995 { 6996 blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 6997 SPDK_BLOB_WRITE); 6998 } 6999 7000 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel, 7001 void *payload, uint64_t offset, uint64_t length, 7002 spdk_blob_op_complete cb_fn, void *cb_arg) 7003 { 7004 blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 7005 SPDK_BLOB_READ); 7006 } 7007 7008 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel, 7009 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 7010 spdk_blob_op_complete cb_fn, void *cb_arg) 7011 { 7012 blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 7013 } 7014 7015 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel, 7016 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 7017 spdk_blob_op_complete cb_fn, void *cb_arg) 7018 { 7019 blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 7020 } 7021 7022 struct spdk_bs_iter_ctx { 7023 int64_t page_num; 7024 struct spdk_blob_store *bs; 7025 7026 spdk_blob_op_with_handle_complete cb_fn; 7027 void *cb_arg; 7028 }; 7029 7030 static void 7031 bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 7032 { 7033 struct spdk_bs_iter_ctx *ctx = cb_arg; 7034 struct spdk_blob_store *bs = ctx->bs; 7035 spdk_blob_id id; 7036 7037 if (bserrno == 0) { 7038 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 7039 free(ctx); 7040 return; 7041 } 7042 7043 ctx->page_num++; 7044 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 7045 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 7046 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 7047 free(ctx); 7048 return; 7049 } 7050 7051 id = bs_page_to_blobid(ctx->page_num); 7052 7053 spdk_bs_open_blob(bs, id, bs_iter_cpl, ctx); 7054 } 7055 7056 void 7057 spdk_bs_iter_first(struct spdk_blob_store *bs, 7058 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 7059 { 7060 struct spdk_bs_iter_ctx *ctx; 7061 7062 ctx = calloc(1, sizeof(*ctx)); 7063 if (!ctx) { 7064 cb_fn(cb_arg, NULL, -ENOMEM); 7065 return; 7066 } 7067 7068 ctx->page_num = -1; 7069 ctx->bs = bs; 7070 ctx->cb_fn = cb_fn; 7071 ctx->cb_arg = cb_arg; 7072 7073 bs_iter_cpl(ctx, NULL, -1); 7074 } 7075 7076 static void 7077 bs_iter_close_cpl(void *cb_arg, int bserrno) 7078 { 7079 struct spdk_bs_iter_ctx *ctx = cb_arg; 7080 7081 bs_iter_cpl(ctx, NULL, -1); 7082 } 7083 7084 void 7085 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob, 7086 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 7087 { 7088 struct spdk_bs_iter_ctx *ctx; 7089 7090 assert(blob != NULL); 7091 7092 ctx = calloc(1, sizeof(*ctx)); 7093 if (!ctx) { 7094 cb_fn(cb_arg, NULL, -ENOMEM); 7095 return; 7096 } 7097 7098 ctx->page_num = bs_blobid_to_page(blob->id); 7099 ctx->bs = bs; 7100 ctx->cb_fn = cb_fn; 7101 ctx->cb_arg = cb_arg; 7102 7103 /* Close the existing blob */ 7104 spdk_blob_close(blob, bs_iter_close_cpl, ctx); 7105 } 7106 7107 static int 7108 blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 7109 uint16_t value_len, bool internal) 7110 { 7111 struct spdk_xattr_tailq *xattrs; 7112 struct spdk_xattr *xattr; 7113 size_t desc_size; 7114 7115 blob_verify_md_op(blob); 7116 7117 if (blob->md_ro) { 7118 return -EPERM; 7119 } 7120 7121 desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len; 7122 if (desc_size > SPDK_BS_MAX_DESC_SIZE) { 7123 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name, 7124 desc_size, SPDK_BS_MAX_DESC_SIZE); 7125 return -ENOMEM; 7126 } 7127 7128 if (internal) { 7129 xattrs = &blob->xattrs_internal; 7130 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR; 7131 } else { 7132 xattrs = &blob->xattrs; 7133 } 7134 7135 TAILQ_FOREACH(xattr, xattrs, link) { 7136 if (!strcmp(name, xattr->name)) { 7137 free(xattr->value); 7138 xattr->value_len = value_len; 7139 xattr->value = malloc(value_len); 7140 memcpy(xattr->value, value, value_len); 7141 7142 blob->state = SPDK_BLOB_STATE_DIRTY; 7143 7144 return 0; 7145 } 7146 } 7147 7148 xattr = calloc(1, sizeof(*xattr)); 7149 if (!xattr) { 7150 return -ENOMEM; 7151 } 7152 xattr->name = strdup(name); 7153 xattr->value_len = value_len; 7154 xattr->value = malloc(value_len); 7155 memcpy(xattr->value, value, value_len); 7156 TAILQ_INSERT_TAIL(xattrs, xattr, link); 7157 7158 blob->state = SPDK_BLOB_STATE_DIRTY; 7159 7160 return 0; 7161 } 7162 7163 int 7164 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 7165 uint16_t value_len) 7166 { 7167 return blob_set_xattr(blob, name, value, value_len, false); 7168 } 7169 7170 static int 7171 blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal) 7172 { 7173 struct spdk_xattr_tailq *xattrs; 7174 struct spdk_xattr *xattr; 7175 7176 blob_verify_md_op(blob); 7177 7178 if (blob->md_ro) { 7179 return -EPERM; 7180 } 7181 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 7182 7183 TAILQ_FOREACH(xattr, xattrs, link) { 7184 if (!strcmp(name, xattr->name)) { 7185 TAILQ_REMOVE(xattrs, xattr, link); 7186 free(xattr->value); 7187 free(xattr->name); 7188 free(xattr); 7189 7190 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) { 7191 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR; 7192 } 7193 blob->state = SPDK_BLOB_STATE_DIRTY; 7194 7195 return 0; 7196 } 7197 } 7198 7199 return -ENOENT; 7200 } 7201 7202 int 7203 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name) 7204 { 7205 return blob_remove_xattr(blob, name, false); 7206 } 7207 7208 static int 7209 blob_get_xattr_value(struct spdk_blob *blob, const char *name, 7210 const void **value, size_t *value_len, bool internal) 7211 { 7212 struct spdk_xattr *xattr; 7213 struct spdk_xattr_tailq *xattrs; 7214 7215 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 7216 7217 TAILQ_FOREACH(xattr, xattrs, link) { 7218 if (!strcmp(name, xattr->name)) { 7219 *value = xattr->value; 7220 *value_len = xattr->value_len; 7221 return 0; 7222 } 7223 } 7224 return -ENOENT; 7225 } 7226 7227 int 7228 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 7229 const void **value, size_t *value_len) 7230 { 7231 blob_verify_md_op(blob); 7232 7233 return blob_get_xattr_value(blob, name, value, value_len, false); 7234 } 7235 7236 struct spdk_xattr_names { 7237 uint32_t count; 7238 const char *names[0]; 7239 }; 7240 7241 static int 7242 blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names) 7243 { 7244 struct spdk_xattr *xattr; 7245 int count = 0; 7246 7247 TAILQ_FOREACH(xattr, xattrs, link) { 7248 count++; 7249 } 7250 7251 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 7252 if (*names == NULL) { 7253 return -ENOMEM; 7254 } 7255 7256 TAILQ_FOREACH(xattr, xattrs, link) { 7257 (*names)->names[(*names)->count++] = xattr->name; 7258 } 7259 7260 return 0; 7261 } 7262 7263 int 7264 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names) 7265 { 7266 blob_verify_md_op(blob); 7267 7268 return blob_get_xattr_names(&blob->xattrs, names); 7269 } 7270 7271 uint32_t 7272 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 7273 { 7274 assert(names != NULL); 7275 7276 return names->count; 7277 } 7278 7279 const char * 7280 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 7281 { 7282 if (index >= names->count) { 7283 return NULL; 7284 } 7285 7286 return names->names[index]; 7287 } 7288 7289 void 7290 spdk_xattr_names_free(struct spdk_xattr_names *names) 7291 { 7292 free(names); 7293 } 7294 7295 struct spdk_bs_type 7296 spdk_bs_get_bstype(struct spdk_blob_store *bs) 7297 { 7298 return bs->bstype; 7299 } 7300 7301 void 7302 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 7303 { 7304 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 7305 } 7306 7307 bool 7308 spdk_blob_is_read_only(struct spdk_blob *blob) 7309 { 7310 assert(blob != NULL); 7311 return (blob->data_ro || blob->md_ro); 7312 } 7313 7314 bool 7315 spdk_blob_is_snapshot(struct spdk_blob *blob) 7316 { 7317 struct spdk_blob_list *snapshot_entry; 7318 7319 assert(blob != NULL); 7320 7321 snapshot_entry = bs_get_snapshot_entry(blob->bs, blob->id); 7322 if (snapshot_entry == NULL) { 7323 return false; 7324 } 7325 7326 return true; 7327 } 7328 7329 bool 7330 spdk_blob_is_clone(struct spdk_blob *blob) 7331 { 7332 assert(blob != NULL); 7333 7334 if (blob->parent_id != SPDK_BLOBID_INVALID) { 7335 assert(spdk_blob_is_thin_provisioned(blob)); 7336 return true; 7337 } 7338 7339 return false; 7340 } 7341 7342 bool 7343 spdk_blob_is_thin_provisioned(struct spdk_blob *blob) 7344 { 7345 assert(blob != NULL); 7346 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV); 7347 } 7348 7349 static void 7350 blob_update_clear_method(struct spdk_blob *blob) 7351 { 7352 enum blob_clear_method stored_cm; 7353 7354 assert(blob != NULL); 7355 7356 /* If BLOB_CLEAR_WITH_DEFAULT was passed in, use the setting stored 7357 * in metadata previously. If something other than the default was 7358 * specified, ignore stored value and used what was passed in. 7359 */ 7360 stored_cm = ((blob->md_ro_flags & SPDK_BLOB_CLEAR_METHOD) >> SPDK_BLOB_CLEAR_METHOD_SHIFT); 7361 7362 if (blob->clear_method == BLOB_CLEAR_WITH_DEFAULT) { 7363 blob->clear_method = stored_cm; 7364 } else if (blob->clear_method != stored_cm) { 7365 SPDK_WARNLOG("Using passed in clear method 0x%x instead of stored value of 0x%x\n", 7366 blob->clear_method, stored_cm); 7367 } 7368 } 7369 7370 spdk_blob_id 7371 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id) 7372 { 7373 struct spdk_blob_list *snapshot_entry = NULL; 7374 struct spdk_blob_list *clone_entry = NULL; 7375 7376 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 7377 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 7378 if (clone_entry->id == blob_id) { 7379 return snapshot_entry->id; 7380 } 7381 } 7382 } 7383 7384 return SPDK_BLOBID_INVALID; 7385 } 7386 7387 int 7388 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids, 7389 size_t *count) 7390 { 7391 struct spdk_blob_list *snapshot_entry, *clone_entry; 7392 size_t n; 7393 7394 snapshot_entry = bs_get_snapshot_entry(bs, blobid); 7395 if (snapshot_entry == NULL) { 7396 *count = 0; 7397 return 0; 7398 } 7399 7400 if (ids == NULL || *count < snapshot_entry->clone_count) { 7401 *count = snapshot_entry->clone_count; 7402 return -ENOMEM; 7403 } 7404 *count = snapshot_entry->clone_count; 7405 7406 n = 0; 7407 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 7408 ids[n++] = clone_entry->id; 7409 } 7410 7411 return 0; 7412 } 7413 7414 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 7415