1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 #include "spdk/util.h" 44 #include "spdk/string.h" 45 46 #include "spdk_internal/assert.h" 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 51 #define BLOB_CRC32C_INITIAL 0xffffffffUL 52 53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 57 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg); 58 59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 60 uint16_t value_len, bool internal); 61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 62 const void **value, size_t *value_len, bool internal); 63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal); 64 65 static void 66 _spdk_blob_verify_md_op(struct spdk_blob *blob) 67 { 68 assert(blob != NULL); 69 assert(spdk_get_thread() == blob->bs->md_thread); 70 assert(blob->state != SPDK_BLOB_STATE_LOADING); 71 } 72 73 static struct spdk_blob_list * 74 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) 75 { 76 struct spdk_blob_list *snapshot_entry = NULL; 77 78 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 79 if (snapshot_entry->id == blobid) { 80 break; 81 } 82 } 83 84 return snapshot_entry; 85 } 86 87 static void 88 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 89 { 90 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 91 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 92 assert(bs->num_free_clusters > 0); 93 94 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 95 96 spdk_bit_array_set(bs->used_clusters, cluster_num); 97 bs->num_free_clusters--; 98 } 99 100 static int 101 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster) 102 { 103 uint64_t *cluster_lba = &blob->active.clusters[cluster_num]; 104 105 _spdk_blob_verify_md_op(blob); 106 107 if (*cluster_lba != 0) { 108 return -EEXIST; 109 } 110 111 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster); 112 return 0; 113 } 114 115 static int 116 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, 117 uint64_t *lowest_free_cluster, bool update_map) 118 { 119 pthread_mutex_lock(&blob->bs->used_clusters_mutex); 120 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters, 121 *lowest_free_cluster); 122 if (*lowest_free_cluster == UINT32_MAX) { 123 /* No more free clusters. Cannot satisfy the request */ 124 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 125 return -ENOSPC; 126 } 127 128 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id); 129 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster); 130 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 131 132 if (update_map) { 133 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster); 134 } 135 136 return 0; 137 } 138 139 static void 140 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 141 { 142 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 143 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 144 assert(bs->num_free_clusters < bs->total_clusters); 145 146 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 147 148 pthread_mutex_lock(&bs->used_clusters_mutex); 149 spdk_bit_array_clear(bs->used_clusters, cluster_num); 150 bs->num_free_clusters++; 151 pthread_mutex_unlock(&bs->used_clusters_mutex); 152 } 153 154 static void 155 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs) 156 { 157 xattrs->count = 0; 158 xattrs->names = NULL; 159 xattrs->ctx = NULL; 160 xattrs->get_value = NULL; 161 } 162 163 void 164 spdk_blob_opts_init(struct spdk_blob_opts *opts) 165 { 166 opts->num_clusters = 0; 167 opts->thin_provision = false; 168 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 169 _spdk_blob_xattrs_init(&opts->xattrs); 170 opts->use_extent_table = false; 171 } 172 173 void 174 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts) 175 { 176 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 177 } 178 179 static struct spdk_blob * 180 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 181 { 182 struct spdk_blob *blob; 183 184 blob = calloc(1, sizeof(*blob)); 185 if (!blob) { 186 return NULL; 187 } 188 189 blob->id = id; 190 blob->bs = bs; 191 192 blob->parent_id = SPDK_BLOBID_INVALID; 193 194 blob->state = SPDK_BLOB_STATE_DIRTY; 195 blob->extent_rle_found = false; 196 blob->extent_table_found = false; 197 blob->active.num_pages = 1; 198 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 199 if (!blob->active.pages) { 200 free(blob); 201 return NULL; 202 } 203 204 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 205 206 TAILQ_INIT(&blob->xattrs); 207 TAILQ_INIT(&blob->xattrs_internal); 208 209 return blob; 210 } 211 212 static void 213 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs) 214 { 215 struct spdk_xattr *xattr, *xattr_tmp; 216 217 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) { 218 TAILQ_REMOVE(xattrs, xattr, link); 219 free(xattr->name); 220 free(xattr->value); 221 free(xattr); 222 } 223 } 224 225 static void 226 _spdk_blob_free(struct spdk_blob *blob) 227 { 228 assert(blob != NULL); 229 230 free(blob->active.clusters); 231 free(blob->clean.clusters); 232 free(blob->active.pages); 233 free(blob->clean.pages); 234 235 _spdk_xattrs_free(&blob->xattrs); 236 _spdk_xattrs_free(&blob->xattrs_internal); 237 238 if (blob->back_bs_dev) { 239 blob->back_bs_dev->destroy(blob->back_bs_dev); 240 } 241 242 free(blob); 243 } 244 245 struct freeze_io_ctx { 246 struct spdk_bs_cpl cpl; 247 struct spdk_blob *blob; 248 }; 249 250 static void 251 _spdk_blob_io_sync(struct spdk_io_channel_iter *i) 252 { 253 spdk_for_each_channel_continue(i, 0); 254 } 255 256 static void 257 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i) 258 { 259 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 260 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); 261 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 262 struct spdk_bs_request_set *set; 263 struct spdk_bs_user_op_args *args; 264 spdk_bs_user_op_t *op, *tmp; 265 266 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { 267 set = (struct spdk_bs_request_set *)op; 268 args = &set->u.user_op; 269 270 if (args->blob == ctx->blob) { 271 TAILQ_REMOVE(&ch->queued_io, op, link); 272 spdk_bs_user_op_execute(op); 273 } 274 } 275 276 spdk_for_each_channel_continue(i, 0); 277 } 278 279 static void 280 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status) 281 { 282 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 283 284 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); 285 286 free(ctx); 287 } 288 289 static void 290 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 291 { 292 struct freeze_io_ctx *ctx; 293 294 ctx = calloc(1, sizeof(*ctx)); 295 if (!ctx) { 296 cb_fn(cb_arg, -ENOMEM); 297 return; 298 } 299 300 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 301 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 302 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 303 ctx->blob = blob; 304 305 /* Freeze I/O on blob */ 306 blob->frozen_refcnt++; 307 308 if (blob->frozen_refcnt == 1) { 309 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl); 310 } else { 311 cb_fn(cb_arg, 0); 312 free(ctx); 313 } 314 } 315 316 static void 317 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 318 { 319 struct freeze_io_ctx *ctx; 320 321 ctx = calloc(1, sizeof(*ctx)); 322 if (!ctx) { 323 cb_fn(cb_arg, -ENOMEM); 324 return; 325 } 326 327 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 328 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 329 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 330 ctx->blob = blob; 331 332 assert(blob->frozen_refcnt > 0); 333 334 blob->frozen_refcnt--; 335 336 if (blob->frozen_refcnt == 0) { 337 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl); 338 } else { 339 cb_fn(cb_arg, 0); 340 free(ctx); 341 } 342 } 343 344 static int 345 _spdk_blob_mark_clean(struct spdk_blob *blob) 346 { 347 uint64_t *clusters = NULL; 348 uint32_t *pages = NULL; 349 350 assert(blob != NULL); 351 352 if (blob->active.num_clusters) { 353 assert(blob->active.clusters); 354 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 355 if (!clusters) { 356 return -ENOMEM; 357 } 358 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 359 } 360 361 if (blob->active.num_pages) { 362 assert(blob->active.pages); 363 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 364 if (!pages) { 365 free(clusters); 366 return -ENOMEM; 367 } 368 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 369 } 370 371 free(blob->clean.clusters); 372 free(blob->clean.pages); 373 374 blob->clean.num_clusters = blob->active.num_clusters; 375 blob->clean.clusters = blob->active.clusters; 376 blob->clean.num_pages = blob->active.num_pages; 377 blob->clean.pages = blob->active.pages; 378 379 blob->active.clusters = clusters; 380 blob->active.pages = pages; 381 382 /* If the metadata was dirtied again while the metadata was being written to disk, 383 * we do not want to revert the DIRTY state back to CLEAN here. 384 */ 385 if (blob->state == SPDK_BLOB_STATE_LOADING) { 386 blob->state = SPDK_BLOB_STATE_CLEAN; 387 } 388 389 return 0; 390 } 391 392 static int 393 _spdk_blob_deserialize_xattr(struct spdk_blob *blob, 394 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal) 395 { 396 struct spdk_xattr *xattr; 397 398 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 399 sizeof(desc_xattr->value_length) + 400 desc_xattr->name_length + desc_xattr->value_length) { 401 return -EINVAL; 402 } 403 404 xattr = calloc(1, sizeof(*xattr)); 405 if (xattr == NULL) { 406 return -ENOMEM; 407 } 408 409 xattr->name = malloc(desc_xattr->name_length + 1); 410 if (xattr->name == NULL) { 411 free(xattr); 412 return -ENOMEM; 413 } 414 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 415 xattr->name[desc_xattr->name_length] = '\0'; 416 417 xattr->value = malloc(desc_xattr->value_length); 418 if (xattr->value == NULL) { 419 free(xattr->name); 420 free(xattr); 421 return -ENOMEM; 422 } 423 xattr->value_len = desc_xattr->value_length; 424 memcpy(xattr->value, 425 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 426 desc_xattr->value_length); 427 428 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link); 429 430 return 0; 431 } 432 433 434 static int 435 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 436 { 437 struct spdk_blob_md_descriptor *desc; 438 size_t cur_desc = 0; 439 void *tmp; 440 441 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 442 while (cur_desc < sizeof(page->descriptors)) { 443 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 444 if (desc->length == 0) { 445 /* If padding and length are 0, this terminates the page */ 446 break; 447 } 448 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 449 struct spdk_blob_md_descriptor_flags *desc_flags; 450 451 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 452 453 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 454 return -EINVAL; 455 } 456 457 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 458 SPDK_BLOB_INVALID_FLAGS_MASK) { 459 return -EINVAL; 460 } 461 462 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 463 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 464 blob->data_ro = true; 465 blob->md_ro = true; 466 } 467 468 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 469 SPDK_BLOB_MD_RO_FLAGS_MASK) { 470 blob->md_ro = true; 471 } 472 473 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 474 blob->data_ro = true; 475 blob->md_ro = true; 476 } 477 478 blob->invalid_flags = desc_flags->invalid_flags; 479 blob->data_ro_flags = desc_flags->data_ro_flags; 480 blob->md_ro_flags = desc_flags->md_ro_flags; 481 482 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 483 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 484 unsigned int i, j; 485 unsigned int cluster_count = blob->active.num_clusters; 486 487 if (blob->extent_table_found) { 488 /* Extent Table already present in the md, 489 * both descriptors should never be at the same time. */ 490 return -EINVAL; 491 } 492 blob->extent_rle_found = true; 493 494 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 495 496 if (desc_extent_rle->length == 0 || 497 (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) { 498 return -EINVAL; 499 } 500 501 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 502 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 503 if (desc_extent_rle->extents[i].cluster_idx != 0) { 504 if (!spdk_bit_array_get(blob->bs->used_clusters, 505 desc_extent_rle->extents[i].cluster_idx + j)) { 506 return -EINVAL; 507 } 508 } 509 cluster_count++; 510 } 511 } 512 513 if (cluster_count == 0) { 514 return -EINVAL; 515 } 516 tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters)); 517 if (tmp == NULL) { 518 return -ENOMEM; 519 } 520 blob->active.clusters = tmp; 521 blob->active.cluster_array_size = cluster_count; 522 523 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 524 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 525 if (desc_extent_rle->extents[i].cluster_idx != 0) { 526 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 527 desc_extent_rle->extents[i].cluster_idx + j); 528 } else if (spdk_blob_is_thin_provisioned(blob)) { 529 blob->active.clusters[blob->active.num_clusters++] = 0; 530 } else { 531 return -EINVAL; 532 } 533 } 534 } 535 536 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 537 int rc; 538 539 rc = _spdk_blob_deserialize_xattr(blob, 540 (struct spdk_blob_md_descriptor_xattr *) desc, false); 541 if (rc != 0) { 542 return rc; 543 } 544 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 545 int rc; 546 547 rc = _spdk_blob_deserialize_xattr(blob, 548 (struct spdk_blob_md_descriptor_xattr *) desc, true); 549 if (rc != 0) { 550 return rc; 551 } 552 } else { 553 /* Unrecognized descriptor type. Do not fail - just continue to the 554 * next descriptor. If this descriptor is associated with some feature 555 * defined in a newer version of blobstore, that version of blobstore 556 * should create and set an associated feature flag to specify if this 557 * blob can be loaded or not. 558 */ 559 } 560 561 /* Advance to the next descriptor */ 562 cur_desc += sizeof(*desc) + desc->length; 563 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 564 break; 565 } 566 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 567 } 568 569 return 0; 570 } 571 572 static int 573 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 574 struct spdk_blob *blob) 575 { 576 const struct spdk_blob_md_page *page; 577 uint32_t i; 578 int rc; 579 580 assert(page_count > 0); 581 assert(pages[0].sequence_num == 0); 582 assert(blob != NULL); 583 assert(blob->state == SPDK_BLOB_STATE_LOADING); 584 assert(blob->active.clusters == NULL); 585 586 /* The blobid provided doesn't match what's in the MD, this can 587 * happen for example if a bogus blobid is passed in through open. 588 */ 589 if (blob->id != pages[0].id) { 590 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 591 blob->id, pages[0].id); 592 return -ENOENT; 593 } 594 595 for (i = 0; i < page_count; i++) { 596 page = &pages[i]; 597 598 assert(page->id == blob->id); 599 assert(page->sequence_num == i); 600 601 rc = _spdk_blob_parse_page(page, blob); 602 if (rc != 0) { 603 return rc; 604 } 605 } 606 607 return 0; 608 } 609 610 static int 611 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 612 struct spdk_blob_md_page **pages, 613 uint32_t *page_count, 614 struct spdk_blob_md_page **last_page) 615 { 616 struct spdk_blob_md_page *page; 617 618 assert(pages != NULL); 619 assert(page_count != NULL); 620 621 if (*page_count == 0) { 622 assert(*pages == NULL); 623 *page_count = 1; 624 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 625 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 626 } else { 627 assert(*pages != NULL); 628 (*page_count)++; 629 *pages = spdk_realloc(*pages, 630 SPDK_BS_PAGE_SIZE * (*page_count), 631 SPDK_BS_PAGE_SIZE); 632 } 633 634 if (*pages == NULL) { 635 *page_count = 0; 636 *last_page = NULL; 637 return -ENOMEM; 638 } 639 640 page = &(*pages)[*page_count - 1]; 641 memset(page, 0, sizeof(*page)); 642 page->id = blob->id; 643 page->sequence_num = *page_count - 1; 644 page->next = SPDK_INVALID_MD_PAGE; 645 *last_page = page; 646 647 return 0; 648 } 649 650 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 651 * Update required_sz on both success and failure. 652 * 653 */ 654 static int 655 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 656 uint8_t *buf, size_t buf_sz, 657 size_t *required_sz, bool internal) 658 { 659 struct spdk_blob_md_descriptor_xattr *desc; 660 661 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 662 strlen(xattr->name) + 663 xattr->value_len; 664 665 if (buf_sz < *required_sz) { 666 return -1; 667 } 668 669 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 670 671 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR; 672 desc->length = sizeof(desc->name_length) + 673 sizeof(desc->value_length) + 674 strlen(xattr->name) + 675 xattr->value_len; 676 desc->name_length = strlen(xattr->name); 677 desc->value_length = xattr->value_len; 678 679 memcpy(desc->name, xattr->name, desc->name_length); 680 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 681 xattr->value, 682 desc->value_length); 683 684 return 0; 685 } 686 687 static void 688 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob, 689 uint64_t start_cluster, uint64_t *next_cluster, 690 uint8_t **buf, size_t *buf_sz) 691 { 692 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 693 size_t cur_sz; 694 uint64_t i, extent_idx; 695 uint64_t lba, lba_per_cluster, lba_count; 696 697 /* The buffer must have room for at least one extent */ 698 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]); 699 if (*buf_sz < cur_sz) { 700 *next_cluster = start_cluster; 701 return; 702 } 703 704 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf; 705 desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE; 706 707 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 708 709 lba = blob->active.clusters[start_cluster]; 710 lba_count = lba_per_cluster; 711 extent_idx = 0; 712 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 713 if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) { 714 /* Run-length encode sequential non-zero LBA */ 715 lba_count += lba_per_cluster; 716 continue; 717 } else if (lba == 0 && blob->active.clusters[i] == 0) { 718 /* Run-length encode unallocated clusters */ 719 lba_count += lba_per_cluster; 720 continue; 721 } 722 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 723 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 724 extent_idx++; 725 726 cur_sz += sizeof(desc_extent_rle->extents[extent_idx]); 727 728 if (*buf_sz < cur_sz) { 729 /* If we ran out of buffer space, return */ 730 *next_cluster = i; 731 break; 732 } 733 734 lba = blob->active.clusters[i]; 735 lba_count = lba_per_cluster; 736 } 737 738 if (*buf_sz >= cur_sz) { 739 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 740 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 741 extent_idx++; 742 743 *next_cluster = blob->active.num_clusters; 744 } 745 746 desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx; 747 *buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 748 *buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 749 } 750 751 static int 752 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob, 753 struct spdk_blob_md_page **pages, 754 struct spdk_blob_md_page *cur_page, 755 uint32_t *page_count, uint8_t **buf, 756 size_t *remaining_sz) 757 { 758 uint64_t last_cluster; 759 int rc; 760 761 last_cluster = 0; 762 while (last_cluster < blob->active.num_clusters) { 763 _spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz); 764 765 if (last_cluster == blob->active.num_clusters) { 766 break; 767 } 768 769 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 770 if (rc < 0) { 771 return rc; 772 } 773 774 *buf = (uint8_t *)cur_page->descriptors; 775 *remaining_sz = sizeof(cur_page->descriptors); 776 } 777 778 return 0; 779 } 780 781 static void 782 _spdk_blob_serialize_flags(const struct spdk_blob *blob, 783 uint8_t *buf, size_t *buf_sz) 784 { 785 struct spdk_blob_md_descriptor_flags *desc; 786 787 /* 788 * Flags get serialized first, so we should always have room for the flags 789 * descriptor. 790 */ 791 assert(*buf_sz >= sizeof(*desc)); 792 793 desc = (struct spdk_blob_md_descriptor_flags *)buf; 794 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 795 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 796 desc->invalid_flags = blob->invalid_flags; 797 desc->data_ro_flags = blob->data_ro_flags; 798 desc->md_ro_flags = blob->md_ro_flags; 799 800 *buf_sz -= sizeof(*desc); 801 } 802 803 static int 804 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob, 805 const struct spdk_xattr_tailq *xattrs, bool internal, 806 struct spdk_blob_md_page **pages, 807 struct spdk_blob_md_page *cur_page, 808 uint32_t *page_count, uint8_t **buf, 809 size_t *remaining_sz) 810 { 811 const struct spdk_xattr *xattr; 812 int rc; 813 814 TAILQ_FOREACH(xattr, xattrs, link) { 815 size_t required_sz = 0; 816 817 rc = _spdk_blob_serialize_xattr(xattr, 818 *buf, *remaining_sz, 819 &required_sz, internal); 820 if (rc < 0) { 821 /* Need to add a new page to the chain */ 822 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 823 &cur_page); 824 if (rc < 0) { 825 spdk_free(*pages); 826 *pages = NULL; 827 *page_count = 0; 828 return rc; 829 } 830 831 *buf = (uint8_t *)cur_page->descriptors; 832 *remaining_sz = sizeof(cur_page->descriptors); 833 834 /* Try again */ 835 required_sz = 0; 836 rc = _spdk_blob_serialize_xattr(xattr, 837 *buf, *remaining_sz, 838 &required_sz, internal); 839 840 if (rc < 0) { 841 spdk_free(*pages); 842 *pages = NULL; 843 *page_count = 0; 844 return rc; 845 } 846 } 847 848 *remaining_sz -= required_sz; 849 *buf += required_sz; 850 } 851 852 return 0; 853 } 854 855 static int 856 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 857 uint32_t *page_count) 858 { 859 struct spdk_blob_md_page *cur_page; 860 int rc; 861 uint8_t *buf; 862 size_t remaining_sz; 863 864 assert(pages != NULL); 865 assert(page_count != NULL); 866 assert(blob != NULL); 867 assert(blob->state == SPDK_BLOB_STATE_DIRTY); 868 869 *pages = NULL; 870 *page_count = 0; 871 872 /* A blob always has at least 1 page, even if it has no descriptors */ 873 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 874 if (rc < 0) { 875 return rc; 876 } 877 878 buf = (uint8_t *)cur_page->descriptors; 879 remaining_sz = sizeof(cur_page->descriptors); 880 881 /* Serialize flags */ 882 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 883 buf += sizeof(struct spdk_blob_md_descriptor_flags); 884 885 /* Serialize xattrs */ 886 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false, 887 pages, cur_page, page_count, &buf, &remaining_sz); 888 if (rc < 0) { 889 return rc; 890 } 891 892 /* Serialize internal xattrs */ 893 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true, 894 pages, cur_page, page_count, &buf, &remaining_sz); 895 if (rc < 0) { 896 return rc; 897 } 898 899 if (blob->use_extent_table) { 900 /* Serialization as extent pages is not yet implemented */ 901 assert(false); 902 rc = -ENOSYS; 903 } else { 904 /* Serialize extents */ 905 rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz); 906 } 907 908 return rc; 909 } 910 911 struct spdk_blob_load_ctx { 912 struct spdk_blob *blob; 913 914 struct spdk_blob_md_page *pages; 915 uint32_t num_pages; 916 spdk_bs_sequence_t *seq; 917 918 spdk_bs_sequence_cpl cb_fn; 919 void *cb_arg; 920 }; 921 922 static uint32_t 923 _spdk_blob_md_page_calc_crc(void *page) 924 { 925 uint32_t crc; 926 927 crc = BLOB_CRC32C_INITIAL; 928 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 929 crc ^= BLOB_CRC32C_INITIAL; 930 931 return crc; 932 933 } 934 935 static void 936 _spdk_blob_load_final(void *cb_arg, int bserrno) 937 { 938 struct spdk_blob_load_ctx *ctx = cb_arg; 939 struct spdk_blob *blob = ctx->blob; 940 941 if (bserrno == 0) { 942 _spdk_blob_mark_clean(blob); 943 } 944 945 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno); 946 947 /* Free the memory */ 948 spdk_free(ctx->pages); 949 free(ctx); 950 } 951 952 static void 953 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno) 954 { 955 struct spdk_blob_load_ctx *ctx = cb_arg; 956 struct spdk_blob *blob = ctx->blob; 957 958 if (bserrno == 0) { 959 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot); 960 if (blob->back_bs_dev == NULL) { 961 bserrno = -ENOMEM; 962 } 963 } 964 if (bserrno != 0) { 965 SPDK_ERRLOG("Snapshot fail\n"); 966 } 967 968 _spdk_blob_load_final(ctx, bserrno); 969 } 970 971 static void _spdk_blob_update_clear_method(struct spdk_blob *blob); 972 973 static void 974 _spdk_blob_load_backing_dev(void *cb_arg) 975 { 976 struct spdk_blob_load_ctx *ctx = cb_arg; 977 struct spdk_blob *blob = ctx->blob; 978 const void *value; 979 size_t len; 980 int rc; 981 982 if (spdk_blob_is_thin_provisioned(blob)) { 983 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true); 984 if (rc == 0) { 985 if (len != sizeof(spdk_blob_id)) { 986 _spdk_blob_load_final(ctx, -EINVAL); 987 return; 988 } 989 /* open snapshot blob and continue in the callback function */ 990 blob->parent_id = *(spdk_blob_id *)value; 991 spdk_bs_open_blob(blob->bs, blob->parent_id, 992 _spdk_blob_load_snapshot_cpl, ctx); 993 return; 994 } else { 995 /* add zeroes_dev for thin provisioned blob */ 996 blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 997 } 998 } else { 999 /* standard blob */ 1000 blob->back_bs_dev = NULL; 1001 } 1002 _spdk_blob_load_final(ctx, 0); 1003 } 1004 1005 static void 1006 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1007 { 1008 struct spdk_blob_load_ctx *ctx = cb_arg; 1009 struct spdk_blob *blob = ctx->blob; 1010 struct spdk_blob_md_page *page; 1011 int rc; 1012 uint32_t crc; 1013 1014 if (bserrno) { 1015 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno); 1016 _spdk_blob_load_final(ctx, bserrno); 1017 return; 1018 } 1019 1020 page = &ctx->pages[ctx->num_pages - 1]; 1021 crc = _spdk_blob_md_page_calc_crc(page); 1022 if (crc != page->crc) { 1023 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 1024 _spdk_blob_load_final(ctx, -EINVAL); 1025 return; 1026 } 1027 1028 if (page->next != SPDK_INVALID_MD_PAGE) { 1029 uint32_t next_page = page->next; 1030 uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page); 1031 1032 /* Read the next page */ 1033 ctx->num_pages++; 1034 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 1035 sizeof(*page)); 1036 if (ctx->pages == NULL) { 1037 _spdk_blob_load_final(ctx, -ENOMEM); 1038 return; 1039 } 1040 1041 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1], 1042 next_lba, 1043 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 1044 _spdk_blob_load_cpl, ctx); 1045 return; 1046 } 1047 1048 /* Parse the pages */ 1049 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 1050 if (rc) { 1051 _spdk_blob_load_final(ctx, rc); 1052 return; 1053 } 1054 1055 if (blob->extent_table_found == true) { 1056 /* If EXTENT_TABLE was found, that means support for it should be enabled. */ 1057 assert(blob->extent_rle_found == false); 1058 blob->use_extent_table = true; 1059 } else { 1060 /* If EXTENT_RLE or no extent_* descriptor was found disable support 1061 * for extent table. No extent_* descriptors means that blob has length of 0 1062 * and no extent_rle descriptors were persisted for it. 1063 * EXTENT_TABLE if used, is always present in metadata regardless of length. */ 1064 blob->use_extent_table = false; 1065 } 1066 1067 ctx->seq = seq; 1068 1069 /* Check the clear_method stored in metadata vs what may have been passed 1070 * via spdk_bs_open_blob_ext() and update accordingly. 1071 */ 1072 _spdk_blob_update_clear_method(blob); 1073 1074 _spdk_blob_load_backing_dev(ctx); 1075 } 1076 1077 /* Load a blob from disk given a blobid */ 1078 static void 1079 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1080 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1081 { 1082 struct spdk_blob_load_ctx *ctx; 1083 struct spdk_blob_store *bs; 1084 uint32_t page_num; 1085 uint64_t lba; 1086 1087 _spdk_blob_verify_md_op(blob); 1088 1089 bs = blob->bs; 1090 1091 ctx = calloc(1, sizeof(*ctx)); 1092 if (!ctx) { 1093 cb_fn(seq, cb_arg, -ENOMEM); 1094 return; 1095 } 1096 1097 ctx->blob = blob; 1098 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE); 1099 if (!ctx->pages) { 1100 free(ctx); 1101 cb_fn(seq, cb_arg, -ENOMEM); 1102 return; 1103 } 1104 ctx->num_pages = 1; 1105 ctx->cb_fn = cb_fn; 1106 ctx->cb_arg = cb_arg; 1107 ctx->seq = seq; 1108 1109 page_num = _spdk_bs_blobid_to_page(blob->id); 1110 lba = _spdk_bs_md_page_to_lba(blob->bs, page_num); 1111 1112 blob->state = SPDK_BLOB_STATE_LOADING; 1113 1114 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1115 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 1116 _spdk_blob_load_cpl, ctx); 1117 } 1118 1119 struct spdk_blob_persist_ctx { 1120 struct spdk_blob *blob; 1121 1122 struct spdk_bs_super_block *super; 1123 1124 struct spdk_blob_md_page *pages; 1125 1126 spdk_bs_sequence_t *seq; 1127 spdk_bs_sequence_cpl cb_fn; 1128 void *cb_arg; 1129 }; 1130 1131 static void 1132 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba, 1133 uint32_t lba_count) 1134 { 1135 switch (ctx->blob->clear_method) { 1136 case BLOB_CLEAR_WITH_DEFAULT: 1137 case BLOB_CLEAR_WITH_UNMAP: 1138 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1139 break; 1140 case BLOB_CLEAR_WITH_WRITE_ZEROES: 1141 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1142 break; 1143 case BLOB_CLEAR_WITH_NONE: 1144 default: 1145 break; 1146 } 1147 } 1148 1149 static void 1150 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1151 { 1152 struct spdk_blob_persist_ctx *ctx = cb_arg; 1153 struct spdk_blob *blob = ctx->blob; 1154 1155 if (bserrno == 0) { 1156 _spdk_blob_mark_clean(blob); 1157 } 1158 1159 /* Call user callback */ 1160 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 1161 1162 /* Free the memory */ 1163 spdk_free(ctx->pages); 1164 free(ctx); 1165 } 1166 1167 static void 1168 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1169 { 1170 struct spdk_blob_persist_ctx *ctx = cb_arg; 1171 struct spdk_blob *blob = ctx->blob; 1172 struct spdk_blob_store *bs = blob->bs; 1173 size_t i; 1174 1175 /* Release all clusters that were truncated */ 1176 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1177 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 1178 1179 /* Nothing to release if it was not allocated */ 1180 if (blob->active.clusters[i] != 0) { 1181 _spdk_bs_release_cluster(bs, cluster_num); 1182 } 1183 } 1184 1185 if (blob->active.num_clusters == 0) { 1186 free(blob->active.clusters); 1187 blob->active.clusters = NULL; 1188 blob->active.cluster_array_size = 0; 1189 } else if (blob->active.num_clusters != blob->active.cluster_array_size) { 1190 #ifndef __clang_analyzer__ 1191 void *tmp; 1192 1193 /* scan-build really can't figure reallocs, workaround it */ 1194 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters); 1195 assert(tmp != NULL); 1196 blob->active.clusters = tmp; 1197 #endif 1198 blob->active.cluster_array_size = blob->active.num_clusters; 1199 } 1200 1201 _spdk_blob_persist_complete(seq, ctx, bserrno); 1202 } 1203 1204 static void 1205 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1206 { 1207 struct spdk_blob_persist_ctx *ctx = cb_arg; 1208 struct spdk_blob *blob = ctx->blob; 1209 struct spdk_blob_store *bs = blob->bs; 1210 spdk_bs_batch_t *batch; 1211 size_t i; 1212 uint64_t lba; 1213 uint32_t lba_count; 1214 1215 /* Clusters don't move around in blobs. The list shrinks or grows 1216 * at the end, but no changes ever occur in the middle of the list. 1217 */ 1218 1219 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx); 1220 1221 /* Clear all clusters that were truncated */ 1222 lba = 0; 1223 lba_count = 0; 1224 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1225 uint64_t next_lba = blob->active.clusters[i]; 1226 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 1227 1228 if (next_lba > 0 && (lba + lba_count) == next_lba) { 1229 /* This cluster is contiguous with the previous one. */ 1230 lba_count += next_lba_count; 1231 continue; 1232 } 1233 1234 /* This cluster is not contiguous with the previous one. */ 1235 1236 /* If a run of LBAs previously existing, clear them now */ 1237 if (lba_count > 0) { 1238 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1239 } 1240 1241 /* Start building the next batch */ 1242 lba = next_lba; 1243 if (next_lba > 0) { 1244 lba_count = next_lba_count; 1245 } else { 1246 lba_count = 0; 1247 } 1248 } 1249 1250 /* If we ended with a contiguous set of LBAs, clear them now */ 1251 if (lba_count > 0) { 1252 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1253 } 1254 1255 spdk_bs_batch_close(batch); 1256 } 1257 1258 static void 1259 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1260 { 1261 struct spdk_blob_persist_ctx *ctx = cb_arg; 1262 struct spdk_blob *blob = ctx->blob; 1263 struct spdk_blob_store *bs = blob->bs; 1264 size_t i; 1265 1266 /* This loop starts at 1 because the first page is special and handled 1267 * below. The pages (except the first) are never written in place, 1268 * so any pages in the clean list must be zeroed. 1269 */ 1270 for (i = 1; i < blob->clean.num_pages; i++) { 1271 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 1272 } 1273 1274 if (blob->active.num_pages == 0) { 1275 uint32_t page_num; 1276 1277 page_num = _spdk_bs_blobid_to_page(blob->id); 1278 spdk_bit_array_clear(bs->used_md_pages, page_num); 1279 } 1280 1281 /* Move on to clearing clusters */ 1282 _spdk_blob_persist_clear_clusters(seq, ctx, 0); 1283 } 1284 1285 static void 1286 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1287 { 1288 struct spdk_blob_persist_ctx *ctx = cb_arg; 1289 struct spdk_blob *blob = ctx->blob; 1290 struct spdk_blob_store *bs = blob->bs; 1291 uint64_t lba; 1292 uint32_t lba_count; 1293 spdk_bs_batch_t *batch; 1294 size_t i; 1295 1296 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 1297 1298 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 1299 1300 /* This loop starts at 1 because the first page is special and handled 1301 * below. The pages (except the first) are never written in place, 1302 * so any pages in the clean list must be zeroed. 1303 */ 1304 for (i = 1; i < blob->clean.num_pages; i++) { 1305 lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]); 1306 1307 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1308 } 1309 1310 /* The first page will only be zeroed if this is a delete. */ 1311 if (blob->active.num_pages == 0) { 1312 uint32_t page_num; 1313 1314 /* The first page in the metadata goes where the blobid indicates */ 1315 page_num = _spdk_bs_blobid_to_page(blob->id); 1316 lba = _spdk_bs_md_page_to_lba(bs, page_num); 1317 1318 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1319 } 1320 1321 spdk_bs_batch_close(batch); 1322 } 1323 1324 static void 1325 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1326 { 1327 struct spdk_blob_persist_ctx *ctx = cb_arg; 1328 struct spdk_blob *blob = ctx->blob; 1329 struct spdk_blob_store *bs = blob->bs; 1330 uint64_t lba; 1331 uint32_t lba_count; 1332 struct spdk_blob_md_page *page; 1333 1334 if (blob->active.num_pages == 0) { 1335 /* Move on to the next step */ 1336 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1337 return; 1338 } 1339 1340 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1341 1342 page = &ctx->pages[0]; 1343 /* The first page in the metadata goes where the blobid indicates */ 1344 lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id)); 1345 1346 spdk_bs_sequence_write_dev(seq, page, lba, lba_count, 1347 _spdk_blob_persist_zero_pages, ctx); 1348 } 1349 1350 static void 1351 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1352 { 1353 struct spdk_blob_persist_ctx *ctx = cb_arg; 1354 struct spdk_blob *blob = ctx->blob; 1355 struct spdk_blob_store *bs = blob->bs; 1356 uint64_t lba; 1357 uint32_t lba_count; 1358 struct spdk_blob_md_page *page; 1359 spdk_bs_batch_t *batch; 1360 size_t i; 1361 1362 /* Clusters don't move around in blobs. The list shrinks or grows 1363 * at the end, but no changes ever occur in the middle of the list. 1364 */ 1365 1366 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1367 1368 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 1369 1370 /* This starts at 1. The root page is not written until 1371 * all of the others are finished 1372 */ 1373 for (i = 1; i < blob->active.num_pages; i++) { 1374 page = &ctx->pages[i]; 1375 assert(page->sequence_num == i); 1376 1377 lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]); 1378 1379 spdk_bs_batch_write_dev(batch, page, lba, lba_count); 1380 } 1381 1382 spdk_bs_batch_close(batch); 1383 } 1384 1385 static int 1386 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz) 1387 { 1388 uint64_t i; 1389 uint64_t *tmp; 1390 uint64_t lfc; /* lowest free cluster */ 1391 uint64_t num_clusters; 1392 struct spdk_blob_store *bs; 1393 1394 bs = blob->bs; 1395 1396 _spdk_blob_verify_md_op(blob); 1397 1398 if (blob->active.num_clusters == sz) { 1399 return 0; 1400 } 1401 1402 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1403 /* If this blob was resized to be larger, then smaller, then 1404 * larger without syncing, then the cluster array already 1405 * contains spare assigned clusters we can use. 1406 */ 1407 num_clusters = spdk_min(blob->active.cluster_array_size, 1408 sz); 1409 } else { 1410 num_clusters = blob->active.num_clusters; 1411 } 1412 1413 /* Do two passes - one to verify that we can obtain enough clusters 1414 * and another to actually claim them. 1415 */ 1416 1417 if (spdk_blob_is_thin_provisioned(blob) == false) { 1418 lfc = 0; 1419 for (i = num_clusters; i < sz; i++) { 1420 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1421 if (lfc == UINT32_MAX) { 1422 /* No more free clusters. Cannot satisfy the request */ 1423 return -ENOSPC; 1424 } 1425 lfc++; 1426 } 1427 } 1428 1429 if (sz > num_clusters) { 1430 /* Expand the cluster array if necessary. 1431 * We only shrink the array when persisting. 1432 */ 1433 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz); 1434 if (sz > 0 && tmp == NULL) { 1435 return -ENOMEM; 1436 } 1437 memset(tmp + blob->active.cluster_array_size, 0, 1438 sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); 1439 blob->active.clusters = tmp; 1440 blob->active.cluster_array_size = sz; 1441 } 1442 1443 blob->state = SPDK_BLOB_STATE_DIRTY; 1444 1445 if (spdk_blob_is_thin_provisioned(blob) == false) { 1446 lfc = 0; 1447 for (i = num_clusters; i < sz; i++) { 1448 _spdk_bs_allocate_cluster(blob, i, &lfc, true); 1449 lfc++; 1450 } 1451 } 1452 1453 blob->active.num_clusters = sz; 1454 1455 return 0; 1456 } 1457 1458 static void 1459 _spdk_blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) 1460 { 1461 spdk_bs_sequence_t *seq = ctx->seq; 1462 struct spdk_blob *blob = ctx->blob; 1463 struct spdk_blob_store *bs = blob->bs; 1464 uint64_t i; 1465 uint32_t page_num; 1466 void *tmp; 1467 int rc; 1468 1469 /* Generate the new metadata */ 1470 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1471 if (rc < 0) { 1472 _spdk_blob_persist_complete(seq, ctx, rc); 1473 return; 1474 } 1475 1476 assert(blob->active.num_pages >= 1); 1477 1478 /* Resize the cache of page indices */ 1479 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 1480 if (!tmp) { 1481 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1482 return; 1483 } 1484 blob->active.pages = tmp; 1485 1486 /* Assign this metadata to pages. This requires two passes - 1487 * one to verify that there are enough pages and a second 1488 * to actually claim them. */ 1489 page_num = 0; 1490 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1491 for (i = 1; i < blob->active.num_pages; i++) { 1492 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1493 if (page_num == UINT32_MAX) { 1494 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1495 return; 1496 } 1497 page_num++; 1498 } 1499 1500 page_num = 0; 1501 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1502 for (i = 1; i < blob->active.num_pages; i++) { 1503 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1504 ctx->pages[i - 1].next = page_num; 1505 /* Now that previous metadata page is complete, calculate the crc for it. */ 1506 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1507 blob->active.pages[i] = page_num; 1508 spdk_bit_array_set(bs->used_md_pages, page_num); 1509 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1510 page_num++; 1511 } 1512 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1513 /* Start writing the metadata from last page to first */ 1514 blob->state = SPDK_BLOB_STATE_CLEAN; 1515 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1516 } 1517 1518 static void 1519 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx) 1520 { 1521 spdk_bs_sequence_t *seq = ctx->seq; 1522 struct spdk_blob *blob = ctx->blob; 1523 1524 if (blob->active.num_pages == 0) { 1525 /* This is the signal that the blob should be deleted. 1526 * Immediately jump to the clean up routine. */ 1527 assert(blob->clean.num_pages > 0); 1528 blob->state = SPDK_BLOB_STATE_CLEAN; 1529 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1530 return; 1531 1532 } 1533 1534 _spdk_blob_persist_generate_new_md(ctx); 1535 } 1536 1537 static void 1538 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1539 { 1540 struct spdk_blob_persist_ctx *ctx = cb_arg; 1541 1542 ctx->blob->bs->clean = 0; 1543 1544 spdk_free(ctx->super); 1545 1546 _spdk_blob_persist_start(ctx); 1547 } 1548 1549 static void 1550 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1551 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg); 1552 1553 1554 static void 1555 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1556 { 1557 struct spdk_blob_persist_ctx *ctx = cb_arg; 1558 1559 ctx->super->clean = 0; 1560 if (ctx->super->size == 0) { 1561 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen; 1562 } 1563 1564 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx); 1565 } 1566 1567 1568 /* Write a blob to disk */ 1569 static void 1570 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1571 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1572 { 1573 struct spdk_blob_persist_ctx *ctx; 1574 1575 _spdk_blob_verify_md_op(blob); 1576 1577 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1578 cb_fn(seq, cb_arg, 0); 1579 return; 1580 } 1581 1582 ctx = calloc(1, sizeof(*ctx)); 1583 if (!ctx) { 1584 cb_fn(seq, cb_arg, -ENOMEM); 1585 return; 1586 } 1587 ctx->blob = blob; 1588 ctx->seq = seq; 1589 ctx->cb_fn = cb_fn; 1590 ctx->cb_arg = cb_arg; 1591 1592 if (blob->bs->clean) { 1593 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 1594 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1595 if (!ctx->super) { 1596 cb_fn(seq, cb_arg, -ENOMEM); 1597 free(ctx); 1598 return; 1599 } 1600 1601 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0), 1602 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)), 1603 _spdk_blob_persist_dirty, ctx); 1604 } else { 1605 _spdk_blob_persist_start(ctx); 1606 } 1607 } 1608 1609 struct spdk_blob_copy_cluster_ctx { 1610 struct spdk_blob *blob; 1611 uint8_t *buf; 1612 uint64_t page; 1613 uint64_t new_cluster; 1614 spdk_bs_sequence_t *seq; 1615 }; 1616 1617 static void 1618 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno) 1619 { 1620 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1621 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq; 1622 TAILQ_HEAD(, spdk_bs_request_set) requests; 1623 spdk_bs_user_op_t *op; 1624 1625 TAILQ_INIT(&requests); 1626 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link); 1627 1628 while (!TAILQ_EMPTY(&requests)) { 1629 op = TAILQ_FIRST(&requests); 1630 TAILQ_REMOVE(&requests, op, link); 1631 if (bserrno == 0) { 1632 spdk_bs_user_op_execute(op); 1633 } else { 1634 spdk_bs_user_op_abort(op); 1635 } 1636 } 1637 1638 spdk_free(ctx->buf); 1639 free(ctx); 1640 } 1641 1642 static void 1643 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno) 1644 { 1645 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1646 1647 if (bserrno) { 1648 if (bserrno == -EEXIST) { 1649 /* The metadata insert failed because another thread 1650 * allocated the cluster first. Free our cluster 1651 * but continue without error. */ 1652 bserrno = 0; 1653 } 1654 _spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster); 1655 } 1656 1657 spdk_bs_sequence_finish(ctx->seq, bserrno); 1658 } 1659 1660 static void 1661 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1662 { 1663 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1664 uint32_t cluster_number; 1665 1666 if (bserrno) { 1667 /* The write failed, so jump to the final completion handler */ 1668 spdk_bs_sequence_finish(seq, bserrno); 1669 return; 1670 } 1671 1672 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page); 1673 1674 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1675 _spdk_blob_insert_cluster_cpl, ctx); 1676 } 1677 1678 static void 1679 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1680 { 1681 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1682 1683 if (bserrno != 0) { 1684 /* The read failed, so jump to the final completion handler */ 1685 spdk_bs_sequence_finish(seq, bserrno); 1686 return; 1687 } 1688 1689 /* Write whole cluster */ 1690 spdk_bs_sequence_write_dev(seq, ctx->buf, 1691 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster), 1692 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1), 1693 _spdk_blob_write_copy_cpl, ctx); 1694 } 1695 1696 static void 1697 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob, 1698 struct spdk_io_channel *_ch, 1699 uint64_t io_unit, spdk_bs_user_op_t *op) 1700 { 1701 struct spdk_bs_cpl cpl; 1702 struct spdk_bs_channel *ch; 1703 struct spdk_blob_copy_cluster_ctx *ctx; 1704 uint32_t cluster_start_page; 1705 uint32_t cluster_number; 1706 int rc; 1707 1708 ch = spdk_io_channel_get_ctx(_ch); 1709 1710 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) { 1711 /* There are already operations pending. Queue this user op 1712 * and return because it will be re-executed when the outstanding 1713 * cluster allocation completes. */ 1714 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1715 return; 1716 } 1717 1718 /* Round the io_unit offset down to the first page in the cluster */ 1719 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit); 1720 1721 /* Calculate which index in the metadata cluster array the corresponding 1722 * cluster is supposed to be at. */ 1723 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit); 1724 1725 ctx = calloc(1, sizeof(*ctx)); 1726 if (!ctx) { 1727 spdk_bs_user_op_abort(op); 1728 return; 1729 } 1730 1731 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0); 1732 1733 ctx->blob = blob; 1734 ctx->page = cluster_start_page; 1735 1736 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1737 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, 1738 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1739 if (!ctx->buf) { 1740 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n", 1741 blob->bs->cluster_sz); 1742 free(ctx); 1743 spdk_bs_user_op_abort(op); 1744 return; 1745 } 1746 } 1747 1748 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false); 1749 if (rc != 0) { 1750 spdk_free(ctx->buf); 1751 free(ctx); 1752 spdk_bs_user_op_abort(op); 1753 return; 1754 } 1755 1756 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1757 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl; 1758 cpl.u.blob_basic.cb_arg = ctx; 1759 1760 ctx->seq = spdk_bs_sequence_start(_ch, &cpl); 1761 if (!ctx->seq) { 1762 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster); 1763 spdk_free(ctx->buf); 1764 free(ctx); 1765 spdk_bs_user_op_abort(op); 1766 return; 1767 } 1768 1769 /* Queue the user op to block other incoming operations */ 1770 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1771 1772 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1773 /* Read cluster from backing device */ 1774 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf, 1775 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page), 1776 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz), 1777 _spdk_blob_write_copy, ctx); 1778 } else { 1779 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1780 _spdk_blob_insert_cluster_cpl, ctx); 1781 } 1782 } 1783 1784 static void 1785 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length, 1786 uint64_t *lba, uint32_t *lba_count) 1787 { 1788 *lba_count = length; 1789 1790 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) { 1791 assert(blob->back_bs_dev != NULL); 1792 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit); 1793 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count); 1794 } else { 1795 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit); 1796 } 1797 } 1798 1799 struct op_split_ctx { 1800 struct spdk_blob *blob; 1801 struct spdk_io_channel *channel; 1802 uint64_t io_unit_offset; 1803 uint64_t io_units_remaining; 1804 void *curr_payload; 1805 enum spdk_blob_op_type op_type; 1806 spdk_bs_sequence_t *seq; 1807 }; 1808 1809 static void 1810 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) 1811 { 1812 struct op_split_ctx *ctx = cb_arg; 1813 struct spdk_blob *blob = ctx->blob; 1814 struct spdk_io_channel *ch = ctx->channel; 1815 enum spdk_blob_op_type op_type = ctx->op_type; 1816 uint8_t *buf = ctx->curr_payload; 1817 uint64_t offset = ctx->io_unit_offset; 1818 uint64_t length = ctx->io_units_remaining; 1819 uint64_t op_length; 1820 1821 if (bserrno != 0 || ctx->io_units_remaining == 0) { 1822 spdk_bs_sequence_finish(ctx->seq, bserrno); 1823 free(ctx); 1824 return; 1825 } 1826 1827 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob, 1828 offset)); 1829 1830 /* Update length and payload for next operation */ 1831 ctx->io_units_remaining -= op_length; 1832 ctx->io_unit_offset += op_length; 1833 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1834 ctx->curr_payload += op_length * blob->bs->io_unit_size; 1835 } 1836 1837 switch (op_type) { 1838 case SPDK_BLOB_READ: 1839 spdk_blob_io_read(blob, ch, buf, offset, op_length, 1840 _spdk_blob_request_submit_op_split_next, ctx); 1841 break; 1842 case SPDK_BLOB_WRITE: 1843 spdk_blob_io_write(blob, ch, buf, offset, op_length, 1844 _spdk_blob_request_submit_op_split_next, ctx); 1845 break; 1846 case SPDK_BLOB_UNMAP: 1847 spdk_blob_io_unmap(blob, ch, offset, op_length, 1848 _spdk_blob_request_submit_op_split_next, ctx); 1849 break; 1850 case SPDK_BLOB_WRITE_ZEROES: 1851 spdk_blob_io_write_zeroes(blob, ch, offset, op_length, 1852 _spdk_blob_request_submit_op_split_next, ctx); 1853 break; 1854 case SPDK_BLOB_READV: 1855 case SPDK_BLOB_WRITEV: 1856 SPDK_ERRLOG("readv/write not valid\n"); 1857 spdk_bs_sequence_finish(ctx->seq, -EINVAL); 1858 free(ctx); 1859 break; 1860 } 1861 } 1862 1863 static void 1864 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, 1865 void *payload, uint64_t offset, uint64_t length, 1866 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1867 { 1868 struct op_split_ctx *ctx; 1869 spdk_bs_sequence_t *seq; 1870 struct spdk_bs_cpl cpl; 1871 1872 assert(blob != NULL); 1873 1874 ctx = calloc(1, sizeof(struct op_split_ctx)); 1875 if (ctx == NULL) { 1876 cb_fn(cb_arg, -ENOMEM); 1877 return; 1878 } 1879 1880 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1881 cpl.u.blob_basic.cb_fn = cb_fn; 1882 cpl.u.blob_basic.cb_arg = cb_arg; 1883 1884 seq = spdk_bs_sequence_start(ch, &cpl); 1885 if (!seq) { 1886 free(ctx); 1887 cb_fn(cb_arg, -ENOMEM); 1888 return; 1889 } 1890 1891 ctx->blob = blob; 1892 ctx->channel = ch; 1893 ctx->curr_payload = payload; 1894 ctx->io_unit_offset = offset; 1895 ctx->io_units_remaining = length; 1896 ctx->op_type = op_type; 1897 ctx->seq = seq; 1898 1899 _spdk_blob_request_submit_op_split_next(ctx, 0); 1900 } 1901 1902 static void 1903 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob, 1904 void *payload, uint64_t offset, uint64_t length, 1905 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1906 { 1907 struct spdk_bs_cpl cpl; 1908 uint64_t lba; 1909 uint32_t lba_count; 1910 1911 assert(blob != NULL); 1912 1913 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1914 cpl.u.blob_basic.cb_fn = cb_fn; 1915 cpl.u.blob_basic.cb_arg = cb_arg; 1916 1917 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 1918 1919 if (blob->frozen_refcnt) { 1920 /* This blob I/O is frozen */ 1921 spdk_bs_user_op_t *op; 1922 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); 1923 1924 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1925 if (!op) { 1926 cb_fn(cb_arg, -ENOMEM); 1927 return; 1928 } 1929 1930 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 1931 1932 return; 1933 } 1934 1935 switch (op_type) { 1936 case SPDK_BLOB_READ: { 1937 spdk_bs_batch_t *batch; 1938 1939 batch = spdk_bs_batch_open(_ch, &cpl); 1940 if (!batch) { 1941 cb_fn(cb_arg, -ENOMEM); 1942 return; 1943 } 1944 1945 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1946 /* Read from the blob */ 1947 spdk_bs_batch_read_dev(batch, payload, lba, lba_count); 1948 } else { 1949 /* Read from the backing block device */ 1950 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count); 1951 } 1952 1953 spdk_bs_batch_close(batch); 1954 break; 1955 } 1956 case SPDK_BLOB_WRITE: 1957 case SPDK_BLOB_WRITE_ZEROES: { 1958 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1959 /* Write to the blob */ 1960 spdk_bs_batch_t *batch; 1961 1962 if (lba_count == 0) { 1963 cb_fn(cb_arg, 0); 1964 return; 1965 } 1966 1967 batch = spdk_bs_batch_open(_ch, &cpl); 1968 if (!batch) { 1969 cb_fn(cb_arg, -ENOMEM); 1970 return; 1971 } 1972 1973 if (op_type == SPDK_BLOB_WRITE) { 1974 spdk_bs_batch_write_dev(batch, payload, lba, lba_count); 1975 } else { 1976 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1977 } 1978 1979 spdk_bs_batch_close(batch); 1980 } else { 1981 /* Queue this operation and allocate the cluster */ 1982 spdk_bs_user_op_t *op; 1983 1984 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1985 if (!op) { 1986 cb_fn(cb_arg, -ENOMEM); 1987 return; 1988 } 1989 1990 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op); 1991 } 1992 break; 1993 } 1994 case SPDK_BLOB_UNMAP: { 1995 spdk_bs_batch_t *batch; 1996 1997 batch = spdk_bs_batch_open(_ch, &cpl); 1998 if (!batch) { 1999 cb_fn(cb_arg, -ENOMEM); 2000 return; 2001 } 2002 2003 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2004 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 2005 } 2006 2007 spdk_bs_batch_close(batch); 2008 break; 2009 } 2010 case SPDK_BLOB_READV: 2011 case SPDK_BLOB_WRITEV: 2012 SPDK_ERRLOG("readv/write not valid\n"); 2013 cb_fn(cb_arg, -EINVAL); 2014 break; 2015 } 2016 } 2017 2018 static void 2019 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2020 void *payload, uint64_t offset, uint64_t length, 2021 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 2022 { 2023 assert(blob != NULL); 2024 2025 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 2026 cb_fn(cb_arg, -EPERM); 2027 return; 2028 } 2029 2030 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2031 cb_fn(cb_arg, -EINVAL); 2032 return; 2033 } 2034 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) { 2035 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length, 2036 cb_fn, cb_arg, op_type); 2037 } else { 2038 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length, 2039 cb_fn, cb_arg, op_type); 2040 } 2041 } 2042 2043 struct rw_iov_ctx { 2044 struct spdk_blob *blob; 2045 struct spdk_io_channel *channel; 2046 spdk_blob_op_complete cb_fn; 2047 void *cb_arg; 2048 bool read; 2049 int iovcnt; 2050 struct iovec *orig_iov; 2051 uint64_t io_unit_offset; 2052 uint64_t io_units_remaining; 2053 uint64_t io_units_done; 2054 struct iovec iov[0]; 2055 }; 2056 2057 static void 2058 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2059 { 2060 assert(cb_arg == NULL); 2061 spdk_bs_sequence_finish(seq, bserrno); 2062 } 2063 2064 static void 2065 _spdk_rw_iov_split_next(void *cb_arg, int bserrno) 2066 { 2067 struct rw_iov_ctx *ctx = cb_arg; 2068 struct spdk_blob *blob = ctx->blob; 2069 struct iovec *iov, *orig_iov; 2070 int iovcnt; 2071 size_t orig_iovoff; 2072 uint64_t io_units_count, io_units_to_boundary, io_unit_offset; 2073 uint64_t byte_count; 2074 2075 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2076 ctx->cb_fn(ctx->cb_arg, bserrno); 2077 free(ctx); 2078 return; 2079 } 2080 2081 io_unit_offset = ctx->io_unit_offset; 2082 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset); 2083 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary); 2084 /* 2085 * Get index and offset into the original iov array for our current position in the I/O sequence. 2086 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 2087 * point to the current position in the I/O sequence. 2088 */ 2089 byte_count = ctx->io_units_done * blob->bs->io_unit_size; 2090 orig_iov = &ctx->orig_iov[0]; 2091 orig_iovoff = 0; 2092 while (byte_count > 0) { 2093 if (byte_count >= orig_iov->iov_len) { 2094 byte_count -= orig_iov->iov_len; 2095 orig_iov++; 2096 } else { 2097 orig_iovoff = byte_count; 2098 byte_count = 0; 2099 } 2100 } 2101 2102 /* 2103 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 2104 * bytes of this next I/O remain to be accounted for in the new iov array. 2105 */ 2106 byte_count = io_units_count * blob->bs->io_unit_size; 2107 iov = &ctx->iov[0]; 2108 iovcnt = 0; 2109 while (byte_count > 0) { 2110 assert(iovcnt < ctx->iovcnt); 2111 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 2112 iov->iov_base = orig_iov->iov_base + orig_iovoff; 2113 byte_count -= iov->iov_len; 2114 orig_iovoff = 0; 2115 orig_iov++; 2116 iov++; 2117 iovcnt++; 2118 } 2119 2120 ctx->io_unit_offset += io_units_count; 2121 ctx->io_units_remaining -= io_units_count; 2122 ctx->io_units_done += io_units_count; 2123 iov = &ctx->iov[0]; 2124 2125 if (ctx->read) { 2126 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2127 io_units_count, _spdk_rw_iov_split_next, ctx); 2128 } else { 2129 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2130 io_units_count, _spdk_rw_iov_split_next, ctx); 2131 } 2132 } 2133 2134 static void 2135 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2136 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2137 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 2138 { 2139 struct spdk_bs_cpl cpl; 2140 2141 assert(blob != NULL); 2142 2143 if (!read && blob->data_ro) { 2144 cb_fn(cb_arg, -EPERM); 2145 return; 2146 } 2147 2148 if (length == 0) { 2149 cb_fn(cb_arg, 0); 2150 return; 2151 } 2152 2153 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2154 cb_fn(cb_arg, -EINVAL); 2155 return; 2156 } 2157 2158 /* 2159 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 2160 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 2161 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 2162 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 2163 * to allocate a separate iov array and split the I/O such that none of the resulting 2164 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 2165 * but since this case happens very infrequently, any performance impact will be negligible. 2166 * 2167 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 2168 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 2169 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 2170 * when the batch was completed, to allow for freeing the memory for the iov arrays. 2171 */ 2172 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) { 2173 uint32_t lba_count; 2174 uint64_t lba; 2175 2176 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2177 cpl.u.blob_basic.cb_fn = cb_fn; 2178 cpl.u.blob_basic.cb_arg = cb_arg; 2179 2180 if (blob->frozen_refcnt) { 2181 /* This blob I/O is frozen */ 2182 enum spdk_blob_op_type op_type; 2183 spdk_bs_user_op_t *op; 2184 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); 2185 2186 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV; 2187 op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length); 2188 if (!op) { 2189 cb_fn(cb_arg, -ENOMEM); 2190 return; 2191 } 2192 2193 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2194 2195 return; 2196 } 2197 2198 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2199 2200 if (read) { 2201 spdk_bs_sequence_t *seq; 2202 2203 seq = spdk_bs_sequence_start(_channel, &cpl); 2204 if (!seq) { 2205 cb_fn(cb_arg, -ENOMEM); 2206 return; 2207 } 2208 2209 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2210 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2211 } else { 2212 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count, 2213 _spdk_rw_iov_done, NULL); 2214 } 2215 } else { 2216 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2217 spdk_bs_sequence_t *seq; 2218 2219 seq = spdk_bs_sequence_start(_channel, &cpl); 2220 if (!seq) { 2221 cb_fn(cb_arg, -ENOMEM); 2222 return; 2223 } 2224 2225 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2226 } else { 2227 /* Queue this operation and allocate the cluster */ 2228 spdk_bs_user_op_t *op; 2229 2230 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, 2231 length); 2232 if (!op) { 2233 cb_fn(cb_arg, -ENOMEM); 2234 return; 2235 } 2236 2237 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op); 2238 } 2239 } 2240 } else { 2241 struct rw_iov_ctx *ctx; 2242 2243 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 2244 if (ctx == NULL) { 2245 cb_fn(cb_arg, -ENOMEM); 2246 return; 2247 } 2248 2249 ctx->blob = blob; 2250 ctx->channel = _channel; 2251 ctx->cb_fn = cb_fn; 2252 ctx->cb_arg = cb_arg; 2253 ctx->read = read; 2254 ctx->orig_iov = iov; 2255 ctx->iovcnt = iovcnt; 2256 ctx->io_unit_offset = offset; 2257 ctx->io_units_remaining = length; 2258 ctx->io_units_done = 0; 2259 2260 _spdk_rw_iov_split_next(ctx, 0); 2261 } 2262 } 2263 2264 static struct spdk_blob * 2265 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 2266 { 2267 struct spdk_blob *blob; 2268 2269 TAILQ_FOREACH(blob, &bs->blobs, link) { 2270 if (blob->id == blobid) { 2271 return blob; 2272 } 2273 } 2274 2275 return NULL; 2276 } 2277 2278 static void 2279 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob, 2280 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry) 2281 { 2282 assert(blob != NULL); 2283 *snapshot_entry = NULL; 2284 *clone_entry = NULL; 2285 2286 if (blob->parent_id == SPDK_BLOBID_INVALID) { 2287 return; 2288 } 2289 2290 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) { 2291 if ((*snapshot_entry)->id == blob->parent_id) { 2292 break; 2293 } 2294 } 2295 2296 if (*snapshot_entry != NULL) { 2297 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) { 2298 if ((*clone_entry)->id == blob->id) { 2299 break; 2300 } 2301 } 2302 2303 assert(clone_entry != NULL); 2304 } 2305 } 2306 2307 static int 2308 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 2309 { 2310 struct spdk_blob_store *bs = io_device; 2311 struct spdk_bs_channel *channel = ctx_buf; 2312 struct spdk_bs_dev *dev; 2313 uint32_t max_ops = bs->max_channel_ops; 2314 uint32_t i; 2315 2316 dev = bs->dev; 2317 2318 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 2319 if (!channel->req_mem) { 2320 return -1; 2321 } 2322 2323 TAILQ_INIT(&channel->reqs); 2324 2325 for (i = 0; i < max_ops; i++) { 2326 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 2327 } 2328 2329 channel->bs = bs; 2330 channel->dev = dev; 2331 channel->dev_channel = dev->create_channel(dev); 2332 2333 if (!channel->dev_channel) { 2334 SPDK_ERRLOG("Failed to create device channel.\n"); 2335 free(channel->req_mem); 2336 return -1; 2337 } 2338 2339 TAILQ_INIT(&channel->need_cluster_alloc); 2340 TAILQ_INIT(&channel->queued_io); 2341 2342 return 0; 2343 } 2344 2345 static void 2346 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 2347 { 2348 struct spdk_bs_channel *channel = ctx_buf; 2349 spdk_bs_user_op_t *op; 2350 2351 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) { 2352 op = TAILQ_FIRST(&channel->need_cluster_alloc); 2353 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link); 2354 spdk_bs_user_op_abort(op); 2355 } 2356 2357 while (!TAILQ_EMPTY(&channel->queued_io)) { 2358 op = TAILQ_FIRST(&channel->queued_io); 2359 TAILQ_REMOVE(&channel->queued_io, op, link); 2360 spdk_bs_user_op_abort(op); 2361 } 2362 2363 free(channel->req_mem); 2364 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 2365 } 2366 2367 static void 2368 _spdk_bs_dev_destroy(void *io_device) 2369 { 2370 struct spdk_blob_store *bs = io_device; 2371 struct spdk_blob *blob, *blob_tmp; 2372 2373 bs->dev->destroy(bs->dev); 2374 2375 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 2376 TAILQ_REMOVE(&bs->blobs, blob, link); 2377 _spdk_blob_free(blob); 2378 } 2379 2380 pthread_mutex_destroy(&bs->used_clusters_mutex); 2381 2382 spdk_bit_array_free(&bs->used_blobids); 2383 spdk_bit_array_free(&bs->used_md_pages); 2384 spdk_bit_array_free(&bs->used_clusters); 2385 /* 2386 * If this function is called for any reason except a successful unload, 2387 * the unload_cpl type will be NONE and this will be a nop. 2388 */ 2389 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 2390 2391 free(bs); 2392 } 2393 2394 static int 2395 _spdk_bs_blob_list_add(struct spdk_blob *blob) 2396 { 2397 spdk_blob_id snapshot_id; 2398 struct spdk_blob_list *snapshot_entry = NULL; 2399 struct spdk_blob_list *clone_entry = NULL; 2400 2401 assert(blob != NULL); 2402 2403 snapshot_id = blob->parent_id; 2404 if (snapshot_id == SPDK_BLOBID_INVALID) { 2405 return 0; 2406 } 2407 2408 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id); 2409 if (snapshot_entry == NULL) { 2410 /* Snapshot not found */ 2411 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list)); 2412 if (snapshot_entry == NULL) { 2413 return -ENOMEM; 2414 } 2415 snapshot_entry->id = snapshot_id; 2416 TAILQ_INIT(&snapshot_entry->clones); 2417 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link); 2418 } else { 2419 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 2420 if (clone_entry->id == blob->id) { 2421 break; 2422 } 2423 } 2424 } 2425 2426 if (clone_entry == NULL) { 2427 /* Clone not found */ 2428 clone_entry = calloc(1, sizeof(struct spdk_blob_list)); 2429 if (clone_entry == NULL) { 2430 return -ENOMEM; 2431 } 2432 clone_entry->id = blob->id; 2433 TAILQ_INIT(&clone_entry->clones); 2434 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link); 2435 snapshot_entry->clone_count++; 2436 } 2437 2438 return 0; 2439 } 2440 2441 static void 2442 _spdk_bs_blob_list_remove(struct spdk_blob *blob) 2443 { 2444 struct spdk_blob_list *snapshot_entry = NULL; 2445 struct spdk_blob_list *clone_entry = NULL; 2446 2447 _spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry); 2448 2449 if (snapshot_entry == NULL) { 2450 return; 2451 } 2452 2453 blob->parent_id = SPDK_BLOBID_INVALID; 2454 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2455 free(clone_entry); 2456 2457 snapshot_entry->clone_count--; 2458 } 2459 2460 static int 2461 _spdk_bs_blob_list_free(struct spdk_blob_store *bs) 2462 { 2463 struct spdk_blob_list *snapshot_entry; 2464 struct spdk_blob_list *snapshot_entry_tmp; 2465 struct spdk_blob_list *clone_entry; 2466 struct spdk_blob_list *clone_entry_tmp; 2467 2468 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) { 2469 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) { 2470 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2471 free(clone_entry); 2472 } 2473 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link); 2474 free(snapshot_entry); 2475 } 2476 2477 return 0; 2478 } 2479 2480 static void 2481 _spdk_bs_free(struct spdk_blob_store *bs) 2482 { 2483 _spdk_bs_blob_list_free(bs); 2484 2485 spdk_bs_unregister_md_thread(bs); 2486 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 2487 } 2488 2489 void 2490 spdk_bs_opts_init(struct spdk_bs_opts *opts) 2491 { 2492 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 2493 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 2494 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 2495 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 2496 opts->clear_method = BS_CLEAR_WITH_UNMAP; 2497 memset(&opts->bstype, 0, sizeof(opts->bstype)); 2498 opts->iter_cb_fn = NULL; 2499 opts->iter_cb_arg = NULL; 2500 } 2501 2502 static int 2503 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 2504 { 2505 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 2506 opts->max_channel_ops == 0) { 2507 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 2508 return -1; 2509 } 2510 2511 return 0; 2512 } 2513 2514 static int 2515 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs) 2516 { 2517 struct spdk_blob_store *bs; 2518 uint64_t dev_size; 2519 int rc; 2520 2521 dev_size = dev->blocklen * dev->blockcnt; 2522 if (dev_size < opts->cluster_sz) { 2523 /* Device size cannot be smaller than cluster size of blobstore */ 2524 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n", 2525 dev_size, opts->cluster_sz); 2526 return -ENOSPC; 2527 } 2528 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 2529 /* Cluster size cannot be smaller than page size */ 2530 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n", 2531 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 2532 return -EINVAL; 2533 } 2534 bs = calloc(1, sizeof(struct spdk_blob_store)); 2535 if (!bs) { 2536 return -ENOMEM; 2537 } 2538 2539 TAILQ_INIT(&bs->blobs); 2540 TAILQ_INIT(&bs->snapshots); 2541 bs->dev = dev; 2542 bs->md_thread = spdk_get_thread(); 2543 assert(bs->md_thread != NULL); 2544 2545 /* 2546 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 2547 * even multiple of the cluster size. 2548 */ 2549 bs->cluster_sz = opts->cluster_sz; 2550 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 2551 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2552 bs->num_free_clusters = bs->total_clusters; 2553 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 2554 bs->io_unit_size = dev->blocklen; 2555 if (bs->used_clusters == NULL) { 2556 free(bs); 2557 return -ENOMEM; 2558 } 2559 2560 bs->max_channel_ops = opts->max_channel_ops; 2561 bs->super_blob = SPDK_BLOBID_INVALID; 2562 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 2563 2564 /* The metadata is assumed to be at least 1 page */ 2565 bs->used_md_pages = spdk_bit_array_create(1); 2566 bs->used_blobids = spdk_bit_array_create(0); 2567 2568 pthread_mutex_init(&bs->used_clusters_mutex, NULL); 2569 2570 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 2571 sizeof(struct spdk_bs_channel), "blobstore"); 2572 rc = spdk_bs_register_md_thread(bs); 2573 if (rc == -1) { 2574 spdk_io_device_unregister(bs, NULL); 2575 pthread_mutex_destroy(&bs->used_clusters_mutex); 2576 spdk_bit_array_free(&bs->used_blobids); 2577 spdk_bit_array_free(&bs->used_md_pages); 2578 spdk_bit_array_free(&bs->used_clusters); 2579 free(bs); 2580 /* FIXME: this is a lie but don't know how to get a proper error code here */ 2581 return -ENOMEM; 2582 } 2583 2584 *_bs = bs; 2585 return 0; 2586 } 2587 2588 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 2589 2590 struct spdk_bs_load_ctx { 2591 struct spdk_blob_store *bs; 2592 struct spdk_bs_super_block *super; 2593 2594 struct spdk_bs_md_mask *mask; 2595 bool in_page_chain; 2596 uint32_t page_index; 2597 uint32_t cur_page; 2598 struct spdk_blob_md_page *page; 2599 2600 spdk_bs_sequence_t *seq; 2601 spdk_blob_op_with_handle_complete iter_cb_fn; 2602 void *iter_cb_arg; 2603 struct spdk_blob *blob; 2604 spdk_blob_id blobid; 2605 }; 2606 2607 static void 2608 _spdk_bs_load_ctx_fail(struct spdk_bs_load_ctx *ctx, int bserrno) 2609 { 2610 assert(bserrno != 0); 2611 2612 spdk_free(ctx->super); 2613 spdk_bs_sequence_finish(ctx->seq, bserrno); 2614 _spdk_bs_free(ctx->bs); 2615 free(ctx); 2616 } 2617 2618 static void 2619 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 2620 { 2621 uint32_t i = 0; 2622 2623 while (true) { 2624 i = spdk_bit_array_find_first_set(array, i); 2625 if (i >= mask->length) { 2626 break; 2627 } 2628 mask->mask[i / 8] |= 1U << (i % 8); 2629 i++; 2630 } 2631 } 2632 2633 static int 2634 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask) 2635 { 2636 struct spdk_bit_array *array; 2637 uint32_t i; 2638 2639 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) { 2640 return -ENOMEM; 2641 } 2642 2643 array = *array_ptr; 2644 for (i = 0; i < mask->length; i++) { 2645 if (mask->mask[i / 8] & (1U << (i % 8))) { 2646 spdk_bit_array_set(array, i); 2647 } 2648 } 2649 2650 return 0; 2651 } 2652 2653 static void 2654 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 2655 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 2656 { 2657 /* Update the values in the super block */ 2658 super->super_blob = bs->super_blob; 2659 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 2660 super->crc = _spdk_blob_md_page_calc_crc(super); 2661 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0), 2662 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 2663 cb_fn, cb_arg); 2664 } 2665 2666 static void 2667 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2668 { 2669 struct spdk_bs_load_ctx *ctx = arg; 2670 uint64_t mask_size, lba, lba_count; 2671 2672 /* Write out the used clusters mask */ 2673 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 2674 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2675 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2676 if (!ctx->mask) { 2677 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2678 return; 2679 } 2680 2681 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 2682 ctx->mask->length = ctx->bs->total_clusters; 2683 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 2684 2685 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 2686 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 2687 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 2688 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2689 } 2690 2691 static void 2692 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2693 { 2694 struct spdk_bs_load_ctx *ctx = arg; 2695 uint64_t mask_size, lba, lba_count; 2696 2697 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 2698 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2699 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2700 if (!ctx->mask) { 2701 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2702 return; 2703 } 2704 2705 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 2706 ctx->mask->length = ctx->super->md_len; 2707 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 2708 2709 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 2710 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 2711 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 2712 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2713 } 2714 2715 static void 2716 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2717 { 2718 struct spdk_bs_load_ctx *ctx = arg; 2719 uint64_t mask_size, lba, lba_count; 2720 2721 if (ctx->super->used_blobid_mask_len == 0) { 2722 /* 2723 * This is a pre-v3 on-disk format where the blobid mask does not get 2724 * written to disk. 2725 */ 2726 cb_fn(seq, arg, 0); 2727 return; 2728 } 2729 2730 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2731 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2732 SPDK_MALLOC_DMA); 2733 if (!ctx->mask) { 2734 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2735 return; 2736 } 2737 2738 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 2739 ctx->mask->length = ctx->super->md_len; 2740 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 2741 2742 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask); 2743 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2744 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2745 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2746 } 2747 2748 static void 2749 _spdk_blob_set_thin_provision(struct spdk_blob *blob) 2750 { 2751 _spdk_blob_verify_md_op(blob); 2752 blob->invalid_flags |= SPDK_BLOB_THIN_PROV; 2753 blob->state = SPDK_BLOB_STATE_DIRTY; 2754 } 2755 2756 static void 2757 _spdk_blob_set_clear_method(struct spdk_blob *blob, enum blob_clear_method clear_method) 2758 { 2759 _spdk_blob_verify_md_op(blob); 2760 blob->clear_method = clear_method; 2761 blob->md_ro_flags |= (clear_method << SPDK_BLOB_CLEAR_METHOD_SHIFT); 2762 blob->state = SPDK_BLOB_STATE_DIRTY; 2763 } 2764 2765 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno); 2766 2767 static void 2768 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno) 2769 { 2770 struct spdk_bs_load_ctx *ctx = cb_arg; 2771 spdk_blob_id id; 2772 int64_t page_num; 2773 2774 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our 2775 * last blob has been removed */ 2776 page_num = _spdk_bs_blobid_to_page(ctx->blobid); 2777 page_num++; 2778 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num); 2779 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) { 2780 _spdk_bs_load_iter(ctx, NULL, -ENOENT); 2781 return; 2782 } 2783 2784 id = _spdk_bs_page_to_blobid(page_num); 2785 2786 spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx); 2787 } 2788 2789 static void 2790 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno) 2791 { 2792 struct spdk_bs_load_ctx *ctx = cb_arg; 2793 2794 if (bserrno != 0) { 2795 SPDK_ERRLOG("Failed to close corrupted blob\n"); 2796 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2797 return; 2798 } 2799 2800 spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx); 2801 } 2802 2803 static void 2804 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno) 2805 { 2806 struct spdk_bs_load_ctx *ctx = cb_arg; 2807 uint64_t i; 2808 2809 if (bserrno != 0) { 2810 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2811 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2812 return; 2813 } 2814 2815 /* Snapshot and clone have the same copy of cluster map at this point. 2816 * Let's clear cluster map for snpashot now so that it won't be cleared 2817 * for clone later when we remove snapshot. Also set thin provision to 2818 * pass data corruption check */ 2819 for (i = 0; i < ctx->blob->active.num_clusters; i++) { 2820 ctx->blob->active.clusters[i] = 0; 2821 } 2822 2823 ctx->blob->md_ro = false; 2824 2825 _spdk_blob_set_thin_provision(ctx->blob); 2826 2827 ctx->blobid = ctx->blob->id; 2828 2829 spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx); 2830 } 2831 2832 static void 2833 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno) 2834 { 2835 struct spdk_bs_load_ctx *ctx = cb_arg; 2836 2837 if (bserrno != 0) { 2838 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2839 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2840 return; 2841 } 2842 2843 ctx->blob->md_ro = false; 2844 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true); 2845 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true); 2846 spdk_blob_set_read_only(ctx->blob); 2847 2848 if (ctx->iter_cb_fn) { 2849 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0); 2850 } 2851 _spdk_bs_blob_list_add(ctx->blob); 2852 2853 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2854 } 2855 2856 static void 2857 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno) 2858 { 2859 struct spdk_bs_load_ctx *ctx = cb_arg; 2860 2861 if (bserrno != 0) { 2862 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n"); 2863 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2864 return; 2865 } 2866 2867 if (blob->parent_id == ctx->blob->id) { 2868 /* Power failure occured before updating clone (snapshot delete case) 2869 * or after updating clone (creating snapshot case) - keep snapshot */ 2870 spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx); 2871 } else { 2872 /* Power failure occured after updating clone (snapshot delete case) 2873 * or before updating clone (creating snapshot case) - remove snapshot */ 2874 spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx); 2875 } 2876 } 2877 2878 static void 2879 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno) 2880 { 2881 struct spdk_bs_load_ctx *ctx = arg; 2882 const void *value; 2883 size_t len; 2884 int rc = 0; 2885 2886 if (bserrno == 0) { 2887 /* Examine blob if it is corrupted after power failure. Fix 2888 * the ones that can be fixed and remove any other corrupted 2889 * ones. If it is not corrupted just process it */ 2890 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true); 2891 if (rc != 0) { 2892 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true); 2893 if (rc != 0) { 2894 /* Not corrupted - process it and continue with iterating through blobs */ 2895 if (ctx->iter_cb_fn) { 2896 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0); 2897 } 2898 _spdk_bs_blob_list_add(blob); 2899 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx); 2900 return; 2901 } 2902 2903 } 2904 2905 assert(len == sizeof(spdk_blob_id)); 2906 2907 ctx->blob = blob; 2908 2909 /* Open clone to check if we are able to fix this blob or should we remove it */ 2910 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx); 2911 return; 2912 } else if (bserrno == -ENOENT) { 2913 bserrno = 0; 2914 } else { 2915 /* 2916 * This case needs to be looked at further. Same problem 2917 * exists with applications that rely on explicit blob 2918 * iteration. We should just skip the blob that failed 2919 * to load and continue on to the next one. 2920 */ 2921 SPDK_ERRLOG("Error in iterating blobs\n"); 2922 } 2923 2924 ctx->iter_cb_fn = NULL; 2925 2926 spdk_free(ctx->super); 2927 spdk_free(ctx->mask); 2928 spdk_bs_sequence_finish(ctx->seq, bserrno); 2929 free(ctx); 2930 } 2931 2932 static void 2933 _spdk_bs_load_complete(struct spdk_bs_load_ctx *ctx) 2934 { 2935 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx); 2936 } 2937 2938 static void 2939 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2940 { 2941 struct spdk_bs_load_ctx *ctx = cb_arg; 2942 int rc; 2943 2944 /* The type must be correct */ 2945 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 2946 2947 /* The length of the mask (in bits) must not be greater than 2948 * the length of the buffer (converted to bits) */ 2949 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 2950 2951 /* The length of the mask must be exactly equal to the size 2952 * (in pages) of the metadata region */ 2953 assert(ctx->mask->length == ctx->super->md_len); 2954 2955 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask); 2956 if (rc < 0) { 2957 spdk_free(ctx->mask); 2958 _spdk_bs_load_ctx_fail(ctx, rc); 2959 return; 2960 } 2961 2962 _spdk_bs_load_complete(ctx); 2963 } 2964 2965 static void 2966 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2967 { 2968 struct spdk_bs_load_ctx *ctx = cb_arg; 2969 uint64_t lba, lba_count, mask_size; 2970 int rc; 2971 2972 if (bserrno != 0) { 2973 _spdk_bs_load_ctx_fail(ctx, bserrno); 2974 return; 2975 } 2976 2977 /* The type must be correct */ 2978 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 2979 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2980 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 2981 struct spdk_blob_md_page) * 8)); 2982 /* The length of the mask must be exactly equal to the total number of clusters */ 2983 assert(ctx->mask->length == ctx->bs->total_clusters); 2984 2985 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask); 2986 if (rc < 0) { 2987 spdk_free(ctx->mask); 2988 _spdk_bs_load_ctx_fail(ctx, rc); 2989 return; 2990 } 2991 2992 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters); 2993 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters); 2994 2995 spdk_free(ctx->mask); 2996 2997 /* Read the used blobids mask */ 2998 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2999 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3000 SPDK_MALLOC_DMA); 3001 if (!ctx->mask) { 3002 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3003 return; 3004 } 3005 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 3006 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 3007 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3008 _spdk_bs_load_used_blobids_cpl, ctx); 3009 } 3010 3011 static void 3012 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3013 { 3014 struct spdk_bs_load_ctx *ctx = cb_arg; 3015 uint64_t lba, lba_count, mask_size; 3016 int rc; 3017 3018 if (bserrno != 0) { 3019 _spdk_bs_load_ctx_fail(ctx, bserrno); 3020 return; 3021 } 3022 3023 /* The type must be correct */ 3024 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 3025 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 3026 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 3027 8)); 3028 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 3029 assert(ctx->mask->length == ctx->super->md_len); 3030 3031 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask); 3032 if (rc < 0) { 3033 spdk_free(ctx->mask); 3034 _spdk_bs_load_ctx_fail(ctx, rc); 3035 return; 3036 } 3037 3038 spdk_free(ctx->mask); 3039 3040 /* Read the used clusters mask */ 3041 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 3042 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3043 SPDK_MALLOC_DMA); 3044 if (!ctx->mask) { 3045 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3046 return; 3047 } 3048 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 3049 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 3050 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3051 _spdk_bs_load_used_clusters_cpl, ctx); 3052 } 3053 3054 static void 3055 _spdk_bs_load_read_used_pages(struct spdk_bs_load_ctx *ctx) 3056 { 3057 uint64_t lba, lba_count, mask_size; 3058 3059 /* Read the used pages mask */ 3060 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3061 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3062 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3063 if (!ctx->mask) { 3064 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3065 return; 3066 } 3067 3068 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3069 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3070 spdk_bs_sequence_read_dev(ctx->seq, ctx->mask, lba, lba_count, 3071 _spdk_bs_load_used_pages_cpl, ctx); 3072 } 3073 3074 static int 3075 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 3076 { 3077 struct spdk_blob_md_descriptor *desc; 3078 size_t cur_desc = 0; 3079 3080 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3081 while (cur_desc < sizeof(page->descriptors)) { 3082 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3083 if (desc->length == 0) { 3084 /* If padding and length are 0, this terminates the page */ 3085 break; 3086 } 3087 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3088 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3089 unsigned int i, j; 3090 unsigned int cluster_count = 0; 3091 uint32_t cluster_idx; 3092 3093 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3094 3095 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3096 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 3097 cluster_idx = desc_extent_rle->extents[i].cluster_idx; 3098 /* 3099 * cluster_idx = 0 means an unallocated cluster - don't mark that 3100 * in the used cluster map. 3101 */ 3102 if (cluster_idx != 0) { 3103 spdk_bit_array_set(bs->used_clusters, cluster_idx + j); 3104 if (bs->num_free_clusters == 0) { 3105 return -ENOSPC; 3106 } 3107 bs->num_free_clusters--; 3108 } 3109 cluster_count++; 3110 } 3111 } 3112 if (cluster_count == 0) { 3113 return -EINVAL; 3114 } 3115 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3116 /* Skip this item */ 3117 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3118 /* Skip this item */ 3119 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3120 /* Skip this item */ 3121 } else { 3122 /* Error */ 3123 return -EINVAL; 3124 } 3125 /* Advance to the next descriptor */ 3126 cur_desc += sizeof(*desc) + desc->length; 3127 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3128 break; 3129 } 3130 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3131 } 3132 return 0; 3133 } 3134 3135 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 3136 { 3137 uint32_t crc; 3138 3139 crc = _spdk_blob_md_page_calc_crc(ctx->page); 3140 if (crc != ctx->page->crc) { 3141 return false; 3142 } 3143 3144 if (ctx->page->sequence_num == 0 && 3145 _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 3146 return false; 3147 } 3148 return true; 3149 } 3150 3151 static void 3152 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx); 3153 3154 static void 3155 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3156 { 3157 struct spdk_bs_load_ctx *ctx = cb_arg; 3158 3159 if (bserrno != 0) { 3160 _spdk_bs_load_ctx_fail(ctx, bserrno); 3161 return; 3162 } 3163 3164 _spdk_bs_load_complete(ctx); 3165 } 3166 3167 static void 3168 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3169 { 3170 struct spdk_bs_load_ctx *ctx = cb_arg; 3171 3172 spdk_free(ctx->mask); 3173 ctx->mask = NULL; 3174 3175 if (bserrno != 0) { 3176 _spdk_bs_load_ctx_fail(ctx, bserrno); 3177 return; 3178 } 3179 3180 _spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_load_write_used_clusters_cpl); 3181 } 3182 3183 static void 3184 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3185 { 3186 struct spdk_bs_load_ctx *ctx = cb_arg; 3187 3188 spdk_free(ctx->mask); 3189 ctx->mask = NULL; 3190 3191 if (bserrno != 0) { 3192 _spdk_bs_load_ctx_fail(ctx, bserrno); 3193 return; 3194 } 3195 3196 _spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_load_write_used_blobids_cpl); 3197 } 3198 3199 static void 3200 _spdk_bs_load_write_used_md(struct spdk_bs_load_ctx *ctx) 3201 { 3202 _spdk_bs_write_used_md(ctx->seq, ctx, _spdk_bs_load_write_used_pages_cpl); 3203 } 3204 3205 static void 3206 _spdk_bs_load_replay_md_chain_cpl(struct spdk_bs_load_ctx *ctx) 3207 { 3208 uint64_t num_md_clusters; 3209 uint64_t i; 3210 3211 ctx->in_page_chain = false; 3212 3213 do { 3214 ctx->page_index++; 3215 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 3216 3217 if (ctx->page_index < ctx->super->md_len) { 3218 ctx->cur_page = ctx->page_index; 3219 _spdk_bs_load_replay_cur_md_page(ctx); 3220 } else { 3221 /* Claim all of the clusters used by the metadata */ 3222 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster); 3223 for (i = 0; i < num_md_clusters; i++) { 3224 _spdk_bs_claim_cluster(ctx->bs, i); 3225 } 3226 spdk_free(ctx->page); 3227 _spdk_bs_load_write_used_md(ctx); 3228 } 3229 } 3230 3231 static void 3232 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3233 { 3234 struct spdk_bs_load_ctx *ctx = cb_arg; 3235 uint32_t page_num; 3236 3237 if (bserrno != 0) { 3238 _spdk_bs_load_ctx_fail(ctx, bserrno); 3239 return; 3240 } 3241 3242 page_num = ctx->cur_page; 3243 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 3244 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 3245 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 3246 if (ctx->page->sequence_num == 0) { 3247 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 3248 } 3249 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 3250 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3251 return; 3252 } 3253 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 3254 ctx->in_page_chain = true; 3255 ctx->cur_page = ctx->page->next; 3256 _spdk_bs_load_replay_cur_md_page(ctx); 3257 return; 3258 } 3259 } 3260 } 3261 _spdk_bs_load_replay_md_chain_cpl(ctx); 3262 } 3263 3264 static void 3265 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx) 3266 { 3267 uint64_t lba; 3268 3269 assert(ctx->cur_page < ctx->super->md_len); 3270 lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page); 3271 spdk_bs_sequence_read_dev(ctx->seq, ctx->page, lba, 3272 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3273 _spdk_bs_load_replay_md_cpl, ctx); 3274 } 3275 3276 static void 3277 _spdk_bs_load_replay_md(struct spdk_bs_load_ctx *ctx) 3278 { 3279 ctx->page_index = 0; 3280 ctx->cur_page = 0; 3281 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3282 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3283 if (!ctx->page) { 3284 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3285 return; 3286 } 3287 _spdk_bs_load_replay_cur_md_page(ctx); 3288 } 3289 3290 static void 3291 _spdk_bs_recover(struct spdk_bs_load_ctx *ctx) 3292 { 3293 int rc; 3294 3295 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 3296 if (rc < 0) { 3297 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3298 return; 3299 } 3300 3301 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 3302 if (rc < 0) { 3303 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3304 return; 3305 } 3306 3307 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3308 if (rc < 0) { 3309 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3310 return; 3311 } 3312 3313 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 3314 _spdk_bs_load_replay_md(ctx); 3315 } 3316 3317 static void 3318 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3319 { 3320 struct spdk_bs_load_ctx *ctx = cb_arg; 3321 uint32_t crc; 3322 int rc; 3323 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 3324 3325 if (ctx->super->version > SPDK_BS_VERSION || 3326 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 3327 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3328 return; 3329 } 3330 3331 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3332 sizeof(ctx->super->signature)) != 0) { 3333 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3334 return; 3335 } 3336 3337 crc = _spdk_blob_md_page_calc_crc(ctx->super); 3338 if (crc != ctx->super->crc) { 3339 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3340 return; 3341 } 3342 3343 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3344 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 3345 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3346 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 3347 } else { 3348 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 3349 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3350 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3351 _spdk_bs_load_ctx_fail(ctx, -ENXIO); 3352 return; 3353 } 3354 3355 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) { 3356 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n", 3357 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size); 3358 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3359 return; 3360 } 3361 3362 if (ctx->super->size == 0) { 3363 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen; 3364 } 3365 3366 if (ctx->super->io_unit_size == 0) { 3367 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE; 3368 } 3369 3370 /* Parse the super block */ 3371 ctx->bs->clean = 1; 3372 ctx->bs->cluster_sz = ctx->super->cluster_size; 3373 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size; 3374 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 3375 ctx->bs->io_unit_size = ctx->super->io_unit_size; 3376 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3377 if (rc < 0) { 3378 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3379 return; 3380 } 3381 ctx->bs->md_start = ctx->super->md_start; 3382 ctx->bs->md_len = ctx->super->md_len; 3383 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up( 3384 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 3385 ctx->bs->super_blob = ctx->super->super_blob; 3386 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 3387 3388 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) { 3389 _spdk_bs_recover(ctx); 3390 } else { 3391 _spdk_bs_load_read_used_pages(ctx); 3392 } 3393 } 3394 3395 void 3396 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3397 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3398 { 3399 struct spdk_blob_store *bs; 3400 struct spdk_bs_cpl cpl; 3401 struct spdk_bs_load_ctx *ctx; 3402 struct spdk_bs_opts opts = {}; 3403 int err; 3404 3405 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 3406 3407 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3408 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen); 3409 dev->destroy(dev); 3410 cb_fn(cb_arg, NULL, -EINVAL); 3411 return; 3412 } 3413 3414 if (o) { 3415 opts = *o; 3416 } else { 3417 spdk_bs_opts_init(&opts); 3418 } 3419 3420 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 3421 dev->destroy(dev); 3422 cb_fn(cb_arg, NULL, -EINVAL); 3423 return; 3424 } 3425 3426 err = _spdk_bs_alloc(dev, &opts, &bs); 3427 if (err) { 3428 dev->destroy(dev); 3429 cb_fn(cb_arg, NULL, err); 3430 return; 3431 } 3432 3433 ctx = calloc(1, sizeof(*ctx)); 3434 if (!ctx) { 3435 _spdk_bs_free(bs); 3436 cb_fn(cb_arg, NULL, -ENOMEM); 3437 return; 3438 } 3439 3440 ctx->bs = bs; 3441 ctx->iter_cb_fn = opts.iter_cb_fn; 3442 ctx->iter_cb_arg = opts.iter_cb_arg; 3443 3444 /* Allocate memory for the super block */ 3445 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3446 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3447 if (!ctx->super) { 3448 free(ctx); 3449 _spdk_bs_free(bs); 3450 cb_fn(cb_arg, NULL, -ENOMEM); 3451 return; 3452 } 3453 3454 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3455 cpl.u.bs_handle.cb_fn = cb_fn; 3456 cpl.u.bs_handle.cb_arg = cb_arg; 3457 cpl.u.bs_handle.bs = bs; 3458 3459 ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3460 if (!ctx->seq) { 3461 spdk_free(ctx->super); 3462 free(ctx); 3463 _spdk_bs_free(bs); 3464 cb_fn(cb_arg, NULL, -ENOMEM); 3465 return; 3466 } 3467 3468 /* Read the super block */ 3469 spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3470 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3471 _spdk_bs_load_super_cpl, ctx); 3472 } 3473 3474 /* END spdk_bs_load */ 3475 3476 /* START spdk_bs_dump */ 3477 3478 struct spdk_bs_dump_ctx { 3479 struct spdk_blob_store *bs; 3480 struct spdk_bs_super_block *super; 3481 uint32_t cur_page; 3482 struct spdk_blob_md_page *page; 3483 spdk_bs_sequence_t *seq; 3484 FILE *fp; 3485 spdk_bs_dump_print_xattr print_xattr_fn; 3486 char xattr_name[4096]; 3487 }; 3488 3489 static void 3490 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno) 3491 { 3492 spdk_free(ctx->super); 3493 3494 /* 3495 * We need to defer calling spdk_bs_call_cpl() until after 3496 * dev destruction, so tuck these away for later use. 3497 */ 3498 ctx->bs->unload_err = bserrno; 3499 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3500 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3501 3502 spdk_bs_sequence_finish(seq, 0); 3503 _spdk_bs_free(ctx->bs); 3504 free(ctx); 3505 } 3506 3507 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3508 3509 static void 3510 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx) 3511 { 3512 uint32_t page_idx = ctx->cur_page; 3513 struct spdk_blob_md_page *page = ctx->page; 3514 struct spdk_blob_md_descriptor *desc; 3515 size_t cur_desc = 0; 3516 uint32_t crc; 3517 3518 fprintf(ctx->fp, "=========\n"); 3519 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx); 3520 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id); 3521 3522 crc = _spdk_blob_md_page_calc_crc(page); 3523 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch"); 3524 3525 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3526 while (cur_desc < sizeof(page->descriptors)) { 3527 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3528 if (desc->length == 0) { 3529 /* If padding and length are 0, this terminates the page */ 3530 break; 3531 } 3532 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3533 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3534 unsigned int i; 3535 3536 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3537 3538 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3539 if (desc_extent_rle->extents[i].cluster_idx != 0) { 3540 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 3541 desc_extent_rle->extents[i].cluster_idx); 3542 } else { 3543 fprintf(ctx->fp, "Unallocated Extent - "); 3544 } 3545 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length); 3546 fprintf(ctx->fp, "\n"); 3547 } 3548 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3549 struct spdk_blob_md_descriptor_xattr *desc_xattr; 3550 uint32_t i; 3551 3552 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 3553 3554 if (desc_xattr->length != 3555 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) + 3556 desc_xattr->name_length + desc_xattr->value_length) { 3557 } 3558 3559 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length); 3560 ctx->xattr_name[desc_xattr->name_length] = '\0'; 3561 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name); 3562 fprintf(ctx->fp, " value = \""); 3563 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name, 3564 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 3565 desc_xattr->value_length); 3566 fprintf(ctx->fp, "\"\n"); 3567 for (i = 0; i < desc_xattr->value_length; i++) { 3568 if (i % 16 == 0) { 3569 fprintf(ctx->fp, " "); 3570 } 3571 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i)); 3572 if ((i + 1) % 16 == 0) { 3573 fprintf(ctx->fp, "\n"); 3574 } 3575 } 3576 if (i % 16 != 0) { 3577 fprintf(ctx->fp, "\n"); 3578 } 3579 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3580 /* TODO */ 3581 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3582 /* TODO */ 3583 } else { 3584 /* Error */ 3585 } 3586 /* Advance to the next descriptor */ 3587 cur_desc += sizeof(*desc) + desc->length; 3588 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3589 break; 3590 } 3591 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3592 } 3593 } 3594 3595 static void 3596 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3597 { 3598 struct spdk_bs_dump_ctx *ctx = cb_arg; 3599 3600 if (bserrno != 0) { 3601 _spdk_bs_dump_finish(seq, ctx, bserrno); 3602 return; 3603 } 3604 3605 if (ctx->page->id != 0) { 3606 _spdk_bs_dump_print_md_page(ctx); 3607 } 3608 3609 ctx->cur_page++; 3610 3611 if (ctx->cur_page < ctx->super->md_len) { 3612 _spdk_bs_dump_read_md_page(seq, ctx); 3613 } else { 3614 spdk_free(ctx->page); 3615 _spdk_bs_dump_finish(seq, ctx, 0); 3616 } 3617 } 3618 3619 static void 3620 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3621 { 3622 struct spdk_bs_dump_ctx *ctx = cb_arg; 3623 uint64_t lba; 3624 3625 assert(ctx->cur_page < ctx->super->md_len); 3626 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 3627 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3628 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3629 _spdk_bs_dump_read_md_page_cpl, ctx); 3630 } 3631 3632 static void 3633 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3634 { 3635 struct spdk_bs_dump_ctx *ctx = cb_arg; 3636 3637 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature); 3638 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3639 sizeof(ctx->super->signature)) != 0) { 3640 fprintf(ctx->fp, "(Mismatch)\n"); 3641 _spdk_bs_dump_finish(seq, ctx, bserrno); 3642 return; 3643 } else { 3644 fprintf(ctx->fp, "(OK)\n"); 3645 } 3646 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version); 3647 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc, 3648 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch"); 3649 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype); 3650 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size); 3651 fprintf(ctx->fp, "Super Blob ID: "); 3652 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) { 3653 fprintf(ctx->fp, "(None)\n"); 3654 } else { 3655 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob); 3656 } 3657 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean); 3658 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start); 3659 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len); 3660 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start); 3661 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len); 3662 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start); 3663 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len); 3664 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start); 3665 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len); 3666 3667 ctx->cur_page = 0; 3668 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3669 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3670 if (!ctx->page) { 3671 _spdk_bs_dump_finish(seq, ctx, -ENOMEM); 3672 return; 3673 } 3674 _spdk_bs_dump_read_md_page(seq, ctx); 3675 } 3676 3677 void 3678 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn, 3679 spdk_bs_op_complete cb_fn, void *cb_arg) 3680 { 3681 struct spdk_blob_store *bs; 3682 struct spdk_bs_cpl cpl; 3683 spdk_bs_sequence_t *seq; 3684 struct spdk_bs_dump_ctx *ctx; 3685 struct spdk_bs_opts opts = {}; 3686 int err; 3687 3688 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev); 3689 3690 spdk_bs_opts_init(&opts); 3691 3692 err = _spdk_bs_alloc(dev, &opts, &bs); 3693 if (err) { 3694 dev->destroy(dev); 3695 cb_fn(cb_arg, err); 3696 return; 3697 } 3698 3699 ctx = calloc(1, sizeof(*ctx)); 3700 if (!ctx) { 3701 _spdk_bs_free(bs); 3702 cb_fn(cb_arg, -ENOMEM); 3703 return; 3704 } 3705 3706 ctx->bs = bs; 3707 ctx->fp = fp; 3708 ctx->print_xattr_fn = print_xattr_fn; 3709 3710 /* Allocate memory for the super block */ 3711 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3712 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3713 if (!ctx->super) { 3714 free(ctx); 3715 _spdk_bs_free(bs); 3716 cb_fn(cb_arg, -ENOMEM); 3717 return; 3718 } 3719 3720 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3721 cpl.u.bs_basic.cb_fn = cb_fn; 3722 cpl.u.bs_basic.cb_arg = cb_arg; 3723 3724 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3725 if (!seq) { 3726 spdk_free(ctx->super); 3727 free(ctx); 3728 _spdk_bs_free(bs); 3729 cb_fn(cb_arg, -ENOMEM); 3730 return; 3731 } 3732 3733 /* Read the super block */ 3734 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3735 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3736 _spdk_bs_dump_super_cpl, ctx); 3737 } 3738 3739 /* END spdk_bs_dump */ 3740 3741 /* START spdk_bs_init */ 3742 3743 struct spdk_bs_init_ctx { 3744 struct spdk_blob_store *bs; 3745 struct spdk_bs_super_block *super; 3746 }; 3747 3748 static void 3749 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3750 { 3751 struct spdk_bs_init_ctx *ctx = cb_arg; 3752 3753 spdk_free(ctx->super); 3754 free(ctx); 3755 3756 spdk_bs_sequence_finish(seq, bserrno); 3757 } 3758 3759 static void 3760 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3761 { 3762 struct spdk_bs_init_ctx *ctx = cb_arg; 3763 3764 /* Write super block */ 3765 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 3766 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 3767 _spdk_bs_init_persist_super_cpl, ctx); 3768 } 3769 3770 void 3771 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3772 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3773 { 3774 struct spdk_bs_init_ctx *ctx; 3775 struct spdk_blob_store *bs; 3776 struct spdk_bs_cpl cpl; 3777 spdk_bs_sequence_t *seq; 3778 spdk_bs_batch_t *batch; 3779 uint64_t num_md_lba; 3780 uint64_t num_md_pages; 3781 uint64_t num_md_clusters; 3782 uint32_t i; 3783 struct spdk_bs_opts opts = {}; 3784 int rc; 3785 3786 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 3787 3788 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3789 SPDK_ERRLOG("unsupported dev block length of %d\n", 3790 dev->blocklen); 3791 dev->destroy(dev); 3792 cb_fn(cb_arg, NULL, -EINVAL); 3793 return; 3794 } 3795 3796 if (o) { 3797 opts = *o; 3798 } else { 3799 spdk_bs_opts_init(&opts); 3800 } 3801 3802 if (_spdk_bs_opts_verify(&opts) != 0) { 3803 dev->destroy(dev); 3804 cb_fn(cb_arg, NULL, -EINVAL); 3805 return; 3806 } 3807 3808 rc = _spdk_bs_alloc(dev, &opts, &bs); 3809 if (rc) { 3810 dev->destroy(dev); 3811 cb_fn(cb_arg, NULL, rc); 3812 return; 3813 } 3814 3815 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 3816 /* By default, allocate 1 page per cluster. 3817 * Technically, this over-allocates metadata 3818 * because more metadata will reduce the number 3819 * of usable clusters. This can be addressed with 3820 * more complex math in the future. 3821 */ 3822 bs->md_len = bs->total_clusters; 3823 } else { 3824 bs->md_len = opts.num_md_pages; 3825 } 3826 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 3827 if (rc < 0) { 3828 _spdk_bs_free(bs); 3829 cb_fn(cb_arg, NULL, -ENOMEM); 3830 return; 3831 } 3832 3833 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 3834 if (rc < 0) { 3835 _spdk_bs_free(bs); 3836 cb_fn(cb_arg, NULL, -ENOMEM); 3837 return; 3838 } 3839 3840 ctx = calloc(1, sizeof(*ctx)); 3841 if (!ctx) { 3842 _spdk_bs_free(bs); 3843 cb_fn(cb_arg, NULL, -ENOMEM); 3844 return; 3845 } 3846 3847 ctx->bs = bs; 3848 3849 /* Allocate memory for the super block */ 3850 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3851 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3852 if (!ctx->super) { 3853 free(ctx); 3854 _spdk_bs_free(bs); 3855 cb_fn(cb_arg, NULL, -ENOMEM); 3856 return; 3857 } 3858 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3859 sizeof(ctx->super->signature)); 3860 ctx->super->version = SPDK_BS_VERSION; 3861 ctx->super->length = sizeof(*ctx->super); 3862 ctx->super->super_blob = bs->super_blob; 3863 ctx->super->clean = 0; 3864 ctx->super->cluster_size = bs->cluster_sz; 3865 ctx->super->io_unit_size = bs->io_unit_size; 3866 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 3867 3868 /* Calculate how many pages the metadata consumes at the front 3869 * of the disk. 3870 */ 3871 3872 /* The super block uses 1 page */ 3873 num_md_pages = 1; 3874 3875 /* The used_md_pages mask requires 1 bit per metadata page, rounded 3876 * up to the nearest page, plus a header. 3877 */ 3878 ctx->super->used_page_mask_start = num_md_pages; 3879 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3880 spdk_divide_round_up(bs->md_len, 8), 3881 SPDK_BS_PAGE_SIZE); 3882 num_md_pages += ctx->super->used_page_mask_len; 3883 3884 /* The used_clusters mask requires 1 bit per cluster, rounded 3885 * up to the nearest page, plus a header. 3886 */ 3887 ctx->super->used_cluster_mask_start = num_md_pages; 3888 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3889 spdk_divide_round_up(bs->total_clusters, 8), 3890 SPDK_BS_PAGE_SIZE); 3891 num_md_pages += ctx->super->used_cluster_mask_len; 3892 3893 /* The used_blobids mask requires 1 bit per metadata page, rounded 3894 * up to the nearest page, plus a header. 3895 */ 3896 ctx->super->used_blobid_mask_start = num_md_pages; 3897 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3898 spdk_divide_round_up(bs->md_len, 8), 3899 SPDK_BS_PAGE_SIZE); 3900 num_md_pages += ctx->super->used_blobid_mask_len; 3901 3902 /* The metadata region size was chosen above */ 3903 ctx->super->md_start = bs->md_start = num_md_pages; 3904 ctx->super->md_len = bs->md_len; 3905 num_md_pages += bs->md_len; 3906 3907 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 3908 3909 ctx->super->size = dev->blockcnt * dev->blocklen; 3910 3911 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 3912 3913 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster); 3914 if (num_md_clusters > bs->total_clusters) { 3915 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 3916 "please decrease number of pages reserved for metadata " 3917 "or increase cluster size.\n"); 3918 spdk_free(ctx->super); 3919 free(ctx); 3920 _spdk_bs_free(bs); 3921 cb_fn(cb_arg, NULL, -ENOMEM); 3922 return; 3923 } 3924 /* Claim all of the clusters used by the metadata */ 3925 for (i = 0; i < num_md_clusters; i++) { 3926 _spdk_bs_claim_cluster(bs, i); 3927 } 3928 3929 bs->total_data_clusters = bs->num_free_clusters; 3930 3931 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3932 cpl.u.bs_handle.cb_fn = cb_fn; 3933 cpl.u.bs_handle.cb_arg = cb_arg; 3934 cpl.u.bs_handle.bs = bs; 3935 3936 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3937 if (!seq) { 3938 spdk_free(ctx->super); 3939 free(ctx); 3940 _spdk_bs_free(bs); 3941 cb_fn(cb_arg, NULL, -ENOMEM); 3942 return; 3943 } 3944 3945 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 3946 3947 /* Clear metadata space */ 3948 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba); 3949 3950 switch (opts.clear_method) { 3951 case BS_CLEAR_WITH_UNMAP: 3952 /* Trim data clusters */ 3953 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3954 break; 3955 case BS_CLEAR_WITH_WRITE_ZEROES: 3956 /* Write_zeroes to data clusters */ 3957 spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3958 break; 3959 case BS_CLEAR_WITH_NONE: 3960 default: 3961 break; 3962 } 3963 3964 spdk_bs_batch_close(batch); 3965 } 3966 3967 /* END spdk_bs_init */ 3968 3969 /* START spdk_bs_destroy */ 3970 3971 static void 3972 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3973 { 3974 struct spdk_bs_init_ctx *ctx = cb_arg; 3975 struct spdk_blob_store *bs = ctx->bs; 3976 3977 /* 3978 * We need to defer calling spdk_bs_call_cpl() until after 3979 * dev destruction, so tuck these away for later use. 3980 */ 3981 bs->unload_err = bserrno; 3982 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3983 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3984 3985 spdk_bs_sequence_finish(seq, bserrno); 3986 3987 _spdk_bs_free(bs); 3988 free(ctx); 3989 } 3990 3991 void 3992 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 3993 void *cb_arg) 3994 { 3995 struct spdk_bs_cpl cpl; 3996 spdk_bs_sequence_t *seq; 3997 struct spdk_bs_init_ctx *ctx; 3998 3999 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 4000 4001 if (!TAILQ_EMPTY(&bs->blobs)) { 4002 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4003 cb_fn(cb_arg, -EBUSY); 4004 return; 4005 } 4006 4007 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4008 cpl.u.bs_basic.cb_fn = cb_fn; 4009 cpl.u.bs_basic.cb_arg = cb_arg; 4010 4011 ctx = calloc(1, sizeof(*ctx)); 4012 if (!ctx) { 4013 cb_fn(cb_arg, -ENOMEM); 4014 return; 4015 } 4016 4017 ctx->bs = bs; 4018 4019 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4020 if (!seq) { 4021 free(ctx); 4022 cb_fn(cb_arg, -ENOMEM); 4023 return; 4024 } 4025 4026 /* Write zeroes to the super block */ 4027 spdk_bs_sequence_write_zeroes_dev(seq, 4028 _spdk_bs_page_to_lba(bs, 0), 4029 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 4030 _spdk_bs_destroy_trim_cpl, ctx); 4031 } 4032 4033 /* END spdk_bs_destroy */ 4034 4035 /* START spdk_bs_unload */ 4036 4037 static void 4038 _spdk_bs_unload_finish(struct spdk_bs_load_ctx *ctx, int bserrno) 4039 { 4040 spdk_bs_sequence_t *seq = ctx->seq; 4041 4042 spdk_free(ctx->super); 4043 4044 /* 4045 * We need to defer calling spdk_bs_call_cpl() until after 4046 * dev destruction, so tuck these away for later use. 4047 */ 4048 ctx->bs->unload_err = bserrno; 4049 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 4050 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 4051 4052 spdk_bs_sequence_finish(seq, bserrno); 4053 4054 _spdk_bs_free(ctx->bs); 4055 free(ctx); 4056 } 4057 4058 static void 4059 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4060 { 4061 struct spdk_bs_load_ctx *ctx = cb_arg; 4062 4063 _spdk_bs_unload_finish(ctx, bserrno); 4064 } 4065 4066 static void 4067 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4068 { 4069 struct spdk_bs_load_ctx *ctx = cb_arg; 4070 4071 spdk_free(ctx->mask); 4072 4073 if (bserrno != 0) { 4074 _spdk_bs_unload_finish(ctx, bserrno); 4075 return; 4076 } 4077 4078 ctx->super->clean = 1; 4079 4080 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 4081 } 4082 4083 static void 4084 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4085 { 4086 struct spdk_bs_load_ctx *ctx = cb_arg; 4087 4088 spdk_free(ctx->mask); 4089 ctx->mask = NULL; 4090 4091 if (bserrno != 0) { 4092 _spdk_bs_unload_finish(ctx, bserrno); 4093 return; 4094 } 4095 4096 _spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_unload_write_used_clusters_cpl); 4097 } 4098 4099 static void 4100 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4101 { 4102 struct spdk_bs_load_ctx *ctx = cb_arg; 4103 4104 spdk_free(ctx->mask); 4105 ctx->mask = NULL; 4106 4107 if (bserrno != 0) { 4108 _spdk_bs_unload_finish(ctx, bserrno); 4109 return; 4110 } 4111 4112 _spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_unload_write_used_blobids_cpl); 4113 } 4114 4115 static void 4116 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4117 { 4118 struct spdk_bs_load_ctx *ctx = cb_arg; 4119 4120 if (bserrno != 0) { 4121 _spdk_bs_unload_finish(ctx, bserrno); 4122 return; 4123 } 4124 4125 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 4126 } 4127 4128 void 4129 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 4130 { 4131 struct spdk_bs_cpl cpl; 4132 struct spdk_bs_load_ctx *ctx; 4133 4134 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 4135 4136 if (!TAILQ_EMPTY(&bs->blobs)) { 4137 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4138 cb_fn(cb_arg, -EBUSY); 4139 return; 4140 } 4141 4142 ctx = calloc(1, sizeof(*ctx)); 4143 if (!ctx) { 4144 cb_fn(cb_arg, -ENOMEM); 4145 return; 4146 } 4147 4148 ctx->bs = bs; 4149 4150 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4151 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4152 if (!ctx->super) { 4153 free(ctx); 4154 cb_fn(cb_arg, -ENOMEM); 4155 return; 4156 } 4157 4158 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4159 cpl.u.bs_basic.cb_fn = cb_fn; 4160 cpl.u.bs_basic.cb_arg = cb_arg; 4161 4162 ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4163 if (!ctx->seq) { 4164 spdk_free(ctx->super); 4165 free(ctx); 4166 cb_fn(cb_arg, -ENOMEM); 4167 return; 4168 } 4169 4170 /* Read super block */ 4171 spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4172 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4173 _spdk_bs_unload_read_super_cpl, ctx); 4174 } 4175 4176 /* END spdk_bs_unload */ 4177 4178 /* START spdk_bs_set_super */ 4179 4180 struct spdk_bs_set_super_ctx { 4181 struct spdk_blob_store *bs; 4182 struct spdk_bs_super_block *super; 4183 }; 4184 4185 static void 4186 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4187 { 4188 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4189 4190 if (bserrno != 0) { 4191 SPDK_ERRLOG("Unable to write to super block of blobstore\n"); 4192 } 4193 4194 spdk_free(ctx->super); 4195 4196 spdk_bs_sequence_finish(seq, bserrno); 4197 4198 free(ctx); 4199 } 4200 4201 static void 4202 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4203 { 4204 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4205 4206 if (bserrno != 0) { 4207 SPDK_ERRLOG("Unable to read super block of blobstore\n"); 4208 spdk_free(ctx->super); 4209 spdk_bs_sequence_finish(seq, bserrno); 4210 free(ctx); 4211 return; 4212 } 4213 4214 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx); 4215 } 4216 4217 void 4218 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 4219 spdk_bs_op_complete cb_fn, void *cb_arg) 4220 { 4221 struct spdk_bs_cpl cpl; 4222 spdk_bs_sequence_t *seq; 4223 struct spdk_bs_set_super_ctx *ctx; 4224 4225 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n"); 4226 4227 ctx = calloc(1, sizeof(*ctx)); 4228 if (!ctx) { 4229 cb_fn(cb_arg, -ENOMEM); 4230 return; 4231 } 4232 4233 ctx->bs = bs; 4234 4235 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4236 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4237 if (!ctx->super) { 4238 free(ctx); 4239 cb_fn(cb_arg, -ENOMEM); 4240 return; 4241 } 4242 4243 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4244 cpl.u.bs_basic.cb_fn = cb_fn; 4245 cpl.u.bs_basic.cb_arg = cb_arg; 4246 4247 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4248 if (!seq) { 4249 spdk_free(ctx->super); 4250 free(ctx); 4251 cb_fn(cb_arg, -ENOMEM); 4252 return; 4253 } 4254 4255 bs->super_blob = blobid; 4256 4257 /* Read super block */ 4258 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4259 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4260 _spdk_bs_set_super_read_cpl, ctx); 4261 } 4262 4263 /* END spdk_bs_set_super */ 4264 4265 void 4266 spdk_bs_get_super(struct spdk_blob_store *bs, 4267 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4268 { 4269 if (bs->super_blob == SPDK_BLOBID_INVALID) { 4270 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 4271 } else { 4272 cb_fn(cb_arg, bs->super_blob, 0); 4273 } 4274 } 4275 4276 uint64_t 4277 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 4278 { 4279 return bs->cluster_sz; 4280 } 4281 4282 uint64_t 4283 spdk_bs_get_page_size(struct spdk_blob_store *bs) 4284 { 4285 return SPDK_BS_PAGE_SIZE; 4286 } 4287 4288 uint64_t 4289 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs) 4290 { 4291 return bs->io_unit_size; 4292 } 4293 4294 uint64_t 4295 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 4296 { 4297 return bs->num_free_clusters; 4298 } 4299 4300 uint64_t 4301 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 4302 { 4303 return bs->total_data_clusters; 4304 } 4305 4306 static int 4307 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 4308 { 4309 bs->md_channel = spdk_get_io_channel(bs); 4310 if (!bs->md_channel) { 4311 SPDK_ERRLOG("Failed to get IO channel.\n"); 4312 return -1; 4313 } 4314 4315 return 0; 4316 } 4317 4318 static int 4319 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 4320 { 4321 spdk_put_io_channel(bs->md_channel); 4322 4323 return 0; 4324 } 4325 4326 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 4327 { 4328 assert(blob != NULL); 4329 4330 return blob->id; 4331 } 4332 4333 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 4334 { 4335 assert(blob != NULL); 4336 4337 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 4338 } 4339 4340 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob) 4341 { 4342 assert(blob != NULL); 4343 4344 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs); 4345 } 4346 4347 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 4348 { 4349 assert(blob != NULL); 4350 4351 return blob->active.num_clusters; 4352 } 4353 4354 /* START spdk_bs_create_blob */ 4355 4356 static void 4357 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4358 { 4359 struct spdk_blob *blob = cb_arg; 4360 4361 _spdk_blob_free(blob); 4362 4363 spdk_bs_sequence_finish(seq, bserrno); 4364 } 4365 4366 static int 4367 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs, 4368 bool internal) 4369 { 4370 uint64_t i; 4371 size_t value_len = 0; 4372 int rc; 4373 const void *value = NULL; 4374 if (xattrs->count > 0 && xattrs->get_value == NULL) { 4375 return -EINVAL; 4376 } 4377 for (i = 0; i < xattrs->count; i++) { 4378 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len); 4379 if (value == NULL || value_len == 0) { 4380 return -EINVAL; 4381 } 4382 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal); 4383 if (rc < 0) { 4384 return rc; 4385 } 4386 } 4387 return 0; 4388 } 4389 4390 static void 4391 _spdk_bs_create_blob(struct spdk_blob_store *bs, 4392 const struct spdk_blob_opts *opts, 4393 const struct spdk_blob_xattr_opts *internal_xattrs, 4394 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4395 { 4396 struct spdk_blob *blob; 4397 uint32_t page_idx; 4398 struct spdk_bs_cpl cpl; 4399 struct spdk_blob_opts opts_default; 4400 struct spdk_blob_xattr_opts internal_xattrs_default; 4401 spdk_bs_sequence_t *seq; 4402 spdk_blob_id id; 4403 int rc; 4404 4405 assert(spdk_get_thread() == bs->md_thread); 4406 4407 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 4408 if (page_idx == UINT32_MAX) { 4409 cb_fn(cb_arg, 0, -ENOMEM); 4410 return; 4411 } 4412 spdk_bit_array_set(bs->used_blobids, page_idx); 4413 spdk_bit_array_set(bs->used_md_pages, page_idx); 4414 4415 id = _spdk_bs_page_to_blobid(page_idx); 4416 4417 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 4418 4419 blob = _spdk_blob_alloc(bs, id); 4420 if (!blob) { 4421 cb_fn(cb_arg, 0, -ENOMEM); 4422 return; 4423 } 4424 4425 if (!opts) { 4426 spdk_blob_opts_init(&opts_default); 4427 opts = &opts_default; 4428 } 4429 4430 blob->use_extent_table = opts->use_extent_table; 4431 4432 if (!internal_xattrs) { 4433 _spdk_blob_xattrs_init(&internal_xattrs_default); 4434 internal_xattrs = &internal_xattrs_default; 4435 } 4436 4437 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false); 4438 if (rc < 0) { 4439 _spdk_blob_free(blob); 4440 cb_fn(cb_arg, 0, rc); 4441 return; 4442 } 4443 4444 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true); 4445 if (rc < 0) { 4446 _spdk_blob_free(blob); 4447 cb_fn(cb_arg, 0, rc); 4448 return; 4449 } 4450 4451 if (opts->thin_provision) { 4452 _spdk_blob_set_thin_provision(blob); 4453 } 4454 4455 _spdk_blob_set_clear_method(blob, opts->clear_method); 4456 4457 rc = _spdk_blob_resize(blob, opts->num_clusters); 4458 if (rc < 0) { 4459 _spdk_blob_free(blob); 4460 cb_fn(cb_arg, 0, rc); 4461 return; 4462 } 4463 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4464 cpl.u.blobid.cb_fn = cb_fn; 4465 cpl.u.blobid.cb_arg = cb_arg; 4466 cpl.u.blobid.blobid = blob->id; 4467 4468 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4469 if (!seq) { 4470 _spdk_blob_free(blob); 4471 cb_fn(cb_arg, 0, -ENOMEM); 4472 return; 4473 } 4474 4475 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 4476 } 4477 4478 void spdk_bs_create_blob(struct spdk_blob_store *bs, 4479 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4480 { 4481 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg); 4482 } 4483 4484 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 4485 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4486 { 4487 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg); 4488 } 4489 4490 /* END spdk_bs_create_blob */ 4491 4492 /* START blob_cleanup */ 4493 4494 struct spdk_clone_snapshot_ctx { 4495 struct spdk_bs_cpl cpl; 4496 int bserrno; 4497 bool frozen; 4498 4499 struct spdk_io_channel *channel; 4500 4501 /* Current cluster for inflate operation */ 4502 uint64_t cluster; 4503 4504 /* For inflation force allocation of all unallocated clusters and remove 4505 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */ 4506 bool allocate_all; 4507 4508 struct { 4509 spdk_blob_id id; 4510 struct spdk_blob *blob; 4511 } original; 4512 struct { 4513 spdk_blob_id id; 4514 struct spdk_blob *blob; 4515 } new; 4516 4517 /* xattrs specified for snapshot/clones only. They have no impact on 4518 * the original blobs xattrs. */ 4519 const struct spdk_blob_xattr_opts *xattrs; 4520 }; 4521 4522 static void 4523 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) 4524 { 4525 struct spdk_clone_snapshot_ctx *ctx = cb_arg; 4526 struct spdk_bs_cpl *cpl = &ctx->cpl; 4527 4528 if (bserrno != 0) { 4529 if (ctx->bserrno != 0) { 4530 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4531 } else { 4532 ctx->bserrno = bserrno; 4533 } 4534 } 4535 4536 switch (cpl->type) { 4537 case SPDK_BS_CPL_TYPE_BLOBID: 4538 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno); 4539 break; 4540 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 4541 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno); 4542 break; 4543 default: 4544 SPDK_UNREACHABLE(); 4545 break; 4546 } 4547 4548 free(ctx); 4549 } 4550 4551 static void 4552 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 4553 { 4554 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4555 struct spdk_blob *origblob = ctx->original.blob; 4556 4557 if (bserrno != 0) { 4558 if (ctx->bserrno != 0) { 4559 SPDK_ERRLOG("Unfreeze error %d\n", bserrno); 4560 } else { 4561 ctx->bserrno = bserrno; 4562 } 4563 } 4564 4565 ctx->original.id = origblob->id; 4566 origblob->locked_operation_in_progress = false; 4567 4568 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4569 } 4570 4571 static void 4572 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) 4573 { 4574 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4575 struct spdk_blob *origblob = ctx->original.blob; 4576 4577 if (bserrno != 0) { 4578 if (ctx->bserrno != 0) { 4579 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4580 } else { 4581 ctx->bserrno = bserrno; 4582 } 4583 } 4584 4585 if (ctx->frozen) { 4586 /* Unfreeze any outstanding I/O */ 4587 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx); 4588 } else { 4589 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0); 4590 } 4591 4592 } 4593 4594 static void 4595 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno) 4596 { 4597 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4598 struct spdk_blob *newblob = ctx->new.blob; 4599 4600 if (bserrno != 0) { 4601 if (ctx->bserrno != 0) { 4602 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4603 } else { 4604 ctx->bserrno = bserrno; 4605 } 4606 } 4607 4608 ctx->new.id = newblob->id; 4609 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4610 } 4611 4612 /* END blob_cleanup */ 4613 4614 /* START spdk_bs_create_snapshot */ 4615 4616 static void 4617 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2) 4618 { 4619 uint64_t *cluster_temp; 4620 4621 cluster_temp = blob1->active.clusters; 4622 blob1->active.clusters = blob2->active.clusters; 4623 blob2->active.clusters = cluster_temp; 4624 } 4625 4626 static void 4627 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno) 4628 { 4629 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4630 struct spdk_blob *origblob = ctx->original.blob; 4631 struct spdk_blob *newblob = ctx->new.blob; 4632 4633 if (bserrno != 0) { 4634 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4635 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4636 return; 4637 } 4638 4639 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */ 4640 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true); 4641 if (bserrno != 0) { 4642 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4643 return; 4644 } 4645 4646 _spdk_bs_blob_list_add(ctx->original.blob); 4647 4648 spdk_blob_set_read_only(newblob); 4649 4650 /* sync snapshot metadata */ 4651 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4652 } 4653 4654 static void 4655 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) 4656 { 4657 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4658 struct spdk_blob *origblob = ctx->original.blob; 4659 struct spdk_blob *newblob = ctx->new.blob; 4660 4661 if (bserrno != 0) { 4662 /* return cluster map back to original */ 4663 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4664 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4665 return; 4666 } 4667 4668 /* Set internal xattr for snapshot id */ 4669 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true); 4670 if (bserrno != 0) { 4671 /* return cluster map back to original */ 4672 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4673 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4674 return; 4675 } 4676 4677 _spdk_bs_blob_list_remove(origblob); 4678 origblob->parent_id = newblob->id; 4679 4680 /* Create new back_bs_dev for snapshot */ 4681 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob); 4682 if (origblob->back_bs_dev == NULL) { 4683 /* return cluster map back to original */ 4684 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4685 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL); 4686 return; 4687 } 4688 4689 /* set clone blob as thin provisioned */ 4690 _spdk_blob_set_thin_provision(origblob); 4691 4692 _spdk_bs_blob_list_add(newblob); 4693 4694 /* sync clone metadata */ 4695 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx); 4696 } 4697 4698 static void 4699 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc) 4700 { 4701 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4702 struct spdk_blob *origblob = ctx->original.blob; 4703 struct spdk_blob *newblob = ctx->new.blob; 4704 int bserrno; 4705 4706 if (rc != 0) { 4707 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc); 4708 return; 4709 } 4710 4711 ctx->frozen = true; 4712 4713 /* set new back_bs_dev for snapshot */ 4714 newblob->back_bs_dev = origblob->back_bs_dev; 4715 /* Set invalid flags from origblob */ 4716 newblob->invalid_flags = origblob->invalid_flags; 4717 4718 /* inherit parent from original blob if set */ 4719 newblob->parent_id = origblob->parent_id; 4720 if (origblob->parent_id != SPDK_BLOBID_INVALID) { 4721 /* Set internal xattr for snapshot id */ 4722 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT, 4723 &origblob->parent_id, sizeof(spdk_blob_id), true); 4724 if (bserrno != 0) { 4725 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4726 return; 4727 } 4728 } 4729 4730 /* swap cluster maps */ 4731 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4732 4733 /* Set the clear method on the new blob to match the original. */ 4734 _spdk_blob_set_clear_method(newblob, origblob->clear_method); 4735 4736 /* sync snapshot metadata */ 4737 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); 4738 } 4739 4740 static void 4741 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4742 { 4743 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4744 struct spdk_blob *origblob = ctx->original.blob; 4745 struct spdk_blob *newblob = _blob; 4746 4747 if (bserrno != 0) { 4748 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4749 return; 4750 } 4751 4752 ctx->new.blob = newblob; 4753 assert(spdk_blob_is_thin_provisioned(newblob)); 4754 assert(spdk_mem_all_zero(newblob->active.clusters, 4755 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 4756 4757 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx); 4758 } 4759 4760 static void 4761 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4762 { 4763 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4764 struct spdk_blob *origblob = ctx->original.blob; 4765 4766 if (bserrno != 0) { 4767 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4768 return; 4769 } 4770 4771 ctx->new.id = blobid; 4772 ctx->cpl.u.blobid.blobid = blobid; 4773 4774 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx); 4775 } 4776 4777 4778 static void 4779 _spdk_bs_xattr_snapshot(void *arg, const char *name, 4780 const void **value, size_t *value_len) 4781 { 4782 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0); 4783 4784 struct spdk_blob *blob = (struct spdk_blob *)arg; 4785 *value = &blob->id; 4786 *value_len = sizeof(blob->id); 4787 } 4788 4789 static void 4790 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4791 { 4792 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4793 struct spdk_blob_opts opts; 4794 struct spdk_blob_xattr_opts internal_xattrs; 4795 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS }; 4796 4797 if (bserrno != 0) { 4798 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4799 return; 4800 } 4801 4802 ctx->original.blob = _blob; 4803 4804 if (_blob->data_ro || _blob->md_ro) { 4805 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n", 4806 _blob->id); 4807 ctx->bserrno = -EINVAL; 4808 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4809 return; 4810 } 4811 4812 if (_blob->locked_operation_in_progress) { 4813 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n"); 4814 ctx->bserrno = -EBUSY; 4815 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4816 return; 4817 } 4818 4819 _blob->locked_operation_in_progress = true; 4820 4821 spdk_blob_opts_init(&opts); 4822 _spdk_blob_xattrs_init(&internal_xattrs); 4823 4824 /* Change the size of new blob to the same as in original blob, 4825 * but do not allocate clusters */ 4826 opts.thin_provision = true; 4827 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4828 opts.use_extent_table = _blob->use_extent_table; 4829 4830 /* If there are any xattrs specified for snapshot, set them now */ 4831 if (ctx->xattrs) { 4832 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4833 } 4834 /* Set internal xattr SNAPSHOT_IN_PROGRESS */ 4835 internal_xattrs.count = 1; 4836 internal_xattrs.ctx = _blob; 4837 internal_xattrs.names = xattrs_names; 4838 internal_xattrs.get_value = _spdk_bs_xattr_snapshot; 4839 4840 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4841 _spdk_bs_snapshot_newblob_create_cpl, ctx); 4842 } 4843 4844 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, 4845 const struct spdk_blob_xattr_opts *snapshot_xattrs, 4846 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4847 { 4848 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4849 4850 if (!ctx) { 4851 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4852 return; 4853 } 4854 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4855 ctx->cpl.u.blobid.cb_fn = cb_fn; 4856 ctx->cpl.u.blobid.cb_arg = cb_arg; 4857 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4858 ctx->bserrno = 0; 4859 ctx->frozen = false; 4860 ctx->original.id = blobid; 4861 ctx->xattrs = snapshot_xattrs; 4862 4863 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx); 4864 } 4865 /* END spdk_bs_create_snapshot */ 4866 4867 /* START spdk_bs_create_clone */ 4868 4869 static void 4870 _spdk_bs_xattr_clone(void *arg, const char *name, 4871 const void **value, size_t *value_len) 4872 { 4873 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0); 4874 4875 struct spdk_blob *blob = (struct spdk_blob *)arg; 4876 *value = &blob->id; 4877 *value_len = sizeof(blob->id); 4878 } 4879 4880 static void 4881 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4882 { 4883 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4884 struct spdk_blob *clone = _blob; 4885 4886 ctx->new.blob = clone; 4887 _spdk_bs_blob_list_add(clone); 4888 4889 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4890 } 4891 4892 static void 4893 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4894 { 4895 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4896 4897 ctx->cpl.u.blobid.blobid = blobid; 4898 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx); 4899 } 4900 4901 static void 4902 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4903 { 4904 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4905 struct spdk_blob_opts opts; 4906 struct spdk_blob_xattr_opts internal_xattrs; 4907 char *xattr_names[] = { BLOB_SNAPSHOT }; 4908 4909 if (bserrno != 0) { 4910 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4911 return; 4912 } 4913 4914 ctx->original.blob = _blob; 4915 4916 if (!_blob->data_ro || !_blob->md_ro) { 4917 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n"); 4918 ctx->bserrno = -EINVAL; 4919 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4920 return; 4921 } 4922 4923 if (_blob->locked_operation_in_progress) { 4924 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n"); 4925 ctx->bserrno = -EBUSY; 4926 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4927 return; 4928 } 4929 4930 _blob->locked_operation_in_progress = true; 4931 4932 spdk_blob_opts_init(&opts); 4933 _spdk_blob_xattrs_init(&internal_xattrs); 4934 4935 opts.thin_provision = true; 4936 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4937 opts.use_extent_table = _blob->use_extent_table; 4938 if (ctx->xattrs) { 4939 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4940 } 4941 4942 /* Set internal xattr BLOB_SNAPSHOT */ 4943 internal_xattrs.count = 1; 4944 internal_xattrs.ctx = _blob; 4945 internal_xattrs.names = xattr_names; 4946 internal_xattrs.get_value = _spdk_bs_xattr_clone; 4947 4948 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4949 _spdk_bs_clone_newblob_create_cpl, ctx); 4950 } 4951 4952 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid, 4953 const struct spdk_blob_xattr_opts *clone_xattrs, 4954 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4955 { 4956 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4957 4958 if (!ctx) { 4959 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4960 return; 4961 } 4962 4963 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4964 ctx->cpl.u.blobid.cb_fn = cb_fn; 4965 ctx->cpl.u.blobid.cb_arg = cb_arg; 4966 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4967 ctx->bserrno = 0; 4968 ctx->xattrs = clone_xattrs; 4969 ctx->original.id = blobid; 4970 4971 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx); 4972 } 4973 4974 /* END spdk_bs_create_clone */ 4975 4976 /* START spdk_bs_inflate_blob */ 4977 4978 static void 4979 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno) 4980 { 4981 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4982 struct spdk_blob *_blob = ctx->original.blob; 4983 4984 if (bserrno != 0) { 4985 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4986 return; 4987 } 4988 4989 assert(_parent != NULL); 4990 4991 _spdk_bs_blob_list_remove(_blob); 4992 _blob->parent_id = _parent->id; 4993 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id, 4994 sizeof(spdk_blob_id), true); 4995 4996 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4997 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent); 4998 _spdk_bs_blob_list_add(_blob); 4999 5000 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 5001 } 5002 5003 static void 5004 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno) 5005 { 5006 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5007 struct spdk_blob *_blob = ctx->original.blob; 5008 struct spdk_blob *_parent; 5009 5010 if (bserrno != 0) { 5011 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5012 return; 5013 } 5014 5015 if (ctx->allocate_all) { 5016 /* remove thin provisioning */ 5017 _spdk_bs_blob_list_remove(_blob); 5018 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 5019 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV; 5020 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 5021 _blob->back_bs_dev = NULL; 5022 _blob->parent_id = SPDK_BLOBID_INVALID; 5023 } else { 5024 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob; 5025 if (_parent->parent_id != SPDK_BLOBID_INVALID) { 5026 /* We must change the parent of the inflated blob */ 5027 spdk_bs_open_blob(_blob->bs, _parent->parent_id, 5028 _spdk_bs_inflate_blob_set_parent_cpl, ctx); 5029 return; 5030 } 5031 5032 _spdk_bs_blob_list_remove(_blob); 5033 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 5034 _blob->parent_id = SPDK_BLOBID_INVALID; 5035 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 5036 _blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 5037 } 5038 5039 _blob->state = SPDK_BLOB_STATE_DIRTY; 5040 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 5041 } 5042 5043 /* Check if cluster needs allocation */ 5044 static inline bool 5045 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all) 5046 { 5047 struct spdk_blob_bs_dev *b; 5048 5049 assert(blob != NULL); 5050 5051 if (blob->active.clusters[cluster] != 0) { 5052 /* Cluster is already allocated */ 5053 return false; 5054 } 5055 5056 if (blob->parent_id == SPDK_BLOBID_INVALID) { 5057 /* Blob have no parent blob */ 5058 return allocate_all; 5059 } 5060 5061 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev; 5062 return (allocate_all || b->blob->active.clusters[cluster] != 0); 5063 } 5064 5065 static void 5066 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno) 5067 { 5068 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5069 struct spdk_blob *_blob = ctx->original.blob; 5070 uint64_t offset; 5071 5072 if (bserrno != 0) { 5073 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5074 return; 5075 } 5076 5077 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) { 5078 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) { 5079 break; 5080 } 5081 } 5082 5083 if (ctx->cluster < _blob->active.num_clusters) { 5084 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster); 5085 5086 /* We may safely increment a cluster before write */ 5087 ctx->cluster++; 5088 5089 /* Use zero length write to touch a cluster */ 5090 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0, 5091 _spdk_bs_inflate_blob_touch_next, ctx); 5092 } else { 5093 _spdk_bs_inflate_blob_done(cb_arg, bserrno); 5094 } 5095 } 5096 5097 static void 5098 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5099 { 5100 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5101 uint64_t lfc; /* lowest free cluster */ 5102 uint64_t i; 5103 5104 if (bserrno != 0) { 5105 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5106 return; 5107 } 5108 5109 ctx->original.blob = _blob; 5110 5111 if (_blob->locked_operation_in_progress) { 5112 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n"); 5113 ctx->bserrno = -EBUSY; 5114 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 5115 return; 5116 } 5117 5118 _blob->locked_operation_in_progress = true; 5119 5120 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) { 5121 /* This blob have no parent, so we cannot decouple it. */ 5122 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n"); 5123 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL); 5124 return; 5125 } 5126 5127 if (spdk_blob_is_thin_provisioned(_blob) == false) { 5128 /* This is not thin provisioned blob. No need to inflate. */ 5129 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0); 5130 return; 5131 } 5132 5133 /* Do two passes - one to verify that we can obtain enough clusters 5134 * and another to actually claim them. 5135 */ 5136 lfc = 0; 5137 for (i = 0; i < _blob->active.num_clusters; i++) { 5138 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) { 5139 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc); 5140 if (lfc == UINT32_MAX) { 5141 /* No more free clusters. Cannot satisfy the request */ 5142 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC); 5143 return; 5144 } 5145 lfc++; 5146 } 5147 } 5148 5149 ctx->cluster = 0; 5150 _spdk_bs_inflate_blob_touch_next(ctx, 0); 5151 } 5152 5153 static void 5154 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5155 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg) 5156 { 5157 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5158 5159 if (!ctx) { 5160 cb_fn(cb_arg, -ENOMEM); 5161 return; 5162 } 5163 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5164 ctx->cpl.u.bs_basic.cb_fn = cb_fn; 5165 ctx->cpl.u.bs_basic.cb_arg = cb_arg; 5166 ctx->bserrno = 0; 5167 ctx->original.id = blobid; 5168 ctx->channel = channel; 5169 ctx->allocate_all = allocate_all; 5170 5171 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx); 5172 } 5173 5174 void 5175 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5176 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5177 { 5178 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg); 5179 } 5180 5181 void 5182 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5183 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5184 { 5185 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg); 5186 } 5187 /* END spdk_bs_inflate_blob */ 5188 5189 /* START spdk_blob_resize */ 5190 struct spdk_bs_resize_ctx { 5191 spdk_blob_op_complete cb_fn; 5192 void *cb_arg; 5193 struct spdk_blob *blob; 5194 uint64_t sz; 5195 int rc; 5196 }; 5197 5198 static void 5199 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc) 5200 { 5201 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5202 5203 if (rc != 0) { 5204 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc); 5205 } 5206 5207 if (ctx->rc != 0) { 5208 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc); 5209 rc = ctx->rc; 5210 } 5211 5212 ctx->blob->locked_operation_in_progress = false; 5213 5214 ctx->cb_fn(ctx->cb_arg, rc); 5215 free(ctx); 5216 } 5217 5218 static void 5219 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc) 5220 { 5221 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5222 5223 if (rc != 0) { 5224 ctx->blob->locked_operation_in_progress = false; 5225 ctx->cb_fn(ctx->cb_arg, rc); 5226 free(ctx); 5227 return; 5228 } 5229 5230 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz); 5231 5232 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx); 5233 } 5234 5235 void 5236 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg) 5237 { 5238 struct spdk_bs_resize_ctx *ctx; 5239 5240 _spdk_blob_verify_md_op(blob); 5241 5242 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 5243 5244 if (blob->md_ro) { 5245 cb_fn(cb_arg, -EPERM); 5246 return; 5247 } 5248 5249 if (sz == blob->active.num_clusters) { 5250 cb_fn(cb_arg, 0); 5251 return; 5252 } 5253 5254 if (blob->locked_operation_in_progress) { 5255 cb_fn(cb_arg, -EBUSY); 5256 return; 5257 } 5258 5259 ctx = calloc(1, sizeof(*ctx)); 5260 if (!ctx) { 5261 cb_fn(cb_arg, -ENOMEM); 5262 return; 5263 } 5264 5265 blob->locked_operation_in_progress = true; 5266 ctx->cb_fn = cb_fn; 5267 ctx->cb_arg = cb_arg; 5268 ctx->blob = blob; 5269 ctx->sz = sz; 5270 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx); 5271 } 5272 5273 /* END spdk_blob_resize */ 5274 5275 5276 /* START spdk_bs_delete_blob */ 5277 5278 static void 5279 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 5280 { 5281 spdk_bs_sequence_t *seq = cb_arg; 5282 5283 spdk_bs_sequence_finish(seq, bserrno); 5284 } 5285 5286 static void 5287 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5288 { 5289 struct spdk_blob *blob = cb_arg; 5290 5291 if (bserrno != 0) { 5292 /* 5293 * We already removed this blob from the blobstore tailq, so 5294 * we need to free it here since this is the last reference 5295 * to it. 5296 */ 5297 _spdk_blob_free(blob); 5298 _spdk_bs_delete_close_cpl(seq, bserrno); 5299 return; 5300 } 5301 5302 /* 5303 * This will immediately decrement the ref_count and call 5304 * the completion routine since the metadata state is clean. 5305 * By calling spdk_blob_close, we reduce the number of call 5306 * points into code that touches the blob->open_ref count 5307 * and the blobstore's blob list. 5308 */ 5309 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq); 5310 } 5311 5312 struct delete_snapshot_ctx { 5313 struct spdk_blob_list *parent_snapshot_entry; 5314 struct spdk_blob *snapshot; 5315 bool snapshot_md_ro; 5316 struct spdk_blob *clone; 5317 bool clone_md_ro; 5318 spdk_blob_op_with_handle_complete cb_fn; 5319 void *cb_arg; 5320 int bserrno; 5321 }; 5322 5323 static void 5324 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno) 5325 { 5326 struct delete_snapshot_ctx *ctx = cb_arg; 5327 5328 if (bserrno != 0) { 5329 SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno); 5330 } 5331 5332 assert(ctx != NULL); 5333 5334 if (bserrno != 0 && ctx->bserrno == 0) { 5335 ctx->bserrno = bserrno; 5336 } 5337 5338 ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno); 5339 free(ctx); 5340 } 5341 5342 static void 5343 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno) 5344 { 5345 struct delete_snapshot_ctx *ctx = cb_arg; 5346 5347 if (bserrno != 0) { 5348 ctx->bserrno = bserrno; 5349 SPDK_ERRLOG("Clone cleanup error %d\n", bserrno); 5350 } 5351 5352 /* open_ref == 1 menas that only deletion context has opened this snapshot 5353 * open_ref == 2 menas that clone has opened this snapshot as well, 5354 * so we have to add it back to the blobs list */ 5355 if (ctx->snapshot->open_ref == 2) { 5356 TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link); 5357 } 5358 5359 ctx->snapshot->locked_operation_in_progress = false; 5360 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5361 5362 spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx); 5363 } 5364 5365 static void 5366 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno) 5367 { 5368 struct delete_snapshot_ctx *ctx = cb_arg; 5369 5370 ctx->clone->locked_operation_in_progress = false; 5371 ctx->clone->md_ro = ctx->clone_md_ro; 5372 5373 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5374 } 5375 5376 static void 5377 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 5378 { 5379 struct delete_snapshot_ctx *ctx = cb_arg; 5380 5381 if (bserrno) { 5382 ctx->bserrno = bserrno; 5383 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5384 return; 5385 } 5386 5387 ctx->clone->locked_operation_in_progress = false; 5388 spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx); 5389 } 5390 5391 static void 5392 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno) 5393 { 5394 struct delete_snapshot_ctx *ctx = cb_arg; 5395 struct spdk_blob_list *parent_snapshot_entry = NULL; 5396 struct spdk_blob_list *snapshot_entry = NULL; 5397 struct spdk_blob_list *clone_entry = NULL; 5398 struct spdk_blob_list *snapshot_clone_entry = NULL; 5399 5400 if (bserrno) { 5401 SPDK_ERRLOG("Failed to sync MD on blob\n"); 5402 ctx->bserrno = bserrno; 5403 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5404 return; 5405 } 5406 5407 /* Get snapshot entry for the snapshot we want to remove */ 5408 snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id); 5409 5410 assert(snapshot_entry != NULL); 5411 5412 /* Remove clone entry in this snapshot (at this point there can be only one clone) */ 5413 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5414 assert(clone_entry != NULL); 5415 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 5416 snapshot_entry->clone_count--; 5417 assert(TAILQ_EMPTY(&snapshot_entry->clones)); 5418 5419 if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) { 5420 /* This snapshot is at the same time a clone of another snapshot - we need to 5421 * update parent snapshot (remove current clone, add new one inherited from 5422 * the snapshot that is being removed) */ 5423 5424 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5425 * snapshot that we are removing */ 5426 _spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry, 5427 &snapshot_clone_entry); 5428 5429 /* Switch clone entry in parent snapshot */ 5430 TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link); 5431 TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link); 5432 free(snapshot_clone_entry); 5433 } else { 5434 /* No parent snapshot - just remove clone entry */ 5435 free(clone_entry); 5436 } 5437 5438 /* Restore md_ro flags */ 5439 ctx->clone->md_ro = ctx->clone_md_ro; 5440 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5441 5442 _spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx); 5443 } 5444 5445 static void 5446 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno) 5447 { 5448 struct delete_snapshot_ctx *ctx = cb_arg; 5449 uint64_t i; 5450 5451 ctx->snapshot->md_ro = false; 5452 5453 if (bserrno) { 5454 SPDK_ERRLOG("Failed to sync MD on clone\n"); 5455 ctx->bserrno = bserrno; 5456 5457 /* Restore snapshot to previous state */ 5458 bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true); 5459 if (bserrno != 0) { 5460 _spdk_delete_snapshot_cleanup_clone(ctx, bserrno); 5461 return; 5462 } 5463 5464 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx); 5465 return; 5466 } 5467 5468 /* Clear cluster map entries for snapshot */ 5469 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5470 if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) { 5471 ctx->snapshot->active.clusters[i] = 0; 5472 } 5473 } 5474 5475 ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY; 5476 5477 if (ctx->parent_snapshot_entry != NULL) { 5478 ctx->snapshot->back_bs_dev = NULL; 5479 } 5480 5481 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx); 5482 } 5483 5484 static void 5485 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno) 5486 { 5487 struct delete_snapshot_ctx *ctx = cb_arg; 5488 uint64_t i; 5489 5490 /* Temporarily override md_ro flag for clone for MD modification */ 5491 ctx->clone_md_ro = ctx->clone->md_ro; 5492 ctx->clone->md_ro = false; 5493 5494 if (bserrno) { 5495 SPDK_ERRLOG("Failed to sync MD with xattr on blob\n"); 5496 ctx->bserrno = bserrno; 5497 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5498 return; 5499 } 5500 5501 /* Copy snapshot map to clone map (only unallocated clusters in clone) */ 5502 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5503 if (ctx->clone->active.clusters[i] == 0) { 5504 ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i]; 5505 } 5506 } 5507 5508 /* Delete old backing bs_dev from clone (related to snapshot that will be removed) */ 5509 ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev); 5510 5511 /* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */ 5512 if (ctx->parent_snapshot_entry != NULL) { 5513 /* ...to parent snapshot */ 5514 ctx->clone->parent_id = ctx->parent_snapshot_entry->id; 5515 ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev; 5516 _spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id, 5517 sizeof(spdk_blob_id), 5518 true); 5519 } else { 5520 /* ...to blobid invalid and zeroes dev */ 5521 ctx->clone->parent_id = SPDK_BLOBID_INVALID; 5522 ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev(); 5523 _spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true); 5524 } 5525 5526 spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx); 5527 } 5528 5529 static void 5530 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno) 5531 { 5532 struct delete_snapshot_ctx *ctx = cb_arg; 5533 5534 if (bserrno) { 5535 SPDK_ERRLOG("Failed to freeze I/O on clone\n"); 5536 ctx->bserrno = bserrno; 5537 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5538 return; 5539 } 5540 5541 /* Temporarily override md_ro flag for snapshot for MD modification */ 5542 ctx->snapshot_md_ro = ctx->snapshot->md_ro; 5543 ctx->snapshot->md_ro = false; 5544 5545 /* Mark blob as pending for removal for power failure safety, use clone id for recovery */ 5546 ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id, 5547 sizeof(spdk_blob_id), true); 5548 if (ctx->bserrno != 0) { 5549 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5550 return; 5551 } 5552 5553 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx); 5554 } 5555 5556 static void 5557 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno) 5558 { 5559 struct delete_snapshot_ctx *ctx = cb_arg; 5560 5561 if (bserrno) { 5562 SPDK_ERRLOG("Failed to open clone\n"); 5563 ctx->bserrno = bserrno; 5564 _spdk_delete_snapshot_cleanup_snapshot(ctx, 0); 5565 return; 5566 } 5567 5568 ctx->clone = clone; 5569 5570 if (clone->locked_operation_in_progress) { 5571 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n"); 5572 ctx->bserrno = -EBUSY; 5573 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5574 return; 5575 } 5576 5577 clone->locked_operation_in_progress = true; 5578 5579 _spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx); 5580 } 5581 5582 static void 5583 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx) 5584 { 5585 struct spdk_blob_list *snapshot_entry = NULL; 5586 struct spdk_blob_list *clone_entry = NULL; 5587 struct spdk_blob_list *snapshot_clone_entry = NULL; 5588 5589 /* Get snapshot entry for the snapshot we want to remove */ 5590 snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id); 5591 5592 assert(snapshot_entry != NULL); 5593 5594 /* Get clone of the snapshot (at this point there can be only one clone) */ 5595 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5596 assert(snapshot_entry->clone_count == 1); 5597 assert(clone_entry != NULL); 5598 5599 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5600 * snapshot that we are removing */ 5601 _spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry, 5602 &snapshot_clone_entry); 5603 5604 spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx); 5605 } 5606 5607 static void 5608 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno) 5609 { 5610 spdk_bs_sequence_t *seq = cb_arg; 5611 struct spdk_blob_list *snapshot_entry = NULL; 5612 uint32_t page_num; 5613 5614 if (bserrno) { 5615 SPDK_ERRLOG("Failed to remove blob\n"); 5616 spdk_bs_sequence_finish(seq, bserrno); 5617 return; 5618 } 5619 5620 /* Remove snapshot from the list */ 5621 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5622 if (snapshot_entry != NULL) { 5623 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link); 5624 free(snapshot_entry); 5625 } 5626 5627 page_num = _spdk_bs_blobid_to_page(blob->id); 5628 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 5629 blob->state = SPDK_BLOB_STATE_DIRTY; 5630 blob->active.num_pages = 0; 5631 _spdk_blob_resize(blob, 0); 5632 5633 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob); 5634 } 5635 5636 static int 5637 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone) 5638 { 5639 struct spdk_blob_list *snapshot_entry = NULL; 5640 struct spdk_blob_list *clone_entry = NULL; 5641 struct spdk_blob *clone = NULL; 5642 bool has_one_clone = false; 5643 5644 /* Check if this is a snapshot with clones */ 5645 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5646 if (snapshot_entry != NULL) { 5647 if (snapshot_entry->clone_count > 1) { 5648 SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n"); 5649 return -EBUSY; 5650 } else if (snapshot_entry->clone_count == 1) { 5651 has_one_clone = true; 5652 } 5653 } 5654 5655 /* Check if someone has this blob open (besides this delete context): 5656 * - open_ref = 1 - only this context opened blob, so it is ok to remove it 5657 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot 5658 * and that is ok, because we will update it accordingly */ 5659 if (blob->open_ref <= 2 && has_one_clone) { 5660 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5661 assert(clone_entry != NULL); 5662 clone = _spdk_blob_lookup(blob->bs, clone_entry->id); 5663 5664 if (blob->open_ref == 2 && clone == NULL) { 5665 /* Clone is closed and someone else opened this blob */ 5666 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5667 return -EBUSY; 5668 } 5669 5670 *update_clone = true; 5671 return 0; 5672 } 5673 5674 if (blob->open_ref > 1) { 5675 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5676 return -EBUSY; 5677 } 5678 5679 assert(has_one_clone == false); 5680 *update_clone = false; 5681 return 0; 5682 } 5683 5684 static void 5685 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno) 5686 { 5687 spdk_bs_sequence_t *seq = cb_arg; 5688 5689 spdk_bs_sequence_finish(seq, -ENOMEM); 5690 } 5691 5692 static void 5693 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 5694 { 5695 spdk_bs_sequence_t *seq = cb_arg; 5696 struct delete_snapshot_ctx *ctx; 5697 bool update_clone = false; 5698 5699 if (bserrno != 0) { 5700 spdk_bs_sequence_finish(seq, bserrno); 5701 return; 5702 } 5703 5704 _spdk_blob_verify_md_op(blob); 5705 5706 ctx = calloc(1, sizeof(*ctx)); 5707 if (ctx == NULL) { 5708 spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq); 5709 return; 5710 } 5711 5712 ctx->snapshot = blob; 5713 ctx->cb_fn = _spdk_bs_delete_blob_finish; 5714 ctx->cb_arg = seq; 5715 5716 /* Check if blob can be removed and if it is a snapshot with clone on top of it */ 5717 ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone); 5718 if (ctx->bserrno) { 5719 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5720 return; 5721 } 5722 5723 if (blob->locked_operation_in_progress) { 5724 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n"); 5725 ctx->bserrno = -EBUSY; 5726 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5727 return; 5728 } 5729 5730 blob->locked_operation_in_progress = true; 5731 5732 /* 5733 * Remove the blob from the blob_store list now, to ensure it does not 5734 * get returned after this point by _spdk_blob_lookup(). 5735 */ 5736 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5737 5738 if (update_clone) { 5739 /* This blob is a snapshot with active clone - update clone first */ 5740 _spdk_update_clone_on_snapshot_deletion(blob, ctx); 5741 } else { 5742 /* This blob does not have any clones - just remove it */ 5743 _spdk_bs_blob_list_remove(blob); 5744 _spdk_bs_delete_blob_finish(seq, blob, 0); 5745 free(ctx); 5746 } 5747 } 5748 5749 void 5750 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5751 spdk_blob_op_complete cb_fn, void *cb_arg) 5752 { 5753 struct spdk_bs_cpl cpl; 5754 spdk_bs_sequence_t *seq; 5755 5756 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 5757 5758 assert(spdk_get_thread() == bs->md_thread); 5759 5760 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5761 cpl.u.blob_basic.cb_fn = cb_fn; 5762 cpl.u.blob_basic.cb_arg = cb_arg; 5763 5764 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5765 if (!seq) { 5766 cb_fn(cb_arg, -ENOMEM); 5767 return; 5768 } 5769 5770 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 5771 } 5772 5773 /* END spdk_bs_delete_blob */ 5774 5775 /* START spdk_bs_open_blob */ 5776 5777 static void 5778 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5779 { 5780 struct spdk_blob *blob = cb_arg; 5781 5782 if (bserrno != 0) { 5783 _spdk_blob_free(blob); 5784 seq->cpl.u.blob_handle.blob = NULL; 5785 spdk_bs_sequence_finish(seq, bserrno); 5786 return; 5787 } 5788 5789 blob->open_ref++; 5790 5791 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 5792 5793 spdk_bs_sequence_finish(seq, bserrno); 5794 } 5795 5796 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5797 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5798 { 5799 struct spdk_blob *blob; 5800 struct spdk_bs_cpl cpl; 5801 struct spdk_blob_open_opts opts_default; 5802 spdk_bs_sequence_t *seq; 5803 uint32_t page_num; 5804 5805 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 5806 assert(spdk_get_thread() == bs->md_thread); 5807 5808 page_num = _spdk_bs_blobid_to_page(blobid); 5809 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 5810 /* Invalid blobid */ 5811 cb_fn(cb_arg, NULL, -ENOENT); 5812 return; 5813 } 5814 5815 blob = _spdk_blob_lookup(bs, blobid); 5816 if (blob) { 5817 blob->open_ref++; 5818 cb_fn(cb_arg, blob, 0); 5819 return; 5820 } 5821 5822 blob = _spdk_blob_alloc(bs, blobid); 5823 if (!blob) { 5824 cb_fn(cb_arg, NULL, -ENOMEM); 5825 return; 5826 } 5827 5828 if (!opts) { 5829 spdk_blob_open_opts_init(&opts_default); 5830 opts = &opts_default; 5831 } 5832 5833 blob->clear_method = opts->clear_method; 5834 5835 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 5836 cpl.u.blob_handle.cb_fn = cb_fn; 5837 cpl.u.blob_handle.cb_arg = cb_arg; 5838 cpl.u.blob_handle.blob = blob; 5839 5840 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5841 if (!seq) { 5842 _spdk_blob_free(blob); 5843 cb_fn(cb_arg, NULL, -ENOMEM); 5844 return; 5845 } 5846 5847 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 5848 } 5849 5850 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5851 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5852 { 5853 _spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg); 5854 } 5855 5856 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid, 5857 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5858 { 5859 _spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg); 5860 } 5861 5862 /* END spdk_bs_open_blob */ 5863 5864 /* START spdk_blob_set_read_only */ 5865 int spdk_blob_set_read_only(struct spdk_blob *blob) 5866 { 5867 _spdk_blob_verify_md_op(blob); 5868 5869 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 5870 5871 blob->state = SPDK_BLOB_STATE_DIRTY; 5872 return 0; 5873 } 5874 /* END spdk_blob_set_read_only */ 5875 5876 /* START spdk_blob_sync_md */ 5877 5878 static void 5879 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5880 { 5881 struct spdk_blob *blob = cb_arg; 5882 5883 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 5884 blob->data_ro = true; 5885 blob->md_ro = true; 5886 } 5887 5888 spdk_bs_sequence_finish(seq, bserrno); 5889 } 5890 5891 static void 5892 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5893 { 5894 struct spdk_bs_cpl cpl; 5895 spdk_bs_sequence_t *seq; 5896 5897 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5898 cpl.u.blob_basic.cb_fn = cb_fn; 5899 cpl.u.blob_basic.cb_arg = cb_arg; 5900 5901 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5902 if (!seq) { 5903 cb_fn(cb_arg, -ENOMEM); 5904 return; 5905 } 5906 5907 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 5908 } 5909 5910 void 5911 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5912 { 5913 _spdk_blob_verify_md_op(blob); 5914 5915 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 5916 5917 if (blob->md_ro) { 5918 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 5919 cb_fn(cb_arg, 0); 5920 return; 5921 } 5922 5923 _spdk_blob_sync_md(blob, cb_fn, cb_arg); 5924 } 5925 5926 /* END spdk_blob_sync_md */ 5927 5928 struct spdk_blob_insert_cluster_ctx { 5929 struct spdk_thread *thread; 5930 struct spdk_blob *blob; 5931 uint32_t cluster_num; /* cluster index in blob */ 5932 uint32_t cluster; /* cluster on disk */ 5933 int rc; 5934 spdk_blob_op_complete cb_fn; 5935 void *cb_arg; 5936 }; 5937 5938 static void 5939 _spdk_blob_insert_cluster_msg_cpl(void *arg) 5940 { 5941 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5942 5943 ctx->cb_fn(ctx->cb_arg, ctx->rc); 5944 free(ctx); 5945 } 5946 5947 static void 5948 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno) 5949 { 5950 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5951 5952 ctx->rc = bserrno; 5953 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5954 } 5955 5956 static void 5957 _spdk_blob_insert_cluster_msg(void *arg) 5958 { 5959 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5960 5961 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster); 5962 if (ctx->rc != 0) { 5963 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5964 return; 5965 } 5966 5967 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 5968 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx); 5969 } 5970 5971 static void 5972 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 5973 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg) 5974 { 5975 struct spdk_blob_insert_cluster_ctx *ctx; 5976 5977 ctx = calloc(1, sizeof(*ctx)); 5978 if (ctx == NULL) { 5979 cb_fn(cb_arg, -ENOMEM); 5980 return; 5981 } 5982 5983 ctx->thread = spdk_get_thread(); 5984 ctx->blob = blob; 5985 ctx->cluster_num = cluster_num; 5986 ctx->cluster = cluster; 5987 ctx->cb_fn = cb_fn; 5988 ctx->cb_arg = cb_arg; 5989 5990 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx); 5991 } 5992 5993 /* START spdk_blob_close */ 5994 5995 static void 5996 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5997 { 5998 struct spdk_blob *blob = cb_arg; 5999 6000 if (bserrno == 0) { 6001 blob->open_ref--; 6002 if (blob->open_ref == 0) { 6003 /* 6004 * Blobs with active.num_pages == 0 are deleted blobs. 6005 * these blobs are removed from the blob_store list 6006 * when the deletion process starts - so don't try to 6007 * remove them again. 6008 */ 6009 if (blob->active.num_pages > 0) { 6010 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 6011 } 6012 _spdk_blob_free(blob); 6013 } 6014 } 6015 6016 spdk_bs_sequence_finish(seq, bserrno); 6017 } 6018 6019 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 6020 { 6021 struct spdk_bs_cpl cpl; 6022 spdk_bs_sequence_t *seq; 6023 6024 _spdk_blob_verify_md_op(blob); 6025 6026 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 6027 6028 if (blob->open_ref == 0) { 6029 cb_fn(cb_arg, -EBADF); 6030 return; 6031 } 6032 6033 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 6034 cpl.u.blob_basic.cb_fn = cb_fn; 6035 cpl.u.blob_basic.cb_arg = cb_arg; 6036 6037 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 6038 if (!seq) { 6039 cb_fn(cb_arg, -ENOMEM); 6040 return; 6041 } 6042 6043 /* Sync metadata */ 6044 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 6045 } 6046 6047 /* END spdk_blob_close */ 6048 6049 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 6050 { 6051 return spdk_get_io_channel(bs); 6052 } 6053 6054 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 6055 { 6056 spdk_put_io_channel(channel); 6057 } 6058 6059 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel, 6060 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 6061 { 6062 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 6063 SPDK_BLOB_UNMAP); 6064 } 6065 6066 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel, 6067 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 6068 { 6069 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 6070 SPDK_BLOB_WRITE_ZEROES); 6071 } 6072 6073 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel, 6074 void *payload, uint64_t offset, uint64_t length, 6075 spdk_blob_op_complete cb_fn, void *cb_arg) 6076 { 6077 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 6078 SPDK_BLOB_WRITE); 6079 } 6080 6081 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel, 6082 void *payload, uint64_t offset, uint64_t length, 6083 spdk_blob_op_complete cb_fn, void *cb_arg) 6084 { 6085 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 6086 SPDK_BLOB_READ); 6087 } 6088 6089 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel, 6090 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6091 spdk_blob_op_complete cb_fn, void *cb_arg) 6092 { 6093 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 6094 } 6095 6096 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel, 6097 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6098 spdk_blob_op_complete cb_fn, void *cb_arg) 6099 { 6100 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 6101 } 6102 6103 struct spdk_bs_iter_ctx { 6104 int64_t page_num; 6105 struct spdk_blob_store *bs; 6106 6107 spdk_blob_op_with_handle_complete cb_fn; 6108 void *cb_arg; 6109 }; 6110 6111 static void 6112 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 6113 { 6114 struct spdk_bs_iter_ctx *ctx = cb_arg; 6115 struct spdk_blob_store *bs = ctx->bs; 6116 spdk_blob_id id; 6117 6118 if (bserrno == 0) { 6119 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 6120 free(ctx); 6121 return; 6122 } 6123 6124 ctx->page_num++; 6125 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 6126 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 6127 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 6128 free(ctx); 6129 return; 6130 } 6131 6132 id = _spdk_bs_page_to_blobid(ctx->page_num); 6133 6134 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 6135 } 6136 6137 void 6138 spdk_bs_iter_first(struct spdk_blob_store *bs, 6139 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6140 { 6141 struct spdk_bs_iter_ctx *ctx; 6142 6143 ctx = calloc(1, sizeof(*ctx)); 6144 if (!ctx) { 6145 cb_fn(cb_arg, NULL, -ENOMEM); 6146 return; 6147 } 6148 6149 ctx->page_num = -1; 6150 ctx->bs = bs; 6151 ctx->cb_fn = cb_fn; 6152 ctx->cb_arg = cb_arg; 6153 6154 _spdk_bs_iter_cpl(ctx, NULL, -1); 6155 } 6156 6157 static void 6158 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 6159 { 6160 struct spdk_bs_iter_ctx *ctx = cb_arg; 6161 6162 _spdk_bs_iter_cpl(ctx, NULL, -1); 6163 } 6164 6165 void 6166 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob, 6167 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6168 { 6169 struct spdk_bs_iter_ctx *ctx; 6170 6171 assert(blob != NULL); 6172 6173 ctx = calloc(1, sizeof(*ctx)); 6174 if (!ctx) { 6175 cb_fn(cb_arg, NULL, -ENOMEM); 6176 return; 6177 } 6178 6179 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 6180 ctx->bs = bs; 6181 ctx->cb_fn = cb_fn; 6182 ctx->cb_arg = cb_arg; 6183 6184 /* Close the existing blob */ 6185 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx); 6186 } 6187 6188 static int 6189 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6190 uint16_t value_len, bool internal) 6191 { 6192 struct spdk_xattr_tailq *xattrs; 6193 struct spdk_xattr *xattr; 6194 size_t desc_size; 6195 6196 _spdk_blob_verify_md_op(blob); 6197 6198 if (blob->md_ro) { 6199 return -EPERM; 6200 } 6201 6202 desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len; 6203 if (desc_size > SPDK_BS_MAX_DESC_SIZE) { 6204 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name, 6205 desc_size, SPDK_BS_MAX_DESC_SIZE); 6206 return -ENOMEM; 6207 } 6208 6209 if (internal) { 6210 xattrs = &blob->xattrs_internal; 6211 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR; 6212 } else { 6213 xattrs = &blob->xattrs; 6214 } 6215 6216 TAILQ_FOREACH(xattr, xattrs, link) { 6217 if (!strcmp(name, xattr->name)) { 6218 free(xattr->value); 6219 xattr->value_len = value_len; 6220 xattr->value = malloc(value_len); 6221 memcpy(xattr->value, value, value_len); 6222 6223 blob->state = SPDK_BLOB_STATE_DIRTY; 6224 6225 return 0; 6226 } 6227 } 6228 6229 xattr = calloc(1, sizeof(*xattr)); 6230 if (!xattr) { 6231 return -ENOMEM; 6232 } 6233 xattr->name = strdup(name); 6234 xattr->value_len = value_len; 6235 xattr->value = malloc(value_len); 6236 memcpy(xattr->value, value, value_len); 6237 TAILQ_INSERT_TAIL(xattrs, xattr, link); 6238 6239 blob->state = SPDK_BLOB_STATE_DIRTY; 6240 6241 return 0; 6242 } 6243 6244 int 6245 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6246 uint16_t value_len) 6247 { 6248 return _spdk_blob_set_xattr(blob, name, value, value_len, false); 6249 } 6250 6251 static int 6252 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal) 6253 { 6254 struct spdk_xattr_tailq *xattrs; 6255 struct spdk_xattr *xattr; 6256 6257 _spdk_blob_verify_md_op(blob); 6258 6259 if (blob->md_ro) { 6260 return -EPERM; 6261 } 6262 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6263 6264 TAILQ_FOREACH(xattr, xattrs, link) { 6265 if (!strcmp(name, xattr->name)) { 6266 TAILQ_REMOVE(xattrs, xattr, link); 6267 free(xattr->value); 6268 free(xattr->name); 6269 free(xattr); 6270 6271 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) { 6272 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR; 6273 } 6274 blob->state = SPDK_BLOB_STATE_DIRTY; 6275 6276 return 0; 6277 } 6278 } 6279 6280 return -ENOENT; 6281 } 6282 6283 int 6284 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name) 6285 { 6286 return _spdk_blob_remove_xattr(blob, name, false); 6287 } 6288 6289 static int 6290 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6291 const void **value, size_t *value_len, bool internal) 6292 { 6293 struct spdk_xattr *xattr; 6294 struct spdk_xattr_tailq *xattrs; 6295 6296 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6297 6298 TAILQ_FOREACH(xattr, xattrs, link) { 6299 if (!strcmp(name, xattr->name)) { 6300 *value = xattr->value; 6301 *value_len = xattr->value_len; 6302 return 0; 6303 } 6304 } 6305 return -ENOENT; 6306 } 6307 6308 int 6309 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6310 const void **value, size_t *value_len) 6311 { 6312 _spdk_blob_verify_md_op(blob); 6313 6314 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false); 6315 } 6316 6317 struct spdk_xattr_names { 6318 uint32_t count; 6319 const char *names[0]; 6320 }; 6321 6322 static int 6323 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names) 6324 { 6325 struct spdk_xattr *xattr; 6326 int count = 0; 6327 6328 TAILQ_FOREACH(xattr, xattrs, link) { 6329 count++; 6330 } 6331 6332 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 6333 if (*names == NULL) { 6334 return -ENOMEM; 6335 } 6336 6337 TAILQ_FOREACH(xattr, xattrs, link) { 6338 (*names)->names[(*names)->count++] = xattr->name; 6339 } 6340 6341 return 0; 6342 } 6343 6344 int 6345 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names) 6346 { 6347 _spdk_blob_verify_md_op(blob); 6348 6349 return _spdk_blob_get_xattr_names(&blob->xattrs, names); 6350 } 6351 6352 uint32_t 6353 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 6354 { 6355 assert(names != NULL); 6356 6357 return names->count; 6358 } 6359 6360 const char * 6361 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 6362 { 6363 if (index >= names->count) { 6364 return NULL; 6365 } 6366 6367 return names->names[index]; 6368 } 6369 6370 void 6371 spdk_xattr_names_free(struct spdk_xattr_names *names) 6372 { 6373 free(names); 6374 } 6375 6376 struct spdk_bs_type 6377 spdk_bs_get_bstype(struct spdk_blob_store *bs) 6378 { 6379 return bs->bstype; 6380 } 6381 6382 void 6383 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 6384 { 6385 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 6386 } 6387 6388 bool 6389 spdk_blob_is_read_only(struct spdk_blob *blob) 6390 { 6391 assert(blob != NULL); 6392 return (blob->data_ro || blob->md_ro); 6393 } 6394 6395 bool 6396 spdk_blob_is_snapshot(struct spdk_blob *blob) 6397 { 6398 struct spdk_blob_list *snapshot_entry; 6399 6400 assert(blob != NULL); 6401 6402 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 6403 if (snapshot_entry == NULL) { 6404 return false; 6405 } 6406 6407 return true; 6408 } 6409 6410 bool 6411 spdk_blob_is_clone(struct spdk_blob *blob) 6412 { 6413 assert(blob != NULL); 6414 6415 if (blob->parent_id != SPDK_BLOBID_INVALID) { 6416 assert(spdk_blob_is_thin_provisioned(blob)); 6417 return true; 6418 } 6419 6420 return false; 6421 } 6422 6423 bool 6424 spdk_blob_is_thin_provisioned(struct spdk_blob *blob) 6425 { 6426 assert(blob != NULL); 6427 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV); 6428 } 6429 6430 static void 6431 _spdk_blob_update_clear_method(struct spdk_blob *blob) 6432 { 6433 enum blob_clear_method stored_cm; 6434 6435 assert(blob != NULL); 6436 6437 /* If BLOB_CLEAR_WITH_DEFAULT was passed in, use the setting stored 6438 * in metadata previously. If something other than the default was 6439 * specified, ignore stored value and used what was passed in. 6440 */ 6441 stored_cm = ((blob->md_ro_flags & SPDK_BLOB_CLEAR_METHOD) >> SPDK_BLOB_CLEAR_METHOD_SHIFT); 6442 6443 if (blob->clear_method == BLOB_CLEAR_WITH_DEFAULT) { 6444 blob->clear_method = stored_cm; 6445 } else if (blob->clear_method != stored_cm) { 6446 SPDK_WARNLOG("Using passed in clear method 0x%x instead of stored value of 0x%x\n", 6447 blob->clear_method, stored_cm); 6448 } 6449 } 6450 6451 spdk_blob_id 6452 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id) 6453 { 6454 struct spdk_blob_list *snapshot_entry = NULL; 6455 struct spdk_blob_list *clone_entry = NULL; 6456 6457 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 6458 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6459 if (clone_entry->id == blob_id) { 6460 return snapshot_entry->id; 6461 } 6462 } 6463 } 6464 6465 return SPDK_BLOBID_INVALID; 6466 } 6467 6468 int 6469 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids, 6470 size_t *count) 6471 { 6472 struct spdk_blob_list *snapshot_entry, *clone_entry; 6473 size_t n; 6474 6475 snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid); 6476 if (snapshot_entry == NULL) { 6477 *count = 0; 6478 return 0; 6479 } 6480 6481 if (ids == NULL || *count < snapshot_entry->clone_count) { 6482 *count = snapshot_entry->clone_count; 6483 return -ENOMEM; 6484 } 6485 *count = snapshot_entry->clone_count; 6486 6487 n = 0; 6488 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6489 ids[n++] = clone_entry->id; 6490 } 6491 6492 return 0; 6493 } 6494 6495 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 6496