1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 #include "spdk/util.h" 44 #include "spdk/string.h" 45 46 #include "spdk_internal/assert.h" 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 51 #define BLOB_CRC32C_INITIAL 0xffffffffUL 52 53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 57 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg); 58 59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 60 uint16_t value_len, bool internal); 61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 62 const void **value, size_t *value_len, bool internal); 63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal); 64 65 static void 66 _spdk_blob_verify_md_op(struct spdk_blob *blob) 67 { 68 assert(blob != NULL); 69 assert(spdk_get_thread() == blob->bs->md_thread); 70 assert(blob->state != SPDK_BLOB_STATE_LOADING); 71 } 72 73 static struct spdk_blob_list * 74 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) 75 { 76 struct spdk_blob_list *snapshot_entry = NULL; 77 78 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 79 if (snapshot_entry->id == blobid) { 80 break; 81 } 82 } 83 84 return snapshot_entry; 85 } 86 87 static void 88 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 89 { 90 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 91 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 92 assert(bs->num_free_clusters > 0); 93 94 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 95 96 spdk_bit_array_set(bs->used_clusters, cluster_num); 97 bs->num_free_clusters--; 98 } 99 100 static int 101 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster) 102 { 103 uint64_t *cluster_lba = &blob->active.clusters[cluster_num]; 104 105 _spdk_blob_verify_md_op(blob); 106 107 if (*cluster_lba != 0) { 108 return -EEXIST; 109 } 110 111 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster); 112 return 0; 113 } 114 115 static int 116 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, 117 uint64_t *lowest_free_cluster, bool update_map) 118 { 119 pthread_mutex_lock(&blob->bs->used_clusters_mutex); 120 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters, 121 *lowest_free_cluster); 122 if (*lowest_free_cluster == UINT32_MAX) { 123 /* No more free clusters. Cannot satisfy the request */ 124 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 125 return -ENOSPC; 126 } 127 128 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id); 129 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster); 130 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 131 132 if (update_map) { 133 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster); 134 } 135 136 return 0; 137 } 138 139 static void 140 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 141 { 142 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 143 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 144 assert(bs->num_free_clusters < bs->total_clusters); 145 146 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 147 148 pthread_mutex_lock(&bs->used_clusters_mutex); 149 spdk_bit_array_clear(bs->used_clusters, cluster_num); 150 bs->num_free_clusters++; 151 pthread_mutex_unlock(&bs->used_clusters_mutex); 152 } 153 154 static void 155 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs) 156 { 157 xattrs->count = 0; 158 xattrs->names = NULL; 159 xattrs->ctx = NULL; 160 xattrs->get_value = NULL; 161 } 162 163 void 164 spdk_blob_opts_init(struct spdk_blob_opts *opts) 165 { 166 opts->num_clusters = 0; 167 opts->thin_provision = false; 168 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 169 _spdk_blob_xattrs_init(&opts->xattrs); 170 } 171 172 void 173 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts) 174 { 175 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 176 } 177 178 static struct spdk_blob * 179 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 180 { 181 struct spdk_blob *blob; 182 183 blob = calloc(1, sizeof(*blob)); 184 if (!blob) { 185 return NULL; 186 } 187 188 blob->id = id; 189 blob->bs = bs; 190 191 blob->parent_id = SPDK_BLOBID_INVALID; 192 193 blob->state = SPDK_BLOB_STATE_DIRTY; 194 blob->active.num_pages = 1; 195 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 196 if (!blob->active.pages) { 197 free(blob); 198 return NULL; 199 } 200 201 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 202 203 TAILQ_INIT(&blob->xattrs); 204 TAILQ_INIT(&blob->xattrs_internal); 205 206 return blob; 207 } 208 209 static void 210 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs) 211 { 212 struct spdk_xattr *xattr, *xattr_tmp; 213 214 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) { 215 TAILQ_REMOVE(xattrs, xattr, link); 216 free(xattr->name); 217 free(xattr->value); 218 free(xattr); 219 } 220 } 221 222 static void 223 _spdk_blob_free(struct spdk_blob *blob) 224 { 225 assert(blob != NULL); 226 227 free(blob->active.clusters); 228 free(blob->clean.clusters); 229 free(blob->active.pages); 230 free(blob->clean.pages); 231 232 _spdk_xattrs_free(&blob->xattrs); 233 _spdk_xattrs_free(&blob->xattrs_internal); 234 235 if (blob->back_bs_dev) { 236 blob->back_bs_dev->destroy(blob->back_bs_dev); 237 } 238 239 free(blob); 240 } 241 242 struct freeze_io_ctx { 243 struct spdk_bs_cpl cpl; 244 struct spdk_blob *blob; 245 }; 246 247 static void 248 _spdk_blob_io_sync(struct spdk_io_channel_iter *i) 249 { 250 spdk_for_each_channel_continue(i, 0); 251 } 252 253 static void 254 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i) 255 { 256 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 257 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); 258 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 259 struct spdk_bs_request_set *set; 260 struct spdk_bs_user_op_args *args; 261 spdk_bs_user_op_t *op, *tmp; 262 263 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { 264 set = (struct spdk_bs_request_set *)op; 265 args = &set->u.user_op; 266 267 if (args->blob == ctx->blob) { 268 TAILQ_REMOVE(&ch->queued_io, op, link); 269 spdk_bs_user_op_execute(op); 270 } 271 } 272 273 spdk_for_each_channel_continue(i, 0); 274 } 275 276 static void 277 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status) 278 { 279 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 280 281 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); 282 283 free(ctx); 284 } 285 286 static void 287 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 288 { 289 struct freeze_io_ctx *ctx; 290 291 ctx = calloc(1, sizeof(*ctx)); 292 if (!ctx) { 293 cb_fn(cb_arg, -ENOMEM); 294 return; 295 } 296 297 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 298 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 299 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 300 ctx->blob = blob; 301 302 /* Freeze I/O on blob */ 303 blob->frozen_refcnt++; 304 305 if (blob->frozen_refcnt == 1) { 306 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl); 307 } else { 308 cb_fn(cb_arg, 0); 309 free(ctx); 310 } 311 } 312 313 static void 314 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 315 { 316 struct freeze_io_ctx *ctx; 317 318 ctx = calloc(1, sizeof(*ctx)); 319 if (!ctx) { 320 cb_fn(cb_arg, -ENOMEM); 321 return; 322 } 323 324 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 325 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 326 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 327 ctx->blob = blob; 328 329 assert(blob->frozen_refcnt > 0); 330 331 blob->frozen_refcnt--; 332 333 if (blob->frozen_refcnt == 0) { 334 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl); 335 } else { 336 cb_fn(cb_arg, 0); 337 free(ctx); 338 } 339 } 340 341 static int 342 _spdk_blob_mark_clean(struct spdk_blob *blob) 343 { 344 uint64_t *clusters = NULL; 345 uint32_t *pages = NULL; 346 347 assert(blob != NULL); 348 349 if (blob->active.num_clusters) { 350 assert(blob->active.clusters); 351 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 352 if (!clusters) { 353 return -ENOMEM; 354 } 355 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 356 } 357 358 if (blob->active.num_pages) { 359 assert(blob->active.pages); 360 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 361 if (!pages) { 362 free(clusters); 363 return -ENOMEM; 364 } 365 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 366 } 367 368 free(blob->clean.clusters); 369 free(blob->clean.pages); 370 371 blob->clean.num_clusters = blob->active.num_clusters; 372 blob->clean.clusters = blob->active.clusters; 373 blob->clean.num_pages = blob->active.num_pages; 374 blob->clean.pages = blob->active.pages; 375 376 blob->active.clusters = clusters; 377 blob->active.pages = pages; 378 379 /* If the metadata was dirtied again while the metadata was being written to disk, 380 * we do not want to revert the DIRTY state back to CLEAN here. 381 */ 382 if (blob->state == SPDK_BLOB_STATE_LOADING) { 383 blob->state = SPDK_BLOB_STATE_CLEAN; 384 } 385 386 return 0; 387 } 388 389 static int 390 _spdk_blob_deserialize_xattr(struct spdk_blob *blob, 391 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal) 392 { 393 struct spdk_xattr *xattr; 394 395 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 396 sizeof(desc_xattr->value_length) + 397 desc_xattr->name_length + desc_xattr->value_length) { 398 return -EINVAL; 399 } 400 401 xattr = calloc(1, sizeof(*xattr)); 402 if (xattr == NULL) { 403 return -ENOMEM; 404 } 405 406 xattr->name = malloc(desc_xattr->name_length + 1); 407 if (xattr->name == NULL) { 408 free(xattr); 409 return -ENOMEM; 410 } 411 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 412 xattr->name[desc_xattr->name_length] = '\0'; 413 414 xattr->value = malloc(desc_xattr->value_length); 415 if (xattr->value == NULL) { 416 free(xattr->name); 417 free(xattr); 418 return -ENOMEM; 419 } 420 xattr->value_len = desc_xattr->value_length; 421 memcpy(xattr->value, 422 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 423 desc_xattr->value_length); 424 425 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link); 426 427 return 0; 428 } 429 430 431 static int 432 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 433 { 434 struct spdk_blob_md_descriptor *desc; 435 size_t cur_desc = 0; 436 void *tmp; 437 438 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 439 while (cur_desc < sizeof(page->descriptors)) { 440 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 441 if (desc->length == 0) { 442 /* If padding and length are 0, this terminates the page */ 443 break; 444 } 445 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 446 struct spdk_blob_md_descriptor_flags *desc_flags; 447 448 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 449 450 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 451 return -EINVAL; 452 } 453 454 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 455 SPDK_BLOB_INVALID_FLAGS_MASK) { 456 return -EINVAL; 457 } 458 459 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 460 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 461 blob->data_ro = true; 462 blob->md_ro = true; 463 } 464 465 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 466 SPDK_BLOB_MD_RO_FLAGS_MASK) { 467 blob->md_ro = true; 468 } 469 470 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 471 blob->data_ro = true; 472 blob->md_ro = true; 473 } 474 475 blob->invalid_flags = desc_flags->invalid_flags; 476 blob->data_ro_flags = desc_flags->data_ro_flags; 477 blob->md_ro_flags = desc_flags->md_ro_flags; 478 479 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 480 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 481 unsigned int i, j; 482 unsigned int cluster_count = blob->active.num_clusters; 483 484 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 485 486 if (desc_extent_rle->length == 0 || 487 (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) { 488 return -EINVAL; 489 } 490 491 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 492 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 493 if (desc_extent_rle->extents[i].cluster_idx != 0) { 494 if (!spdk_bit_array_get(blob->bs->used_clusters, 495 desc_extent_rle->extents[i].cluster_idx + j)) { 496 return -EINVAL; 497 } 498 } 499 cluster_count++; 500 } 501 } 502 503 if (cluster_count == 0) { 504 return -EINVAL; 505 } 506 tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters)); 507 if (tmp == NULL) { 508 return -ENOMEM; 509 } 510 blob->active.clusters = tmp; 511 blob->active.cluster_array_size = cluster_count; 512 513 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 514 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 515 if (desc_extent_rle->extents[i].cluster_idx != 0) { 516 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 517 desc_extent_rle->extents[i].cluster_idx + j); 518 } else if (spdk_blob_is_thin_provisioned(blob)) { 519 blob->active.clusters[blob->active.num_clusters++] = 0; 520 } else { 521 return -EINVAL; 522 } 523 } 524 } 525 526 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 527 int rc; 528 529 rc = _spdk_blob_deserialize_xattr(blob, 530 (struct spdk_blob_md_descriptor_xattr *) desc, false); 531 if (rc != 0) { 532 return rc; 533 } 534 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 535 int rc; 536 537 rc = _spdk_blob_deserialize_xattr(blob, 538 (struct spdk_blob_md_descriptor_xattr *) desc, true); 539 if (rc != 0) { 540 return rc; 541 } 542 } else { 543 /* Unrecognized descriptor type. Do not fail - just continue to the 544 * next descriptor. If this descriptor is associated with some feature 545 * defined in a newer version of blobstore, that version of blobstore 546 * should create and set an associated feature flag to specify if this 547 * blob can be loaded or not. 548 */ 549 } 550 551 /* Advance to the next descriptor */ 552 cur_desc += sizeof(*desc) + desc->length; 553 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 554 break; 555 } 556 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 557 } 558 559 return 0; 560 } 561 562 static int 563 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 564 struct spdk_blob *blob) 565 { 566 const struct spdk_blob_md_page *page; 567 uint32_t i; 568 int rc; 569 570 assert(page_count > 0); 571 assert(pages[0].sequence_num == 0); 572 assert(blob != NULL); 573 assert(blob->state == SPDK_BLOB_STATE_LOADING); 574 assert(blob->active.clusters == NULL); 575 576 /* The blobid provided doesn't match what's in the MD, this can 577 * happen for example if a bogus blobid is passed in through open. 578 */ 579 if (blob->id != pages[0].id) { 580 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 581 blob->id, pages[0].id); 582 return -ENOENT; 583 } 584 585 for (i = 0; i < page_count; i++) { 586 page = &pages[i]; 587 588 assert(page->id == blob->id); 589 assert(page->sequence_num == i); 590 591 rc = _spdk_blob_parse_page(page, blob); 592 if (rc != 0) { 593 return rc; 594 } 595 } 596 597 return 0; 598 } 599 600 static int 601 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 602 struct spdk_blob_md_page **pages, 603 uint32_t *page_count, 604 struct spdk_blob_md_page **last_page) 605 { 606 struct spdk_blob_md_page *page; 607 608 assert(pages != NULL); 609 assert(page_count != NULL); 610 611 if (*page_count == 0) { 612 assert(*pages == NULL); 613 *page_count = 1; 614 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 615 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 616 } else { 617 assert(*pages != NULL); 618 (*page_count)++; 619 *pages = spdk_realloc(*pages, 620 SPDK_BS_PAGE_SIZE * (*page_count), 621 SPDK_BS_PAGE_SIZE); 622 } 623 624 if (*pages == NULL) { 625 *page_count = 0; 626 *last_page = NULL; 627 return -ENOMEM; 628 } 629 630 page = &(*pages)[*page_count - 1]; 631 memset(page, 0, sizeof(*page)); 632 page->id = blob->id; 633 page->sequence_num = *page_count - 1; 634 page->next = SPDK_INVALID_MD_PAGE; 635 *last_page = page; 636 637 return 0; 638 } 639 640 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 641 * Update required_sz on both success and failure. 642 * 643 */ 644 static int 645 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 646 uint8_t *buf, size_t buf_sz, 647 size_t *required_sz, bool internal) 648 { 649 struct spdk_blob_md_descriptor_xattr *desc; 650 651 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 652 strlen(xattr->name) + 653 xattr->value_len; 654 655 if (buf_sz < *required_sz) { 656 return -1; 657 } 658 659 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 660 661 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR; 662 desc->length = sizeof(desc->name_length) + 663 sizeof(desc->value_length) + 664 strlen(xattr->name) + 665 xattr->value_len; 666 desc->name_length = strlen(xattr->name); 667 desc->value_length = xattr->value_len; 668 669 memcpy(desc->name, xattr->name, desc->name_length); 670 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 671 xattr->value, 672 desc->value_length); 673 674 return 0; 675 } 676 677 static void 678 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob, 679 uint64_t start_cluster, uint64_t *next_cluster, 680 uint8_t **buf, size_t *buf_sz) 681 { 682 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 683 size_t cur_sz; 684 uint64_t i, extent_idx; 685 uint64_t lba, lba_per_cluster, lba_count; 686 687 /* The buffer must have room for at least one extent */ 688 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]); 689 if (*buf_sz < cur_sz) { 690 *next_cluster = start_cluster; 691 return; 692 } 693 694 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf; 695 desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE; 696 697 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 698 699 lba = blob->active.clusters[start_cluster]; 700 lba_count = lba_per_cluster; 701 extent_idx = 0; 702 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 703 if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) { 704 /* Run-length encode sequential non-zero LBA */ 705 lba_count += lba_per_cluster; 706 continue; 707 } else if (lba == 0 && blob->active.clusters[i] == 0) { 708 /* Run-length encode unallocated clusters */ 709 lba_count += lba_per_cluster; 710 continue; 711 } 712 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 713 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 714 extent_idx++; 715 716 cur_sz += sizeof(desc_extent_rle->extents[extent_idx]); 717 718 if (*buf_sz < cur_sz) { 719 /* If we ran out of buffer space, return */ 720 *next_cluster = i; 721 goto finish; 722 } 723 724 lba = blob->active.clusters[i]; 725 lba_count = lba_per_cluster; 726 } 727 728 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 729 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 730 extent_idx++; 731 732 *next_cluster = blob->active.num_clusters; 733 734 finish: 735 desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx; 736 *buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 737 *buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 738 739 return; 740 } 741 742 static int 743 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob, 744 struct spdk_blob_md_page **pages, 745 struct spdk_blob_md_page *cur_page, 746 uint32_t *page_count, uint8_t **buf, 747 size_t *remaining_sz) 748 { 749 uint64_t last_cluster; 750 int rc; 751 752 last_cluster = 0; 753 while (last_cluster < blob->active.num_clusters) { 754 _spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz); 755 756 if (last_cluster == blob->active.num_clusters) { 757 break; 758 } 759 760 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 761 if (rc < 0) { 762 return rc; 763 } 764 765 *buf = (uint8_t *)cur_page->descriptors; 766 *remaining_sz = sizeof(cur_page->descriptors); 767 } 768 769 return 0; 770 } 771 772 static void 773 _spdk_blob_serialize_flags(const struct spdk_blob *blob, 774 uint8_t *buf, size_t *buf_sz) 775 { 776 struct spdk_blob_md_descriptor_flags *desc; 777 778 /* 779 * Flags get serialized first, so we should always have room for the flags 780 * descriptor. 781 */ 782 assert(*buf_sz >= sizeof(*desc)); 783 784 desc = (struct spdk_blob_md_descriptor_flags *)buf; 785 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 786 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 787 desc->invalid_flags = blob->invalid_flags; 788 desc->data_ro_flags = blob->data_ro_flags; 789 desc->md_ro_flags = blob->md_ro_flags; 790 791 *buf_sz -= sizeof(*desc); 792 } 793 794 static int 795 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob, 796 const struct spdk_xattr_tailq *xattrs, bool internal, 797 struct spdk_blob_md_page **pages, 798 struct spdk_blob_md_page *cur_page, 799 uint32_t *page_count, uint8_t **buf, 800 size_t *remaining_sz) 801 { 802 const struct spdk_xattr *xattr; 803 int rc; 804 805 TAILQ_FOREACH(xattr, xattrs, link) { 806 size_t required_sz = 0; 807 808 rc = _spdk_blob_serialize_xattr(xattr, 809 *buf, *remaining_sz, 810 &required_sz, internal); 811 if (rc < 0) { 812 /* Need to add a new page to the chain */ 813 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 814 &cur_page); 815 if (rc < 0) { 816 spdk_free(*pages); 817 *pages = NULL; 818 *page_count = 0; 819 return rc; 820 } 821 822 *buf = (uint8_t *)cur_page->descriptors; 823 *remaining_sz = sizeof(cur_page->descriptors); 824 825 /* Try again */ 826 required_sz = 0; 827 rc = _spdk_blob_serialize_xattr(xattr, 828 *buf, *remaining_sz, 829 &required_sz, internal); 830 831 if (rc < 0) { 832 spdk_free(*pages); 833 *pages = NULL; 834 *page_count = 0; 835 return rc; 836 } 837 } 838 839 *remaining_sz -= required_sz; 840 *buf += required_sz; 841 } 842 843 return 0; 844 } 845 846 static int 847 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 848 uint32_t *page_count) 849 { 850 struct spdk_blob_md_page *cur_page; 851 int rc; 852 uint8_t *buf; 853 size_t remaining_sz; 854 855 assert(pages != NULL); 856 assert(page_count != NULL); 857 assert(blob != NULL); 858 assert(blob->state == SPDK_BLOB_STATE_DIRTY); 859 860 *pages = NULL; 861 *page_count = 0; 862 863 /* A blob always has at least 1 page, even if it has no descriptors */ 864 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 865 if (rc < 0) { 866 return rc; 867 } 868 869 buf = (uint8_t *)cur_page->descriptors; 870 remaining_sz = sizeof(cur_page->descriptors); 871 872 /* Serialize flags */ 873 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 874 buf += sizeof(struct spdk_blob_md_descriptor_flags); 875 876 /* Serialize xattrs */ 877 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false, 878 pages, cur_page, page_count, &buf, &remaining_sz); 879 if (rc < 0) { 880 return rc; 881 } 882 883 /* Serialize internal xattrs */ 884 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true, 885 pages, cur_page, page_count, &buf, &remaining_sz); 886 if (rc < 0) { 887 return rc; 888 } 889 890 /* Serialize extents */ 891 rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz); 892 893 return rc; 894 } 895 896 struct spdk_blob_load_ctx { 897 struct spdk_blob *blob; 898 899 struct spdk_blob_md_page *pages; 900 uint32_t num_pages; 901 spdk_bs_sequence_t *seq; 902 903 spdk_bs_sequence_cpl cb_fn; 904 void *cb_arg; 905 }; 906 907 static uint32_t 908 _spdk_blob_md_page_calc_crc(void *page) 909 { 910 uint32_t crc; 911 912 crc = BLOB_CRC32C_INITIAL; 913 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 914 crc ^= BLOB_CRC32C_INITIAL; 915 916 return crc; 917 918 } 919 920 static void 921 _spdk_blob_load_final(void *cb_arg, int bserrno) 922 { 923 struct spdk_blob_load_ctx *ctx = cb_arg; 924 struct spdk_blob *blob = ctx->blob; 925 926 if (bserrno == 0) { 927 _spdk_blob_mark_clean(blob); 928 } 929 930 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno); 931 932 /* Free the memory */ 933 spdk_free(ctx->pages); 934 free(ctx); 935 } 936 937 static void 938 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno) 939 { 940 struct spdk_blob_load_ctx *ctx = cb_arg; 941 struct spdk_blob *blob = ctx->blob; 942 943 if (bserrno == 0) { 944 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot); 945 if (blob->back_bs_dev == NULL) { 946 bserrno = -ENOMEM; 947 } 948 } 949 if (bserrno != 0) { 950 SPDK_ERRLOG("Snapshot fail\n"); 951 } 952 953 _spdk_blob_load_final(ctx, bserrno); 954 } 955 956 static void _spdk_blob_update_clear_method(struct spdk_blob *blob); 957 958 static void 959 _spdk_blob_load_backing_dev(void *cb_arg) 960 { 961 struct spdk_blob_load_ctx *ctx = cb_arg; 962 struct spdk_blob *blob = ctx->blob; 963 const void *value; 964 size_t len; 965 int rc; 966 967 if (spdk_blob_is_thin_provisioned(blob)) { 968 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true); 969 if (rc == 0) { 970 if (len != sizeof(spdk_blob_id)) { 971 _spdk_blob_load_final(ctx, -EINVAL); 972 return; 973 } 974 /* open snapshot blob and continue in the callback function */ 975 blob->parent_id = *(spdk_blob_id *)value; 976 spdk_bs_open_blob(blob->bs, blob->parent_id, 977 _spdk_blob_load_snapshot_cpl, ctx); 978 return; 979 } else { 980 /* add zeroes_dev for thin provisioned blob */ 981 blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 982 } 983 } else { 984 /* standard blob */ 985 blob->back_bs_dev = NULL; 986 } 987 _spdk_blob_load_final(ctx, 0); 988 } 989 990 static void 991 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 992 { 993 struct spdk_blob_load_ctx *ctx = cb_arg; 994 struct spdk_blob *blob = ctx->blob; 995 struct spdk_blob_md_page *page; 996 int rc; 997 uint32_t crc; 998 999 if (bserrno) { 1000 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno); 1001 _spdk_blob_load_final(ctx, bserrno); 1002 return; 1003 } 1004 1005 page = &ctx->pages[ctx->num_pages - 1]; 1006 crc = _spdk_blob_md_page_calc_crc(page); 1007 if (crc != page->crc) { 1008 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 1009 _spdk_blob_load_final(ctx, -EINVAL); 1010 return; 1011 } 1012 1013 if (page->next != SPDK_INVALID_MD_PAGE) { 1014 uint32_t next_page = page->next; 1015 uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page); 1016 1017 /* Read the next page */ 1018 ctx->num_pages++; 1019 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 1020 sizeof(*page)); 1021 if (ctx->pages == NULL) { 1022 _spdk_blob_load_final(ctx, -ENOMEM); 1023 return; 1024 } 1025 1026 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1], 1027 next_lba, 1028 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 1029 _spdk_blob_load_cpl, ctx); 1030 return; 1031 } 1032 1033 /* Parse the pages */ 1034 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 1035 if (rc) { 1036 _spdk_blob_load_final(ctx, rc); 1037 return; 1038 } 1039 ctx->seq = seq; 1040 1041 /* Check the clear_method stored in metadata vs what may have been passed 1042 * via spdk_bs_open_blob_ext() and update accordingly. 1043 */ 1044 _spdk_blob_update_clear_method(blob); 1045 1046 _spdk_blob_load_backing_dev(ctx); 1047 } 1048 1049 /* Load a blob from disk given a blobid */ 1050 static void 1051 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1052 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1053 { 1054 struct spdk_blob_load_ctx *ctx; 1055 struct spdk_blob_store *bs; 1056 uint32_t page_num; 1057 uint64_t lba; 1058 1059 _spdk_blob_verify_md_op(blob); 1060 1061 bs = blob->bs; 1062 1063 ctx = calloc(1, sizeof(*ctx)); 1064 if (!ctx) { 1065 cb_fn(seq, cb_arg, -ENOMEM); 1066 return; 1067 } 1068 1069 ctx->blob = blob; 1070 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE); 1071 if (!ctx->pages) { 1072 free(ctx); 1073 cb_fn(seq, cb_arg, -ENOMEM); 1074 return; 1075 } 1076 ctx->num_pages = 1; 1077 ctx->cb_fn = cb_fn; 1078 ctx->cb_arg = cb_arg; 1079 ctx->seq = seq; 1080 1081 page_num = _spdk_bs_blobid_to_page(blob->id); 1082 lba = _spdk_bs_md_page_to_lba(blob->bs, page_num); 1083 1084 blob->state = SPDK_BLOB_STATE_LOADING; 1085 1086 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1087 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 1088 _spdk_blob_load_cpl, ctx); 1089 } 1090 1091 struct spdk_blob_persist_ctx { 1092 struct spdk_blob *blob; 1093 1094 struct spdk_bs_super_block *super; 1095 1096 struct spdk_blob_md_page *pages; 1097 1098 spdk_bs_sequence_t *seq; 1099 spdk_bs_sequence_cpl cb_fn; 1100 void *cb_arg; 1101 }; 1102 1103 static void 1104 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba, 1105 uint32_t lba_count) 1106 { 1107 switch (ctx->blob->clear_method) { 1108 case BLOB_CLEAR_WITH_DEFAULT: 1109 case BLOB_CLEAR_WITH_UNMAP: 1110 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1111 break; 1112 case BLOB_CLEAR_WITH_WRITE_ZEROES: 1113 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1114 break; 1115 case BLOB_CLEAR_WITH_NONE: 1116 default: 1117 break; 1118 } 1119 } 1120 1121 static void 1122 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1123 { 1124 struct spdk_blob_persist_ctx *ctx = cb_arg; 1125 struct spdk_blob *blob = ctx->blob; 1126 1127 if (bserrno == 0) { 1128 _spdk_blob_mark_clean(blob); 1129 } 1130 1131 /* Call user callback */ 1132 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 1133 1134 /* Free the memory */ 1135 spdk_free(ctx->pages); 1136 free(ctx); 1137 } 1138 1139 static void 1140 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1141 { 1142 struct spdk_blob_persist_ctx *ctx = cb_arg; 1143 struct spdk_blob *blob = ctx->blob; 1144 struct spdk_blob_store *bs = blob->bs; 1145 size_t i; 1146 1147 /* Release all clusters that were truncated */ 1148 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1149 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 1150 1151 /* Nothing to release if it was not allocated */ 1152 if (blob->active.clusters[i] != 0) { 1153 _spdk_bs_release_cluster(bs, cluster_num); 1154 } 1155 } 1156 1157 if (blob->active.num_clusters == 0) { 1158 free(blob->active.clusters); 1159 blob->active.clusters = NULL; 1160 blob->active.cluster_array_size = 0; 1161 } else if (blob->active.num_clusters != blob->active.cluster_array_size) { 1162 #ifndef __clang_analyzer__ 1163 void *tmp; 1164 1165 /* scan-build really can't figure reallocs, workaround it */ 1166 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters); 1167 assert(tmp != NULL); 1168 blob->active.clusters = tmp; 1169 #endif 1170 blob->active.cluster_array_size = blob->active.num_clusters; 1171 } 1172 1173 _spdk_blob_persist_complete(seq, ctx, bserrno); 1174 } 1175 1176 static void 1177 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1178 { 1179 struct spdk_blob_persist_ctx *ctx = cb_arg; 1180 struct spdk_blob *blob = ctx->blob; 1181 struct spdk_blob_store *bs = blob->bs; 1182 spdk_bs_batch_t *batch; 1183 size_t i; 1184 uint64_t lba; 1185 uint32_t lba_count; 1186 1187 /* Clusters don't move around in blobs. The list shrinks or grows 1188 * at the end, but no changes ever occur in the middle of the list. 1189 */ 1190 1191 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx); 1192 1193 /* Clear all clusters that were truncated */ 1194 lba = 0; 1195 lba_count = 0; 1196 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1197 uint64_t next_lba = blob->active.clusters[i]; 1198 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 1199 1200 if (next_lba > 0 && (lba + lba_count) == next_lba) { 1201 /* This cluster is contiguous with the previous one. */ 1202 lba_count += next_lba_count; 1203 continue; 1204 } 1205 1206 /* This cluster is not contiguous with the previous one. */ 1207 1208 /* If a run of LBAs previously existing, clear them now */ 1209 if (lba_count > 0) { 1210 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1211 } 1212 1213 /* Start building the next batch */ 1214 lba = next_lba; 1215 if (next_lba > 0) { 1216 lba_count = next_lba_count; 1217 } else { 1218 lba_count = 0; 1219 } 1220 } 1221 1222 /* If we ended with a contiguous set of LBAs, clear them now */ 1223 if (lba_count > 0) { 1224 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1225 } 1226 1227 spdk_bs_batch_close(batch); 1228 } 1229 1230 static void 1231 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1232 { 1233 struct spdk_blob_persist_ctx *ctx = cb_arg; 1234 struct spdk_blob *blob = ctx->blob; 1235 struct spdk_blob_store *bs = blob->bs; 1236 size_t i; 1237 1238 /* This loop starts at 1 because the first page is special and handled 1239 * below. The pages (except the first) are never written in place, 1240 * so any pages in the clean list must be zeroed. 1241 */ 1242 for (i = 1; i < blob->clean.num_pages; i++) { 1243 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 1244 } 1245 1246 if (blob->active.num_pages == 0) { 1247 uint32_t page_num; 1248 1249 page_num = _spdk_bs_blobid_to_page(blob->id); 1250 spdk_bit_array_clear(bs->used_md_pages, page_num); 1251 } 1252 1253 /* Move on to clearing clusters */ 1254 _spdk_blob_persist_clear_clusters(seq, ctx, 0); 1255 } 1256 1257 static void 1258 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1259 { 1260 struct spdk_blob_persist_ctx *ctx = cb_arg; 1261 struct spdk_blob *blob = ctx->blob; 1262 struct spdk_blob_store *bs = blob->bs; 1263 uint64_t lba; 1264 uint32_t lba_count; 1265 spdk_bs_batch_t *batch; 1266 size_t i; 1267 1268 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 1269 1270 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 1271 1272 /* This loop starts at 1 because the first page is special and handled 1273 * below. The pages (except the first) are never written in place, 1274 * so any pages in the clean list must be zeroed. 1275 */ 1276 for (i = 1; i < blob->clean.num_pages; i++) { 1277 lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]); 1278 1279 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1280 } 1281 1282 /* The first page will only be zeroed if this is a delete. */ 1283 if (blob->active.num_pages == 0) { 1284 uint32_t page_num; 1285 1286 /* The first page in the metadata goes where the blobid indicates */ 1287 page_num = _spdk_bs_blobid_to_page(blob->id); 1288 lba = _spdk_bs_md_page_to_lba(bs, page_num); 1289 1290 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1291 } 1292 1293 spdk_bs_batch_close(batch); 1294 } 1295 1296 static void 1297 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1298 { 1299 struct spdk_blob_persist_ctx *ctx = cb_arg; 1300 struct spdk_blob *blob = ctx->blob; 1301 struct spdk_blob_store *bs = blob->bs; 1302 uint64_t lba; 1303 uint32_t lba_count; 1304 struct spdk_blob_md_page *page; 1305 1306 if (blob->active.num_pages == 0) { 1307 /* Move on to the next step */ 1308 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1309 return; 1310 } 1311 1312 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1313 1314 page = &ctx->pages[0]; 1315 /* The first page in the metadata goes where the blobid indicates */ 1316 lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id)); 1317 1318 spdk_bs_sequence_write_dev(seq, page, lba, lba_count, 1319 _spdk_blob_persist_zero_pages, ctx); 1320 } 1321 1322 static void 1323 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1324 { 1325 struct spdk_blob_persist_ctx *ctx = cb_arg; 1326 struct spdk_blob *blob = ctx->blob; 1327 struct spdk_blob_store *bs = blob->bs; 1328 uint64_t lba; 1329 uint32_t lba_count; 1330 struct spdk_blob_md_page *page; 1331 spdk_bs_batch_t *batch; 1332 size_t i; 1333 1334 /* Clusters don't move around in blobs. The list shrinks or grows 1335 * at the end, but no changes ever occur in the middle of the list. 1336 */ 1337 1338 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1339 1340 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 1341 1342 /* This starts at 1. The root page is not written until 1343 * all of the others are finished 1344 */ 1345 for (i = 1; i < blob->active.num_pages; i++) { 1346 page = &ctx->pages[i]; 1347 assert(page->sequence_num == i); 1348 1349 lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]); 1350 1351 spdk_bs_batch_write_dev(batch, page, lba, lba_count); 1352 } 1353 1354 spdk_bs_batch_close(batch); 1355 } 1356 1357 static int 1358 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz) 1359 { 1360 uint64_t i; 1361 uint64_t *tmp; 1362 uint64_t lfc; /* lowest free cluster */ 1363 uint64_t num_clusters; 1364 struct spdk_blob_store *bs; 1365 1366 bs = blob->bs; 1367 1368 _spdk_blob_verify_md_op(blob); 1369 1370 if (blob->active.num_clusters == sz) { 1371 return 0; 1372 } 1373 1374 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1375 /* If this blob was resized to be larger, then smaller, then 1376 * larger without syncing, then the cluster array already 1377 * contains spare assigned clusters we can use. 1378 */ 1379 num_clusters = spdk_min(blob->active.cluster_array_size, 1380 sz); 1381 } else { 1382 num_clusters = blob->active.num_clusters; 1383 } 1384 1385 /* Do two passes - one to verify that we can obtain enough clusters 1386 * and another to actually claim them. 1387 */ 1388 1389 if (spdk_blob_is_thin_provisioned(blob) == false) { 1390 lfc = 0; 1391 for (i = num_clusters; i < sz; i++) { 1392 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1393 if (lfc == UINT32_MAX) { 1394 /* No more free clusters. Cannot satisfy the request */ 1395 return -ENOSPC; 1396 } 1397 lfc++; 1398 } 1399 } 1400 1401 if (sz > num_clusters) { 1402 /* Expand the cluster array if necessary. 1403 * We only shrink the array when persisting. 1404 */ 1405 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz); 1406 if (sz > 0 && tmp == NULL) { 1407 return -ENOMEM; 1408 } 1409 memset(tmp + blob->active.cluster_array_size, 0, 1410 sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); 1411 blob->active.clusters = tmp; 1412 blob->active.cluster_array_size = sz; 1413 } 1414 1415 blob->state = SPDK_BLOB_STATE_DIRTY; 1416 1417 if (spdk_blob_is_thin_provisioned(blob) == false) { 1418 lfc = 0; 1419 for (i = num_clusters; i < sz; i++) { 1420 _spdk_bs_allocate_cluster(blob, i, &lfc, true); 1421 lfc++; 1422 } 1423 } 1424 1425 blob->active.num_clusters = sz; 1426 1427 return 0; 1428 } 1429 1430 static void 1431 _spdk_blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) 1432 { 1433 spdk_bs_sequence_t *seq = ctx->seq; 1434 struct spdk_blob *blob = ctx->blob; 1435 struct spdk_blob_store *bs = blob->bs; 1436 uint64_t i; 1437 uint32_t page_num; 1438 void *tmp; 1439 int rc; 1440 1441 /* Generate the new metadata */ 1442 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1443 if (rc < 0) { 1444 _spdk_blob_persist_complete(seq, ctx, rc); 1445 return; 1446 } 1447 1448 assert(blob->active.num_pages >= 1); 1449 1450 /* Resize the cache of page indices */ 1451 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 1452 if (!tmp) { 1453 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1454 return; 1455 } 1456 blob->active.pages = tmp; 1457 1458 /* Assign this metadata to pages. This requires two passes - 1459 * one to verify that there are enough pages and a second 1460 * to actually claim them. */ 1461 page_num = 0; 1462 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1463 for (i = 1; i < blob->active.num_pages; i++) { 1464 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1465 if (page_num == UINT32_MAX) { 1466 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1467 return; 1468 } 1469 page_num++; 1470 } 1471 1472 page_num = 0; 1473 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1474 for (i = 1; i < blob->active.num_pages; i++) { 1475 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1476 ctx->pages[i - 1].next = page_num; 1477 /* Now that previous metadata page is complete, calculate the crc for it. */ 1478 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1479 blob->active.pages[i] = page_num; 1480 spdk_bit_array_set(bs->used_md_pages, page_num); 1481 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1482 page_num++; 1483 } 1484 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1485 /* Start writing the metadata from last page to first */ 1486 blob->state = SPDK_BLOB_STATE_CLEAN; 1487 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1488 } 1489 1490 static void 1491 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx) 1492 { 1493 spdk_bs_sequence_t *seq = ctx->seq; 1494 struct spdk_blob *blob = ctx->blob; 1495 1496 if (blob->active.num_pages == 0) { 1497 /* This is the signal that the blob should be deleted. 1498 * Immediately jump to the clean up routine. */ 1499 assert(blob->clean.num_pages > 0); 1500 blob->state = SPDK_BLOB_STATE_CLEAN; 1501 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1502 return; 1503 1504 } 1505 1506 _spdk_blob_persist_generate_new_md(ctx); 1507 } 1508 1509 static void 1510 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1511 { 1512 struct spdk_blob_persist_ctx *ctx = cb_arg; 1513 1514 ctx->blob->bs->clean = 0; 1515 1516 spdk_free(ctx->super); 1517 1518 _spdk_blob_persist_start(ctx); 1519 } 1520 1521 static void 1522 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1523 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg); 1524 1525 1526 static void 1527 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1528 { 1529 struct spdk_blob_persist_ctx *ctx = cb_arg; 1530 1531 ctx->super->clean = 0; 1532 if (ctx->super->size == 0) { 1533 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen; 1534 } 1535 1536 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx); 1537 } 1538 1539 1540 /* Write a blob to disk */ 1541 static void 1542 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1543 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1544 { 1545 struct spdk_blob_persist_ctx *ctx; 1546 1547 _spdk_blob_verify_md_op(blob); 1548 1549 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1550 cb_fn(seq, cb_arg, 0); 1551 return; 1552 } 1553 1554 ctx = calloc(1, sizeof(*ctx)); 1555 if (!ctx) { 1556 cb_fn(seq, cb_arg, -ENOMEM); 1557 return; 1558 } 1559 ctx->blob = blob; 1560 ctx->seq = seq; 1561 ctx->cb_fn = cb_fn; 1562 ctx->cb_arg = cb_arg; 1563 1564 if (blob->bs->clean) { 1565 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 1566 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1567 if (!ctx->super) { 1568 cb_fn(seq, cb_arg, -ENOMEM); 1569 free(ctx); 1570 return; 1571 } 1572 1573 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0), 1574 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)), 1575 _spdk_blob_persist_dirty, ctx); 1576 } else { 1577 _spdk_blob_persist_start(ctx); 1578 } 1579 } 1580 1581 struct spdk_blob_copy_cluster_ctx { 1582 struct spdk_blob *blob; 1583 uint8_t *buf; 1584 uint64_t page; 1585 uint64_t new_cluster; 1586 spdk_bs_sequence_t *seq; 1587 }; 1588 1589 static void 1590 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno) 1591 { 1592 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1593 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq; 1594 TAILQ_HEAD(, spdk_bs_request_set) requests; 1595 spdk_bs_user_op_t *op; 1596 1597 TAILQ_INIT(&requests); 1598 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link); 1599 1600 while (!TAILQ_EMPTY(&requests)) { 1601 op = TAILQ_FIRST(&requests); 1602 TAILQ_REMOVE(&requests, op, link); 1603 if (bserrno == 0) { 1604 spdk_bs_user_op_execute(op); 1605 } else { 1606 spdk_bs_user_op_abort(op); 1607 } 1608 } 1609 1610 spdk_free(ctx->buf); 1611 free(ctx); 1612 } 1613 1614 static void 1615 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno) 1616 { 1617 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1618 1619 if (bserrno) { 1620 if (bserrno == -EEXIST) { 1621 /* The metadata insert failed because another thread 1622 * allocated the cluster first. Free our cluster 1623 * but continue without error. */ 1624 bserrno = 0; 1625 } 1626 _spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster); 1627 } 1628 1629 spdk_bs_sequence_finish(ctx->seq, bserrno); 1630 } 1631 1632 static void 1633 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1634 { 1635 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1636 uint32_t cluster_number; 1637 1638 if (bserrno) { 1639 /* The write failed, so jump to the final completion handler */ 1640 spdk_bs_sequence_finish(seq, bserrno); 1641 return; 1642 } 1643 1644 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page); 1645 1646 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1647 _spdk_blob_insert_cluster_cpl, ctx); 1648 } 1649 1650 static void 1651 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1652 { 1653 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1654 1655 if (bserrno != 0) { 1656 /* The read failed, so jump to the final completion handler */ 1657 spdk_bs_sequence_finish(seq, bserrno); 1658 return; 1659 } 1660 1661 /* Write whole cluster */ 1662 spdk_bs_sequence_write_dev(seq, ctx->buf, 1663 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster), 1664 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1), 1665 _spdk_blob_write_copy_cpl, ctx); 1666 } 1667 1668 static void 1669 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob, 1670 struct spdk_io_channel *_ch, 1671 uint64_t io_unit, spdk_bs_user_op_t *op) 1672 { 1673 struct spdk_bs_cpl cpl; 1674 struct spdk_bs_channel *ch; 1675 struct spdk_blob_copy_cluster_ctx *ctx; 1676 uint32_t cluster_start_page; 1677 uint32_t cluster_number; 1678 int rc; 1679 1680 ch = spdk_io_channel_get_ctx(_ch); 1681 1682 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) { 1683 /* There are already operations pending. Queue this user op 1684 * and return because it will be re-executed when the outstanding 1685 * cluster allocation completes. */ 1686 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1687 return; 1688 } 1689 1690 /* Round the io_unit offset down to the first page in the cluster */ 1691 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit); 1692 1693 /* Calculate which index in the metadata cluster array the corresponding 1694 * cluster is supposed to be at. */ 1695 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit); 1696 1697 ctx = calloc(1, sizeof(*ctx)); 1698 if (!ctx) { 1699 spdk_bs_user_op_abort(op); 1700 return; 1701 } 1702 1703 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0); 1704 1705 ctx->blob = blob; 1706 ctx->page = cluster_start_page; 1707 1708 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1709 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, 1710 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1711 if (!ctx->buf) { 1712 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n", 1713 blob->bs->cluster_sz); 1714 free(ctx); 1715 spdk_bs_user_op_abort(op); 1716 return; 1717 } 1718 } 1719 1720 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false); 1721 if (rc != 0) { 1722 spdk_free(ctx->buf); 1723 free(ctx); 1724 spdk_bs_user_op_abort(op); 1725 return; 1726 } 1727 1728 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1729 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl; 1730 cpl.u.blob_basic.cb_arg = ctx; 1731 1732 ctx->seq = spdk_bs_sequence_start(_ch, &cpl); 1733 if (!ctx->seq) { 1734 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster); 1735 spdk_free(ctx->buf); 1736 free(ctx); 1737 spdk_bs_user_op_abort(op); 1738 return; 1739 } 1740 1741 /* Queue the user op to block other incoming operations */ 1742 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1743 1744 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1745 /* Read cluster from backing device */ 1746 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf, 1747 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page), 1748 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz), 1749 _spdk_blob_write_copy, ctx); 1750 } else { 1751 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1752 _spdk_blob_insert_cluster_cpl, ctx); 1753 } 1754 } 1755 1756 static void 1757 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length, 1758 uint64_t *lba, uint32_t *lba_count) 1759 { 1760 *lba_count = length; 1761 1762 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) { 1763 assert(blob->back_bs_dev != NULL); 1764 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit); 1765 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count); 1766 } else { 1767 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit); 1768 } 1769 } 1770 1771 struct op_split_ctx { 1772 struct spdk_blob *blob; 1773 struct spdk_io_channel *channel; 1774 uint64_t io_unit_offset; 1775 uint64_t io_units_remaining; 1776 void *curr_payload; 1777 enum spdk_blob_op_type op_type; 1778 spdk_bs_sequence_t *seq; 1779 }; 1780 1781 static void 1782 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) 1783 { 1784 struct op_split_ctx *ctx = cb_arg; 1785 struct spdk_blob *blob = ctx->blob; 1786 struct spdk_io_channel *ch = ctx->channel; 1787 enum spdk_blob_op_type op_type = ctx->op_type; 1788 uint8_t *buf = ctx->curr_payload; 1789 uint64_t offset = ctx->io_unit_offset; 1790 uint64_t length = ctx->io_units_remaining; 1791 uint64_t op_length; 1792 1793 if (bserrno != 0 || ctx->io_units_remaining == 0) { 1794 spdk_bs_sequence_finish(ctx->seq, bserrno); 1795 free(ctx); 1796 return; 1797 } 1798 1799 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob, 1800 offset)); 1801 1802 /* Update length and payload for next operation */ 1803 ctx->io_units_remaining -= op_length; 1804 ctx->io_unit_offset += op_length; 1805 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1806 ctx->curr_payload += op_length * blob->bs->io_unit_size; 1807 } 1808 1809 switch (op_type) { 1810 case SPDK_BLOB_READ: 1811 spdk_blob_io_read(blob, ch, buf, offset, op_length, 1812 _spdk_blob_request_submit_op_split_next, ctx); 1813 break; 1814 case SPDK_BLOB_WRITE: 1815 spdk_blob_io_write(blob, ch, buf, offset, op_length, 1816 _spdk_blob_request_submit_op_split_next, ctx); 1817 break; 1818 case SPDK_BLOB_UNMAP: 1819 spdk_blob_io_unmap(blob, ch, offset, op_length, 1820 _spdk_blob_request_submit_op_split_next, ctx); 1821 break; 1822 case SPDK_BLOB_WRITE_ZEROES: 1823 spdk_blob_io_write_zeroes(blob, ch, offset, op_length, 1824 _spdk_blob_request_submit_op_split_next, ctx); 1825 break; 1826 case SPDK_BLOB_READV: 1827 case SPDK_BLOB_WRITEV: 1828 SPDK_ERRLOG("readv/write not valid\n"); 1829 spdk_bs_sequence_finish(ctx->seq, -EINVAL); 1830 free(ctx); 1831 break; 1832 } 1833 } 1834 1835 static void 1836 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, 1837 void *payload, uint64_t offset, uint64_t length, 1838 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1839 { 1840 struct op_split_ctx *ctx; 1841 spdk_bs_sequence_t *seq; 1842 struct spdk_bs_cpl cpl; 1843 1844 assert(blob != NULL); 1845 1846 ctx = calloc(1, sizeof(struct op_split_ctx)); 1847 if (ctx == NULL) { 1848 cb_fn(cb_arg, -ENOMEM); 1849 return; 1850 } 1851 1852 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1853 cpl.u.blob_basic.cb_fn = cb_fn; 1854 cpl.u.blob_basic.cb_arg = cb_arg; 1855 1856 seq = spdk_bs_sequence_start(ch, &cpl); 1857 if (!seq) { 1858 free(ctx); 1859 cb_fn(cb_arg, -ENOMEM); 1860 return; 1861 } 1862 1863 ctx->blob = blob; 1864 ctx->channel = ch; 1865 ctx->curr_payload = payload; 1866 ctx->io_unit_offset = offset; 1867 ctx->io_units_remaining = length; 1868 ctx->op_type = op_type; 1869 ctx->seq = seq; 1870 1871 _spdk_blob_request_submit_op_split_next(ctx, 0); 1872 } 1873 1874 static void 1875 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob, 1876 void *payload, uint64_t offset, uint64_t length, 1877 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1878 { 1879 struct spdk_bs_cpl cpl; 1880 uint64_t lba; 1881 uint32_t lba_count; 1882 1883 assert(blob != NULL); 1884 1885 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1886 cpl.u.blob_basic.cb_fn = cb_fn; 1887 cpl.u.blob_basic.cb_arg = cb_arg; 1888 1889 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 1890 1891 if (blob->frozen_refcnt) { 1892 /* This blob I/O is frozen */ 1893 spdk_bs_user_op_t *op; 1894 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); 1895 1896 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1897 if (!op) { 1898 cb_fn(cb_arg, -ENOMEM); 1899 return; 1900 } 1901 1902 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 1903 1904 return; 1905 } 1906 1907 switch (op_type) { 1908 case SPDK_BLOB_READ: { 1909 spdk_bs_batch_t *batch; 1910 1911 batch = spdk_bs_batch_open(_ch, &cpl); 1912 if (!batch) { 1913 cb_fn(cb_arg, -ENOMEM); 1914 return; 1915 } 1916 1917 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1918 /* Read from the blob */ 1919 spdk_bs_batch_read_dev(batch, payload, lba, lba_count); 1920 } else { 1921 /* Read from the backing block device */ 1922 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count); 1923 } 1924 1925 spdk_bs_batch_close(batch); 1926 break; 1927 } 1928 case SPDK_BLOB_WRITE: 1929 case SPDK_BLOB_WRITE_ZEROES: { 1930 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1931 /* Write to the blob */ 1932 spdk_bs_batch_t *batch; 1933 1934 if (lba_count == 0) { 1935 cb_fn(cb_arg, 0); 1936 return; 1937 } 1938 1939 batch = spdk_bs_batch_open(_ch, &cpl); 1940 if (!batch) { 1941 cb_fn(cb_arg, -ENOMEM); 1942 return; 1943 } 1944 1945 if (op_type == SPDK_BLOB_WRITE) { 1946 spdk_bs_batch_write_dev(batch, payload, lba, lba_count); 1947 } else { 1948 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1949 } 1950 1951 spdk_bs_batch_close(batch); 1952 } else { 1953 /* Queue this operation and allocate the cluster */ 1954 spdk_bs_user_op_t *op; 1955 1956 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1957 if (!op) { 1958 cb_fn(cb_arg, -ENOMEM); 1959 return; 1960 } 1961 1962 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op); 1963 } 1964 break; 1965 } 1966 case SPDK_BLOB_UNMAP: { 1967 spdk_bs_batch_t *batch; 1968 1969 batch = spdk_bs_batch_open(_ch, &cpl); 1970 if (!batch) { 1971 cb_fn(cb_arg, -ENOMEM); 1972 return; 1973 } 1974 1975 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1976 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1977 } 1978 1979 spdk_bs_batch_close(batch); 1980 break; 1981 } 1982 case SPDK_BLOB_READV: 1983 case SPDK_BLOB_WRITEV: 1984 SPDK_ERRLOG("readv/write not valid\n"); 1985 cb_fn(cb_arg, -EINVAL); 1986 break; 1987 } 1988 } 1989 1990 static void 1991 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1992 void *payload, uint64_t offset, uint64_t length, 1993 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1994 { 1995 assert(blob != NULL); 1996 1997 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1998 cb_fn(cb_arg, -EPERM); 1999 return; 2000 } 2001 2002 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2003 cb_fn(cb_arg, -EINVAL); 2004 return; 2005 } 2006 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) { 2007 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length, 2008 cb_fn, cb_arg, op_type); 2009 } else { 2010 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length, 2011 cb_fn, cb_arg, op_type); 2012 } 2013 } 2014 2015 struct rw_iov_ctx { 2016 struct spdk_blob *blob; 2017 struct spdk_io_channel *channel; 2018 spdk_blob_op_complete cb_fn; 2019 void *cb_arg; 2020 bool read; 2021 int iovcnt; 2022 struct iovec *orig_iov; 2023 uint64_t io_unit_offset; 2024 uint64_t io_units_remaining; 2025 uint64_t io_units_done; 2026 struct iovec iov[0]; 2027 }; 2028 2029 static void 2030 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2031 { 2032 assert(cb_arg == NULL); 2033 spdk_bs_sequence_finish(seq, bserrno); 2034 } 2035 2036 static void 2037 _spdk_rw_iov_split_next(void *cb_arg, int bserrno) 2038 { 2039 struct rw_iov_ctx *ctx = cb_arg; 2040 struct spdk_blob *blob = ctx->blob; 2041 struct iovec *iov, *orig_iov; 2042 int iovcnt; 2043 size_t orig_iovoff; 2044 uint64_t io_units_count, io_units_to_boundary, io_unit_offset; 2045 uint64_t byte_count; 2046 2047 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2048 ctx->cb_fn(ctx->cb_arg, bserrno); 2049 free(ctx); 2050 return; 2051 } 2052 2053 io_unit_offset = ctx->io_unit_offset; 2054 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset); 2055 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary); 2056 /* 2057 * Get index and offset into the original iov array for our current position in the I/O sequence. 2058 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 2059 * point to the current position in the I/O sequence. 2060 */ 2061 byte_count = ctx->io_units_done * blob->bs->io_unit_size; 2062 orig_iov = &ctx->orig_iov[0]; 2063 orig_iovoff = 0; 2064 while (byte_count > 0) { 2065 if (byte_count >= orig_iov->iov_len) { 2066 byte_count -= orig_iov->iov_len; 2067 orig_iov++; 2068 } else { 2069 orig_iovoff = byte_count; 2070 byte_count = 0; 2071 } 2072 } 2073 2074 /* 2075 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 2076 * bytes of this next I/O remain to be accounted for in the new iov array. 2077 */ 2078 byte_count = io_units_count * blob->bs->io_unit_size; 2079 iov = &ctx->iov[0]; 2080 iovcnt = 0; 2081 while (byte_count > 0) { 2082 assert(iovcnt < ctx->iovcnt); 2083 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 2084 iov->iov_base = orig_iov->iov_base + orig_iovoff; 2085 byte_count -= iov->iov_len; 2086 orig_iovoff = 0; 2087 orig_iov++; 2088 iov++; 2089 iovcnt++; 2090 } 2091 2092 ctx->io_unit_offset += io_units_count; 2093 ctx->io_units_remaining -= io_units_count; 2094 ctx->io_units_done += io_units_count; 2095 iov = &ctx->iov[0]; 2096 2097 if (ctx->read) { 2098 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2099 io_units_count, _spdk_rw_iov_split_next, ctx); 2100 } else { 2101 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2102 io_units_count, _spdk_rw_iov_split_next, ctx); 2103 } 2104 } 2105 2106 static void 2107 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2108 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2109 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 2110 { 2111 struct spdk_bs_cpl cpl; 2112 2113 assert(blob != NULL); 2114 2115 if (!read && blob->data_ro) { 2116 cb_fn(cb_arg, -EPERM); 2117 return; 2118 } 2119 2120 if (length == 0) { 2121 cb_fn(cb_arg, 0); 2122 return; 2123 } 2124 2125 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2126 cb_fn(cb_arg, -EINVAL); 2127 return; 2128 } 2129 2130 /* 2131 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 2132 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 2133 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 2134 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 2135 * to allocate a separate iov array and split the I/O such that none of the resulting 2136 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 2137 * but since this case happens very infrequently, any performance impact will be negligible. 2138 * 2139 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 2140 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 2141 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 2142 * when the batch was completed, to allow for freeing the memory for the iov arrays. 2143 */ 2144 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) { 2145 uint32_t lba_count; 2146 uint64_t lba; 2147 2148 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2149 cpl.u.blob_basic.cb_fn = cb_fn; 2150 cpl.u.blob_basic.cb_arg = cb_arg; 2151 2152 if (blob->frozen_refcnt) { 2153 /* This blob I/O is frozen */ 2154 enum spdk_blob_op_type op_type; 2155 spdk_bs_user_op_t *op; 2156 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); 2157 2158 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV; 2159 op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length); 2160 if (!op) { 2161 cb_fn(cb_arg, -ENOMEM); 2162 return; 2163 } 2164 2165 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2166 2167 return; 2168 } 2169 2170 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2171 2172 if (read) { 2173 spdk_bs_sequence_t *seq; 2174 2175 seq = spdk_bs_sequence_start(_channel, &cpl); 2176 if (!seq) { 2177 cb_fn(cb_arg, -ENOMEM); 2178 return; 2179 } 2180 2181 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2182 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2183 } else { 2184 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count, 2185 _spdk_rw_iov_done, NULL); 2186 } 2187 } else { 2188 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2189 spdk_bs_sequence_t *seq; 2190 2191 seq = spdk_bs_sequence_start(_channel, &cpl); 2192 if (!seq) { 2193 cb_fn(cb_arg, -ENOMEM); 2194 return; 2195 } 2196 2197 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2198 } else { 2199 /* Queue this operation and allocate the cluster */ 2200 spdk_bs_user_op_t *op; 2201 2202 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, 2203 length); 2204 if (!op) { 2205 cb_fn(cb_arg, -ENOMEM); 2206 return; 2207 } 2208 2209 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op); 2210 } 2211 } 2212 } else { 2213 struct rw_iov_ctx *ctx; 2214 2215 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 2216 if (ctx == NULL) { 2217 cb_fn(cb_arg, -ENOMEM); 2218 return; 2219 } 2220 2221 ctx->blob = blob; 2222 ctx->channel = _channel; 2223 ctx->cb_fn = cb_fn; 2224 ctx->cb_arg = cb_arg; 2225 ctx->read = read; 2226 ctx->orig_iov = iov; 2227 ctx->iovcnt = iovcnt; 2228 ctx->io_unit_offset = offset; 2229 ctx->io_units_remaining = length; 2230 ctx->io_units_done = 0; 2231 2232 _spdk_rw_iov_split_next(ctx, 0); 2233 } 2234 } 2235 2236 static struct spdk_blob * 2237 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 2238 { 2239 struct spdk_blob *blob; 2240 2241 TAILQ_FOREACH(blob, &bs->blobs, link) { 2242 if (blob->id == blobid) { 2243 return blob; 2244 } 2245 } 2246 2247 return NULL; 2248 } 2249 2250 static void 2251 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob, 2252 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry) 2253 { 2254 assert(blob != NULL); 2255 *snapshot_entry = NULL; 2256 *clone_entry = NULL; 2257 2258 if (blob->parent_id == SPDK_BLOBID_INVALID) { 2259 return; 2260 } 2261 2262 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) { 2263 if ((*snapshot_entry)->id == blob->parent_id) { 2264 break; 2265 } 2266 } 2267 2268 if (*snapshot_entry != NULL) { 2269 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) { 2270 if ((*clone_entry)->id == blob->id) { 2271 break; 2272 } 2273 } 2274 2275 assert(clone_entry != NULL); 2276 } 2277 } 2278 2279 static int 2280 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 2281 { 2282 struct spdk_blob_store *bs = io_device; 2283 struct spdk_bs_channel *channel = ctx_buf; 2284 struct spdk_bs_dev *dev; 2285 uint32_t max_ops = bs->max_channel_ops; 2286 uint32_t i; 2287 2288 dev = bs->dev; 2289 2290 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 2291 if (!channel->req_mem) { 2292 return -1; 2293 } 2294 2295 TAILQ_INIT(&channel->reqs); 2296 2297 for (i = 0; i < max_ops; i++) { 2298 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 2299 } 2300 2301 channel->bs = bs; 2302 channel->dev = dev; 2303 channel->dev_channel = dev->create_channel(dev); 2304 2305 if (!channel->dev_channel) { 2306 SPDK_ERRLOG("Failed to create device channel.\n"); 2307 free(channel->req_mem); 2308 return -1; 2309 } 2310 2311 TAILQ_INIT(&channel->need_cluster_alloc); 2312 TAILQ_INIT(&channel->queued_io); 2313 2314 return 0; 2315 } 2316 2317 static void 2318 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 2319 { 2320 struct spdk_bs_channel *channel = ctx_buf; 2321 spdk_bs_user_op_t *op; 2322 2323 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) { 2324 op = TAILQ_FIRST(&channel->need_cluster_alloc); 2325 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link); 2326 spdk_bs_user_op_abort(op); 2327 } 2328 2329 while (!TAILQ_EMPTY(&channel->queued_io)) { 2330 op = TAILQ_FIRST(&channel->queued_io); 2331 TAILQ_REMOVE(&channel->queued_io, op, link); 2332 spdk_bs_user_op_abort(op); 2333 } 2334 2335 free(channel->req_mem); 2336 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 2337 } 2338 2339 static void 2340 _spdk_bs_dev_destroy(void *io_device) 2341 { 2342 struct spdk_blob_store *bs = io_device; 2343 struct spdk_blob *blob, *blob_tmp; 2344 2345 bs->dev->destroy(bs->dev); 2346 2347 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 2348 TAILQ_REMOVE(&bs->blobs, blob, link); 2349 _spdk_blob_free(blob); 2350 } 2351 2352 pthread_mutex_destroy(&bs->used_clusters_mutex); 2353 2354 spdk_bit_array_free(&bs->used_blobids); 2355 spdk_bit_array_free(&bs->used_md_pages); 2356 spdk_bit_array_free(&bs->used_clusters); 2357 /* 2358 * If this function is called for any reason except a successful unload, 2359 * the unload_cpl type will be NONE and this will be a nop. 2360 */ 2361 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 2362 2363 free(bs); 2364 } 2365 2366 static int 2367 _spdk_bs_blob_list_add(struct spdk_blob *blob) 2368 { 2369 spdk_blob_id snapshot_id; 2370 struct spdk_blob_list *snapshot_entry = NULL; 2371 struct spdk_blob_list *clone_entry = NULL; 2372 2373 assert(blob != NULL); 2374 2375 snapshot_id = blob->parent_id; 2376 if (snapshot_id == SPDK_BLOBID_INVALID) { 2377 return 0; 2378 } 2379 2380 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id); 2381 if (snapshot_entry == NULL) { 2382 /* Snapshot not found */ 2383 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list)); 2384 if (snapshot_entry == NULL) { 2385 return -ENOMEM; 2386 } 2387 snapshot_entry->id = snapshot_id; 2388 TAILQ_INIT(&snapshot_entry->clones); 2389 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link); 2390 } else { 2391 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 2392 if (clone_entry->id == blob->id) { 2393 break; 2394 } 2395 } 2396 } 2397 2398 if (clone_entry == NULL) { 2399 /* Clone not found */ 2400 clone_entry = calloc(1, sizeof(struct spdk_blob_list)); 2401 if (clone_entry == NULL) { 2402 return -ENOMEM; 2403 } 2404 clone_entry->id = blob->id; 2405 TAILQ_INIT(&clone_entry->clones); 2406 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link); 2407 snapshot_entry->clone_count++; 2408 } 2409 2410 return 0; 2411 } 2412 2413 static void 2414 _spdk_bs_blob_list_remove(struct spdk_blob *blob) 2415 { 2416 struct spdk_blob_list *snapshot_entry = NULL; 2417 struct spdk_blob_list *clone_entry = NULL; 2418 2419 _spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry); 2420 2421 if (snapshot_entry == NULL) { 2422 return; 2423 } 2424 2425 blob->parent_id = SPDK_BLOBID_INVALID; 2426 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2427 free(clone_entry); 2428 2429 snapshot_entry->clone_count--; 2430 } 2431 2432 static int 2433 _spdk_bs_blob_list_free(struct spdk_blob_store *bs) 2434 { 2435 struct spdk_blob_list *snapshot_entry; 2436 struct spdk_blob_list *snapshot_entry_tmp; 2437 struct spdk_blob_list *clone_entry; 2438 struct spdk_blob_list *clone_entry_tmp; 2439 2440 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) { 2441 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) { 2442 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2443 free(clone_entry); 2444 } 2445 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link); 2446 free(snapshot_entry); 2447 } 2448 2449 return 0; 2450 } 2451 2452 static void 2453 _spdk_bs_free(struct spdk_blob_store *bs) 2454 { 2455 _spdk_bs_blob_list_free(bs); 2456 2457 spdk_bs_unregister_md_thread(bs); 2458 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 2459 } 2460 2461 void 2462 spdk_bs_opts_init(struct spdk_bs_opts *opts) 2463 { 2464 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 2465 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 2466 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 2467 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 2468 opts->clear_method = BS_CLEAR_WITH_UNMAP; 2469 memset(&opts->bstype, 0, sizeof(opts->bstype)); 2470 opts->iter_cb_fn = NULL; 2471 opts->iter_cb_arg = NULL; 2472 } 2473 2474 static int 2475 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 2476 { 2477 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 2478 opts->max_channel_ops == 0) { 2479 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 2480 return -1; 2481 } 2482 2483 return 0; 2484 } 2485 2486 static int 2487 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs) 2488 { 2489 struct spdk_blob_store *bs; 2490 uint64_t dev_size; 2491 int rc; 2492 2493 dev_size = dev->blocklen * dev->blockcnt; 2494 if (dev_size < opts->cluster_sz) { 2495 /* Device size cannot be smaller than cluster size of blobstore */ 2496 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n", 2497 dev_size, opts->cluster_sz); 2498 return -ENOSPC; 2499 } 2500 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 2501 /* Cluster size cannot be smaller than page size */ 2502 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n", 2503 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 2504 return -EINVAL; 2505 } 2506 bs = calloc(1, sizeof(struct spdk_blob_store)); 2507 if (!bs) { 2508 return -ENOMEM; 2509 } 2510 2511 TAILQ_INIT(&bs->blobs); 2512 TAILQ_INIT(&bs->snapshots); 2513 bs->dev = dev; 2514 bs->md_thread = spdk_get_thread(); 2515 assert(bs->md_thread != NULL); 2516 2517 /* 2518 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 2519 * even multiple of the cluster size. 2520 */ 2521 bs->cluster_sz = opts->cluster_sz; 2522 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 2523 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2524 bs->num_free_clusters = bs->total_clusters; 2525 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 2526 bs->io_unit_size = dev->blocklen; 2527 if (bs->used_clusters == NULL) { 2528 free(bs); 2529 return -ENOMEM; 2530 } 2531 2532 bs->max_channel_ops = opts->max_channel_ops; 2533 bs->super_blob = SPDK_BLOBID_INVALID; 2534 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 2535 2536 /* The metadata is assumed to be at least 1 page */ 2537 bs->used_md_pages = spdk_bit_array_create(1); 2538 bs->used_blobids = spdk_bit_array_create(0); 2539 2540 pthread_mutex_init(&bs->used_clusters_mutex, NULL); 2541 2542 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 2543 sizeof(struct spdk_bs_channel), "blobstore"); 2544 rc = spdk_bs_register_md_thread(bs); 2545 if (rc == -1) { 2546 spdk_io_device_unregister(bs, NULL); 2547 pthread_mutex_destroy(&bs->used_clusters_mutex); 2548 spdk_bit_array_free(&bs->used_blobids); 2549 spdk_bit_array_free(&bs->used_md_pages); 2550 spdk_bit_array_free(&bs->used_clusters); 2551 free(bs); 2552 /* FIXME: this is a lie but don't know how to get a proper error code here */ 2553 return -ENOMEM; 2554 } 2555 2556 *_bs = bs; 2557 return 0; 2558 } 2559 2560 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 2561 2562 struct spdk_bs_load_ctx { 2563 struct spdk_blob_store *bs; 2564 struct spdk_bs_super_block *super; 2565 2566 struct spdk_bs_md_mask *mask; 2567 bool in_page_chain; 2568 uint32_t page_index; 2569 uint32_t cur_page; 2570 struct spdk_blob_md_page *page; 2571 2572 spdk_bs_sequence_t *seq; 2573 spdk_blob_op_with_handle_complete iter_cb_fn; 2574 void *iter_cb_arg; 2575 struct spdk_blob *blob; 2576 spdk_blob_id blobid; 2577 }; 2578 2579 static void 2580 _spdk_bs_load_ctx_fail(struct spdk_bs_load_ctx *ctx, int bserrno) 2581 { 2582 assert(bserrno != 0); 2583 2584 spdk_free(ctx->super); 2585 spdk_bs_sequence_finish(ctx->seq, bserrno); 2586 _spdk_bs_free(ctx->bs); 2587 free(ctx); 2588 } 2589 2590 static void 2591 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 2592 { 2593 uint32_t i = 0; 2594 2595 while (true) { 2596 i = spdk_bit_array_find_first_set(array, i); 2597 if (i >= mask->length) { 2598 break; 2599 } 2600 mask->mask[i / 8] |= 1U << (i % 8); 2601 i++; 2602 } 2603 } 2604 2605 static int 2606 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask) 2607 { 2608 struct spdk_bit_array *array; 2609 uint32_t i; 2610 2611 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) { 2612 return -ENOMEM; 2613 } 2614 2615 array = *array_ptr; 2616 for (i = 0; i < mask->length; i++) { 2617 if (mask->mask[i / 8] & (1U << (i % 8))) { 2618 spdk_bit_array_set(array, i); 2619 } 2620 } 2621 2622 return 0; 2623 } 2624 2625 static void 2626 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 2627 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 2628 { 2629 /* Update the values in the super block */ 2630 super->super_blob = bs->super_blob; 2631 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 2632 super->crc = _spdk_blob_md_page_calc_crc(super); 2633 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0), 2634 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 2635 cb_fn, cb_arg); 2636 } 2637 2638 static void 2639 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2640 { 2641 struct spdk_bs_load_ctx *ctx = arg; 2642 uint64_t mask_size, lba, lba_count; 2643 2644 /* Write out the used clusters mask */ 2645 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 2646 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2647 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2648 if (!ctx->mask) { 2649 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2650 return; 2651 } 2652 2653 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 2654 ctx->mask->length = ctx->bs->total_clusters; 2655 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 2656 2657 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 2658 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 2659 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 2660 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2661 } 2662 2663 static void 2664 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2665 { 2666 struct spdk_bs_load_ctx *ctx = arg; 2667 uint64_t mask_size, lba, lba_count; 2668 2669 if (seq->bserrno) { 2670 _spdk_bs_load_ctx_fail(ctx, seq->bserrno); 2671 return; 2672 } 2673 2674 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 2675 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2676 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2677 if (!ctx->mask) { 2678 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2679 return; 2680 } 2681 2682 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 2683 ctx->mask->length = ctx->super->md_len; 2684 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 2685 2686 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 2687 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 2688 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 2689 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2690 } 2691 2692 static void 2693 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2694 { 2695 struct spdk_bs_load_ctx *ctx = arg; 2696 uint64_t mask_size, lba, lba_count; 2697 2698 if (ctx->super->used_blobid_mask_len == 0) { 2699 /* 2700 * This is a pre-v3 on-disk format where the blobid mask does not get 2701 * written to disk. 2702 */ 2703 cb_fn(seq, arg, 0); 2704 return; 2705 } 2706 2707 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2708 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2709 SPDK_MALLOC_DMA); 2710 if (!ctx->mask) { 2711 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2712 return; 2713 } 2714 2715 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 2716 ctx->mask->length = ctx->super->md_len; 2717 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 2718 2719 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask); 2720 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2721 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2722 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2723 } 2724 2725 static void 2726 _spdk_blob_set_thin_provision(struct spdk_blob *blob) 2727 { 2728 _spdk_blob_verify_md_op(blob); 2729 blob->invalid_flags |= SPDK_BLOB_THIN_PROV; 2730 blob->state = SPDK_BLOB_STATE_DIRTY; 2731 } 2732 2733 static void 2734 _spdk_blob_set_clear_method(struct spdk_blob *blob, enum blob_clear_method clear_method) 2735 { 2736 _spdk_blob_verify_md_op(blob); 2737 blob->clear_method = clear_method; 2738 blob->md_ro_flags |= (clear_method << SPDK_BLOB_CLEAR_METHOD_SHIFT); 2739 blob->state = SPDK_BLOB_STATE_DIRTY; 2740 } 2741 2742 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno); 2743 2744 static void 2745 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno) 2746 { 2747 struct spdk_bs_load_ctx *ctx = cb_arg; 2748 spdk_blob_id id; 2749 int64_t page_num; 2750 2751 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our 2752 * last blob has been removed */ 2753 page_num = _spdk_bs_blobid_to_page(ctx->blobid); 2754 page_num++; 2755 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num); 2756 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) { 2757 _spdk_bs_load_iter(ctx, NULL, -ENOENT); 2758 return; 2759 } 2760 2761 id = _spdk_bs_page_to_blobid(page_num); 2762 2763 spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx); 2764 } 2765 2766 static void 2767 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno) 2768 { 2769 struct spdk_bs_load_ctx *ctx = cb_arg; 2770 2771 if (bserrno != 0) { 2772 SPDK_ERRLOG("Failed to close corrupted blob\n"); 2773 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2774 return; 2775 } 2776 2777 spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx); 2778 } 2779 2780 static void 2781 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno) 2782 { 2783 struct spdk_bs_load_ctx *ctx = cb_arg; 2784 uint64_t i; 2785 2786 if (bserrno != 0) { 2787 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2788 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2789 return; 2790 } 2791 2792 /* Snapshot and clone have the same copy of cluster map at this point. 2793 * Let's clear cluster map for snpashot now so that it won't be cleared 2794 * for clone later when we remove snapshot. Also set thin provision to 2795 * pass data corruption check */ 2796 for (i = 0; i < ctx->blob->active.num_clusters; i++) { 2797 ctx->blob->active.clusters[i] = 0; 2798 } 2799 2800 ctx->blob->md_ro = false; 2801 2802 _spdk_blob_set_thin_provision(ctx->blob); 2803 2804 ctx->blobid = ctx->blob->id; 2805 2806 spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx); 2807 } 2808 2809 static void 2810 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno) 2811 { 2812 struct spdk_bs_load_ctx *ctx = cb_arg; 2813 2814 if (bserrno != 0) { 2815 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2816 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2817 return; 2818 } 2819 2820 ctx->blob->md_ro = false; 2821 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true); 2822 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true); 2823 spdk_blob_set_read_only(ctx->blob); 2824 2825 if (ctx->iter_cb_fn) { 2826 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0); 2827 } 2828 _spdk_bs_blob_list_add(ctx->blob); 2829 2830 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2831 } 2832 2833 static void 2834 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno) 2835 { 2836 struct spdk_bs_load_ctx *ctx = cb_arg; 2837 2838 if (bserrno != 0) { 2839 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n"); 2840 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2841 return; 2842 } 2843 2844 if (blob->parent_id == ctx->blob->id) { 2845 /* Power failure occured before updating clone (snapshot delete case) 2846 * or after updating clone (creating snapshot case) - keep snapshot */ 2847 spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx); 2848 } else { 2849 /* Power failure occured after updating clone (snapshot delete case) 2850 * or before updating clone (creating snapshot case) - remove snapshot */ 2851 spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx); 2852 } 2853 } 2854 2855 static void 2856 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno) 2857 { 2858 struct spdk_bs_load_ctx *ctx = arg; 2859 const void *value; 2860 size_t len; 2861 int rc = 0; 2862 2863 if (bserrno == 0) { 2864 /* Examine blob if it is corrupted after power failure. Fix 2865 * the ones that can be fixed and remove any other corrupted 2866 * ones. If it is not corrupted just process it */ 2867 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true); 2868 if (rc != 0) { 2869 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true); 2870 if (rc != 0) { 2871 /* Not corrupted - process it and continue with iterating through blobs */ 2872 if (ctx->iter_cb_fn) { 2873 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0); 2874 } 2875 _spdk_bs_blob_list_add(blob); 2876 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx); 2877 return; 2878 } 2879 2880 } 2881 2882 assert(len == sizeof(spdk_blob_id)); 2883 2884 ctx->blob = blob; 2885 2886 /* Open clone to check if we are able to fix this blob or should we remove it */ 2887 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx); 2888 return; 2889 } else if (bserrno == -ENOENT) { 2890 bserrno = 0; 2891 } else { 2892 /* 2893 * This case needs to be looked at further. Same problem 2894 * exists with applications that rely on explicit blob 2895 * iteration. We should just skip the blob that failed 2896 * to load and continue on to the next one. 2897 */ 2898 SPDK_ERRLOG("Error in iterating blobs\n"); 2899 } 2900 2901 ctx->iter_cb_fn = NULL; 2902 2903 spdk_free(ctx->super); 2904 spdk_free(ctx->mask); 2905 spdk_bs_sequence_finish(ctx->seq, bserrno); 2906 free(ctx); 2907 } 2908 2909 static void 2910 _spdk_bs_load_complete(struct spdk_bs_load_ctx *ctx) 2911 { 2912 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx); 2913 } 2914 2915 static void 2916 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2917 { 2918 struct spdk_bs_load_ctx *ctx = cb_arg; 2919 int rc; 2920 2921 /* The type must be correct */ 2922 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 2923 2924 /* The length of the mask (in bits) must not be greater than 2925 * the length of the buffer (converted to bits) */ 2926 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 2927 2928 /* The length of the mask must be exactly equal to the size 2929 * (in pages) of the metadata region */ 2930 assert(ctx->mask->length == ctx->super->md_len); 2931 2932 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask); 2933 if (rc < 0) { 2934 spdk_free(ctx->mask); 2935 _spdk_bs_load_ctx_fail(ctx, rc); 2936 return; 2937 } 2938 2939 _spdk_bs_load_complete(ctx); 2940 } 2941 2942 static void 2943 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2944 { 2945 struct spdk_bs_load_ctx *ctx = cb_arg; 2946 uint64_t lba, lba_count, mask_size; 2947 int rc; 2948 2949 if (bserrno != 0) { 2950 _spdk_bs_load_ctx_fail(ctx, bserrno); 2951 return; 2952 } 2953 2954 /* The type must be correct */ 2955 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 2956 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2957 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 2958 struct spdk_blob_md_page) * 8)); 2959 /* The length of the mask must be exactly equal to the total number of clusters */ 2960 assert(ctx->mask->length == ctx->bs->total_clusters); 2961 2962 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask); 2963 if (rc < 0) { 2964 spdk_free(ctx->mask); 2965 _spdk_bs_load_ctx_fail(ctx, rc); 2966 return; 2967 } 2968 2969 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters); 2970 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters); 2971 2972 spdk_free(ctx->mask); 2973 2974 /* Read the used blobids mask */ 2975 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2976 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2977 SPDK_MALLOC_DMA); 2978 if (!ctx->mask) { 2979 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2980 return; 2981 } 2982 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2983 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2984 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 2985 _spdk_bs_load_used_blobids_cpl, ctx); 2986 } 2987 2988 static void 2989 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2990 { 2991 struct spdk_bs_load_ctx *ctx = cb_arg; 2992 uint64_t lba, lba_count, mask_size; 2993 int rc; 2994 2995 if (bserrno != 0) { 2996 _spdk_bs_load_ctx_fail(ctx, bserrno); 2997 return; 2998 } 2999 3000 /* The type must be correct */ 3001 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 3002 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 3003 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 3004 8)); 3005 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 3006 assert(ctx->mask->length == ctx->super->md_len); 3007 3008 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask); 3009 if (rc < 0) { 3010 spdk_free(ctx->mask); 3011 _spdk_bs_load_ctx_fail(ctx, rc); 3012 return; 3013 } 3014 3015 spdk_free(ctx->mask); 3016 3017 /* Read the used clusters mask */ 3018 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 3019 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3020 SPDK_MALLOC_DMA); 3021 if (!ctx->mask) { 3022 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3023 return; 3024 } 3025 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 3026 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 3027 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3028 _spdk_bs_load_used_clusters_cpl, ctx); 3029 } 3030 3031 static void 3032 _spdk_bs_load_read_used_pages(struct spdk_bs_load_ctx *ctx) 3033 { 3034 uint64_t lba, lba_count, mask_size; 3035 3036 /* Read the used pages mask */ 3037 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3038 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3039 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3040 if (!ctx->mask) { 3041 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3042 return; 3043 } 3044 3045 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3046 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3047 spdk_bs_sequence_read_dev(ctx->seq, ctx->mask, lba, lba_count, 3048 _spdk_bs_load_used_pages_cpl, ctx); 3049 } 3050 3051 static int 3052 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 3053 { 3054 struct spdk_blob_md_descriptor *desc; 3055 size_t cur_desc = 0; 3056 3057 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3058 while (cur_desc < sizeof(page->descriptors)) { 3059 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3060 if (desc->length == 0) { 3061 /* If padding and length are 0, this terminates the page */ 3062 break; 3063 } 3064 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3065 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3066 unsigned int i, j; 3067 unsigned int cluster_count = 0; 3068 uint32_t cluster_idx; 3069 3070 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3071 3072 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3073 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 3074 cluster_idx = desc_extent_rle->extents[i].cluster_idx; 3075 /* 3076 * cluster_idx = 0 means an unallocated cluster - don't mark that 3077 * in the used cluster map. 3078 */ 3079 if (cluster_idx != 0) { 3080 spdk_bit_array_set(bs->used_clusters, cluster_idx + j); 3081 if (bs->num_free_clusters == 0) { 3082 return -ENOSPC; 3083 } 3084 bs->num_free_clusters--; 3085 } 3086 cluster_count++; 3087 } 3088 } 3089 if (cluster_count == 0) { 3090 return -EINVAL; 3091 } 3092 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3093 /* Skip this item */ 3094 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3095 /* Skip this item */ 3096 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3097 /* Skip this item */ 3098 } else { 3099 /* Error */ 3100 return -EINVAL; 3101 } 3102 /* Advance to the next descriptor */ 3103 cur_desc += sizeof(*desc) + desc->length; 3104 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3105 break; 3106 } 3107 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3108 } 3109 return 0; 3110 } 3111 3112 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 3113 { 3114 uint32_t crc; 3115 3116 crc = _spdk_blob_md_page_calc_crc(ctx->page); 3117 if (crc != ctx->page->crc) { 3118 return false; 3119 } 3120 3121 if (ctx->page->sequence_num == 0 && 3122 _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 3123 return false; 3124 } 3125 return true; 3126 } 3127 3128 static void 3129 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx); 3130 3131 static void 3132 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3133 { 3134 struct spdk_bs_load_ctx *ctx = cb_arg; 3135 3136 if (bserrno != 0) { 3137 _spdk_bs_load_ctx_fail(ctx, bserrno); 3138 return; 3139 } 3140 3141 _spdk_bs_load_complete(ctx); 3142 } 3143 3144 static void 3145 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3146 { 3147 struct spdk_bs_load_ctx *ctx = cb_arg; 3148 3149 spdk_free(ctx->mask); 3150 ctx->mask = NULL; 3151 3152 if (bserrno != 0) { 3153 _spdk_bs_load_ctx_fail(ctx, bserrno); 3154 return; 3155 } 3156 3157 _spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_load_write_used_clusters_cpl); 3158 } 3159 3160 static void 3161 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3162 { 3163 struct spdk_bs_load_ctx *ctx = cb_arg; 3164 3165 spdk_free(ctx->mask); 3166 ctx->mask = NULL; 3167 3168 if (bserrno != 0) { 3169 _spdk_bs_load_ctx_fail(ctx, bserrno); 3170 return; 3171 } 3172 3173 _spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_load_write_used_blobids_cpl); 3174 } 3175 3176 static void 3177 _spdk_bs_load_write_used_md(struct spdk_bs_load_ctx *ctx) 3178 { 3179 _spdk_bs_write_used_md(ctx->seq, ctx, _spdk_bs_load_write_used_pages_cpl); 3180 } 3181 3182 static void 3183 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3184 { 3185 struct spdk_bs_load_ctx *ctx = cb_arg; 3186 uint64_t num_md_clusters; 3187 uint64_t i; 3188 uint32_t page_num; 3189 3190 if (bserrno != 0) { 3191 _spdk_bs_load_ctx_fail(ctx, bserrno); 3192 return; 3193 } 3194 3195 page_num = ctx->cur_page; 3196 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 3197 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 3198 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 3199 if (ctx->page->sequence_num == 0) { 3200 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 3201 } 3202 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 3203 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3204 return; 3205 } 3206 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 3207 ctx->in_page_chain = true; 3208 ctx->cur_page = ctx->page->next; 3209 _spdk_bs_load_replay_cur_md_page(ctx); 3210 return; 3211 } 3212 } 3213 } 3214 3215 ctx->in_page_chain = false; 3216 3217 do { 3218 ctx->page_index++; 3219 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 3220 3221 if (ctx->page_index < ctx->super->md_len) { 3222 ctx->cur_page = ctx->page_index; 3223 _spdk_bs_load_replay_cur_md_page(ctx); 3224 } else { 3225 /* Claim all of the clusters used by the metadata */ 3226 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster); 3227 for (i = 0; i < num_md_clusters; i++) { 3228 _spdk_bs_claim_cluster(ctx->bs, i); 3229 } 3230 spdk_free(ctx->page); 3231 _spdk_bs_load_write_used_md(ctx); 3232 } 3233 } 3234 3235 static void 3236 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx) 3237 { 3238 uint64_t lba; 3239 3240 assert(ctx->cur_page < ctx->super->md_len); 3241 lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page); 3242 spdk_bs_sequence_read_dev(ctx->seq, ctx->page, lba, 3243 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3244 _spdk_bs_load_replay_md_cpl, ctx); 3245 } 3246 3247 static void 3248 _spdk_bs_load_replay_md(struct spdk_bs_load_ctx *ctx) 3249 { 3250 ctx->page_index = 0; 3251 ctx->cur_page = 0; 3252 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3253 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3254 if (!ctx->page) { 3255 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3256 return; 3257 } 3258 _spdk_bs_load_replay_cur_md_page(ctx); 3259 } 3260 3261 static void 3262 _spdk_bs_recover(struct spdk_bs_load_ctx *ctx) 3263 { 3264 int rc; 3265 3266 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 3267 if (rc < 0) { 3268 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3269 return; 3270 } 3271 3272 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 3273 if (rc < 0) { 3274 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3275 return; 3276 } 3277 3278 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3279 if (rc < 0) { 3280 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3281 return; 3282 } 3283 3284 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 3285 _spdk_bs_load_replay_md(ctx); 3286 } 3287 3288 static void 3289 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3290 { 3291 struct spdk_bs_load_ctx *ctx = cb_arg; 3292 uint32_t crc; 3293 int rc; 3294 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 3295 3296 if (ctx->super->version > SPDK_BS_VERSION || 3297 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 3298 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3299 return; 3300 } 3301 3302 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3303 sizeof(ctx->super->signature)) != 0) { 3304 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3305 return; 3306 } 3307 3308 crc = _spdk_blob_md_page_calc_crc(ctx->super); 3309 if (crc != ctx->super->crc) { 3310 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3311 return; 3312 } 3313 3314 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3315 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 3316 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3317 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 3318 } else { 3319 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 3320 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3321 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3322 _spdk_bs_load_ctx_fail(ctx, -ENXIO); 3323 return; 3324 } 3325 3326 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) { 3327 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n", 3328 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size); 3329 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3330 return; 3331 } 3332 3333 if (ctx->super->size == 0) { 3334 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen; 3335 } 3336 3337 if (ctx->super->io_unit_size == 0) { 3338 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE; 3339 } 3340 3341 /* Parse the super block */ 3342 ctx->bs->clean = 1; 3343 ctx->bs->cluster_sz = ctx->super->cluster_size; 3344 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size; 3345 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 3346 ctx->bs->io_unit_size = ctx->super->io_unit_size; 3347 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3348 if (rc < 0) { 3349 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3350 return; 3351 } 3352 ctx->bs->md_start = ctx->super->md_start; 3353 ctx->bs->md_len = ctx->super->md_len; 3354 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up( 3355 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 3356 ctx->bs->super_blob = ctx->super->super_blob; 3357 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 3358 3359 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) { 3360 _spdk_bs_recover(ctx); 3361 } else { 3362 _spdk_bs_load_read_used_pages(ctx); 3363 } 3364 } 3365 3366 void 3367 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3368 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3369 { 3370 struct spdk_blob_store *bs; 3371 struct spdk_bs_cpl cpl; 3372 struct spdk_bs_load_ctx *ctx; 3373 struct spdk_bs_opts opts = {}; 3374 int err; 3375 3376 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 3377 3378 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3379 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen); 3380 dev->destroy(dev); 3381 cb_fn(cb_arg, NULL, -EINVAL); 3382 return; 3383 } 3384 3385 if (o) { 3386 opts = *o; 3387 } else { 3388 spdk_bs_opts_init(&opts); 3389 } 3390 3391 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 3392 dev->destroy(dev); 3393 cb_fn(cb_arg, NULL, -EINVAL); 3394 return; 3395 } 3396 3397 err = _spdk_bs_alloc(dev, &opts, &bs); 3398 if (err) { 3399 dev->destroy(dev); 3400 cb_fn(cb_arg, NULL, err); 3401 return; 3402 } 3403 3404 ctx = calloc(1, sizeof(*ctx)); 3405 if (!ctx) { 3406 _spdk_bs_free(bs); 3407 cb_fn(cb_arg, NULL, -ENOMEM); 3408 return; 3409 } 3410 3411 ctx->bs = bs; 3412 ctx->iter_cb_fn = opts.iter_cb_fn; 3413 ctx->iter_cb_arg = opts.iter_cb_arg; 3414 3415 /* Allocate memory for the super block */ 3416 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3417 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3418 if (!ctx->super) { 3419 free(ctx); 3420 _spdk_bs_free(bs); 3421 cb_fn(cb_arg, NULL, -ENOMEM); 3422 return; 3423 } 3424 3425 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3426 cpl.u.bs_handle.cb_fn = cb_fn; 3427 cpl.u.bs_handle.cb_arg = cb_arg; 3428 cpl.u.bs_handle.bs = bs; 3429 3430 ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3431 if (!ctx->seq) { 3432 spdk_free(ctx->super); 3433 free(ctx); 3434 _spdk_bs_free(bs); 3435 cb_fn(cb_arg, NULL, -ENOMEM); 3436 return; 3437 } 3438 3439 /* Read the super block */ 3440 spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3441 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3442 _spdk_bs_load_super_cpl, ctx); 3443 } 3444 3445 /* END spdk_bs_load */ 3446 3447 /* START spdk_bs_dump */ 3448 3449 struct spdk_bs_dump_ctx { 3450 struct spdk_blob_store *bs; 3451 struct spdk_bs_super_block *super; 3452 uint32_t cur_page; 3453 struct spdk_blob_md_page *page; 3454 spdk_bs_sequence_t *seq; 3455 FILE *fp; 3456 spdk_bs_dump_print_xattr print_xattr_fn; 3457 char xattr_name[4096]; 3458 }; 3459 3460 static void 3461 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno) 3462 { 3463 spdk_free(ctx->super); 3464 3465 /* 3466 * We need to defer calling spdk_bs_call_cpl() until after 3467 * dev destruction, so tuck these away for later use. 3468 */ 3469 ctx->bs->unload_err = bserrno; 3470 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3471 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3472 3473 spdk_bs_sequence_finish(seq, 0); 3474 _spdk_bs_free(ctx->bs); 3475 free(ctx); 3476 } 3477 3478 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3479 3480 static void 3481 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx) 3482 { 3483 uint32_t page_idx = ctx->cur_page; 3484 struct spdk_blob_md_page *page = ctx->page; 3485 struct spdk_blob_md_descriptor *desc; 3486 size_t cur_desc = 0; 3487 uint32_t crc; 3488 3489 fprintf(ctx->fp, "=========\n"); 3490 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx); 3491 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id); 3492 3493 crc = _spdk_blob_md_page_calc_crc(page); 3494 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch"); 3495 3496 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3497 while (cur_desc < sizeof(page->descriptors)) { 3498 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3499 if (desc->length == 0) { 3500 /* If padding and length are 0, this terminates the page */ 3501 break; 3502 } 3503 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3504 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3505 unsigned int i; 3506 3507 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3508 3509 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3510 if (desc_extent_rle->extents[i].cluster_idx != 0) { 3511 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 3512 desc_extent_rle->extents[i].cluster_idx); 3513 } else { 3514 fprintf(ctx->fp, "Unallocated Extent - "); 3515 } 3516 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length); 3517 fprintf(ctx->fp, "\n"); 3518 } 3519 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3520 struct spdk_blob_md_descriptor_xattr *desc_xattr; 3521 uint32_t i; 3522 3523 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 3524 3525 if (desc_xattr->length != 3526 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) + 3527 desc_xattr->name_length + desc_xattr->value_length) { 3528 } 3529 3530 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length); 3531 ctx->xattr_name[desc_xattr->name_length] = '\0'; 3532 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name); 3533 fprintf(ctx->fp, " value = \""); 3534 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name, 3535 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 3536 desc_xattr->value_length); 3537 fprintf(ctx->fp, "\"\n"); 3538 for (i = 0; i < desc_xattr->value_length; i++) { 3539 if (i % 16 == 0) { 3540 fprintf(ctx->fp, " "); 3541 } 3542 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i)); 3543 if ((i + 1) % 16 == 0) { 3544 fprintf(ctx->fp, "\n"); 3545 } 3546 } 3547 if (i % 16 != 0) { 3548 fprintf(ctx->fp, "\n"); 3549 } 3550 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3551 /* TODO */ 3552 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3553 /* TODO */ 3554 } else { 3555 /* Error */ 3556 } 3557 /* Advance to the next descriptor */ 3558 cur_desc += sizeof(*desc) + desc->length; 3559 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3560 break; 3561 } 3562 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3563 } 3564 } 3565 3566 static void 3567 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3568 { 3569 struct spdk_bs_dump_ctx *ctx = cb_arg; 3570 3571 if (bserrno != 0) { 3572 _spdk_bs_dump_finish(seq, ctx, bserrno); 3573 return; 3574 } 3575 3576 if (ctx->page->id != 0) { 3577 _spdk_bs_dump_print_md_page(ctx); 3578 } 3579 3580 ctx->cur_page++; 3581 3582 if (ctx->cur_page < ctx->super->md_len) { 3583 _spdk_bs_dump_read_md_page(seq, ctx); 3584 } else { 3585 spdk_free(ctx->page); 3586 _spdk_bs_dump_finish(seq, ctx, 0); 3587 } 3588 } 3589 3590 static void 3591 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3592 { 3593 struct spdk_bs_dump_ctx *ctx = cb_arg; 3594 uint64_t lba; 3595 3596 assert(ctx->cur_page < ctx->super->md_len); 3597 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 3598 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3599 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3600 _spdk_bs_dump_read_md_page_cpl, ctx); 3601 } 3602 3603 static void 3604 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3605 { 3606 struct spdk_bs_dump_ctx *ctx = cb_arg; 3607 3608 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature); 3609 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3610 sizeof(ctx->super->signature)) != 0) { 3611 fprintf(ctx->fp, "(Mismatch)\n"); 3612 _spdk_bs_dump_finish(seq, ctx, bserrno); 3613 return; 3614 } else { 3615 fprintf(ctx->fp, "(OK)\n"); 3616 } 3617 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version); 3618 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc, 3619 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch"); 3620 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype); 3621 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size); 3622 fprintf(ctx->fp, "Super Blob ID: "); 3623 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) { 3624 fprintf(ctx->fp, "(None)\n"); 3625 } else { 3626 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob); 3627 } 3628 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean); 3629 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start); 3630 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len); 3631 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start); 3632 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len); 3633 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start); 3634 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len); 3635 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start); 3636 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len); 3637 3638 ctx->cur_page = 0; 3639 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3640 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3641 if (!ctx->page) { 3642 _spdk_bs_dump_finish(seq, ctx, -ENOMEM); 3643 return; 3644 } 3645 _spdk_bs_dump_read_md_page(seq, ctx); 3646 } 3647 3648 void 3649 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn, 3650 spdk_bs_op_complete cb_fn, void *cb_arg) 3651 { 3652 struct spdk_blob_store *bs; 3653 struct spdk_bs_cpl cpl; 3654 spdk_bs_sequence_t *seq; 3655 struct spdk_bs_dump_ctx *ctx; 3656 struct spdk_bs_opts opts = {}; 3657 int err; 3658 3659 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev); 3660 3661 spdk_bs_opts_init(&opts); 3662 3663 err = _spdk_bs_alloc(dev, &opts, &bs); 3664 if (err) { 3665 dev->destroy(dev); 3666 cb_fn(cb_arg, err); 3667 return; 3668 } 3669 3670 ctx = calloc(1, sizeof(*ctx)); 3671 if (!ctx) { 3672 _spdk_bs_free(bs); 3673 cb_fn(cb_arg, -ENOMEM); 3674 return; 3675 } 3676 3677 ctx->bs = bs; 3678 ctx->fp = fp; 3679 ctx->print_xattr_fn = print_xattr_fn; 3680 3681 /* Allocate memory for the super block */ 3682 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3683 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3684 if (!ctx->super) { 3685 free(ctx); 3686 _spdk_bs_free(bs); 3687 cb_fn(cb_arg, -ENOMEM); 3688 return; 3689 } 3690 3691 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3692 cpl.u.bs_basic.cb_fn = cb_fn; 3693 cpl.u.bs_basic.cb_arg = cb_arg; 3694 3695 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3696 if (!seq) { 3697 spdk_free(ctx->super); 3698 free(ctx); 3699 _spdk_bs_free(bs); 3700 cb_fn(cb_arg, -ENOMEM); 3701 return; 3702 } 3703 3704 /* Read the super block */ 3705 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3706 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3707 _spdk_bs_dump_super_cpl, ctx); 3708 } 3709 3710 /* END spdk_bs_dump */ 3711 3712 /* START spdk_bs_init */ 3713 3714 struct spdk_bs_init_ctx { 3715 struct spdk_blob_store *bs; 3716 struct spdk_bs_super_block *super; 3717 }; 3718 3719 static void 3720 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3721 { 3722 struct spdk_bs_init_ctx *ctx = cb_arg; 3723 3724 spdk_free(ctx->super); 3725 free(ctx); 3726 3727 spdk_bs_sequence_finish(seq, bserrno); 3728 } 3729 3730 static void 3731 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3732 { 3733 struct spdk_bs_init_ctx *ctx = cb_arg; 3734 3735 /* Write super block */ 3736 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 3737 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 3738 _spdk_bs_init_persist_super_cpl, ctx); 3739 } 3740 3741 void 3742 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3743 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3744 { 3745 struct spdk_bs_init_ctx *ctx; 3746 struct spdk_blob_store *bs; 3747 struct spdk_bs_cpl cpl; 3748 spdk_bs_sequence_t *seq; 3749 spdk_bs_batch_t *batch; 3750 uint64_t num_md_lba; 3751 uint64_t num_md_pages; 3752 uint64_t num_md_clusters; 3753 uint32_t i; 3754 struct spdk_bs_opts opts = {}; 3755 int rc; 3756 3757 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 3758 3759 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3760 SPDK_ERRLOG("unsupported dev block length of %d\n", 3761 dev->blocklen); 3762 dev->destroy(dev); 3763 cb_fn(cb_arg, NULL, -EINVAL); 3764 return; 3765 } 3766 3767 if (o) { 3768 opts = *o; 3769 } else { 3770 spdk_bs_opts_init(&opts); 3771 } 3772 3773 if (_spdk_bs_opts_verify(&opts) != 0) { 3774 dev->destroy(dev); 3775 cb_fn(cb_arg, NULL, -EINVAL); 3776 return; 3777 } 3778 3779 rc = _spdk_bs_alloc(dev, &opts, &bs); 3780 if (rc) { 3781 dev->destroy(dev); 3782 cb_fn(cb_arg, NULL, rc); 3783 return; 3784 } 3785 3786 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 3787 /* By default, allocate 1 page per cluster. 3788 * Technically, this over-allocates metadata 3789 * because more metadata will reduce the number 3790 * of usable clusters. This can be addressed with 3791 * more complex math in the future. 3792 */ 3793 bs->md_len = bs->total_clusters; 3794 } else { 3795 bs->md_len = opts.num_md_pages; 3796 } 3797 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 3798 if (rc < 0) { 3799 _spdk_bs_free(bs); 3800 cb_fn(cb_arg, NULL, -ENOMEM); 3801 return; 3802 } 3803 3804 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 3805 if (rc < 0) { 3806 _spdk_bs_free(bs); 3807 cb_fn(cb_arg, NULL, -ENOMEM); 3808 return; 3809 } 3810 3811 ctx = calloc(1, sizeof(*ctx)); 3812 if (!ctx) { 3813 _spdk_bs_free(bs); 3814 cb_fn(cb_arg, NULL, -ENOMEM); 3815 return; 3816 } 3817 3818 ctx->bs = bs; 3819 3820 /* Allocate memory for the super block */ 3821 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3822 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3823 if (!ctx->super) { 3824 free(ctx); 3825 _spdk_bs_free(bs); 3826 cb_fn(cb_arg, NULL, -ENOMEM); 3827 return; 3828 } 3829 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3830 sizeof(ctx->super->signature)); 3831 ctx->super->version = SPDK_BS_VERSION; 3832 ctx->super->length = sizeof(*ctx->super); 3833 ctx->super->super_blob = bs->super_blob; 3834 ctx->super->clean = 0; 3835 ctx->super->cluster_size = bs->cluster_sz; 3836 ctx->super->io_unit_size = bs->io_unit_size; 3837 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 3838 3839 /* Calculate how many pages the metadata consumes at the front 3840 * of the disk. 3841 */ 3842 3843 /* The super block uses 1 page */ 3844 num_md_pages = 1; 3845 3846 /* The used_md_pages mask requires 1 bit per metadata page, rounded 3847 * up to the nearest page, plus a header. 3848 */ 3849 ctx->super->used_page_mask_start = num_md_pages; 3850 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3851 spdk_divide_round_up(bs->md_len, 8), 3852 SPDK_BS_PAGE_SIZE); 3853 num_md_pages += ctx->super->used_page_mask_len; 3854 3855 /* The used_clusters mask requires 1 bit per cluster, rounded 3856 * up to the nearest page, plus a header. 3857 */ 3858 ctx->super->used_cluster_mask_start = num_md_pages; 3859 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3860 spdk_divide_round_up(bs->total_clusters, 8), 3861 SPDK_BS_PAGE_SIZE); 3862 num_md_pages += ctx->super->used_cluster_mask_len; 3863 3864 /* The used_blobids mask requires 1 bit per metadata page, rounded 3865 * up to the nearest page, plus a header. 3866 */ 3867 ctx->super->used_blobid_mask_start = num_md_pages; 3868 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3869 spdk_divide_round_up(bs->md_len, 8), 3870 SPDK_BS_PAGE_SIZE); 3871 num_md_pages += ctx->super->used_blobid_mask_len; 3872 3873 /* The metadata region size was chosen above */ 3874 ctx->super->md_start = bs->md_start = num_md_pages; 3875 ctx->super->md_len = bs->md_len; 3876 num_md_pages += bs->md_len; 3877 3878 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 3879 3880 ctx->super->size = dev->blockcnt * dev->blocklen; 3881 3882 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 3883 3884 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster); 3885 if (num_md_clusters > bs->total_clusters) { 3886 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 3887 "please decrease number of pages reserved for metadata " 3888 "or increase cluster size.\n"); 3889 spdk_free(ctx->super); 3890 free(ctx); 3891 _spdk_bs_free(bs); 3892 cb_fn(cb_arg, NULL, -ENOMEM); 3893 return; 3894 } 3895 /* Claim all of the clusters used by the metadata */ 3896 for (i = 0; i < num_md_clusters; i++) { 3897 _spdk_bs_claim_cluster(bs, i); 3898 } 3899 3900 bs->total_data_clusters = bs->num_free_clusters; 3901 3902 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3903 cpl.u.bs_handle.cb_fn = cb_fn; 3904 cpl.u.bs_handle.cb_arg = cb_arg; 3905 cpl.u.bs_handle.bs = bs; 3906 3907 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3908 if (!seq) { 3909 spdk_free(ctx->super); 3910 free(ctx); 3911 _spdk_bs_free(bs); 3912 cb_fn(cb_arg, NULL, -ENOMEM); 3913 return; 3914 } 3915 3916 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 3917 3918 /* Clear metadata space */ 3919 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba); 3920 3921 switch (opts.clear_method) { 3922 case BS_CLEAR_WITH_UNMAP: 3923 /* Trim data clusters */ 3924 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3925 break; 3926 case BS_CLEAR_WITH_WRITE_ZEROES: 3927 /* Write_zeroes to data clusters */ 3928 spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3929 break; 3930 case BS_CLEAR_WITH_NONE: 3931 default: 3932 break; 3933 } 3934 3935 spdk_bs_batch_close(batch); 3936 } 3937 3938 /* END spdk_bs_init */ 3939 3940 /* START spdk_bs_destroy */ 3941 3942 static void 3943 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3944 { 3945 struct spdk_bs_init_ctx *ctx = cb_arg; 3946 struct spdk_blob_store *bs = ctx->bs; 3947 3948 /* 3949 * We need to defer calling spdk_bs_call_cpl() until after 3950 * dev destruction, so tuck these away for later use. 3951 */ 3952 bs->unload_err = bserrno; 3953 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3954 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3955 3956 spdk_bs_sequence_finish(seq, bserrno); 3957 3958 _spdk_bs_free(bs); 3959 free(ctx); 3960 } 3961 3962 void 3963 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 3964 void *cb_arg) 3965 { 3966 struct spdk_bs_cpl cpl; 3967 spdk_bs_sequence_t *seq; 3968 struct spdk_bs_init_ctx *ctx; 3969 3970 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 3971 3972 if (!TAILQ_EMPTY(&bs->blobs)) { 3973 SPDK_ERRLOG("Blobstore still has open blobs\n"); 3974 cb_fn(cb_arg, -EBUSY); 3975 return; 3976 } 3977 3978 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3979 cpl.u.bs_basic.cb_fn = cb_fn; 3980 cpl.u.bs_basic.cb_arg = cb_arg; 3981 3982 ctx = calloc(1, sizeof(*ctx)); 3983 if (!ctx) { 3984 cb_fn(cb_arg, -ENOMEM); 3985 return; 3986 } 3987 3988 ctx->bs = bs; 3989 3990 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3991 if (!seq) { 3992 free(ctx); 3993 cb_fn(cb_arg, -ENOMEM); 3994 return; 3995 } 3996 3997 /* Write zeroes to the super block */ 3998 spdk_bs_sequence_write_zeroes_dev(seq, 3999 _spdk_bs_page_to_lba(bs, 0), 4000 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 4001 _spdk_bs_destroy_trim_cpl, ctx); 4002 } 4003 4004 /* END spdk_bs_destroy */ 4005 4006 /* START spdk_bs_unload */ 4007 4008 static void 4009 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4010 { 4011 struct spdk_bs_load_ctx *ctx = cb_arg; 4012 4013 spdk_free(ctx->super); 4014 4015 /* 4016 * We need to defer calling spdk_bs_call_cpl() until after 4017 * dev destruction, so tuck these away for later use. 4018 */ 4019 ctx->bs->unload_err = bserrno; 4020 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 4021 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 4022 4023 spdk_bs_sequence_finish(seq, bserrno); 4024 4025 _spdk_bs_free(ctx->bs); 4026 free(ctx); 4027 } 4028 4029 static void 4030 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4031 { 4032 struct spdk_bs_load_ctx *ctx = cb_arg; 4033 4034 spdk_free(ctx->mask); 4035 ctx->super->clean = 1; 4036 4037 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 4038 } 4039 4040 static void 4041 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4042 { 4043 struct spdk_bs_load_ctx *ctx = cb_arg; 4044 4045 spdk_free(ctx->mask); 4046 ctx->mask = NULL; 4047 4048 _spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_unload_write_used_clusters_cpl); 4049 } 4050 4051 static void 4052 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4053 { 4054 struct spdk_bs_load_ctx *ctx = cb_arg; 4055 4056 spdk_free(ctx->mask); 4057 ctx->mask = NULL; 4058 4059 _spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_unload_write_used_blobids_cpl); 4060 } 4061 4062 static void 4063 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4064 { 4065 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 4066 } 4067 4068 void 4069 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 4070 { 4071 struct spdk_bs_cpl cpl; 4072 struct spdk_bs_load_ctx *ctx; 4073 4074 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 4075 4076 if (!TAILQ_EMPTY(&bs->blobs)) { 4077 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4078 cb_fn(cb_arg, -EBUSY); 4079 return; 4080 } 4081 4082 ctx = calloc(1, sizeof(*ctx)); 4083 if (!ctx) { 4084 cb_fn(cb_arg, -ENOMEM); 4085 return; 4086 } 4087 4088 ctx->bs = bs; 4089 4090 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4091 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4092 if (!ctx->super) { 4093 free(ctx); 4094 cb_fn(cb_arg, -ENOMEM); 4095 return; 4096 } 4097 4098 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4099 cpl.u.bs_basic.cb_fn = cb_fn; 4100 cpl.u.bs_basic.cb_arg = cb_arg; 4101 4102 ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4103 if (!ctx->seq) { 4104 spdk_free(ctx->super); 4105 free(ctx); 4106 cb_fn(cb_arg, -ENOMEM); 4107 return; 4108 } 4109 4110 /* Read super block */ 4111 spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4112 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4113 _spdk_bs_unload_read_super_cpl, ctx); 4114 } 4115 4116 /* END spdk_bs_unload */ 4117 4118 /* START spdk_bs_set_super */ 4119 4120 struct spdk_bs_set_super_ctx { 4121 struct spdk_blob_store *bs; 4122 struct spdk_bs_super_block *super; 4123 }; 4124 4125 static void 4126 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4127 { 4128 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4129 4130 if (bserrno != 0) { 4131 SPDK_ERRLOG("Unable to write to super block of blobstore\n"); 4132 } 4133 4134 spdk_free(ctx->super); 4135 4136 spdk_bs_sequence_finish(seq, bserrno); 4137 4138 free(ctx); 4139 } 4140 4141 static void 4142 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4143 { 4144 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4145 4146 if (bserrno != 0) { 4147 SPDK_ERRLOG("Unable to read super block of blobstore\n"); 4148 spdk_free(ctx->super); 4149 spdk_bs_sequence_finish(seq, bserrno); 4150 free(ctx); 4151 return; 4152 } 4153 4154 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx); 4155 } 4156 4157 void 4158 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 4159 spdk_bs_op_complete cb_fn, void *cb_arg) 4160 { 4161 struct spdk_bs_cpl cpl; 4162 spdk_bs_sequence_t *seq; 4163 struct spdk_bs_set_super_ctx *ctx; 4164 4165 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n"); 4166 4167 ctx = calloc(1, sizeof(*ctx)); 4168 if (!ctx) { 4169 cb_fn(cb_arg, -ENOMEM); 4170 return; 4171 } 4172 4173 ctx->bs = bs; 4174 4175 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4176 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4177 if (!ctx->super) { 4178 free(ctx); 4179 cb_fn(cb_arg, -ENOMEM); 4180 return; 4181 } 4182 4183 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4184 cpl.u.bs_basic.cb_fn = cb_fn; 4185 cpl.u.bs_basic.cb_arg = cb_arg; 4186 4187 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4188 if (!seq) { 4189 spdk_free(ctx->super); 4190 free(ctx); 4191 cb_fn(cb_arg, -ENOMEM); 4192 return; 4193 } 4194 4195 bs->super_blob = blobid; 4196 4197 /* Read super block */ 4198 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4199 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4200 _spdk_bs_set_super_read_cpl, ctx); 4201 } 4202 4203 /* END spdk_bs_set_super */ 4204 4205 void 4206 spdk_bs_get_super(struct spdk_blob_store *bs, 4207 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4208 { 4209 if (bs->super_blob == SPDK_BLOBID_INVALID) { 4210 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 4211 } else { 4212 cb_fn(cb_arg, bs->super_blob, 0); 4213 } 4214 } 4215 4216 uint64_t 4217 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 4218 { 4219 return bs->cluster_sz; 4220 } 4221 4222 uint64_t 4223 spdk_bs_get_page_size(struct spdk_blob_store *bs) 4224 { 4225 return SPDK_BS_PAGE_SIZE; 4226 } 4227 4228 uint64_t 4229 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs) 4230 { 4231 return bs->io_unit_size; 4232 } 4233 4234 uint64_t 4235 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 4236 { 4237 return bs->num_free_clusters; 4238 } 4239 4240 uint64_t 4241 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 4242 { 4243 return bs->total_data_clusters; 4244 } 4245 4246 static int 4247 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 4248 { 4249 bs->md_channel = spdk_get_io_channel(bs); 4250 if (!bs->md_channel) { 4251 SPDK_ERRLOG("Failed to get IO channel.\n"); 4252 return -1; 4253 } 4254 4255 return 0; 4256 } 4257 4258 static int 4259 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 4260 { 4261 spdk_put_io_channel(bs->md_channel); 4262 4263 return 0; 4264 } 4265 4266 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 4267 { 4268 assert(blob != NULL); 4269 4270 return blob->id; 4271 } 4272 4273 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 4274 { 4275 assert(blob != NULL); 4276 4277 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 4278 } 4279 4280 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob) 4281 { 4282 assert(blob != NULL); 4283 4284 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs); 4285 } 4286 4287 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 4288 { 4289 assert(blob != NULL); 4290 4291 return blob->active.num_clusters; 4292 } 4293 4294 /* START spdk_bs_create_blob */ 4295 4296 static void 4297 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4298 { 4299 struct spdk_blob *blob = cb_arg; 4300 4301 _spdk_blob_free(blob); 4302 4303 spdk_bs_sequence_finish(seq, bserrno); 4304 } 4305 4306 static int 4307 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs, 4308 bool internal) 4309 { 4310 uint64_t i; 4311 size_t value_len = 0; 4312 int rc; 4313 const void *value = NULL; 4314 if (xattrs->count > 0 && xattrs->get_value == NULL) { 4315 return -EINVAL; 4316 } 4317 for (i = 0; i < xattrs->count; i++) { 4318 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len); 4319 if (value == NULL || value_len == 0) { 4320 return -EINVAL; 4321 } 4322 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal); 4323 if (rc < 0) { 4324 return rc; 4325 } 4326 } 4327 return 0; 4328 } 4329 4330 static void 4331 _spdk_bs_create_blob(struct spdk_blob_store *bs, 4332 const struct spdk_blob_opts *opts, 4333 const struct spdk_blob_xattr_opts *internal_xattrs, 4334 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4335 { 4336 struct spdk_blob *blob; 4337 uint32_t page_idx; 4338 struct spdk_bs_cpl cpl; 4339 struct spdk_blob_opts opts_default; 4340 struct spdk_blob_xattr_opts internal_xattrs_default; 4341 spdk_bs_sequence_t *seq; 4342 spdk_blob_id id; 4343 int rc; 4344 4345 assert(spdk_get_thread() == bs->md_thread); 4346 4347 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 4348 if (page_idx == UINT32_MAX) { 4349 cb_fn(cb_arg, 0, -ENOMEM); 4350 return; 4351 } 4352 spdk_bit_array_set(bs->used_blobids, page_idx); 4353 spdk_bit_array_set(bs->used_md_pages, page_idx); 4354 4355 id = _spdk_bs_page_to_blobid(page_idx); 4356 4357 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 4358 4359 blob = _spdk_blob_alloc(bs, id); 4360 if (!blob) { 4361 cb_fn(cb_arg, 0, -ENOMEM); 4362 return; 4363 } 4364 4365 if (!opts) { 4366 spdk_blob_opts_init(&opts_default); 4367 opts = &opts_default; 4368 } 4369 if (!internal_xattrs) { 4370 _spdk_blob_xattrs_init(&internal_xattrs_default); 4371 internal_xattrs = &internal_xattrs_default; 4372 } 4373 4374 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false); 4375 if (rc < 0) { 4376 _spdk_blob_free(blob); 4377 cb_fn(cb_arg, 0, rc); 4378 return; 4379 } 4380 4381 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true); 4382 if (rc < 0) { 4383 _spdk_blob_free(blob); 4384 cb_fn(cb_arg, 0, rc); 4385 return; 4386 } 4387 4388 if (opts->thin_provision) { 4389 _spdk_blob_set_thin_provision(blob); 4390 } 4391 4392 _spdk_blob_set_clear_method(blob, opts->clear_method); 4393 4394 rc = _spdk_blob_resize(blob, opts->num_clusters); 4395 if (rc < 0) { 4396 _spdk_blob_free(blob); 4397 cb_fn(cb_arg, 0, rc); 4398 return; 4399 } 4400 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4401 cpl.u.blobid.cb_fn = cb_fn; 4402 cpl.u.blobid.cb_arg = cb_arg; 4403 cpl.u.blobid.blobid = blob->id; 4404 4405 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4406 if (!seq) { 4407 _spdk_blob_free(blob); 4408 cb_fn(cb_arg, 0, -ENOMEM); 4409 return; 4410 } 4411 4412 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 4413 } 4414 4415 void spdk_bs_create_blob(struct spdk_blob_store *bs, 4416 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4417 { 4418 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg); 4419 } 4420 4421 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 4422 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4423 { 4424 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg); 4425 } 4426 4427 /* END spdk_bs_create_blob */ 4428 4429 /* START blob_cleanup */ 4430 4431 struct spdk_clone_snapshot_ctx { 4432 struct spdk_bs_cpl cpl; 4433 int bserrno; 4434 bool frozen; 4435 4436 struct spdk_io_channel *channel; 4437 4438 /* Current cluster for inflate operation */ 4439 uint64_t cluster; 4440 4441 /* For inflation force allocation of all unallocated clusters and remove 4442 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */ 4443 bool allocate_all; 4444 4445 struct { 4446 spdk_blob_id id; 4447 struct spdk_blob *blob; 4448 } original; 4449 struct { 4450 spdk_blob_id id; 4451 struct spdk_blob *blob; 4452 } new; 4453 4454 /* xattrs specified for snapshot/clones only. They have no impact on 4455 * the original blobs xattrs. */ 4456 const struct spdk_blob_xattr_opts *xattrs; 4457 }; 4458 4459 static void 4460 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) 4461 { 4462 struct spdk_clone_snapshot_ctx *ctx = cb_arg; 4463 struct spdk_bs_cpl *cpl = &ctx->cpl; 4464 4465 if (bserrno != 0) { 4466 if (ctx->bserrno != 0) { 4467 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4468 } else { 4469 ctx->bserrno = bserrno; 4470 } 4471 } 4472 4473 switch (cpl->type) { 4474 case SPDK_BS_CPL_TYPE_BLOBID: 4475 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno); 4476 break; 4477 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 4478 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno); 4479 break; 4480 default: 4481 SPDK_UNREACHABLE(); 4482 break; 4483 } 4484 4485 free(ctx); 4486 } 4487 4488 static void 4489 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 4490 { 4491 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4492 struct spdk_blob *origblob = ctx->original.blob; 4493 4494 if (bserrno != 0) { 4495 if (ctx->bserrno != 0) { 4496 SPDK_ERRLOG("Unfreeze error %d\n", bserrno); 4497 } else { 4498 ctx->bserrno = bserrno; 4499 } 4500 } 4501 4502 ctx->original.id = origblob->id; 4503 origblob->locked_operation_in_progress = false; 4504 4505 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4506 } 4507 4508 static void 4509 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) 4510 { 4511 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4512 struct spdk_blob *origblob = ctx->original.blob; 4513 4514 if (bserrno != 0) { 4515 if (ctx->bserrno != 0) { 4516 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4517 } else { 4518 ctx->bserrno = bserrno; 4519 } 4520 } 4521 4522 if (ctx->frozen) { 4523 /* Unfreeze any outstanding I/O */ 4524 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx); 4525 } else { 4526 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0); 4527 } 4528 4529 } 4530 4531 static void 4532 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno) 4533 { 4534 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4535 struct spdk_blob *newblob = ctx->new.blob; 4536 4537 if (bserrno != 0) { 4538 if (ctx->bserrno != 0) { 4539 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4540 } else { 4541 ctx->bserrno = bserrno; 4542 } 4543 } 4544 4545 ctx->new.id = newblob->id; 4546 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4547 } 4548 4549 /* END blob_cleanup */ 4550 4551 /* START spdk_bs_create_snapshot */ 4552 4553 static void 4554 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2) 4555 { 4556 uint64_t *cluster_temp; 4557 4558 cluster_temp = blob1->active.clusters; 4559 blob1->active.clusters = blob2->active.clusters; 4560 blob2->active.clusters = cluster_temp; 4561 } 4562 4563 static void 4564 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno) 4565 { 4566 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4567 struct spdk_blob *origblob = ctx->original.blob; 4568 struct spdk_blob *newblob = ctx->new.blob; 4569 4570 if (bserrno != 0) { 4571 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4572 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4573 return; 4574 } 4575 4576 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */ 4577 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true); 4578 if (bserrno != 0) { 4579 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4580 return; 4581 } 4582 4583 _spdk_bs_blob_list_add(ctx->original.blob); 4584 4585 spdk_blob_set_read_only(newblob); 4586 4587 /* sync snapshot metadata */ 4588 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4589 } 4590 4591 static void 4592 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) 4593 { 4594 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4595 struct spdk_blob *origblob = ctx->original.blob; 4596 struct spdk_blob *newblob = ctx->new.blob; 4597 4598 if (bserrno != 0) { 4599 /* return cluster map back to original */ 4600 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4601 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4602 return; 4603 } 4604 4605 /* Set internal xattr for snapshot id */ 4606 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true); 4607 if (bserrno != 0) { 4608 /* return cluster map back to original */ 4609 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4610 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4611 return; 4612 } 4613 4614 _spdk_bs_blob_list_remove(origblob); 4615 origblob->parent_id = newblob->id; 4616 4617 /* Create new back_bs_dev for snapshot */ 4618 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob); 4619 if (origblob->back_bs_dev == NULL) { 4620 /* return cluster map back to original */ 4621 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4622 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL); 4623 return; 4624 } 4625 4626 /* set clone blob as thin provisioned */ 4627 _spdk_blob_set_thin_provision(origblob); 4628 4629 _spdk_bs_blob_list_add(newblob); 4630 4631 /* sync clone metadata */ 4632 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx); 4633 } 4634 4635 static void 4636 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc) 4637 { 4638 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4639 struct spdk_blob *origblob = ctx->original.blob; 4640 struct spdk_blob *newblob = ctx->new.blob; 4641 int bserrno; 4642 4643 if (rc != 0) { 4644 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc); 4645 return; 4646 } 4647 4648 ctx->frozen = true; 4649 4650 /* set new back_bs_dev for snapshot */ 4651 newblob->back_bs_dev = origblob->back_bs_dev; 4652 /* Set invalid flags from origblob */ 4653 newblob->invalid_flags = origblob->invalid_flags; 4654 4655 /* inherit parent from original blob if set */ 4656 newblob->parent_id = origblob->parent_id; 4657 if (origblob->parent_id != SPDK_BLOBID_INVALID) { 4658 /* Set internal xattr for snapshot id */ 4659 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT, 4660 &origblob->parent_id, sizeof(spdk_blob_id), true); 4661 if (bserrno != 0) { 4662 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4663 return; 4664 } 4665 } 4666 4667 /* swap cluster maps */ 4668 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4669 4670 /* Set the clear method on the new blob to match the original. */ 4671 _spdk_blob_set_clear_method(newblob, origblob->clear_method); 4672 4673 /* sync snapshot metadata */ 4674 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); 4675 } 4676 4677 static void 4678 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4679 { 4680 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4681 struct spdk_blob *origblob = ctx->original.blob; 4682 struct spdk_blob *newblob = _blob; 4683 4684 if (bserrno != 0) { 4685 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4686 return; 4687 } 4688 4689 ctx->new.blob = newblob; 4690 assert(spdk_blob_is_thin_provisioned(newblob)); 4691 assert(spdk_mem_all_zero(newblob->active.clusters, 4692 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 4693 4694 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx); 4695 } 4696 4697 static void 4698 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4699 { 4700 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4701 struct spdk_blob *origblob = ctx->original.blob; 4702 4703 if (bserrno != 0) { 4704 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4705 return; 4706 } 4707 4708 ctx->new.id = blobid; 4709 ctx->cpl.u.blobid.blobid = blobid; 4710 4711 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx); 4712 } 4713 4714 4715 static void 4716 _spdk_bs_xattr_snapshot(void *arg, const char *name, 4717 const void **value, size_t *value_len) 4718 { 4719 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0); 4720 4721 struct spdk_blob *blob = (struct spdk_blob *)arg; 4722 *value = &blob->id; 4723 *value_len = sizeof(blob->id); 4724 } 4725 4726 static void 4727 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4728 { 4729 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4730 struct spdk_blob_opts opts; 4731 struct spdk_blob_xattr_opts internal_xattrs; 4732 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS }; 4733 4734 if (bserrno != 0) { 4735 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4736 return; 4737 } 4738 4739 ctx->original.blob = _blob; 4740 4741 if (_blob->data_ro || _blob->md_ro) { 4742 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n", 4743 _blob->id); 4744 ctx->bserrno = -EINVAL; 4745 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4746 return; 4747 } 4748 4749 if (_blob->locked_operation_in_progress) { 4750 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n"); 4751 ctx->bserrno = -EBUSY; 4752 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4753 return; 4754 } 4755 4756 _blob->locked_operation_in_progress = true; 4757 4758 spdk_blob_opts_init(&opts); 4759 _spdk_blob_xattrs_init(&internal_xattrs); 4760 4761 /* Change the size of new blob to the same as in original blob, 4762 * but do not allocate clusters */ 4763 opts.thin_provision = true; 4764 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4765 4766 /* If there are any xattrs specified for snapshot, set them now */ 4767 if (ctx->xattrs) { 4768 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4769 } 4770 /* Set internal xattr SNAPSHOT_IN_PROGRESS */ 4771 internal_xattrs.count = 1; 4772 internal_xattrs.ctx = _blob; 4773 internal_xattrs.names = xattrs_names; 4774 internal_xattrs.get_value = _spdk_bs_xattr_snapshot; 4775 4776 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4777 _spdk_bs_snapshot_newblob_create_cpl, ctx); 4778 } 4779 4780 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, 4781 const struct spdk_blob_xattr_opts *snapshot_xattrs, 4782 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4783 { 4784 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4785 4786 if (!ctx) { 4787 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4788 return; 4789 } 4790 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4791 ctx->cpl.u.blobid.cb_fn = cb_fn; 4792 ctx->cpl.u.blobid.cb_arg = cb_arg; 4793 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4794 ctx->bserrno = 0; 4795 ctx->frozen = false; 4796 ctx->original.id = blobid; 4797 ctx->xattrs = snapshot_xattrs; 4798 4799 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx); 4800 } 4801 /* END spdk_bs_create_snapshot */ 4802 4803 /* START spdk_bs_create_clone */ 4804 4805 static void 4806 _spdk_bs_xattr_clone(void *arg, const char *name, 4807 const void **value, size_t *value_len) 4808 { 4809 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0); 4810 4811 struct spdk_blob *blob = (struct spdk_blob *)arg; 4812 *value = &blob->id; 4813 *value_len = sizeof(blob->id); 4814 } 4815 4816 static void 4817 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4818 { 4819 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4820 struct spdk_blob *clone = _blob; 4821 4822 ctx->new.blob = clone; 4823 _spdk_bs_blob_list_add(clone); 4824 4825 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4826 } 4827 4828 static void 4829 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4830 { 4831 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4832 4833 ctx->cpl.u.blobid.blobid = blobid; 4834 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx); 4835 } 4836 4837 static void 4838 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4839 { 4840 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4841 struct spdk_blob_opts opts; 4842 struct spdk_blob_xattr_opts internal_xattrs; 4843 char *xattr_names[] = { BLOB_SNAPSHOT }; 4844 4845 if (bserrno != 0) { 4846 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4847 return; 4848 } 4849 4850 ctx->original.blob = _blob; 4851 4852 if (!_blob->data_ro || !_blob->md_ro) { 4853 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n"); 4854 ctx->bserrno = -EINVAL; 4855 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4856 return; 4857 } 4858 4859 if (_blob->locked_operation_in_progress) { 4860 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n"); 4861 ctx->bserrno = -EBUSY; 4862 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4863 return; 4864 } 4865 4866 _blob->locked_operation_in_progress = true; 4867 4868 spdk_blob_opts_init(&opts); 4869 _spdk_blob_xattrs_init(&internal_xattrs); 4870 4871 opts.thin_provision = true; 4872 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4873 if (ctx->xattrs) { 4874 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4875 } 4876 4877 /* Set internal xattr BLOB_SNAPSHOT */ 4878 internal_xattrs.count = 1; 4879 internal_xattrs.ctx = _blob; 4880 internal_xattrs.names = xattr_names; 4881 internal_xattrs.get_value = _spdk_bs_xattr_clone; 4882 4883 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4884 _spdk_bs_clone_newblob_create_cpl, ctx); 4885 } 4886 4887 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid, 4888 const struct spdk_blob_xattr_opts *clone_xattrs, 4889 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4890 { 4891 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4892 4893 if (!ctx) { 4894 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4895 return; 4896 } 4897 4898 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4899 ctx->cpl.u.blobid.cb_fn = cb_fn; 4900 ctx->cpl.u.blobid.cb_arg = cb_arg; 4901 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4902 ctx->bserrno = 0; 4903 ctx->xattrs = clone_xattrs; 4904 ctx->original.id = blobid; 4905 4906 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx); 4907 } 4908 4909 /* END spdk_bs_create_clone */ 4910 4911 /* START spdk_bs_inflate_blob */ 4912 4913 static void 4914 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno) 4915 { 4916 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4917 struct spdk_blob *_blob = ctx->original.blob; 4918 4919 if (bserrno != 0) { 4920 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4921 return; 4922 } 4923 4924 assert(_parent != NULL); 4925 4926 _spdk_bs_blob_list_remove(_blob); 4927 _blob->parent_id = _parent->id; 4928 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id, 4929 sizeof(spdk_blob_id), true); 4930 4931 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4932 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent); 4933 _spdk_bs_blob_list_add(_blob); 4934 4935 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4936 } 4937 4938 static void 4939 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno) 4940 { 4941 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4942 struct spdk_blob *_blob = ctx->original.blob; 4943 struct spdk_blob *_parent; 4944 4945 if (bserrno != 0) { 4946 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4947 return; 4948 } 4949 4950 if (ctx->allocate_all) { 4951 /* remove thin provisioning */ 4952 _spdk_bs_blob_list_remove(_blob); 4953 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4954 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV; 4955 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4956 _blob->back_bs_dev = NULL; 4957 _blob->parent_id = SPDK_BLOBID_INVALID; 4958 } else { 4959 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob; 4960 if (_parent->parent_id != SPDK_BLOBID_INVALID) { 4961 /* We must change the parent of the inflated blob */ 4962 spdk_bs_open_blob(_blob->bs, _parent->parent_id, 4963 _spdk_bs_inflate_blob_set_parent_cpl, ctx); 4964 return; 4965 } 4966 4967 _spdk_bs_blob_list_remove(_blob); 4968 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4969 _blob->parent_id = SPDK_BLOBID_INVALID; 4970 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4971 _blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 4972 } 4973 4974 _blob->state = SPDK_BLOB_STATE_DIRTY; 4975 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4976 } 4977 4978 /* Check if cluster needs allocation */ 4979 static inline bool 4980 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all) 4981 { 4982 struct spdk_blob_bs_dev *b; 4983 4984 assert(blob != NULL); 4985 4986 if (blob->active.clusters[cluster] != 0) { 4987 /* Cluster is already allocated */ 4988 return false; 4989 } 4990 4991 if (blob->parent_id == SPDK_BLOBID_INVALID) { 4992 /* Blob have no parent blob */ 4993 return allocate_all; 4994 } 4995 4996 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev; 4997 return (allocate_all || b->blob->active.clusters[cluster] != 0); 4998 } 4999 5000 static void 5001 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno) 5002 { 5003 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5004 struct spdk_blob *_blob = ctx->original.blob; 5005 uint64_t offset; 5006 5007 if (bserrno != 0) { 5008 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 5009 return; 5010 } 5011 5012 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) { 5013 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) { 5014 break; 5015 } 5016 } 5017 5018 if (ctx->cluster < _blob->active.num_clusters) { 5019 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster); 5020 5021 /* We may safely increment a cluster before write */ 5022 ctx->cluster++; 5023 5024 /* Use zero length write to touch a cluster */ 5025 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0, 5026 _spdk_bs_inflate_blob_touch_next, ctx); 5027 } else { 5028 _spdk_bs_inflate_blob_done(cb_arg, bserrno); 5029 } 5030 } 5031 5032 static void 5033 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5034 { 5035 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5036 uint64_t lfc; /* lowest free cluster */ 5037 uint64_t i; 5038 5039 if (bserrno != 0) { 5040 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5041 return; 5042 } 5043 5044 ctx->original.blob = _blob; 5045 5046 if (_blob->locked_operation_in_progress) { 5047 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n"); 5048 ctx->bserrno = -EBUSY; 5049 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 5050 return; 5051 } 5052 5053 _blob->locked_operation_in_progress = true; 5054 5055 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) { 5056 /* This blob have no parent, so we cannot decouple it. */ 5057 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n"); 5058 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL); 5059 return; 5060 } 5061 5062 if (spdk_blob_is_thin_provisioned(_blob) == false) { 5063 /* This is not thin provisioned blob. No need to inflate. */ 5064 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0); 5065 return; 5066 } 5067 5068 /* Do two passes - one to verify that we can obtain enough clusters 5069 * and another to actually claim them. 5070 */ 5071 lfc = 0; 5072 for (i = 0; i < _blob->active.num_clusters; i++) { 5073 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) { 5074 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc); 5075 if (lfc == UINT32_MAX) { 5076 /* No more free clusters. Cannot satisfy the request */ 5077 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC); 5078 return; 5079 } 5080 lfc++; 5081 } 5082 } 5083 5084 ctx->cluster = 0; 5085 _spdk_bs_inflate_blob_touch_next(ctx, 0); 5086 } 5087 5088 static void 5089 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5090 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg) 5091 { 5092 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5093 5094 if (!ctx) { 5095 cb_fn(cb_arg, -ENOMEM); 5096 return; 5097 } 5098 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5099 ctx->cpl.u.bs_basic.cb_fn = cb_fn; 5100 ctx->cpl.u.bs_basic.cb_arg = cb_arg; 5101 ctx->bserrno = 0; 5102 ctx->original.id = blobid; 5103 ctx->channel = channel; 5104 ctx->allocate_all = allocate_all; 5105 5106 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx); 5107 } 5108 5109 void 5110 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5111 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5112 { 5113 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg); 5114 } 5115 5116 void 5117 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5118 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5119 { 5120 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg); 5121 } 5122 /* END spdk_bs_inflate_blob */ 5123 5124 /* START spdk_blob_resize */ 5125 struct spdk_bs_resize_ctx { 5126 spdk_blob_op_complete cb_fn; 5127 void *cb_arg; 5128 struct spdk_blob *blob; 5129 uint64_t sz; 5130 int rc; 5131 }; 5132 5133 static void 5134 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc) 5135 { 5136 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5137 5138 if (rc != 0) { 5139 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc); 5140 } 5141 5142 if (ctx->rc != 0) { 5143 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc); 5144 rc = ctx->rc; 5145 } 5146 5147 ctx->blob->locked_operation_in_progress = false; 5148 5149 ctx->cb_fn(ctx->cb_arg, rc); 5150 free(ctx); 5151 } 5152 5153 static void 5154 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc) 5155 { 5156 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5157 5158 if (rc != 0) { 5159 ctx->blob->locked_operation_in_progress = false; 5160 ctx->cb_fn(ctx->cb_arg, rc); 5161 free(ctx); 5162 return; 5163 } 5164 5165 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz); 5166 5167 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx); 5168 } 5169 5170 void 5171 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg) 5172 { 5173 struct spdk_bs_resize_ctx *ctx; 5174 5175 _spdk_blob_verify_md_op(blob); 5176 5177 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 5178 5179 if (blob->md_ro) { 5180 cb_fn(cb_arg, -EPERM); 5181 return; 5182 } 5183 5184 if (sz == blob->active.num_clusters) { 5185 cb_fn(cb_arg, 0); 5186 return; 5187 } 5188 5189 if (blob->locked_operation_in_progress) { 5190 cb_fn(cb_arg, -EBUSY); 5191 return; 5192 } 5193 5194 ctx = calloc(1, sizeof(*ctx)); 5195 if (!ctx) { 5196 cb_fn(cb_arg, -ENOMEM); 5197 return; 5198 } 5199 5200 blob->locked_operation_in_progress = true; 5201 ctx->cb_fn = cb_fn; 5202 ctx->cb_arg = cb_arg; 5203 ctx->blob = blob; 5204 ctx->sz = sz; 5205 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx); 5206 } 5207 5208 /* END spdk_blob_resize */ 5209 5210 5211 /* START spdk_bs_delete_blob */ 5212 5213 static void 5214 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 5215 { 5216 spdk_bs_sequence_t *seq = cb_arg; 5217 5218 spdk_bs_sequence_finish(seq, bserrno); 5219 } 5220 5221 static void 5222 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5223 { 5224 struct spdk_blob *blob = cb_arg; 5225 5226 if (bserrno != 0) { 5227 /* 5228 * We already removed this blob from the blobstore tailq, so 5229 * we need to free it here since this is the last reference 5230 * to it. 5231 */ 5232 _spdk_blob_free(blob); 5233 _spdk_bs_delete_close_cpl(seq, bserrno); 5234 return; 5235 } 5236 5237 /* 5238 * This will immediately decrement the ref_count and call 5239 * the completion routine since the metadata state is clean. 5240 * By calling spdk_blob_close, we reduce the number of call 5241 * points into code that touches the blob->open_ref count 5242 * and the blobstore's blob list. 5243 */ 5244 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq); 5245 } 5246 5247 struct delete_snapshot_ctx { 5248 struct spdk_blob_list *parent_snapshot_entry; 5249 struct spdk_blob *snapshot; 5250 bool snapshot_md_ro; 5251 struct spdk_blob *clone; 5252 bool clone_md_ro; 5253 spdk_blob_op_with_handle_complete cb_fn; 5254 void *cb_arg; 5255 int bserrno; 5256 }; 5257 5258 static void 5259 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno) 5260 { 5261 struct delete_snapshot_ctx *ctx = cb_arg; 5262 5263 if (bserrno != 0) { 5264 SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno); 5265 } 5266 5267 assert(ctx != NULL); 5268 5269 if (bserrno != 0 && ctx->bserrno == 0) { 5270 ctx->bserrno = bserrno; 5271 } 5272 5273 ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno); 5274 free(ctx); 5275 } 5276 5277 static void 5278 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno) 5279 { 5280 struct delete_snapshot_ctx *ctx = cb_arg; 5281 5282 if (bserrno != 0) { 5283 ctx->bserrno = bserrno; 5284 SPDK_ERRLOG("Clone cleanup error %d\n", bserrno); 5285 } 5286 5287 /* open_ref == 1 menas that only deletion context has opened this snapshot 5288 * open_ref == 2 menas that clone has opened this snapshot as well, 5289 * so we have to add it back to the blobs list */ 5290 if (ctx->snapshot->open_ref == 2) { 5291 TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link); 5292 } 5293 5294 ctx->snapshot->locked_operation_in_progress = false; 5295 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5296 5297 spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx); 5298 } 5299 5300 static void 5301 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno) 5302 { 5303 struct delete_snapshot_ctx *ctx = cb_arg; 5304 5305 ctx->clone->locked_operation_in_progress = false; 5306 ctx->clone->md_ro = ctx->clone_md_ro; 5307 5308 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5309 } 5310 5311 static void 5312 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 5313 { 5314 struct delete_snapshot_ctx *ctx = cb_arg; 5315 5316 if (bserrno) { 5317 ctx->bserrno = bserrno; 5318 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5319 return; 5320 } 5321 5322 ctx->clone->locked_operation_in_progress = false; 5323 spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx); 5324 } 5325 5326 static void 5327 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno) 5328 { 5329 struct delete_snapshot_ctx *ctx = cb_arg; 5330 struct spdk_blob_list *parent_snapshot_entry = NULL; 5331 struct spdk_blob_list *snapshot_entry = NULL; 5332 struct spdk_blob_list *clone_entry = NULL; 5333 struct spdk_blob_list *snapshot_clone_entry = NULL; 5334 5335 if (bserrno) { 5336 SPDK_ERRLOG("Failed to sync MD on blob\n"); 5337 ctx->bserrno = bserrno; 5338 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5339 return; 5340 } 5341 5342 /* Get snapshot entry for the snapshot we want to remove */ 5343 snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id); 5344 5345 assert(snapshot_entry != NULL); 5346 5347 /* Remove clone entry in this snapshot (at this point there can be only one clone) */ 5348 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5349 assert(clone_entry != NULL); 5350 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 5351 snapshot_entry->clone_count--; 5352 assert(TAILQ_EMPTY(&snapshot_entry->clones)); 5353 5354 if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) { 5355 /* This snapshot is at the same time a clone of another snapshot - we need to 5356 * update parent snapshot (remove current clone, add new one inherited from 5357 * the snapshot that is being removed) */ 5358 5359 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5360 * snapshot that we are removing */ 5361 _spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry, 5362 &snapshot_clone_entry); 5363 5364 /* Switch clone entry in parent snapshot */ 5365 TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link); 5366 TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link); 5367 free(snapshot_clone_entry); 5368 } else { 5369 /* No parent snapshot - just remove clone entry */ 5370 free(clone_entry); 5371 } 5372 5373 /* Restore md_ro flags */ 5374 ctx->clone->md_ro = ctx->clone_md_ro; 5375 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5376 5377 _spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx); 5378 } 5379 5380 static void 5381 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno) 5382 { 5383 struct delete_snapshot_ctx *ctx = cb_arg; 5384 uint64_t i; 5385 5386 ctx->snapshot->md_ro = false; 5387 5388 if (bserrno) { 5389 SPDK_ERRLOG("Failed to sync MD on clone\n"); 5390 ctx->bserrno = bserrno; 5391 5392 /* Restore snapshot to previous state */ 5393 bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true); 5394 if (bserrno != 0) { 5395 _spdk_delete_snapshot_cleanup_clone(ctx, bserrno); 5396 return; 5397 } 5398 5399 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx); 5400 return; 5401 } 5402 5403 /* Clear cluster map entries for snapshot */ 5404 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5405 if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) { 5406 ctx->snapshot->active.clusters[i] = 0; 5407 } 5408 } 5409 5410 ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY; 5411 5412 if (ctx->parent_snapshot_entry != NULL) { 5413 ctx->snapshot->back_bs_dev = NULL; 5414 } 5415 5416 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx); 5417 } 5418 5419 static void 5420 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno) 5421 { 5422 struct delete_snapshot_ctx *ctx = cb_arg; 5423 uint64_t i; 5424 5425 /* Temporarily override md_ro flag for clone for MD modification */ 5426 ctx->clone_md_ro = ctx->clone->md_ro; 5427 ctx->clone->md_ro = false; 5428 5429 if (bserrno) { 5430 SPDK_ERRLOG("Failed to sync MD with xattr on blob\n"); 5431 ctx->bserrno = bserrno; 5432 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5433 return; 5434 } 5435 5436 /* Copy snapshot map to clone map (only unallocated clusters in clone) */ 5437 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5438 if (ctx->clone->active.clusters[i] == 0) { 5439 ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i]; 5440 } 5441 } 5442 5443 /* Delete old backing bs_dev from clone (related to snapshot that will be removed) */ 5444 ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev); 5445 5446 /* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */ 5447 if (ctx->parent_snapshot_entry != NULL) { 5448 /* ...to parent snapshot */ 5449 ctx->clone->parent_id = ctx->parent_snapshot_entry->id; 5450 ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev; 5451 _spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id, 5452 sizeof(spdk_blob_id), 5453 true); 5454 } else { 5455 /* ...to blobid invalid and zeroes dev */ 5456 ctx->clone->parent_id = SPDK_BLOBID_INVALID; 5457 ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev(); 5458 _spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true); 5459 } 5460 5461 spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx); 5462 } 5463 5464 static void 5465 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno) 5466 { 5467 struct delete_snapshot_ctx *ctx = cb_arg; 5468 5469 if (bserrno) { 5470 SPDK_ERRLOG("Failed to freeze I/O on clone\n"); 5471 ctx->bserrno = bserrno; 5472 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5473 return; 5474 } 5475 5476 /* Temporarily override md_ro flag for snapshot for MD modification */ 5477 ctx->snapshot_md_ro = ctx->snapshot->md_ro; 5478 ctx->snapshot->md_ro = false; 5479 5480 /* Mark blob as pending for removal for power failure safety, use clone id for recovery */ 5481 ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id, 5482 sizeof(spdk_blob_id), true); 5483 if (ctx->bserrno != 0) { 5484 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5485 return; 5486 } 5487 5488 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx); 5489 } 5490 5491 static void 5492 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno) 5493 { 5494 struct delete_snapshot_ctx *ctx = cb_arg; 5495 5496 if (bserrno) { 5497 SPDK_ERRLOG("Failed to open clone\n"); 5498 ctx->bserrno = bserrno; 5499 _spdk_delete_snapshot_cleanup_snapshot(ctx, 0); 5500 return; 5501 } 5502 5503 ctx->clone = clone; 5504 5505 if (clone->locked_operation_in_progress) { 5506 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n"); 5507 ctx->bserrno = -EBUSY; 5508 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5509 return; 5510 } 5511 5512 clone->locked_operation_in_progress = true; 5513 5514 _spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx); 5515 } 5516 5517 static void 5518 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx) 5519 { 5520 struct spdk_blob_list *snapshot_entry = NULL; 5521 struct spdk_blob_list *clone_entry = NULL; 5522 struct spdk_blob_list *snapshot_clone_entry = NULL; 5523 5524 /* Get snapshot entry for the snapshot we want to remove */ 5525 snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id); 5526 5527 assert(snapshot_entry != NULL); 5528 5529 /* Get clone of the snapshot (at this point there can be only one clone) */ 5530 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5531 assert(snapshot_entry->clone_count == 1); 5532 assert(clone_entry != NULL); 5533 5534 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5535 * snapshot that we are removing */ 5536 _spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry, 5537 &snapshot_clone_entry); 5538 5539 spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx); 5540 } 5541 5542 static void 5543 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno) 5544 { 5545 spdk_bs_sequence_t *seq = cb_arg; 5546 struct spdk_blob_list *snapshot_entry = NULL; 5547 uint32_t page_num; 5548 5549 if (bserrno) { 5550 SPDK_ERRLOG("Failed to remove blob\n"); 5551 spdk_bs_sequence_finish(seq, bserrno); 5552 return; 5553 } 5554 5555 /* Remove snapshot from the list */ 5556 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5557 if (snapshot_entry != NULL) { 5558 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link); 5559 free(snapshot_entry); 5560 } 5561 5562 page_num = _spdk_bs_blobid_to_page(blob->id); 5563 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 5564 blob->state = SPDK_BLOB_STATE_DIRTY; 5565 blob->active.num_pages = 0; 5566 _spdk_blob_resize(blob, 0); 5567 5568 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob); 5569 } 5570 5571 static int 5572 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone) 5573 { 5574 struct spdk_blob_list *snapshot_entry = NULL; 5575 struct spdk_blob_list *clone_entry = NULL; 5576 struct spdk_blob *clone = NULL; 5577 bool has_one_clone = false; 5578 5579 /* Check if this is a snapshot with clones */ 5580 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5581 if (snapshot_entry != NULL) { 5582 if (snapshot_entry->clone_count > 1) { 5583 SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n"); 5584 return -EBUSY; 5585 } else if (snapshot_entry->clone_count == 1) { 5586 has_one_clone = true; 5587 } 5588 } 5589 5590 /* Check if someone has this blob open (besides this delete context): 5591 * - open_ref = 1 - only this context opened blob, so it is ok to remove it 5592 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot 5593 * and that is ok, because we will update it accordingly */ 5594 if (blob->open_ref <= 2 && has_one_clone) { 5595 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5596 assert(clone_entry != NULL); 5597 clone = _spdk_blob_lookup(blob->bs, clone_entry->id); 5598 5599 if (blob->open_ref == 2 && clone == NULL) { 5600 /* Clone is closed and someone else opened this blob */ 5601 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5602 return -EBUSY; 5603 } 5604 5605 *update_clone = true; 5606 return 0; 5607 } 5608 5609 if (blob->open_ref > 1) { 5610 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5611 return -EBUSY; 5612 } 5613 5614 assert(has_one_clone == false); 5615 *update_clone = false; 5616 return 0; 5617 } 5618 5619 static void 5620 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno) 5621 { 5622 spdk_bs_sequence_t *seq = cb_arg; 5623 5624 spdk_bs_sequence_finish(seq, -ENOMEM); 5625 } 5626 5627 static void 5628 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 5629 { 5630 spdk_bs_sequence_t *seq = cb_arg; 5631 struct delete_snapshot_ctx *ctx; 5632 bool update_clone = false; 5633 5634 if (bserrno != 0) { 5635 spdk_bs_sequence_finish(seq, bserrno); 5636 return; 5637 } 5638 5639 _spdk_blob_verify_md_op(blob); 5640 5641 ctx = calloc(1, sizeof(*ctx)); 5642 if (ctx == NULL) { 5643 spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq); 5644 return; 5645 } 5646 5647 ctx->snapshot = blob; 5648 ctx->cb_fn = _spdk_bs_delete_blob_finish; 5649 ctx->cb_arg = seq; 5650 5651 /* Check if blob can be removed and if it is a snapshot with clone on top of it */ 5652 ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone); 5653 if (ctx->bserrno) { 5654 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5655 return; 5656 } 5657 5658 if (blob->locked_operation_in_progress) { 5659 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n"); 5660 ctx->bserrno = -EBUSY; 5661 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5662 return; 5663 } 5664 5665 blob->locked_operation_in_progress = true; 5666 5667 /* 5668 * Remove the blob from the blob_store list now, to ensure it does not 5669 * get returned after this point by _spdk_blob_lookup(). 5670 */ 5671 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5672 5673 if (update_clone) { 5674 /* This blob is a snapshot with active clone - update clone first */ 5675 _spdk_update_clone_on_snapshot_deletion(blob, ctx); 5676 } else { 5677 /* This blob does not have any clones - just remove it */ 5678 _spdk_bs_blob_list_remove(blob); 5679 _spdk_bs_delete_blob_finish(seq, blob, 0); 5680 free(ctx); 5681 } 5682 } 5683 5684 void 5685 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5686 spdk_blob_op_complete cb_fn, void *cb_arg) 5687 { 5688 struct spdk_bs_cpl cpl; 5689 spdk_bs_sequence_t *seq; 5690 5691 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 5692 5693 assert(spdk_get_thread() == bs->md_thread); 5694 5695 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5696 cpl.u.blob_basic.cb_fn = cb_fn; 5697 cpl.u.blob_basic.cb_arg = cb_arg; 5698 5699 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5700 if (!seq) { 5701 cb_fn(cb_arg, -ENOMEM); 5702 return; 5703 } 5704 5705 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 5706 } 5707 5708 /* END spdk_bs_delete_blob */ 5709 5710 /* START spdk_bs_open_blob */ 5711 5712 static void 5713 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5714 { 5715 struct spdk_blob *blob = cb_arg; 5716 5717 if (bserrno != 0) { 5718 _spdk_blob_free(blob); 5719 seq->cpl.u.blob_handle.blob = NULL; 5720 spdk_bs_sequence_finish(seq, bserrno); 5721 return; 5722 } 5723 5724 blob->open_ref++; 5725 5726 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 5727 5728 spdk_bs_sequence_finish(seq, bserrno); 5729 } 5730 5731 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5732 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5733 { 5734 struct spdk_blob *blob; 5735 struct spdk_bs_cpl cpl; 5736 struct spdk_blob_open_opts opts_default; 5737 spdk_bs_sequence_t *seq; 5738 uint32_t page_num; 5739 5740 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 5741 assert(spdk_get_thread() == bs->md_thread); 5742 5743 page_num = _spdk_bs_blobid_to_page(blobid); 5744 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 5745 /* Invalid blobid */ 5746 cb_fn(cb_arg, NULL, -ENOENT); 5747 return; 5748 } 5749 5750 blob = _spdk_blob_lookup(bs, blobid); 5751 if (blob) { 5752 blob->open_ref++; 5753 cb_fn(cb_arg, blob, 0); 5754 return; 5755 } 5756 5757 blob = _spdk_blob_alloc(bs, blobid); 5758 if (!blob) { 5759 cb_fn(cb_arg, NULL, -ENOMEM); 5760 return; 5761 } 5762 5763 if (!opts) { 5764 spdk_blob_open_opts_init(&opts_default); 5765 opts = &opts_default; 5766 } 5767 5768 blob->clear_method = opts->clear_method; 5769 5770 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 5771 cpl.u.blob_handle.cb_fn = cb_fn; 5772 cpl.u.blob_handle.cb_arg = cb_arg; 5773 cpl.u.blob_handle.blob = blob; 5774 5775 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5776 if (!seq) { 5777 _spdk_blob_free(blob); 5778 cb_fn(cb_arg, NULL, -ENOMEM); 5779 return; 5780 } 5781 5782 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 5783 } 5784 5785 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5786 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5787 { 5788 _spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg); 5789 } 5790 5791 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid, 5792 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5793 { 5794 _spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg); 5795 } 5796 5797 /* END spdk_bs_open_blob */ 5798 5799 /* START spdk_blob_set_read_only */ 5800 int spdk_blob_set_read_only(struct spdk_blob *blob) 5801 { 5802 _spdk_blob_verify_md_op(blob); 5803 5804 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 5805 5806 blob->state = SPDK_BLOB_STATE_DIRTY; 5807 return 0; 5808 } 5809 /* END spdk_blob_set_read_only */ 5810 5811 /* START spdk_blob_sync_md */ 5812 5813 static void 5814 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5815 { 5816 struct spdk_blob *blob = cb_arg; 5817 5818 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 5819 blob->data_ro = true; 5820 blob->md_ro = true; 5821 } 5822 5823 spdk_bs_sequence_finish(seq, bserrno); 5824 } 5825 5826 static void 5827 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5828 { 5829 struct spdk_bs_cpl cpl; 5830 spdk_bs_sequence_t *seq; 5831 5832 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5833 cpl.u.blob_basic.cb_fn = cb_fn; 5834 cpl.u.blob_basic.cb_arg = cb_arg; 5835 5836 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5837 if (!seq) { 5838 cb_fn(cb_arg, -ENOMEM); 5839 return; 5840 } 5841 5842 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 5843 } 5844 5845 void 5846 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5847 { 5848 _spdk_blob_verify_md_op(blob); 5849 5850 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 5851 5852 if (blob->md_ro) { 5853 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 5854 cb_fn(cb_arg, 0); 5855 return; 5856 } 5857 5858 _spdk_blob_sync_md(blob, cb_fn, cb_arg); 5859 } 5860 5861 /* END spdk_blob_sync_md */ 5862 5863 struct spdk_blob_insert_cluster_ctx { 5864 struct spdk_thread *thread; 5865 struct spdk_blob *blob; 5866 uint32_t cluster_num; /* cluster index in blob */ 5867 uint32_t cluster; /* cluster on disk */ 5868 int rc; 5869 spdk_blob_op_complete cb_fn; 5870 void *cb_arg; 5871 }; 5872 5873 static void 5874 _spdk_blob_insert_cluster_msg_cpl(void *arg) 5875 { 5876 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5877 5878 ctx->cb_fn(ctx->cb_arg, ctx->rc); 5879 free(ctx); 5880 } 5881 5882 static void 5883 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno) 5884 { 5885 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5886 5887 ctx->rc = bserrno; 5888 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5889 } 5890 5891 static void 5892 _spdk_blob_insert_cluster_msg(void *arg) 5893 { 5894 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5895 5896 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster); 5897 if (ctx->rc != 0) { 5898 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5899 return; 5900 } 5901 5902 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 5903 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx); 5904 } 5905 5906 static void 5907 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 5908 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg) 5909 { 5910 struct spdk_blob_insert_cluster_ctx *ctx; 5911 5912 ctx = calloc(1, sizeof(*ctx)); 5913 if (ctx == NULL) { 5914 cb_fn(cb_arg, -ENOMEM); 5915 return; 5916 } 5917 5918 ctx->thread = spdk_get_thread(); 5919 ctx->blob = blob; 5920 ctx->cluster_num = cluster_num; 5921 ctx->cluster = cluster; 5922 ctx->cb_fn = cb_fn; 5923 ctx->cb_arg = cb_arg; 5924 5925 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx); 5926 } 5927 5928 /* START spdk_blob_close */ 5929 5930 static void 5931 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5932 { 5933 struct spdk_blob *blob = cb_arg; 5934 5935 if (bserrno == 0) { 5936 blob->open_ref--; 5937 if (blob->open_ref == 0) { 5938 /* 5939 * Blobs with active.num_pages == 0 are deleted blobs. 5940 * these blobs are removed from the blob_store list 5941 * when the deletion process starts - so don't try to 5942 * remove them again. 5943 */ 5944 if (blob->active.num_pages > 0) { 5945 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5946 } 5947 _spdk_blob_free(blob); 5948 } 5949 } 5950 5951 spdk_bs_sequence_finish(seq, bserrno); 5952 } 5953 5954 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5955 { 5956 struct spdk_bs_cpl cpl; 5957 spdk_bs_sequence_t *seq; 5958 5959 _spdk_blob_verify_md_op(blob); 5960 5961 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 5962 5963 if (blob->open_ref == 0) { 5964 cb_fn(cb_arg, -EBADF); 5965 return; 5966 } 5967 5968 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5969 cpl.u.blob_basic.cb_fn = cb_fn; 5970 cpl.u.blob_basic.cb_arg = cb_arg; 5971 5972 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5973 if (!seq) { 5974 cb_fn(cb_arg, -ENOMEM); 5975 return; 5976 } 5977 5978 /* Sync metadata */ 5979 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 5980 } 5981 5982 /* END spdk_blob_close */ 5983 5984 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 5985 { 5986 return spdk_get_io_channel(bs); 5987 } 5988 5989 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 5990 { 5991 spdk_put_io_channel(channel); 5992 } 5993 5994 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel, 5995 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5996 { 5997 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5998 SPDK_BLOB_UNMAP); 5999 } 6000 6001 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel, 6002 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 6003 { 6004 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 6005 SPDK_BLOB_WRITE_ZEROES); 6006 } 6007 6008 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel, 6009 void *payload, uint64_t offset, uint64_t length, 6010 spdk_blob_op_complete cb_fn, void *cb_arg) 6011 { 6012 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 6013 SPDK_BLOB_WRITE); 6014 } 6015 6016 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel, 6017 void *payload, uint64_t offset, uint64_t length, 6018 spdk_blob_op_complete cb_fn, void *cb_arg) 6019 { 6020 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 6021 SPDK_BLOB_READ); 6022 } 6023 6024 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel, 6025 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6026 spdk_blob_op_complete cb_fn, void *cb_arg) 6027 { 6028 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 6029 } 6030 6031 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel, 6032 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6033 spdk_blob_op_complete cb_fn, void *cb_arg) 6034 { 6035 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 6036 } 6037 6038 struct spdk_bs_iter_ctx { 6039 int64_t page_num; 6040 struct spdk_blob_store *bs; 6041 6042 spdk_blob_op_with_handle_complete cb_fn; 6043 void *cb_arg; 6044 }; 6045 6046 static void 6047 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 6048 { 6049 struct spdk_bs_iter_ctx *ctx = cb_arg; 6050 struct spdk_blob_store *bs = ctx->bs; 6051 spdk_blob_id id; 6052 6053 if (bserrno == 0) { 6054 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 6055 free(ctx); 6056 return; 6057 } 6058 6059 ctx->page_num++; 6060 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 6061 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 6062 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 6063 free(ctx); 6064 return; 6065 } 6066 6067 id = _spdk_bs_page_to_blobid(ctx->page_num); 6068 6069 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 6070 } 6071 6072 void 6073 spdk_bs_iter_first(struct spdk_blob_store *bs, 6074 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6075 { 6076 struct spdk_bs_iter_ctx *ctx; 6077 6078 ctx = calloc(1, sizeof(*ctx)); 6079 if (!ctx) { 6080 cb_fn(cb_arg, NULL, -ENOMEM); 6081 return; 6082 } 6083 6084 ctx->page_num = -1; 6085 ctx->bs = bs; 6086 ctx->cb_fn = cb_fn; 6087 ctx->cb_arg = cb_arg; 6088 6089 _spdk_bs_iter_cpl(ctx, NULL, -1); 6090 } 6091 6092 static void 6093 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 6094 { 6095 struct spdk_bs_iter_ctx *ctx = cb_arg; 6096 6097 _spdk_bs_iter_cpl(ctx, NULL, -1); 6098 } 6099 6100 void 6101 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob, 6102 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6103 { 6104 struct spdk_bs_iter_ctx *ctx; 6105 6106 assert(blob != NULL); 6107 6108 ctx = calloc(1, sizeof(*ctx)); 6109 if (!ctx) { 6110 cb_fn(cb_arg, NULL, -ENOMEM); 6111 return; 6112 } 6113 6114 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 6115 ctx->bs = bs; 6116 ctx->cb_fn = cb_fn; 6117 ctx->cb_arg = cb_arg; 6118 6119 /* Close the existing blob */ 6120 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx); 6121 } 6122 6123 static int 6124 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6125 uint16_t value_len, bool internal) 6126 { 6127 struct spdk_xattr_tailq *xattrs; 6128 struct spdk_xattr *xattr; 6129 size_t desc_size; 6130 6131 _spdk_blob_verify_md_op(blob); 6132 6133 if (blob->md_ro) { 6134 return -EPERM; 6135 } 6136 6137 desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len; 6138 if (desc_size > SPDK_BS_MAX_DESC_SIZE) { 6139 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name, 6140 desc_size, SPDK_BS_MAX_DESC_SIZE); 6141 return -ENOMEM; 6142 } 6143 6144 if (internal) { 6145 xattrs = &blob->xattrs_internal; 6146 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR; 6147 } else { 6148 xattrs = &blob->xattrs; 6149 } 6150 6151 TAILQ_FOREACH(xattr, xattrs, link) { 6152 if (!strcmp(name, xattr->name)) { 6153 free(xattr->value); 6154 xattr->value_len = value_len; 6155 xattr->value = malloc(value_len); 6156 memcpy(xattr->value, value, value_len); 6157 6158 blob->state = SPDK_BLOB_STATE_DIRTY; 6159 6160 return 0; 6161 } 6162 } 6163 6164 xattr = calloc(1, sizeof(*xattr)); 6165 if (!xattr) { 6166 return -ENOMEM; 6167 } 6168 xattr->name = strdup(name); 6169 xattr->value_len = value_len; 6170 xattr->value = malloc(value_len); 6171 memcpy(xattr->value, value, value_len); 6172 TAILQ_INSERT_TAIL(xattrs, xattr, link); 6173 6174 blob->state = SPDK_BLOB_STATE_DIRTY; 6175 6176 return 0; 6177 } 6178 6179 int 6180 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6181 uint16_t value_len) 6182 { 6183 return _spdk_blob_set_xattr(blob, name, value, value_len, false); 6184 } 6185 6186 static int 6187 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal) 6188 { 6189 struct spdk_xattr_tailq *xattrs; 6190 struct spdk_xattr *xattr; 6191 6192 _spdk_blob_verify_md_op(blob); 6193 6194 if (blob->md_ro) { 6195 return -EPERM; 6196 } 6197 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6198 6199 TAILQ_FOREACH(xattr, xattrs, link) { 6200 if (!strcmp(name, xattr->name)) { 6201 TAILQ_REMOVE(xattrs, xattr, link); 6202 free(xattr->value); 6203 free(xattr->name); 6204 free(xattr); 6205 6206 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) { 6207 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR; 6208 } 6209 blob->state = SPDK_BLOB_STATE_DIRTY; 6210 6211 return 0; 6212 } 6213 } 6214 6215 return -ENOENT; 6216 } 6217 6218 int 6219 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name) 6220 { 6221 return _spdk_blob_remove_xattr(blob, name, false); 6222 } 6223 6224 static int 6225 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6226 const void **value, size_t *value_len, bool internal) 6227 { 6228 struct spdk_xattr *xattr; 6229 struct spdk_xattr_tailq *xattrs; 6230 6231 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6232 6233 TAILQ_FOREACH(xattr, xattrs, link) { 6234 if (!strcmp(name, xattr->name)) { 6235 *value = xattr->value; 6236 *value_len = xattr->value_len; 6237 return 0; 6238 } 6239 } 6240 return -ENOENT; 6241 } 6242 6243 int 6244 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6245 const void **value, size_t *value_len) 6246 { 6247 _spdk_blob_verify_md_op(blob); 6248 6249 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false); 6250 } 6251 6252 struct spdk_xattr_names { 6253 uint32_t count; 6254 const char *names[0]; 6255 }; 6256 6257 static int 6258 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names) 6259 { 6260 struct spdk_xattr *xattr; 6261 int count = 0; 6262 6263 TAILQ_FOREACH(xattr, xattrs, link) { 6264 count++; 6265 } 6266 6267 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 6268 if (*names == NULL) { 6269 return -ENOMEM; 6270 } 6271 6272 TAILQ_FOREACH(xattr, xattrs, link) { 6273 (*names)->names[(*names)->count++] = xattr->name; 6274 } 6275 6276 return 0; 6277 } 6278 6279 int 6280 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names) 6281 { 6282 _spdk_blob_verify_md_op(blob); 6283 6284 return _spdk_blob_get_xattr_names(&blob->xattrs, names); 6285 } 6286 6287 uint32_t 6288 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 6289 { 6290 assert(names != NULL); 6291 6292 return names->count; 6293 } 6294 6295 const char * 6296 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 6297 { 6298 if (index >= names->count) { 6299 return NULL; 6300 } 6301 6302 return names->names[index]; 6303 } 6304 6305 void 6306 spdk_xattr_names_free(struct spdk_xattr_names *names) 6307 { 6308 free(names); 6309 } 6310 6311 struct spdk_bs_type 6312 spdk_bs_get_bstype(struct spdk_blob_store *bs) 6313 { 6314 return bs->bstype; 6315 } 6316 6317 void 6318 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 6319 { 6320 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 6321 } 6322 6323 bool 6324 spdk_blob_is_read_only(struct spdk_blob *blob) 6325 { 6326 assert(blob != NULL); 6327 return (blob->data_ro || blob->md_ro); 6328 } 6329 6330 bool 6331 spdk_blob_is_snapshot(struct spdk_blob *blob) 6332 { 6333 struct spdk_blob_list *snapshot_entry; 6334 6335 assert(blob != NULL); 6336 6337 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 6338 if (snapshot_entry == NULL) { 6339 return false; 6340 } 6341 6342 return true; 6343 } 6344 6345 bool 6346 spdk_blob_is_clone(struct spdk_blob *blob) 6347 { 6348 assert(blob != NULL); 6349 6350 if (blob->parent_id != SPDK_BLOBID_INVALID) { 6351 assert(spdk_blob_is_thin_provisioned(blob)); 6352 return true; 6353 } 6354 6355 return false; 6356 } 6357 6358 bool 6359 spdk_blob_is_thin_provisioned(struct spdk_blob *blob) 6360 { 6361 assert(blob != NULL); 6362 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV); 6363 } 6364 6365 static void 6366 _spdk_blob_update_clear_method(struct spdk_blob *blob) 6367 { 6368 enum blob_clear_method stored_cm; 6369 6370 assert(blob != NULL); 6371 6372 /* If BLOB_CLEAR_WITH_DEFAULT was passed in, use the setting stored 6373 * in metadata previously. If something other than the default was 6374 * specified, ignore stored value and used what was passed in. 6375 */ 6376 stored_cm = ((blob->md_ro_flags & SPDK_BLOB_CLEAR_METHOD) >> SPDK_BLOB_CLEAR_METHOD_SHIFT); 6377 6378 if (blob->clear_method == BLOB_CLEAR_WITH_DEFAULT) { 6379 blob->clear_method = stored_cm; 6380 } else if (blob->clear_method != stored_cm) { 6381 SPDK_WARNLOG("Using passed in clear method 0x%x instead of stored value of 0x%x\n", 6382 blob->clear_method, stored_cm); 6383 } 6384 } 6385 6386 spdk_blob_id 6387 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id) 6388 { 6389 struct spdk_blob_list *snapshot_entry = NULL; 6390 struct spdk_blob_list *clone_entry = NULL; 6391 6392 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 6393 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6394 if (clone_entry->id == blob_id) { 6395 return snapshot_entry->id; 6396 } 6397 } 6398 } 6399 6400 return SPDK_BLOBID_INVALID; 6401 } 6402 6403 int 6404 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids, 6405 size_t *count) 6406 { 6407 struct spdk_blob_list *snapshot_entry, *clone_entry; 6408 size_t n; 6409 6410 snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid); 6411 if (snapshot_entry == NULL) { 6412 *count = 0; 6413 return 0; 6414 } 6415 6416 if (ids == NULL || *count < snapshot_entry->clone_count) { 6417 *count = snapshot_entry->clone_count; 6418 return -ENOMEM; 6419 } 6420 *count = snapshot_entry->clone_count; 6421 6422 n = 0; 6423 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6424 ids[n++] = clone_entry->id; 6425 } 6426 6427 return 0; 6428 } 6429 6430 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 6431