1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 #include "spdk/util.h" 44 #include "spdk/string.h" 45 46 #include "spdk_internal/assert.h" 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 51 #define BLOB_CRC32C_INITIAL 0xffffffffUL 52 53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 57 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg); 58 59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 60 uint16_t value_len, bool internal); 61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 62 const void **value, size_t *value_len, bool internal); 63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal); 64 65 static void 66 _spdk_blob_verify_md_op(struct spdk_blob *blob) 67 { 68 assert(blob != NULL); 69 assert(spdk_get_thread() == blob->bs->md_thread); 70 assert(blob->state != SPDK_BLOB_STATE_LOADING); 71 } 72 73 static struct spdk_blob_list * 74 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) 75 { 76 struct spdk_blob_list *snapshot_entry = NULL; 77 78 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 79 if (snapshot_entry->id == blobid) { 80 break; 81 } 82 } 83 84 return snapshot_entry; 85 } 86 87 static void 88 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 89 { 90 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 91 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 92 assert(bs->num_free_clusters > 0); 93 94 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 95 96 spdk_bit_array_set(bs->used_clusters, cluster_num); 97 bs->num_free_clusters--; 98 } 99 100 static int 101 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster) 102 { 103 uint64_t *cluster_lba = &blob->active.clusters[cluster_num]; 104 105 _spdk_blob_verify_md_op(blob); 106 107 if (*cluster_lba != 0) { 108 return -EEXIST; 109 } 110 111 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster); 112 return 0; 113 } 114 115 static int 116 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, 117 uint64_t *lowest_free_cluster, bool update_map) 118 { 119 pthread_mutex_lock(&blob->bs->used_clusters_mutex); 120 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters, 121 *lowest_free_cluster); 122 if (*lowest_free_cluster == UINT32_MAX) { 123 /* No more free clusters. Cannot satisfy the request */ 124 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 125 return -ENOSPC; 126 } 127 128 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id); 129 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster); 130 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 131 132 if (update_map) { 133 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster); 134 } 135 136 return 0; 137 } 138 139 static void 140 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 141 { 142 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 143 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 144 assert(bs->num_free_clusters < bs->total_clusters); 145 146 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 147 148 pthread_mutex_lock(&bs->used_clusters_mutex); 149 spdk_bit_array_clear(bs->used_clusters, cluster_num); 150 bs->num_free_clusters++; 151 pthread_mutex_unlock(&bs->used_clusters_mutex); 152 } 153 154 static void 155 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs) 156 { 157 xattrs->count = 0; 158 xattrs->names = NULL; 159 xattrs->ctx = NULL; 160 xattrs->get_value = NULL; 161 } 162 163 void 164 spdk_blob_opts_init(struct spdk_blob_opts *opts) 165 { 166 opts->num_clusters = 0; 167 opts->thin_provision = false; 168 _spdk_blob_xattrs_init(&opts->xattrs); 169 } 170 171 void 172 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts) 173 { 174 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 175 } 176 177 static struct spdk_blob * 178 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 179 { 180 struct spdk_blob *blob; 181 182 blob = calloc(1, sizeof(*blob)); 183 if (!blob) { 184 return NULL; 185 } 186 187 blob->id = id; 188 blob->bs = bs; 189 190 blob->parent_id = SPDK_BLOBID_INVALID; 191 192 blob->state = SPDK_BLOB_STATE_DIRTY; 193 blob->active.num_pages = 1; 194 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 195 if (!blob->active.pages) { 196 free(blob); 197 return NULL; 198 } 199 200 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 201 202 TAILQ_INIT(&blob->xattrs); 203 TAILQ_INIT(&blob->xattrs_internal); 204 205 return blob; 206 } 207 208 static void 209 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs) 210 { 211 struct spdk_xattr *xattr, *xattr_tmp; 212 213 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) { 214 TAILQ_REMOVE(xattrs, xattr, link); 215 free(xattr->name); 216 free(xattr->value); 217 free(xattr); 218 } 219 } 220 221 static void 222 _spdk_blob_free(struct spdk_blob *blob) 223 { 224 assert(blob != NULL); 225 226 free(blob->active.clusters); 227 free(blob->clean.clusters); 228 free(blob->active.pages); 229 free(blob->clean.pages); 230 231 _spdk_xattrs_free(&blob->xattrs); 232 _spdk_xattrs_free(&blob->xattrs_internal); 233 234 if (blob->back_bs_dev) { 235 blob->back_bs_dev->destroy(blob->back_bs_dev); 236 } 237 238 free(blob); 239 } 240 241 struct freeze_io_ctx { 242 struct spdk_bs_cpl cpl; 243 struct spdk_blob *blob; 244 }; 245 246 static void 247 _spdk_blob_io_sync(struct spdk_io_channel_iter *i) 248 { 249 spdk_for_each_channel_continue(i, 0); 250 } 251 252 static void 253 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i) 254 { 255 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 256 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); 257 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 258 struct spdk_bs_request_set *set; 259 struct spdk_bs_user_op_args *args; 260 spdk_bs_user_op_t *op, *tmp; 261 262 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { 263 set = (struct spdk_bs_request_set *)op; 264 args = &set->u.user_op; 265 266 if (args->blob == ctx->blob) { 267 TAILQ_REMOVE(&ch->queued_io, op, link); 268 spdk_bs_user_op_execute(op); 269 } 270 } 271 272 spdk_for_each_channel_continue(i, 0); 273 } 274 275 static void 276 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status) 277 { 278 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 279 280 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); 281 282 free(ctx); 283 } 284 285 static void 286 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 287 { 288 struct freeze_io_ctx *ctx; 289 290 ctx = calloc(1, sizeof(*ctx)); 291 if (!ctx) { 292 cb_fn(cb_arg, -ENOMEM); 293 return; 294 } 295 296 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 297 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 298 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 299 ctx->blob = blob; 300 301 /* Freeze I/O on blob */ 302 blob->frozen_refcnt++; 303 304 if (blob->frozen_refcnt == 1) { 305 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl); 306 } else { 307 cb_fn(cb_arg, 0); 308 free(ctx); 309 } 310 } 311 312 static void 313 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 314 { 315 struct freeze_io_ctx *ctx; 316 317 ctx = calloc(1, sizeof(*ctx)); 318 if (!ctx) { 319 cb_fn(cb_arg, -ENOMEM); 320 return; 321 } 322 323 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 324 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 325 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 326 ctx->blob = blob; 327 328 assert(blob->frozen_refcnt > 0); 329 330 blob->frozen_refcnt--; 331 332 if (blob->frozen_refcnt == 0) { 333 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl); 334 } else { 335 cb_fn(cb_arg, 0); 336 free(ctx); 337 } 338 } 339 340 static int 341 _spdk_blob_mark_clean(struct spdk_blob *blob) 342 { 343 uint64_t *clusters = NULL; 344 uint32_t *pages = NULL; 345 346 assert(blob != NULL); 347 348 if (blob->active.num_clusters) { 349 assert(blob->active.clusters); 350 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 351 if (!clusters) { 352 return -ENOMEM; 353 } 354 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 355 } 356 357 if (blob->active.num_pages) { 358 assert(blob->active.pages); 359 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 360 if (!pages) { 361 free(clusters); 362 return -ENOMEM; 363 } 364 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 365 } 366 367 free(blob->clean.clusters); 368 free(blob->clean.pages); 369 370 blob->clean.num_clusters = blob->active.num_clusters; 371 blob->clean.clusters = blob->active.clusters; 372 blob->clean.num_pages = blob->active.num_pages; 373 blob->clean.pages = blob->active.pages; 374 375 blob->active.clusters = clusters; 376 blob->active.pages = pages; 377 378 /* If the metadata was dirtied again while the metadata was being written to disk, 379 * we do not want to revert the DIRTY state back to CLEAN here. 380 */ 381 if (blob->state == SPDK_BLOB_STATE_LOADING) { 382 blob->state = SPDK_BLOB_STATE_CLEAN; 383 } 384 385 return 0; 386 } 387 388 static int 389 _spdk_blob_deserialize_xattr(struct spdk_blob *blob, 390 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal) 391 { 392 struct spdk_xattr *xattr; 393 394 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 395 sizeof(desc_xattr->value_length) + 396 desc_xattr->name_length + desc_xattr->value_length) { 397 return -EINVAL; 398 } 399 400 xattr = calloc(1, sizeof(*xattr)); 401 if (xattr == NULL) { 402 return -ENOMEM; 403 } 404 405 xattr->name = malloc(desc_xattr->name_length + 1); 406 if (xattr->name == NULL) { 407 free(xattr); 408 return -ENOMEM; 409 } 410 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 411 xattr->name[desc_xattr->name_length] = '\0'; 412 413 xattr->value = malloc(desc_xattr->value_length); 414 if (xattr->value == NULL) { 415 free(xattr->name); 416 free(xattr); 417 return -ENOMEM; 418 } 419 xattr->value_len = desc_xattr->value_length; 420 memcpy(xattr->value, 421 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 422 desc_xattr->value_length); 423 424 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link); 425 426 return 0; 427 } 428 429 430 static int 431 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 432 { 433 struct spdk_blob_md_descriptor *desc; 434 size_t cur_desc = 0; 435 void *tmp; 436 437 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 438 while (cur_desc < sizeof(page->descriptors)) { 439 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 440 if (desc->length == 0) { 441 /* If padding and length are 0, this terminates the page */ 442 break; 443 } 444 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 445 struct spdk_blob_md_descriptor_flags *desc_flags; 446 447 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 448 449 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 450 return -EINVAL; 451 } 452 453 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 454 SPDK_BLOB_INVALID_FLAGS_MASK) { 455 return -EINVAL; 456 } 457 458 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 459 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 460 blob->data_ro = true; 461 blob->md_ro = true; 462 } 463 464 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 465 SPDK_BLOB_MD_RO_FLAGS_MASK) { 466 blob->md_ro = true; 467 } 468 469 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 470 blob->data_ro = true; 471 blob->md_ro = true; 472 } 473 474 blob->invalid_flags = desc_flags->invalid_flags; 475 blob->data_ro_flags = desc_flags->data_ro_flags; 476 blob->md_ro_flags = desc_flags->md_ro_flags; 477 478 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 479 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 480 unsigned int i, j; 481 unsigned int cluster_count = blob->active.num_clusters; 482 483 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 484 485 if (desc_extent_rle->length == 0 || 486 (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) { 487 return -EINVAL; 488 } 489 490 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 491 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 492 if (desc_extent_rle->extents[i].cluster_idx != 0) { 493 if (!spdk_bit_array_get(blob->bs->used_clusters, 494 desc_extent_rle->extents[i].cluster_idx + j)) { 495 return -EINVAL; 496 } 497 } 498 cluster_count++; 499 } 500 } 501 502 if (cluster_count == 0) { 503 return -EINVAL; 504 } 505 tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters)); 506 if (tmp == NULL) { 507 return -ENOMEM; 508 } 509 blob->active.clusters = tmp; 510 blob->active.cluster_array_size = cluster_count; 511 512 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 513 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 514 if (desc_extent_rle->extents[i].cluster_idx != 0) { 515 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 516 desc_extent_rle->extents[i].cluster_idx + j); 517 } else if (spdk_blob_is_thin_provisioned(blob)) { 518 blob->active.clusters[blob->active.num_clusters++] = 0; 519 } else { 520 return -EINVAL; 521 } 522 } 523 } 524 525 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 526 int rc; 527 528 rc = _spdk_blob_deserialize_xattr(blob, 529 (struct spdk_blob_md_descriptor_xattr *) desc, false); 530 if (rc != 0) { 531 return rc; 532 } 533 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 534 int rc; 535 536 rc = _spdk_blob_deserialize_xattr(blob, 537 (struct spdk_blob_md_descriptor_xattr *) desc, true); 538 if (rc != 0) { 539 return rc; 540 } 541 } else { 542 /* Unrecognized descriptor type. Do not fail - just continue to the 543 * next descriptor. If this descriptor is associated with some feature 544 * defined in a newer version of blobstore, that version of blobstore 545 * should create and set an associated feature flag to specify if this 546 * blob can be loaded or not. 547 */ 548 } 549 550 /* Advance to the next descriptor */ 551 cur_desc += sizeof(*desc) + desc->length; 552 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 553 break; 554 } 555 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 556 } 557 558 return 0; 559 } 560 561 static int 562 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 563 struct spdk_blob *blob) 564 { 565 const struct spdk_blob_md_page *page; 566 uint32_t i; 567 int rc; 568 569 assert(page_count > 0); 570 assert(pages[0].sequence_num == 0); 571 assert(blob != NULL); 572 assert(blob->state == SPDK_BLOB_STATE_LOADING); 573 assert(blob->active.clusters == NULL); 574 575 /* The blobid provided doesn't match what's in the MD, this can 576 * happen for example if a bogus blobid is passed in through open. 577 */ 578 if (blob->id != pages[0].id) { 579 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 580 blob->id, pages[0].id); 581 return -ENOENT; 582 } 583 584 for (i = 0; i < page_count; i++) { 585 page = &pages[i]; 586 587 assert(page->id == blob->id); 588 assert(page->sequence_num == i); 589 590 rc = _spdk_blob_parse_page(page, blob); 591 if (rc != 0) { 592 return rc; 593 } 594 } 595 596 return 0; 597 } 598 599 static int 600 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 601 struct spdk_blob_md_page **pages, 602 uint32_t *page_count, 603 struct spdk_blob_md_page **last_page) 604 { 605 struct spdk_blob_md_page *page; 606 607 assert(pages != NULL); 608 assert(page_count != NULL); 609 610 if (*page_count == 0) { 611 assert(*pages == NULL); 612 *page_count = 1; 613 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 614 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 615 } else { 616 assert(*pages != NULL); 617 (*page_count)++; 618 *pages = spdk_realloc(*pages, 619 SPDK_BS_PAGE_SIZE * (*page_count), 620 SPDK_BS_PAGE_SIZE); 621 } 622 623 if (*pages == NULL) { 624 *page_count = 0; 625 *last_page = NULL; 626 return -ENOMEM; 627 } 628 629 page = &(*pages)[*page_count - 1]; 630 memset(page, 0, sizeof(*page)); 631 page->id = blob->id; 632 page->sequence_num = *page_count - 1; 633 page->next = SPDK_INVALID_MD_PAGE; 634 *last_page = page; 635 636 return 0; 637 } 638 639 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 640 * Update required_sz on both success and failure. 641 * 642 */ 643 static int 644 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 645 uint8_t *buf, size_t buf_sz, 646 size_t *required_sz, bool internal) 647 { 648 struct spdk_blob_md_descriptor_xattr *desc; 649 650 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 651 strlen(xattr->name) + 652 xattr->value_len; 653 654 if (buf_sz < *required_sz) { 655 return -1; 656 } 657 658 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 659 660 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR; 661 desc->length = sizeof(desc->name_length) + 662 sizeof(desc->value_length) + 663 strlen(xattr->name) + 664 xattr->value_len; 665 desc->name_length = strlen(xattr->name); 666 desc->value_length = xattr->value_len; 667 668 memcpy(desc->name, xattr->name, desc->name_length); 669 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 670 xattr->value, 671 desc->value_length); 672 673 return 0; 674 } 675 676 static void 677 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob, 678 uint64_t start_cluster, uint64_t *next_cluster, 679 uint8_t **buf, size_t *buf_sz) 680 { 681 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 682 size_t cur_sz; 683 uint64_t i, extent_idx; 684 uint64_t lba, lba_per_cluster, lba_count; 685 686 /* The buffer must have room for at least one extent */ 687 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]); 688 if (*buf_sz < cur_sz) { 689 *next_cluster = start_cluster; 690 return; 691 } 692 693 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf; 694 desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE; 695 696 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 697 698 lba = blob->active.clusters[start_cluster]; 699 lba_count = lba_per_cluster; 700 extent_idx = 0; 701 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 702 if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) { 703 /* Run-length encode sequential non-zero LBA */ 704 lba_count += lba_per_cluster; 705 continue; 706 } else if (lba == 0 && blob->active.clusters[i] == 0) { 707 /* Run-length encode unallocated clusters */ 708 lba_count += lba_per_cluster; 709 continue; 710 } 711 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 712 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 713 extent_idx++; 714 715 cur_sz += sizeof(desc_extent_rle->extents[extent_idx]); 716 717 if (*buf_sz < cur_sz) { 718 /* If we ran out of buffer space, return */ 719 *next_cluster = i; 720 goto finish; 721 } 722 723 lba = blob->active.clusters[i]; 724 lba_count = lba_per_cluster; 725 } 726 727 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 728 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 729 extent_idx++; 730 731 *next_cluster = blob->active.num_clusters; 732 733 finish: 734 desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx; 735 *buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 736 *buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 737 738 return; 739 } 740 741 static int 742 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob, 743 struct spdk_blob_md_page **pages, 744 struct spdk_blob_md_page *cur_page, 745 uint32_t *page_count, uint8_t **buf, 746 size_t *remaining_sz) 747 { 748 uint64_t last_cluster; 749 int rc; 750 751 last_cluster = 0; 752 while (last_cluster < blob->active.num_clusters) { 753 _spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz); 754 755 if (last_cluster == blob->active.num_clusters) { 756 break; 757 } 758 759 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 760 if (rc < 0) { 761 return rc; 762 } 763 764 *buf = (uint8_t *)cur_page->descriptors; 765 *remaining_sz = sizeof(cur_page->descriptors); 766 } 767 768 return 0; 769 } 770 771 static void 772 _spdk_blob_serialize_flags(const struct spdk_blob *blob, 773 uint8_t *buf, size_t *buf_sz) 774 { 775 struct spdk_blob_md_descriptor_flags *desc; 776 777 /* 778 * Flags get serialized first, so we should always have room for the flags 779 * descriptor. 780 */ 781 assert(*buf_sz >= sizeof(*desc)); 782 783 desc = (struct spdk_blob_md_descriptor_flags *)buf; 784 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 785 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 786 desc->invalid_flags = blob->invalid_flags; 787 desc->data_ro_flags = blob->data_ro_flags; 788 desc->md_ro_flags = blob->md_ro_flags; 789 790 *buf_sz -= sizeof(*desc); 791 } 792 793 static int 794 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob, 795 const struct spdk_xattr_tailq *xattrs, bool internal, 796 struct spdk_blob_md_page **pages, 797 struct spdk_blob_md_page *cur_page, 798 uint32_t *page_count, uint8_t **buf, 799 size_t *remaining_sz) 800 { 801 const struct spdk_xattr *xattr; 802 int rc; 803 804 TAILQ_FOREACH(xattr, xattrs, link) { 805 size_t required_sz = 0; 806 807 rc = _spdk_blob_serialize_xattr(xattr, 808 *buf, *remaining_sz, 809 &required_sz, internal); 810 if (rc < 0) { 811 /* Need to add a new page to the chain */ 812 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 813 &cur_page); 814 if (rc < 0) { 815 spdk_free(*pages); 816 *pages = NULL; 817 *page_count = 0; 818 return rc; 819 } 820 821 *buf = (uint8_t *)cur_page->descriptors; 822 *remaining_sz = sizeof(cur_page->descriptors); 823 824 /* Try again */ 825 required_sz = 0; 826 rc = _spdk_blob_serialize_xattr(xattr, 827 *buf, *remaining_sz, 828 &required_sz, internal); 829 830 if (rc < 0) { 831 spdk_free(*pages); 832 *pages = NULL; 833 *page_count = 0; 834 return rc; 835 } 836 } 837 838 *remaining_sz -= required_sz; 839 *buf += required_sz; 840 } 841 842 return 0; 843 } 844 845 static int 846 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 847 uint32_t *page_count) 848 { 849 struct spdk_blob_md_page *cur_page; 850 int rc; 851 uint8_t *buf; 852 size_t remaining_sz; 853 854 assert(pages != NULL); 855 assert(page_count != NULL); 856 assert(blob != NULL); 857 assert(blob->state == SPDK_BLOB_STATE_DIRTY); 858 859 *pages = NULL; 860 *page_count = 0; 861 862 /* A blob always has at least 1 page, even if it has no descriptors */ 863 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 864 if (rc < 0) { 865 return rc; 866 } 867 868 buf = (uint8_t *)cur_page->descriptors; 869 remaining_sz = sizeof(cur_page->descriptors); 870 871 /* Serialize flags */ 872 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 873 buf += sizeof(struct spdk_blob_md_descriptor_flags); 874 875 /* Serialize xattrs */ 876 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false, 877 pages, cur_page, page_count, &buf, &remaining_sz); 878 if (rc < 0) { 879 return rc; 880 } 881 882 /* Serialize internal xattrs */ 883 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true, 884 pages, cur_page, page_count, &buf, &remaining_sz); 885 if (rc < 0) { 886 return rc; 887 } 888 889 /* Serialize extents */ 890 rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz); 891 892 return rc; 893 } 894 895 struct spdk_blob_load_ctx { 896 struct spdk_blob *blob; 897 898 struct spdk_blob_md_page *pages; 899 uint32_t num_pages; 900 spdk_bs_sequence_t *seq; 901 902 spdk_bs_sequence_cpl cb_fn; 903 void *cb_arg; 904 }; 905 906 static uint32_t 907 _spdk_blob_md_page_calc_crc(void *page) 908 { 909 uint32_t crc; 910 911 crc = BLOB_CRC32C_INITIAL; 912 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 913 crc ^= BLOB_CRC32C_INITIAL; 914 915 return crc; 916 917 } 918 919 static void 920 _spdk_blob_load_final(void *cb_arg, int bserrno) 921 { 922 struct spdk_blob_load_ctx *ctx = cb_arg; 923 struct spdk_blob *blob = ctx->blob; 924 925 if (bserrno == 0) { 926 _spdk_blob_mark_clean(blob); 927 } 928 929 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno); 930 931 /* Free the memory */ 932 spdk_free(ctx->pages); 933 free(ctx); 934 } 935 936 static void 937 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno) 938 { 939 struct spdk_blob_load_ctx *ctx = cb_arg; 940 struct spdk_blob *blob = ctx->blob; 941 942 if (bserrno == 0) { 943 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot); 944 if (blob->back_bs_dev == NULL) { 945 bserrno = -ENOMEM; 946 } 947 } 948 if (bserrno != 0) { 949 SPDK_ERRLOG("Snapshot fail\n"); 950 } 951 952 _spdk_blob_load_final(ctx, bserrno); 953 } 954 955 static void 956 _spdk_blob_load_backing_dev(void *cb_arg) 957 { 958 struct spdk_blob_load_ctx *ctx = cb_arg; 959 struct spdk_blob *blob = ctx->blob; 960 const void *value; 961 size_t len; 962 int rc; 963 964 if (spdk_blob_is_thin_provisioned(blob)) { 965 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true); 966 if (rc == 0) { 967 if (len != sizeof(spdk_blob_id)) { 968 _spdk_blob_load_final(ctx, -EINVAL); 969 return; 970 } 971 /* open snapshot blob and continue in the callback function */ 972 blob->parent_id = *(spdk_blob_id *)value; 973 spdk_bs_open_blob(blob->bs, blob->parent_id, 974 _spdk_blob_load_snapshot_cpl, ctx); 975 return; 976 } else { 977 /* add zeroes_dev for thin provisioned blob */ 978 blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 979 } 980 } else { 981 /* standard blob */ 982 blob->back_bs_dev = NULL; 983 } 984 _spdk_blob_load_final(ctx, 0); 985 } 986 987 static void 988 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 989 { 990 struct spdk_blob_load_ctx *ctx = cb_arg; 991 struct spdk_blob *blob = ctx->blob; 992 struct spdk_blob_md_page *page; 993 int rc; 994 uint32_t crc; 995 996 if (bserrno) { 997 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno); 998 _spdk_blob_load_final(ctx, bserrno); 999 return; 1000 } 1001 1002 page = &ctx->pages[ctx->num_pages - 1]; 1003 crc = _spdk_blob_md_page_calc_crc(page); 1004 if (crc != page->crc) { 1005 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 1006 _spdk_blob_load_final(ctx, -EINVAL); 1007 return; 1008 } 1009 1010 if (page->next != SPDK_INVALID_MD_PAGE) { 1011 uint32_t next_page = page->next; 1012 uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page); 1013 1014 /* Read the next page */ 1015 ctx->num_pages++; 1016 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 1017 sizeof(*page)); 1018 if (ctx->pages == NULL) { 1019 _spdk_blob_load_final(ctx, -ENOMEM); 1020 return; 1021 } 1022 1023 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1], 1024 next_lba, 1025 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 1026 _spdk_blob_load_cpl, ctx); 1027 return; 1028 } 1029 1030 /* Parse the pages */ 1031 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 1032 if (rc) { 1033 _spdk_blob_load_final(ctx, rc); 1034 return; 1035 } 1036 1037 _spdk_blob_load_backing_dev(ctx); 1038 } 1039 1040 /* Load a blob from disk given a blobid */ 1041 static void 1042 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1043 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1044 { 1045 struct spdk_blob_load_ctx *ctx; 1046 struct spdk_blob_store *bs; 1047 uint32_t page_num; 1048 uint64_t lba; 1049 1050 _spdk_blob_verify_md_op(blob); 1051 1052 bs = blob->bs; 1053 1054 ctx = calloc(1, sizeof(*ctx)); 1055 if (!ctx) { 1056 cb_fn(seq, cb_arg, -ENOMEM); 1057 return; 1058 } 1059 1060 ctx->blob = blob; 1061 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE); 1062 if (!ctx->pages) { 1063 free(ctx); 1064 cb_fn(seq, cb_arg, -ENOMEM); 1065 return; 1066 } 1067 ctx->num_pages = 1; 1068 ctx->cb_fn = cb_fn; 1069 ctx->cb_arg = cb_arg; 1070 ctx->seq = seq; 1071 1072 page_num = _spdk_bs_blobid_to_page(blob->id); 1073 lba = _spdk_bs_md_page_to_lba(blob->bs, page_num); 1074 1075 blob->state = SPDK_BLOB_STATE_LOADING; 1076 1077 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1078 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 1079 _spdk_blob_load_cpl, ctx); 1080 } 1081 1082 struct spdk_blob_persist_ctx { 1083 struct spdk_blob *blob; 1084 1085 struct spdk_bs_super_block *super; 1086 1087 struct spdk_blob_md_page *pages; 1088 1089 spdk_bs_sequence_t *seq; 1090 spdk_bs_sequence_cpl cb_fn; 1091 void *cb_arg; 1092 }; 1093 1094 static void 1095 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba, 1096 uint32_t lba_count) 1097 { 1098 switch (ctx->blob->clear_method) { 1099 case BLOB_CLEAR_WITH_DEFAULT: 1100 case BLOB_CLEAR_WITH_UNMAP: 1101 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1102 break; 1103 case BLOB_CLEAR_WITH_WRITE_ZEROES: 1104 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1105 break; 1106 case BLOB_CLEAR_WITH_NONE: 1107 default: 1108 break; 1109 } 1110 } 1111 1112 static void 1113 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1114 { 1115 struct spdk_blob_persist_ctx *ctx = cb_arg; 1116 struct spdk_blob *blob = ctx->blob; 1117 1118 if (bserrno == 0) { 1119 _spdk_blob_mark_clean(blob); 1120 } 1121 1122 /* Call user callback */ 1123 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 1124 1125 /* Free the memory */ 1126 spdk_free(ctx->pages); 1127 free(ctx); 1128 } 1129 1130 static void 1131 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1132 { 1133 struct spdk_blob_persist_ctx *ctx = cb_arg; 1134 struct spdk_blob *blob = ctx->blob; 1135 struct spdk_blob_store *bs = blob->bs; 1136 size_t i; 1137 1138 /* Release all clusters that were truncated */ 1139 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1140 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 1141 1142 /* Nothing to release if it was not allocated */ 1143 if (blob->active.clusters[i] != 0) { 1144 _spdk_bs_release_cluster(bs, cluster_num); 1145 } 1146 } 1147 1148 if (blob->active.num_clusters == 0) { 1149 free(blob->active.clusters); 1150 blob->active.clusters = NULL; 1151 blob->active.cluster_array_size = 0; 1152 } else if (blob->active.num_clusters != blob->active.cluster_array_size) { 1153 #ifndef __clang_analyzer__ 1154 void *tmp; 1155 1156 /* scan-build really can't figure reallocs, workaround it */ 1157 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters); 1158 assert(tmp != NULL); 1159 blob->active.clusters = tmp; 1160 #endif 1161 blob->active.cluster_array_size = blob->active.num_clusters; 1162 } 1163 1164 _spdk_blob_persist_complete(seq, ctx, bserrno); 1165 } 1166 1167 static void 1168 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1169 { 1170 struct spdk_blob_persist_ctx *ctx = cb_arg; 1171 struct spdk_blob *blob = ctx->blob; 1172 struct spdk_blob_store *bs = blob->bs; 1173 spdk_bs_batch_t *batch; 1174 size_t i; 1175 uint64_t lba; 1176 uint32_t lba_count; 1177 1178 /* Clusters don't move around in blobs. The list shrinks or grows 1179 * at the end, but no changes ever occur in the middle of the list. 1180 */ 1181 1182 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx); 1183 1184 /* Clear all clusters that were truncated */ 1185 lba = 0; 1186 lba_count = 0; 1187 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1188 uint64_t next_lba = blob->active.clusters[i]; 1189 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 1190 1191 if (next_lba > 0 && (lba + lba_count) == next_lba) { 1192 /* This cluster is contiguous with the previous one. */ 1193 lba_count += next_lba_count; 1194 continue; 1195 } 1196 1197 /* This cluster is not contiguous with the previous one. */ 1198 1199 /* If a run of LBAs previously existing, clear them now */ 1200 if (lba_count > 0) { 1201 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1202 } 1203 1204 /* Start building the next batch */ 1205 lba = next_lba; 1206 if (next_lba > 0) { 1207 lba_count = next_lba_count; 1208 } else { 1209 lba_count = 0; 1210 } 1211 } 1212 1213 /* If we ended with a contiguous set of LBAs, clear them now */ 1214 if (lba_count > 0) { 1215 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1216 } 1217 1218 spdk_bs_batch_close(batch); 1219 } 1220 1221 static void 1222 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1223 { 1224 struct spdk_blob_persist_ctx *ctx = cb_arg; 1225 struct spdk_blob *blob = ctx->blob; 1226 struct spdk_blob_store *bs = blob->bs; 1227 size_t i; 1228 1229 /* This loop starts at 1 because the first page is special and handled 1230 * below. The pages (except the first) are never written in place, 1231 * so any pages in the clean list must be zeroed. 1232 */ 1233 for (i = 1; i < blob->clean.num_pages; i++) { 1234 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 1235 } 1236 1237 if (blob->active.num_pages == 0) { 1238 uint32_t page_num; 1239 1240 page_num = _spdk_bs_blobid_to_page(blob->id); 1241 spdk_bit_array_clear(bs->used_md_pages, page_num); 1242 } 1243 1244 /* Move on to clearing clusters */ 1245 _spdk_blob_persist_clear_clusters(seq, ctx, 0); 1246 } 1247 1248 static void 1249 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1250 { 1251 struct spdk_blob_persist_ctx *ctx = cb_arg; 1252 struct spdk_blob *blob = ctx->blob; 1253 struct spdk_blob_store *bs = blob->bs; 1254 uint64_t lba; 1255 uint32_t lba_count; 1256 spdk_bs_batch_t *batch; 1257 size_t i; 1258 1259 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 1260 1261 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 1262 1263 /* This loop starts at 1 because the first page is special and handled 1264 * below. The pages (except the first) are never written in place, 1265 * so any pages in the clean list must be zeroed. 1266 */ 1267 for (i = 1; i < blob->clean.num_pages; i++) { 1268 lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]); 1269 1270 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1271 } 1272 1273 /* The first page will only be zeroed if this is a delete. */ 1274 if (blob->active.num_pages == 0) { 1275 uint32_t page_num; 1276 1277 /* The first page in the metadata goes where the blobid indicates */ 1278 page_num = _spdk_bs_blobid_to_page(blob->id); 1279 lba = _spdk_bs_md_page_to_lba(bs, page_num); 1280 1281 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1282 } 1283 1284 spdk_bs_batch_close(batch); 1285 } 1286 1287 static void 1288 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1289 { 1290 struct spdk_blob_persist_ctx *ctx = cb_arg; 1291 struct spdk_blob *blob = ctx->blob; 1292 struct spdk_blob_store *bs = blob->bs; 1293 uint64_t lba; 1294 uint32_t lba_count; 1295 struct spdk_blob_md_page *page; 1296 1297 if (blob->active.num_pages == 0) { 1298 /* Move on to the next step */ 1299 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1300 return; 1301 } 1302 1303 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1304 1305 page = &ctx->pages[0]; 1306 /* The first page in the metadata goes where the blobid indicates */ 1307 lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id)); 1308 1309 spdk_bs_sequence_write_dev(seq, page, lba, lba_count, 1310 _spdk_blob_persist_zero_pages, ctx); 1311 } 1312 1313 static void 1314 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1315 { 1316 struct spdk_blob_persist_ctx *ctx = cb_arg; 1317 struct spdk_blob *blob = ctx->blob; 1318 struct spdk_blob_store *bs = blob->bs; 1319 uint64_t lba; 1320 uint32_t lba_count; 1321 struct spdk_blob_md_page *page; 1322 spdk_bs_batch_t *batch; 1323 size_t i; 1324 1325 /* Clusters don't move around in blobs. The list shrinks or grows 1326 * at the end, but no changes ever occur in the middle of the list. 1327 */ 1328 1329 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1330 1331 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 1332 1333 /* This starts at 1. The root page is not written until 1334 * all of the others are finished 1335 */ 1336 for (i = 1; i < blob->active.num_pages; i++) { 1337 page = &ctx->pages[i]; 1338 assert(page->sequence_num == i); 1339 1340 lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]); 1341 1342 spdk_bs_batch_write_dev(batch, page, lba, lba_count); 1343 } 1344 1345 spdk_bs_batch_close(batch); 1346 } 1347 1348 static int 1349 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz) 1350 { 1351 uint64_t i; 1352 uint64_t *tmp; 1353 uint64_t lfc; /* lowest free cluster */ 1354 uint64_t num_clusters; 1355 struct spdk_blob_store *bs; 1356 1357 bs = blob->bs; 1358 1359 _spdk_blob_verify_md_op(blob); 1360 1361 if (blob->active.num_clusters == sz) { 1362 return 0; 1363 } 1364 1365 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1366 /* If this blob was resized to be larger, then smaller, then 1367 * larger without syncing, then the cluster array already 1368 * contains spare assigned clusters we can use. 1369 */ 1370 num_clusters = spdk_min(blob->active.cluster_array_size, 1371 sz); 1372 } else { 1373 num_clusters = blob->active.num_clusters; 1374 } 1375 1376 /* Do two passes - one to verify that we can obtain enough clusters 1377 * and another to actually claim them. 1378 */ 1379 1380 if (spdk_blob_is_thin_provisioned(blob) == false) { 1381 lfc = 0; 1382 for (i = num_clusters; i < sz; i++) { 1383 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1384 if (lfc == UINT32_MAX) { 1385 /* No more free clusters. Cannot satisfy the request */ 1386 return -ENOSPC; 1387 } 1388 lfc++; 1389 } 1390 } 1391 1392 if (sz > num_clusters) { 1393 /* Expand the cluster array if necessary. 1394 * We only shrink the array when persisting. 1395 */ 1396 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz); 1397 if (sz > 0 && tmp == NULL) { 1398 return -ENOMEM; 1399 } 1400 memset(tmp + blob->active.cluster_array_size, 0, 1401 sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); 1402 blob->active.clusters = tmp; 1403 blob->active.cluster_array_size = sz; 1404 } 1405 1406 blob->state = SPDK_BLOB_STATE_DIRTY; 1407 1408 if (spdk_blob_is_thin_provisioned(blob) == false) { 1409 lfc = 0; 1410 for (i = num_clusters; i < sz; i++) { 1411 _spdk_bs_allocate_cluster(blob, i, &lfc, true); 1412 lfc++; 1413 } 1414 } 1415 1416 blob->active.num_clusters = sz; 1417 1418 return 0; 1419 } 1420 1421 static void 1422 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx) 1423 { 1424 spdk_bs_sequence_t *seq = ctx->seq; 1425 struct spdk_blob *blob = ctx->blob; 1426 struct spdk_blob_store *bs = blob->bs; 1427 uint64_t i; 1428 uint32_t page_num; 1429 void *tmp; 1430 int rc; 1431 1432 if (blob->active.num_pages == 0) { 1433 /* This is the signal that the blob should be deleted. 1434 * Immediately jump to the clean up routine. */ 1435 assert(blob->clean.num_pages > 0); 1436 blob->state = SPDK_BLOB_STATE_CLEAN; 1437 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1438 return; 1439 1440 } 1441 1442 /* Generate the new metadata */ 1443 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1444 if (rc < 0) { 1445 _spdk_blob_persist_complete(seq, ctx, rc); 1446 return; 1447 } 1448 1449 assert(blob->active.num_pages >= 1); 1450 1451 /* Resize the cache of page indices */ 1452 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 1453 if (!tmp) { 1454 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1455 return; 1456 } 1457 blob->active.pages = tmp; 1458 1459 /* Assign this metadata to pages. This requires two passes - 1460 * one to verify that there are enough pages and a second 1461 * to actually claim them. */ 1462 page_num = 0; 1463 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1464 for (i = 1; i < blob->active.num_pages; i++) { 1465 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1466 if (page_num == UINT32_MAX) { 1467 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1468 return; 1469 } 1470 page_num++; 1471 } 1472 1473 page_num = 0; 1474 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1475 for (i = 1; i < blob->active.num_pages; i++) { 1476 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1477 ctx->pages[i - 1].next = page_num; 1478 /* Now that previous metadata page is complete, calculate the crc for it. */ 1479 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1480 blob->active.pages[i] = page_num; 1481 spdk_bit_array_set(bs->used_md_pages, page_num); 1482 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1483 page_num++; 1484 } 1485 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1486 /* Start writing the metadata from last page to first */ 1487 blob->state = SPDK_BLOB_STATE_CLEAN; 1488 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1489 } 1490 1491 static void 1492 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1493 { 1494 struct spdk_blob_persist_ctx *ctx = cb_arg; 1495 1496 ctx->blob->bs->clean = 0; 1497 1498 spdk_free(ctx->super); 1499 1500 _spdk_blob_persist_start(ctx); 1501 } 1502 1503 static void 1504 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1505 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg); 1506 1507 1508 static void 1509 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1510 { 1511 struct spdk_blob_persist_ctx *ctx = cb_arg; 1512 1513 ctx->super->clean = 0; 1514 if (ctx->super->size == 0) { 1515 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen; 1516 } 1517 1518 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx); 1519 } 1520 1521 1522 /* Write a blob to disk */ 1523 static void 1524 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1525 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1526 { 1527 struct spdk_blob_persist_ctx *ctx; 1528 1529 _spdk_blob_verify_md_op(blob); 1530 1531 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1532 cb_fn(seq, cb_arg, 0); 1533 return; 1534 } 1535 1536 ctx = calloc(1, sizeof(*ctx)); 1537 if (!ctx) { 1538 cb_fn(seq, cb_arg, -ENOMEM); 1539 return; 1540 } 1541 ctx->blob = blob; 1542 ctx->seq = seq; 1543 ctx->cb_fn = cb_fn; 1544 ctx->cb_arg = cb_arg; 1545 1546 if (blob->bs->clean) { 1547 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 1548 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1549 if (!ctx->super) { 1550 cb_fn(seq, cb_arg, -ENOMEM); 1551 free(ctx); 1552 return; 1553 } 1554 1555 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0), 1556 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)), 1557 _spdk_blob_persist_dirty, ctx); 1558 } else { 1559 _spdk_blob_persist_start(ctx); 1560 } 1561 } 1562 1563 struct spdk_blob_copy_cluster_ctx { 1564 struct spdk_blob *blob; 1565 uint8_t *buf; 1566 uint64_t page; 1567 uint64_t new_cluster; 1568 spdk_bs_sequence_t *seq; 1569 }; 1570 1571 static void 1572 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno) 1573 { 1574 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1575 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq; 1576 TAILQ_HEAD(, spdk_bs_request_set) requests; 1577 spdk_bs_user_op_t *op; 1578 1579 TAILQ_INIT(&requests); 1580 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link); 1581 1582 while (!TAILQ_EMPTY(&requests)) { 1583 op = TAILQ_FIRST(&requests); 1584 TAILQ_REMOVE(&requests, op, link); 1585 if (bserrno == 0) { 1586 spdk_bs_user_op_execute(op); 1587 } else { 1588 spdk_bs_user_op_abort(op); 1589 } 1590 } 1591 1592 spdk_free(ctx->buf); 1593 free(ctx); 1594 } 1595 1596 static void 1597 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno) 1598 { 1599 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1600 1601 if (bserrno) { 1602 if (bserrno == -EEXIST) { 1603 /* The metadata insert failed because another thread 1604 * allocated the cluster first. Free our cluster 1605 * but continue without error. */ 1606 bserrno = 0; 1607 } 1608 _spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster); 1609 } 1610 1611 spdk_bs_sequence_finish(ctx->seq, bserrno); 1612 } 1613 1614 static void 1615 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1616 { 1617 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1618 uint32_t cluster_number; 1619 1620 if (bserrno) { 1621 /* The write failed, so jump to the final completion handler */ 1622 spdk_bs_sequence_finish(seq, bserrno); 1623 return; 1624 } 1625 1626 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page); 1627 1628 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1629 _spdk_blob_insert_cluster_cpl, ctx); 1630 } 1631 1632 static void 1633 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1634 { 1635 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1636 1637 if (bserrno != 0) { 1638 /* The read failed, so jump to the final completion handler */ 1639 spdk_bs_sequence_finish(seq, bserrno); 1640 return; 1641 } 1642 1643 /* Write whole cluster */ 1644 spdk_bs_sequence_write_dev(seq, ctx->buf, 1645 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster), 1646 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1), 1647 _spdk_blob_write_copy_cpl, ctx); 1648 } 1649 1650 static void 1651 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob, 1652 struct spdk_io_channel *_ch, 1653 uint64_t io_unit, spdk_bs_user_op_t *op) 1654 { 1655 struct spdk_bs_cpl cpl; 1656 struct spdk_bs_channel *ch; 1657 struct spdk_blob_copy_cluster_ctx *ctx; 1658 uint32_t cluster_start_page; 1659 uint32_t cluster_number; 1660 int rc; 1661 1662 ch = spdk_io_channel_get_ctx(_ch); 1663 1664 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) { 1665 /* There are already operations pending. Queue this user op 1666 * and return because it will be re-executed when the outstanding 1667 * cluster allocation completes. */ 1668 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1669 return; 1670 } 1671 1672 /* Round the io_unit offset down to the first page in the cluster */ 1673 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit); 1674 1675 /* Calculate which index in the metadata cluster array the corresponding 1676 * cluster is supposed to be at. */ 1677 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit); 1678 1679 ctx = calloc(1, sizeof(*ctx)); 1680 if (!ctx) { 1681 spdk_bs_user_op_abort(op); 1682 return; 1683 } 1684 1685 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0); 1686 1687 ctx->blob = blob; 1688 ctx->page = cluster_start_page; 1689 1690 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1691 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, 1692 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1693 if (!ctx->buf) { 1694 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n", 1695 blob->bs->cluster_sz); 1696 free(ctx); 1697 spdk_bs_user_op_abort(op); 1698 return; 1699 } 1700 } 1701 1702 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false); 1703 if (rc != 0) { 1704 spdk_free(ctx->buf); 1705 free(ctx); 1706 spdk_bs_user_op_abort(op); 1707 return; 1708 } 1709 1710 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1711 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl; 1712 cpl.u.blob_basic.cb_arg = ctx; 1713 1714 ctx->seq = spdk_bs_sequence_start(_ch, &cpl); 1715 if (!ctx->seq) { 1716 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster); 1717 spdk_free(ctx->buf); 1718 free(ctx); 1719 spdk_bs_user_op_abort(op); 1720 return; 1721 } 1722 1723 /* Queue the user op to block other incoming operations */ 1724 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1725 1726 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1727 /* Read cluster from backing device */ 1728 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf, 1729 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page), 1730 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz), 1731 _spdk_blob_write_copy, ctx); 1732 } else { 1733 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1734 _spdk_blob_insert_cluster_cpl, ctx); 1735 } 1736 } 1737 1738 static void 1739 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length, 1740 uint64_t *lba, uint32_t *lba_count) 1741 { 1742 *lba_count = length; 1743 1744 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) { 1745 assert(blob->back_bs_dev != NULL); 1746 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit); 1747 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count); 1748 } else { 1749 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit); 1750 } 1751 } 1752 1753 struct op_split_ctx { 1754 struct spdk_blob *blob; 1755 struct spdk_io_channel *channel; 1756 uint64_t io_unit_offset; 1757 uint64_t io_units_remaining; 1758 void *curr_payload; 1759 enum spdk_blob_op_type op_type; 1760 spdk_bs_sequence_t *seq; 1761 }; 1762 1763 static void 1764 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) 1765 { 1766 struct op_split_ctx *ctx = cb_arg; 1767 struct spdk_blob *blob = ctx->blob; 1768 struct spdk_io_channel *ch = ctx->channel; 1769 enum spdk_blob_op_type op_type = ctx->op_type; 1770 uint8_t *buf = ctx->curr_payload; 1771 uint64_t offset = ctx->io_unit_offset; 1772 uint64_t length = ctx->io_units_remaining; 1773 uint64_t op_length; 1774 1775 if (bserrno != 0 || ctx->io_units_remaining == 0) { 1776 spdk_bs_sequence_finish(ctx->seq, bserrno); 1777 free(ctx); 1778 return; 1779 } 1780 1781 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob, 1782 offset)); 1783 1784 /* Update length and payload for next operation */ 1785 ctx->io_units_remaining -= op_length; 1786 ctx->io_unit_offset += op_length; 1787 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1788 ctx->curr_payload += op_length * blob->bs->io_unit_size; 1789 } 1790 1791 switch (op_type) { 1792 case SPDK_BLOB_READ: 1793 spdk_blob_io_read(blob, ch, buf, offset, op_length, 1794 _spdk_blob_request_submit_op_split_next, ctx); 1795 break; 1796 case SPDK_BLOB_WRITE: 1797 spdk_blob_io_write(blob, ch, buf, offset, op_length, 1798 _spdk_blob_request_submit_op_split_next, ctx); 1799 break; 1800 case SPDK_BLOB_UNMAP: 1801 spdk_blob_io_unmap(blob, ch, offset, op_length, 1802 _spdk_blob_request_submit_op_split_next, ctx); 1803 break; 1804 case SPDK_BLOB_WRITE_ZEROES: 1805 spdk_blob_io_write_zeroes(blob, ch, offset, op_length, 1806 _spdk_blob_request_submit_op_split_next, ctx); 1807 break; 1808 case SPDK_BLOB_READV: 1809 case SPDK_BLOB_WRITEV: 1810 SPDK_ERRLOG("readv/write not valid\n"); 1811 spdk_bs_sequence_finish(ctx->seq, -EINVAL); 1812 free(ctx); 1813 break; 1814 } 1815 } 1816 1817 static void 1818 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, 1819 void *payload, uint64_t offset, uint64_t length, 1820 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1821 { 1822 struct op_split_ctx *ctx; 1823 spdk_bs_sequence_t *seq; 1824 struct spdk_bs_cpl cpl; 1825 1826 assert(blob != NULL); 1827 1828 ctx = calloc(1, sizeof(struct op_split_ctx)); 1829 if (ctx == NULL) { 1830 cb_fn(cb_arg, -ENOMEM); 1831 return; 1832 } 1833 1834 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1835 cpl.u.blob_basic.cb_fn = cb_fn; 1836 cpl.u.blob_basic.cb_arg = cb_arg; 1837 1838 seq = spdk_bs_sequence_start(ch, &cpl); 1839 if (!seq) { 1840 free(ctx); 1841 cb_fn(cb_arg, -ENOMEM); 1842 return; 1843 } 1844 1845 ctx->blob = blob; 1846 ctx->channel = ch; 1847 ctx->curr_payload = payload; 1848 ctx->io_unit_offset = offset; 1849 ctx->io_units_remaining = length; 1850 ctx->op_type = op_type; 1851 ctx->seq = seq; 1852 1853 _spdk_blob_request_submit_op_split_next(ctx, 0); 1854 } 1855 1856 static void 1857 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob, 1858 void *payload, uint64_t offset, uint64_t length, 1859 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1860 { 1861 struct spdk_bs_cpl cpl; 1862 uint64_t lba; 1863 uint32_t lba_count; 1864 1865 assert(blob != NULL); 1866 1867 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1868 cpl.u.blob_basic.cb_fn = cb_fn; 1869 cpl.u.blob_basic.cb_arg = cb_arg; 1870 1871 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 1872 1873 if (blob->frozen_refcnt) { 1874 /* This blob I/O is frozen */ 1875 spdk_bs_user_op_t *op; 1876 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); 1877 1878 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1879 if (!op) { 1880 cb_fn(cb_arg, -ENOMEM); 1881 return; 1882 } 1883 1884 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 1885 1886 return; 1887 } 1888 1889 switch (op_type) { 1890 case SPDK_BLOB_READ: { 1891 spdk_bs_batch_t *batch; 1892 1893 batch = spdk_bs_batch_open(_ch, &cpl); 1894 if (!batch) { 1895 cb_fn(cb_arg, -ENOMEM); 1896 return; 1897 } 1898 1899 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1900 /* Read from the blob */ 1901 spdk_bs_batch_read_dev(batch, payload, lba, lba_count); 1902 } else { 1903 /* Read from the backing block device */ 1904 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count); 1905 } 1906 1907 spdk_bs_batch_close(batch); 1908 break; 1909 } 1910 case SPDK_BLOB_WRITE: 1911 case SPDK_BLOB_WRITE_ZEROES: { 1912 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1913 /* Write to the blob */ 1914 spdk_bs_batch_t *batch; 1915 1916 if (lba_count == 0) { 1917 cb_fn(cb_arg, 0); 1918 return; 1919 } 1920 1921 batch = spdk_bs_batch_open(_ch, &cpl); 1922 if (!batch) { 1923 cb_fn(cb_arg, -ENOMEM); 1924 return; 1925 } 1926 1927 if (op_type == SPDK_BLOB_WRITE) { 1928 spdk_bs_batch_write_dev(batch, payload, lba, lba_count); 1929 } else { 1930 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1931 } 1932 1933 spdk_bs_batch_close(batch); 1934 } else { 1935 /* Queue this operation and allocate the cluster */ 1936 spdk_bs_user_op_t *op; 1937 1938 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1939 if (!op) { 1940 cb_fn(cb_arg, -ENOMEM); 1941 return; 1942 } 1943 1944 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op); 1945 } 1946 break; 1947 } 1948 case SPDK_BLOB_UNMAP: { 1949 spdk_bs_batch_t *batch; 1950 1951 batch = spdk_bs_batch_open(_ch, &cpl); 1952 if (!batch) { 1953 cb_fn(cb_arg, -ENOMEM); 1954 return; 1955 } 1956 1957 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1958 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1959 } 1960 1961 spdk_bs_batch_close(batch); 1962 break; 1963 } 1964 case SPDK_BLOB_READV: 1965 case SPDK_BLOB_WRITEV: 1966 SPDK_ERRLOG("readv/write not valid\n"); 1967 cb_fn(cb_arg, -EINVAL); 1968 break; 1969 } 1970 } 1971 1972 static void 1973 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1974 void *payload, uint64_t offset, uint64_t length, 1975 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1976 { 1977 assert(blob != NULL); 1978 1979 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1980 cb_fn(cb_arg, -EPERM); 1981 return; 1982 } 1983 1984 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 1985 cb_fn(cb_arg, -EINVAL); 1986 return; 1987 } 1988 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) { 1989 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length, 1990 cb_fn, cb_arg, op_type); 1991 } else { 1992 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length, 1993 cb_fn, cb_arg, op_type); 1994 } 1995 } 1996 1997 struct rw_iov_ctx { 1998 struct spdk_blob *blob; 1999 struct spdk_io_channel *channel; 2000 spdk_blob_op_complete cb_fn; 2001 void *cb_arg; 2002 bool read; 2003 int iovcnt; 2004 struct iovec *orig_iov; 2005 uint64_t io_unit_offset; 2006 uint64_t io_units_remaining; 2007 uint64_t io_units_done; 2008 struct iovec iov[0]; 2009 }; 2010 2011 static void 2012 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2013 { 2014 assert(cb_arg == NULL); 2015 spdk_bs_sequence_finish(seq, bserrno); 2016 } 2017 2018 static void 2019 _spdk_rw_iov_split_next(void *cb_arg, int bserrno) 2020 { 2021 struct rw_iov_ctx *ctx = cb_arg; 2022 struct spdk_blob *blob = ctx->blob; 2023 struct iovec *iov, *orig_iov; 2024 int iovcnt; 2025 size_t orig_iovoff; 2026 uint64_t io_units_count, io_units_to_boundary, io_unit_offset; 2027 uint64_t byte_count; 2028 2029 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2030 ctx->cb_fn(ctx->cb_arg, bserrno); 2031 free(ctx); 2032 return; 2033 } 2034 2035 io_unit_offset = ctx->io_unit_offset; 2036 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset); 2037 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary); 2038 /* 2039 * Get index and offset into the original iov array for our current position in the I/O sequence. 2040 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 2041 * point to the current position in the I/O sequence. 2042 */ 2043 byte_count = ctx->io_units_done * blob->bs->io_unit_size; 2044 orig_iov = &ctx->orig_iov[0]; 2045 orig_iovoff = 0; 2046 while (byte_count > 0) { 2047 if (byte_count >= orig_iov->iov_len) { 2048 byte_count -= orig_iov->iov_len; 2049 orig_iov++; 2050 } else { 2051 orig_iovoff = byte_count; 2052 byte_count = 0; 2053 } 2054 } 2055 2056 /* 2057 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 2058 * bytes of this next I/O remain to be accounted for in the new iov array. 2059 */ 2060 byte_count = io_units_count * blob->bs->io_unit_size; 2061 iov = &ctx->iov[0]; 2062 iovcnt = 0; 2063 while (byte_count > 0) { 2064 assert(iovcnt < ctx->iovcnt); 2065 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 2066 iov->iov_base = orig_iov->iov_base + orig_iovoff; 2067 byte_count -= iov->iov_len; 2068 orig_iovoff = 0; 2069 orig_iov++; 2070 iov++; 2071 iovcnt++; 2072 } 2073 2074 ctx->io_unit_offset += io_units_count; 2075 ctx->io_units_remaining -= io_units_count; 2076 ctx->io_units_done += io_units_count; 2077 iov = &ctx->iov[0]; 2078 2079 if (ctx->read) { 2080 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2081 io_units_count, _spdk_rw_iov_split_next, ctx); 2082 } else { 2083 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2084 io_units_count, _spdk_rw_iov_split_next, ctx); 2085 } 2086 } 2087 2088 static void 2089 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2090 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2091 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 2092 { 2093 struct spdk_bs_cpl cpl; 2094 2095 assert(blob != NULL); 2096 2097 if (!read && blob->data_ro) { 2098 cb_fn(cb_arg, -EPERM); 2099 return; 2100 } 2101 2102 if (length == 0) { 2103 cb_fn(cb_arg, 0); 2104 return; 2105 } 2106 2107 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2108 cb_fn(cb_arg, -EINVAL); 2109 return; 2110 } 2111 2112 /* 2113 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 2114 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 2115 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 2116 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 2117 * to allocate a separate iov array and split the I/O such that none of the resulting 2118 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 2119 * but since this case happens very infrequently, any performance impact will be negligible. 2120 * 2121 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 2122 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 2123 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 2124 * when the batch was completed, to allow for freeing the memory for the iov arrays. 2125 */ 2126 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) { 2127 uint32_t lba_count; 2128 uint64_t lba; 2129 2130 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2131 cpl.u.blob_basic.cb_fn = cb_fn; 2132 cpl.u.blob_basic.cb_arg = cb_arg; 2133 2134 if (blob->frozen_refcnt) { 2135 /* This blob I/O is frozen */ 2136 enum spdk_blob_op_type op_type; 2137 spdk_bs_user_op_t *op; 2138 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); 2139 2140 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV; 2141 op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length); 2142 if (!op) { 2143 cb_fn(cb_arg, -ENOMEM); 2144 return; 2145 } 2146 2147 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2148 2149 return; 2150 } 2151 2152 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2153 2154 if (read) { 2155 spdk_bs_sequence_t *seq; 2156 2157 seq = spdk_bs_sequence_start(_channel, &cpl); 2158 if (!seq) { 2159 cb_fn(cb_arg, -ENOMEM); 2160 return; 2161 } 2162 2163 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2164 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2165 } else { 2166 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count, 2167 _spdk_rw_iov_done, NULL); 2168 } 2169 } else { 2170 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2171 spdk_bs_sequence_t *seq; 2172 2173 seq = spdk_bs_sequence_start(_channel, &cpl); 2174 if (!seq) { 2175 cb_fn(cb_arg, -ENOMEM); 2176 return; 2177 } 2178 2179 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2180 } else { 2181 /* Queue this operation and allocate the cluster */ 2182 spdk_bs_user_op_t *op; 2183 2184 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, 2185 length); 2186 if (!op) { 2187 cb_fn(cb_arg, -ENOMEM); 2188 return; 2189 } 2190 2191 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op); 2192 } 2193 } 2194 } else { 2195 struct rw_iov_ctx *ctx; 2196 2197 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 2198 if (ctx == NULL) { 2199 cb_fn(cb_arg, -ENOMEM); 2200 return; 2201 } 2202 2203 ctx->blob = blob; 2204 ctx->channel = _channel; 2205 ctx->cb_fn = cb_fn; 2206 ctx->cb_arg = cb_arg; 2207 ctx->read = read; 2208 ctx->orig_iov = iov; 2209 ctx->iovcnt = iovcnt; 2210 ctx->io_unit_offset = offset; 2211 ctx->io_units_remaining = length; 2212 ctx->io_units_done = 0; 2213 2214 _spdk_rw_iov_split_next(ctx, 0); 2215 } 2216 } 2217 2218 static struct spdk_blob * 2219 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 2220 { 2221 struct spdk_blob *blob; 2222 2223 TAILQ_FOREACH(blob, &bs->blobs, link) { 2224 if (blob->id == blobid) { 2225 return blob; 2226 } 2227 } 2228 2229 return NULL; 2230 } 2231 2232 static void 2233 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob, 2234 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry) 2235 { 2236 assert(blob != NULL); 2237 *snapshot_entry = NULL; 2238 *clone_entry = NULL; 2239 2240 if (blob->parent_id == SPDK_BLOBID_INVALID) { 2241 return; 2242 } 2243 2244 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) { 2245 if ((*snapshot_entry)->id == blob->parent_id) { 2246 break; 2247 } 2248 } 2249 2250 if (*snapshot_entry != NULL) { 2251 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) { 2252 if ((*clone_entry)->id == blob->id) { 2253 break; 2254 } 2255 } 2256 2257 assert(clone_entry != NULL); 2258 } 2259 } 2260 2261 static int 2262 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 2263 { 2264 struct spdk_blob_store *bs = io_device; 2265 struct spdk_bs_channel *channel = ctx_buf; 2266 struct spdk_bs_dev *dev; 2267 uint32_t max_ops = bs->max_channel_ops; 2268 uint32_t i; 2269 2270 dev = bs->dev; 2271 2272 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 2273 if (!channel->req_mem) { 2274 return -1; 2275 } 2276 2277 TAILQ_INIT(&channel->reqs); 2278 2279 for (i = 0; i < max_ops; i++) { 2280 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 2281 } 2282 2283 channel->bs = bs; 2284 channel->dev = dev; 2285 channel->dev_channel = dev->create_channel(dev); 2286 2287 if (!channel->dev_channel) { 2288 SPDK_ERRLOG("Failed to create device channel.\n"); 2289 free(channel->req_mem); 2290 return -1; 2291 } 2292 2293 TAILQ_INIT(&channel->need_cluster_alloc); 2294 TAILQ_INIT(&channel->queued_io); 2295 2296 return 0; 2297 } 2298 2299 static void 2300 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 2301 { 2302 struct spdk_bs_channel *channel = ctx_buf; 2303 spdk_bs_user_op_t *op; 2304 2305 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) { 2306 op = TAILQ_FIRST(&channel->need_cluster_alloc); 2307 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link); 2308 spdk_bs_user_op_abort(op); 2309 } 2310 2311 while (!TAILQ_EMPTY(&channel->queued_io)) { 2312 op = TAILQ_FIRST(&channel->queued_io); 2313 TAILQ_REMOVE(&channel->queued_io, op, link); 2314 spdk_bs_user_op_abort(op); 2315 } 2316 2317 free(channel->req_mem); 2318 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 2319 } 2320 2321 static void 2322 _spdk_bs_dev_destroy(void *io_device) 2323 { 2324 struct spdk_blob_store *bs = io_device; 2325 struct spdk_blob *blob, *blob_tmp; 2326 2327 bs->dev->destroy(bs->dev); 2328 2329 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 2330 TAILQ_REMOVE(&bs->blobs, blob, link); 2331 _spdk_blob_free(blob); 2332 } 2333 2334 pthread_mutex_destroy(&bs->used_clusters_mutex); 2335 2336 spdk_bit_array_free(&bs->used_blobids); 2337 spdk_bit_array_free(&bs->used_md_pages); 2338 spdk_bit_array_free(&bs->used_clusters); 2339 /* 2340 * If this function is called for any reason except a successful unload, 2341 * the unload_cpl type will be NONE and this will be a nop. 2342 */ 2343 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 2344 2345 free(bs); 2346 } 2347 2348 static int 2349 _spdk_bs_blob_list_add(struct spdk_blob *blob) 2350 { 2351 spdk_blob_id snapshot_id; 2352 struct spdk_blob_list *snapshot_entry = NULL; 2353 struct spdk_blob_list *clone_entry = NULL; 2354 2355 assert(blob != NULL); 2356 2357 snapshot_id = blob->parent_id; 2358 if (snapshot_id == SPDK_BLOBID_INVALID) { 2359 return 0; 2360 } 2361 2362 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id); 2363 if (snapshot_entry == NULL) { 2364 /* Snapshot not found */ 2365 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list)); 2366 if (snapshot_entry == NULL) { 2367 return -ENOMEM; 2368 } 2369 snapshot_entry->id = snapshot_id; 2370 TAILQ_INIT(&snapshot_entry->clones); 2371 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link); 2372 } else { 2373 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 2374 if (clone_entry->id == blob->id) { 2375 break; 2376 } 2377 } 2378 } 2379 2380 if (clone_entry == NULL) { 2381 /* Clone not found */ 2382 clone_entry = calloc(1, sizeof(struct spdk_blob_list)); 2383 if (clone_entry == NULL) { 2384 return -ENOMEM; 2385 } 2386 clone_entry->id = blob->id; 2387 TAILQ_INIT(&clone_entry->clones); 2388 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link); 2389 snapshot_entry->clone_count++; 2390 } 2391 2392 return 0; 2393 } 2394 2395 static void 2396 _spdk_bs_blob_list_remove(struct spdk_blob *blob) 2397 { 2398 struct spdk_blob_list *snapshot_entry = NULL; 2399 struct spdk_blob_list *clone_entry = NULL; 2400 2401 _spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry); 2402 2403 if (snapshot_entry == NULL) { 2404 return; 2405 } 2406 2407 blob->parent_id = SPDK_BLOBID_INVALID; 2408 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2409 free(clone_entry); 2410 2411 snapshot_entry->clone_count--; 2412 } 2413 2414 static int 2415 _spdk_bs_blob_list_free(struct spdk_blob_store *bs) 2416 { 2417 struct spdk_blob_list *snapshot_entry; 2418 struct spdk_blob_list *snapshot_entry_tmp; 2419 struct spdk_blob_list *clone_entry; 2420 struct spdk_blob_list *clone_entry_tmp; 2421 2422 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) { 2423 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) { 2424 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2425 free(clone_entry); 2426 } 2427 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link); 2428 free(snapshot_entry); 2429 } 2430 2431 return 0; 2432 } 2433 2434 static void 2435 _spdk_bs_free(struct spdk_blob_store *bs) 2436 { 2437 _spdk_bs_blob_list_free(bs); 2438 2439 spdk_bs_unregister_md_thread(bs); 2440 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 2441 } 2442 2443 void 2444 spdk_bs_opts_init(struct spdk_bs_opts *opts) 2445 { 2446 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 2447 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 2448 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 2449 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 2450 opts->clear_method = BS_CLEAR_WITH_UNMAP; 2451 memset(&opts->bstype, 0, sizeof(opts->bstype)); 2452 opts->iter_cb_fn = NULL; 2453 opts->iter_cb_arg = NULL; 2454 } 2455 2456 static int 2457 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 2458 { 2459 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 2460 opts->max_channel_ops == 0) { 2461 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 2462 return -1; 2463 } 2464 2465 return 0; 2466 } 2467 2468 static int 2469 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs) 2470 { 2471 struct spdk_blob_store *bs; 2472 uint64_t dev_size; 2473 int rc; 2474 2475 dev_size = dev->blocklen * dev->blockcnt; 2476 if (dev_size < opts->cluster_sz) { 2477 /* Device size cannot be smaller than cluster size of blobstore */ 2478 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n", 2479 dev_size, opts->cluster_sz); 2480 return -ENOSPC; 2481 } 2482 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 2483 /* Cluster size cannot be smaller than page size */ 2484 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n", 2485 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 2486 return -EINVAL; 2487 } 2488 bs = calloc(1, sizeof(struct spdk_blob_store)); 2489 if (!bs) { 2490 return -ENOMEM; 2491 } 2492 2493 TAILQ_INIT(&bs->blobs); 2494 TAILQ_INIT(&bs->snapshots); 2495 bs->dev = dev; 2496 bs->md_thread = spdk_get_thread(); 2497 assert(bs->md_thread != NULL); 2498 2499 /* 2500 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 2501 * even multiple of the cluster size. 2502 */ 2503 bs->cluster_sz = opts->cluster_sz; 2504 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 2505 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2506 bs->num_free_clusters = bs->total_clusters; 2507 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 2508 bs->io_unit_size = dev->blocklen; 2509 if (bs->used_clusters == NULL) { 2510 free(bs); 2511 return -ENOMEM; 2512 } 2513 2514 bs->max_channel_ops = opts->max_channel_ops; 2515 bs->super_blob = SPDK_BLOBID_INVALID; 2516 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 2517 2518 /* The metadata is assumed to be at least 1 page */ 2519 bs->used_md_pages = spdk_bit_array_create(1); 2520 bs->used_blobids = spdk_bit_array_create(0); 2521 2522 pthread_mutex_init(&bs->used_clusters_mutex, NULL); 2523 2524 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 2525 sizeof(struct spdk_bs_channel), "blobstore"); 2526 rc = spdk_bs_register_md_thread(bs); 2527 if (rc == -1) { 2528 spdk_io_device_unregister(bs, NULL); 2529 pthread_mutex_destroy(&bs->used_clusters_mutex); 2530 spdk_bit_array_free(&bs->used_blobids); 2531 spdk_bit_array_free(&bs->used_md_pages); 2532 spdk_bit_array_free(&bs->used_clusters); 2533 free(bs); 2534 /* FIXME: this is a lie but don't know how to get a proper error code here */ 2535 return -ENOMEM; 2536 } 2537 2538 *_bs = bs; 2539 return 0; 2540 } 2541 2542 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 2543 2544 struct spdk_bs_load_ctx { 2545 struct spdk_blob_store *bs; 2546 struct spdk_bs_super_block *super; 2547 2548 struct spdk_bs_md_mask *mask; 2549 bool in_page_chain; 2550 uint32_t page_index; 2551 uint32_t cur_page; 2552 struct spdk_blob_md_page *page; 2553 2554 spdk_bs_sequence_t *seq; 2555 spdk_blob_op_with_handle_complete iter_cb_fn; 2556 void *iter_cb_arg; 2557 struct spdk_blob *blob; 2558 spdk_blob_id blobid; 2559 }; 2560 2561 static void 2562 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno) 2563 { 2564 assert(bserrno != 0); 2565 2566 spdk_free(ctx->super); 2567 spdk_bs_sequence_finish(seq, bserrno); 2568 _spdk_bs_free(ctx->bs); 2569 free(ctx); 2570 } 2571 2572 static void 2573 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 2574 { 2575 uint32_t i = 0; 2576 2577 while (true) { 2578 i = spdk_bit_array_find_first_set(array, i); 2579 if (i >= mask->length) { 2580 break; 2581 } 2582 mask->mask[i / 8] |= 1U << (i % 8); 2583 i++; 2584 } 2585 } 2586 2587 static int 2588 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask) 2589 { 2590 struct spdk_bit_array *array; 2591 uint32_t i; 2592 2593 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) { 2594 return -ENOMEM; 2595 } 2596 2597 array = *array_ptr; 2598 for (i = 0; i < mask->length; i++) { 2599 if (mask->mask[i / 8] & (1U << (i % 8))) { 2600 spdk_bit_array_set(array, i); 2601 } 2602 } 2603 2604 return 0; 2605 } 2606 2607 static void 2608 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 2609 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 2610 { 2611 /* Update the values in the super block */ 2612 super->super_blob = bs->super_blob; 2613 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 2614 super->crc = _spdk_blob_md_page_calc_crc(super); 2615 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0), 2616 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 2617 cb_fn, cb_arg); 2618 } 2619 2620 static void 2621 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2622 { 2623 struct spdk_bs_load_ctx *ctx = arg; 2624 uint64_t mask_size, lba, lba_count; 2625 2626 /* Write out the used clusters mask */ 2627 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 2628 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2629 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2630 if (!ctx->mask) { 2631 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2632 return; 2633 } 2634 2635 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 2636 ctx->mask->length = ctx->bs->total_clusters; 2637 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 2638 2639 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 2640 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 2641 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 2642 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2643 } 2644 2645 static void 2646 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2647 { 2648 struct spdk_bs_load_ctx *ctx = arg; 2649 uint64_t mask_size, lba, lba_count; 2650 2651 if (seq->bserrno) { 2652 _spdk_bs_load_ctx_fail(seq, ctx, seq->bserrno); 2653 return; 2654 } 2655 2656 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 2657 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2658 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2659 if (!ctx->mask) { 2660 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2661 return; 2662 } 2663 2664 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 2665 ctx->mask->length = ctx->super->md_len; 2666 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 2667 2668 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 2669 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 2670 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 2671 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2672 } 2673 2674 static void 2675 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2676 { 2677 struct spdk_bs_load_ctx *ctx = arg; 2678 uint64_t mask_size, lba, lba_count; 2679 2680 if (ctx->super->used_blobid_mask_len == 0) { 2681 /* 2682 * This is a pre-v3 on-disk format where the blobid mask does not get 2683 * written to disk. 2684 */ 2685 cb_fn(seq, arg, 0); 2686 return; 2687 } 2688 2689 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2690 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2691 SPDK_MALLOC_DMA); 2692 if (!ctx->mask) { 2693 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2694 return; 2695 } 2696 2697 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 2698 ctx->mask->length = ctx->super->md_len; 2699 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 2700 2701 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask); 2702 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2703 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2704 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2705 } 2706 2707 static void 2708 _spdk_blob_set_thin_provision(struct spdk_blob *blob) 2709 { 2710 _spdk_blob_verify_md_op(blob); 2711 blob->invalid_flags |= SPDK_BLOB_THIN_PROV; 2712 blob->state = SPDK_BLOB_STATE_DIRTY; 2713 } 2714 2715 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno); 2716 2717 static void 2718 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno) 2719 { 2720 struct spdk_bs_load_ctx *ctx = cb_arg; 2721 spdk_blob_id id; 2722 int64_t page_num; 2723 2724 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our 2725 * last blob has been removed */ 2726 page_num = _spdk_bs_blobid_to_page(ctx->blobid); 2727 page_num++; 2728 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num); 2729 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) { 2730 _spdk_bs_load_iter(ctx, NULL, -ENOENT); 2731 return; 2732 } 2733 2734 id = _spdk_bs_page_to_blobid(page_num); 2735 2736 spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx); 2737 } 2738 2739 static void 2740 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno) 2741 { 2742 struct spdk_bs_load_ctx *ctx = cb_arg; 2743 2744 if (bserrno != 0) { 2745 SPDK_ERRLOG("Failed to close corrupted blob\n"); 2746 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2747 return; 2748 } 2749 2750 spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx); 2751 } 2752 2753 static void 2754 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno) 2755 { 2756 struct spdk_bs_load_ctx *ctx = cb_arg; 2757 uint64_t i; 2758 2759 if (bserrno != 0) { 2760 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2761 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2762 return; 2763 } 2764 2765 /* Snapshot and clone have the same copy of cluster map at this point. 2766 * Let's clear cluster map for snpashot now so that it won't be cleared 2767 * for clone later when we remove snapshot. Also set thin provision to 2768 * pass data corruption check */ 2769 for (i = 0; i < ctx->blob->active.num_clusters; i++) { 2770 ctx->blob->active.clusters[i] = 0; 2771 } 2772 2773 ctx->blob->md_ro = false; 2774 2775 _spdk_blob_set_thin_provision(ctx->blob); 2776 2777 ctx->blobid = ctx->blob->id; 2778 2779 spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx); 2780 } 2781 2782 static void 2783 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno) 2784 { 2785 struct spdk_bs_load_ctx *ctx = cb_arg; 2786 2787 if (bserrno != 0) { 2788 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2789 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2790 return; 2791 } 2792 2793 ctx->blob->md_ro = false; 2794 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true); 2795 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true); 2796 spdk_blob_set_read_only(ctx->blob); 2797 2798 if (ctx->iter_cb_fn) { 2799 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0); 2800 } 2801 _spdk_bs_blob_list_add(ctx->blob); 2802 2803 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2804 } 2805 2806 static void 2807 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno) 2808 { 2809 struct spdk_bs_load_ctx *ctx = cb_arg; 2810 2811 if (bserrno != 0) { 2812 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n"); 2813 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2814 return; 2815 } 2816 2817 if (blob->parent_id == ctx->blob->id) { 2818 /* Power failure occured before updating clone (snapshot delete case) 2819 * or after updating clone (creating snapshot case) - keep snapshot */ 2820 spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx); 2821 } else { 2822 /* Power failure occured after updating clone (snapshot delete case) 2823 * or before updating clone (creating snapshot case) - remove snapshot */ 2824 spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx); 2825 } 2826 } 2827 2828 static void 2829 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno) 2830 { 2831 struct spdk_bs_load_ctx *ctx = arg; 2832 const void *value; 2833 size_t len; 2834 int rc = 0; 2835 2836 if (bserrno == 0) { 2837 /* Examine blob if it is corrupted after power failure. Fix 2838 * the ones that can be fixed and remove any other corrupted 2839 * ones. If it is not corrupted just process it */ 2840 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true); 2841 if (rc != 0) { 2842 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true); 2843 if (rc != 0) { 2844 /* Not corrupted - process it and continue with iterating through blobs */ 2845 if (ctx->iter_cb_fn) { 2846 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0); 2847 } 2848 _spdk_bs_blob_list_add(blob); 2849 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx); 2850 return; 2851 } 2852 2853 } 2854 2855 assert(len == sizeof(spdk_blob_id)); 2856 2857 ctx->blob = blob; 2858 2859 /* Open clone to check if we are able to fix this blob or should we remove it */ 2860 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx); 2861 return; 2862 } else if (bserrno == -ENOENT) { 2863 bserrno = 0; 2864 } else { 2865 /* 2866 * This case needs to be looked at further. Same problem 2867 * exists with applications that rely on explicit blob 2868 * iteration. We should just skip the blob that failed 2869 * to load and continue on to the next one. 2870 */ 2871 SPDK_ERRLOG("Error in iterating blobs\n"); 2872 } 2873 2874 ctx->iter_cb_fn = NULL; 2875 2876 spdk_free(ctx->super); 2877 spdk_free(ctx->mask); 2878 spdk_bs_sequence_finish(ctx->seq, bserrno); 2879 free(ctx); 2880 } 2881 2882 static void 2883 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno) 2884 { 2885 ctx->seq = seq; 2886 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx); 2887 } 2888 2889 static void 2890 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2891 { 2892 struct spdk_bs_load_ctx *ctx = cb_arg; 2893 int rc; 2894 2895 /* The type must be correct */ 2896 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 2897 2898 /* The length of the mask (in bits) must not be greater than 2899 * the length of the buffer (converted to bits) */ 2900 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 2901 2902 /* The length of the mask must be exactly equal to the size 2903 * (in pages) of the metadata region */ 2904 assert(ctx->mask->length == ctx->super->md_len); 2905 2906 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask); 2907 if (rc < 0) { 2908 spdk_free(ctx->mask); 2909 _spdk_bs_load_ctx_fail(seq, ctx, rc); 2910 return; 2911 } 2912 2913 _spdk_bs_load_complete(seq, ctx, bserrno); 2914 } 2915 2916 static void 2917 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2918 { 2919 struct spdk_bs_load_ctx *ctx = cb_arg; 2920 uint64_t lba, lba_count, mask_size; 2921 int rc; 2922 2923 /* The type must be correct */ 2924 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 2925 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2926 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 2927 struct spdk_blob_md_page) * 8)); 2928 /* The length of the mask must be exactly equal to the total number of clusters */ 2929 assert(ctx->mask->length == ctx->bs->total_clusters); 2930 2931 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask); 2932 if (rc < 0) { 2933 spdk_free(ctx->mask); 2934 _spdk_bs_load_ctx_fail(seq, ctx, rc); 2935 return; 2936 } 2937 2938 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters); 2939 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters); 2940 2941 spdk_free(ctx->mask); 2942 2943 /* Read the used blobids mask */ 2944 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2945 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2946 SPDK_MALLOC_DMA); 2947 if (!ctx->mask) { 2948 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2949 return; 2950 } 2951 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2952 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2953 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 2954 _spdk_bs_load_used_blobids_cpl, ctx); 2955 } 2956 2957 static void 2958 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2959 { 2960 struct spdk_bs_load_ctx *ctx = cb_arg; 2961 uint64_t lba, lba_count, mask_size; 2962 int rc; 2963 2964 /* The type must be correct */ 2965 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 2966 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2967 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 2968 8)); 2969 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 2970 assert(ctx->mask->length == ctx->super->md_len); 2971 2972 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask); 2973 if (rc < 0) { 2974 spdk_free(ctx->mask); 2975 _spdk_bs_load_ctx_fail(seq, ctx, rc); 2976 return; 2977 } 2978 2979 spdk_free(ctx->mask); 2980 2981 /* Read the used clusters mask */ 2982 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 2983 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2984 SPDK_MALLOC_DMA); 2985 if (!ctx->mask) { 2986 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2987 return; 2988 } 2989 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 2990 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 2991 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 2992 _spdk_bs_load_used_clusters_cpl, ctx); 2993 } 2994 2995 static void 2996 _spdk_bs_load_read_used_pages(spdk_bs_sequence_t *seq, void *cb_arg) 2997 { 2998 struct spdk_bs_load_ctx *ctx = cb_arg; 2999 uint64_t lba, lba_count, mask_size; 3000 3001 /* Read the used pages mask */ 3002 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3003 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3004 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3005 if (!ctx->mask) { 3006 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3007 return; 3008 } 3009 3010 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3011 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3012 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3013 _spdk_bs_load_used_pages_cpl, ctx); 3014 } 3015 3016 static int 3017 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 3018 { 3019 struct spdk_blob_md_descriptor *desc; 3020 size_t cur_desc = 0; 3021 3022 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3023 while (cur_desc < sizeof(page->descriptors)) { 3024 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3025 if (desc->length == 0) { 3026 /* If padding and length are 0, this terminates the page */ 3027 break; 3028 } 3029 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3030 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3031 unsigned int i, j; 3032 unsigned int cluster_count = 0; 3033 uint32_t cluster_idx; 3034 3035 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3036 3037 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3038 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 3039 cluster_idx = desc_extent_rle->extents[i].cluster_idx; 3040 /* 3041 * cluster_idx = 0 means an unallocated cluster - don't mark that 3042 * in the used cluster map. 3043 */ 3044 if (cluster_idx != 0) { 3045 spdk_bit_array_set(bs->used_clusters, cluster_idx + j); 3046 if (bs->num_free_clusters == 0) { 3047 return -ENOSPC; 3048 } 3049 bs->num_free_clusters--; 3050 } 3051 cluster_count++; 3052 } 3053 } 3054 if (cluster_count == 0) { 3055 return -EINVAL; 3056 } 3057 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3058 /* Skip this item */ 3059 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3060 /* Skip this item */ 3061 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3062 /* Skip this item */ 3063 } else { 3064 /* Error */ 3065 return -EINVAL; 3066 } 3067 /* Advance to the next descriptor */ 3068 cur_desc += sizeof(*desc) + desc->length; 3069 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3070 break; 3071 } 3072 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3073 } 3074 return 0; 3075 } 3076 3077 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 3078 { 3079 uint32_t crc; 3080 3081 crc = _spdk_blob_md_page_calc_crc(ctx->page); 3082 if (crc != ctx->page->crc) { 3083 return false; 3084 } 3085 3086 if (ctx->page->sequence_num == 0 && 3087 _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 3088 return false; 3089 } 3090 return true; 3091 } 3092 3093 static void 3094 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3095 3096 static void 3097 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3098 { 3099 struct spdk_bs_load_ctx *ctx = cb_arg; 3100 3101 _spdk_bs_load_complete(seq, ctx, bserrno); 3102 } 3103 3104 static void 3105 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3106 { 3107 struct spdk_bs_load_ctx *ctx = cb_arg; 3108 3109 spdk_free(ctx->mask); 3110 ctx->mask = NULL; 3111 3112 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 3113 } 3114 3115 static void 3116 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3117 { 3118 struct spdk_bs_load_ctx *ctx = cb_arg; 3119 3120 spdk_free(ctx->mask); 3121 ctx->mask = NULL; 3122 3123 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl); 3124 } 3125 3126 static void 3127 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3128 { 3129 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 3130 } 3131 3132 static void 3133 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3134 { 3135 struct spdk_bs_load_ctx *ctx = cb_arg; 3136 uint64_t num_md_clusters; 3137 uint64_t i; 3138 uint32_t page_num; 3139 3140 if (bserrno != 0) { 3141 _spdk_bs_load_ctx_fail(seq, ctx, bserrno); 3142 return; 3143 } 3144 3145 page_num = ctx->cur_page; 3146 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 3147 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 3148 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 3149 if (ctx->page->sequence_num == 0) { 3150 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 3151 } 3152 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 3153 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3154 return; 3155 } 3156 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 3157 ctx->in_page_chain = true; 3158 ctx->cur_page = ctx->page->next; 3159 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 3160 return; 3161 } 3162 } 3163 } 3164 3165 ctx->in_page_chain = false; 3166 3167 do { 3168 ctx->page_index++; 3169 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 3170 3171 if (ctx->page_index < ctx->super->md_len) { 3172 ctx->cur_page = ctx->page_index; 3173 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 3174 } else { 3175 /* Claim all of the clusters used by the metadata */ 3176 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster); 3177 for (i = 0; i < num_md_clusters; i++) { 3178 _spdk_bs_claim_cluster(ctx->bs, i); 3179 } 3180 spdk_free(ctx->page); 3181 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 3182 } 3183 } 3184 3185 static void 3186 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3187 { 3188 struct spdk_bs_load_ctx *ctx = cb_arg; 3189 uint64_t lba; 3190 3191 assert(ctx->cur_page < ctx->super->md_len); 3192 lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page); 3193 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3194 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3195 _spdk_bs_load_replay_md_cpl, ctx); 3196 } 3197 3198 static void 3199 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 3200 { 3201 struct spdk_bs_load_ctx *ctx = cb_arg; 3202 3203 ctx->page_index = 0; 3204 ctx->cur_page = 0; 3205 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3206 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3207 if (!ctx->page) { 3208 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3209 return; 3210 } 3211 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 3212 } 3213 3214 static void 3215 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 3216 { 3217 struct spdk_bs_load_ctx *ctx = cb_arg; 3218 int rc; 3219 3220 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 3221 if (rc < 0) { 3222 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3223 return; 3224 } 3225 3226 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 3227 if (rc < 0) { 3228 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3229 return; 3230 } 3231 3232 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3233 if (rc < 0) { 3234 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3235 return; 3236 } 3237 3238 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 3239 _spdk_bs_load_replay_md(seq, cb_arg); 3240 } 3241 3242 static void 3243 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3244 { 3245 struct spdk_bs_load_ctx *ctx = cb_arg; 3246 uint32_t crc; 3247 int rc; 3248 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 3249 3250 if (ctx->super->version > SPDK_BS_VERSION || 3251 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 3252 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3253 return; 3254 } 3255 3256 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3257 sizeof(ctx->super->signature)) != 0) { 3258 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3259 return; 3260 } 3261 3262 crc = _spdk_blob_md_page_calc_crc(ctx->super); 3263 if (crc != ctx->super->crc) { 3264 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3265 return; 3266 } 3267 3268 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3269 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 3270 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3271 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 3272 } else { 3273 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 3274 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3275 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3276 _spdk_bs_load_ctx_fail(seq, ctx, -ENXIO); 3277 return; 3278 } 3279 3280 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) { 3281 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n", 3282 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size); 3283 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3284 return; 3285 } 3286 3287 if (ctx->super->size == 0) { 3288 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen; 3289 } 3290 3291 if (ctx->super->io_unit_size == 0) { 3292 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE; 3293 } 3294 3295 /* Parse the super block */ 3296 ctx->bs->clean = 1; 3297 ctx->bs->cluster_sz = ctx->super->cluster_size; 3298 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size; 3299 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 3300 ctx->bs->io_unit_size = ctx->super->io_unit_size; 3301 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3302 if (rc < 0) { 3303 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3304 return; 3305 } 3306 ctx->bs->md_start = ctx->super->md_start; 3307 ctx->bs->md_len = ctx->super->md_len; 3308 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up( 3309 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 3310 ctx->bs->super_blob = ctx->super->super_blob; 3311 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 3312 3313 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) { 3314 _spdk_bs_recover(seq, ctx); 3315 } else { 3316 _spdk_bs_load_read_used_pages(seq, ctx); 3317 } 3318 } 3319 3320 void 3321 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3322 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3323 { 3324 struct spdk_blob_store *bs; 3325 struct spdk_bs_cpl cpl; 3326 spdk_bs_sequence_t *seq; 3327 struct spdk_bs_load_ctx *ctx; 3328 struct spdk_bs_opts opts = {}; 3329 int err; 3330 3331 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 3332 3333 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3334 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen); 3335 dev->destroy(dev); 3336 cb_fn(cb_arg, NULL, -EINVAL); 3337 return; 3338 } 3339 3340 if (o) { 3341 opts = *o; 3342 } else { 3343 spdk_bs_opts_init(&opts); 3344 } 3345 3346 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 3347 dev->destroy(dev); 3348 cb_fn(cb_arg, NULL, -EINVAL); 3349 return; 3350 } 3351 3352 err = _spdk_bs_alloc(dev, &opts, &bs); 3353 if (err) { 3354 dev->destroy(dev); 3355 cb_fn(cb_arg, NULL, err); 3356 return; 3357 } 3358 3359 ctx = calloc(1, sizeof(*ctx)); 3360 if (!ctx) { 3361 _spdk_bs_free(bs); 3362 cb_fn(cb_arg, NULL, -ENOMEM); 3363 return; 3364 } 3365 3366 ctx->bs = bs; 3367 ctx->iter_cb_fn = opts.iter_cb_fn; 3368 ctx->iter_cb_arg = opts.iter_cb_arg; 3369 3370 /* Allocate memory for the super block */ 3371 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3372 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3373 if (!ctx->super) { 3374 free(ctx); 3375 _spdk_bs_free(bs); 3376 cb_fn(cb_arg, NULL, -ENOMEM); 3377 return; 3378 } 3379 3380 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3381 cpl.u.bs_handle.cb_fn = cb_fn; 3382 cpl.u.bs_handle.cb_arg = cb_arg; 3383 cpl.u.bs_handle.bs = bs; 3384 3385 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3386 if (!seq) { 3387 spdk_free(ctx->super); 3388 free(ctx); 3389 _spdk_bs_free(bs); 3390 cb_fn(cb_arg, NULL, -ENOMEM); 3391 return; 3392 } 3393 3394 /* Read the super block */ 3395 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3396 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3397 _spdk_bs_load_super_cpl, ctx); 3398 } 3399 3400 /* END spdk_bs_load */ 3401 3402 /* START spdk_bs_dump */ 3403 3404 struct spdk_bs_dump_ctx { 3405 struct spdk_blob_store *bs; 3406 struct spdk_bs_super_block *super; 3407 uint32_t cur_page; 3408 struct spdk_blob_md_page *page; 3409 spdk_bs_sequence_t *seq; 3410 FILE *fp; 3411 spdk_bs_dump_print_xattr print_xattr_fn; 3412 char xattr_name[4096]; 3413 }; 3414 3415 static void 3416 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno) 3417 { 3418 spdk_free(ctx->super); 3419 3420 /* 3421 * We need to defer calling spdk_bs_call_cpl() until after 3422 * dev destruction, so tuck these away for later use. 3423 */ 3424 ctx->bs->unload_err = bserrno; 3425 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3426 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3427 3428 spdk_bs_sequence_finish(seq, 0); 3429 _spdk_bs_free(ctx->bs); 3430 free(ctx); 3431 } 3432 3433 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3434 3435 static void 3436 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx) 3437 { 3438 uint32_t page_idx = ctx->cur_page; 3439 struct spdk_blob_md_page *page = ctx->page; 3440 struct spdk_blob_md_descriptor *desc; 3441 size_t cur_desc = 0; 3442 uint32_t crc; 3443 3444 fprintf(ctx->fp, "=========\n"); 3445 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx); 3446 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id); 3447 3448 crc = _spdk_blob_md_page_calc_crc(page); 3449 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch"); 3450 3451 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3452 while (cur_desc < sizeof(page->descriptors)) { 3453 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3454 if (desc->length == 0) { 3455 /* If padding and length are 0, this terminates the page */ 3456 break; 3457 } 3458 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3459 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3460 unsigned int i; 3461 3462 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3463 3464 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3465 if (desc_extent_rle->extents[i].cluster_idx != 0) { 3466 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 3467 desc_extent_rle->extents[i].cluster_idx); 3468 } else { 3469 fprintf(ctx->fp, "Unallocated Extent - "); 3470 } 3471 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length); 3472 fprintf(ctx->fp, "\n"); 3473 } 3474 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3475 struct spdk_blob_md_descriptor_xattr *desc_xattr; 3476 uint32_t i; 3477 3478 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 3479 3480 if (desc_xattr->length != 3481 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) + 3482 desc_xattr->name_length + desc_xattr->value_length) { 3483 } 3484 3485 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length); 3486 ctx->xattr_name[desc_xattr->name_length] = '\0'; 3487 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name); 3488 fprintf(ctx->fp, " value = \""); 3489 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name, 3490 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 3491 desc_xattr->value_length); 3492 fprintf(ctx->fp, "\"\n"); 3493 for (i = 0; i < desc_xattr->value_length; i++) { 3494 if (i % 16 == 0) { 3495 fprintf(ctx->fp, " "); 3496 } 3497 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i)); 3498 if ((i + 1) % 16 == 0) { 3499 fprintf(ctx->fp, "\n"); 3500 } 3501 } 3502 if (i % 16 != 0) { 3503 fprintf(ctx->fp, "\n"); 3504 } 3505 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3506 /* TODO */ 3507 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3508 /* TODO */ 3509 } else { 3510 /* Error */ 3511 } 3512 /* Advance to the next descriptor */ 3513 cur_desc += sizeof(*desc) + desc->length; 3514 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3515 break; 3516 } 3517 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3518 } 3519 } 3520 3521 static void 3522 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3523 { 3524 struct spdk_bs_dump_ctx *ctx = cb_arg; 3525 3526 if (bserrno != 0) { 3527 _spdk_bs_dump_finish(seq, ctx, bserrno); 3528 return; 3529 } 3530 3531 if (ctx->page->id != 0) { 3532 _spdk_bs_dump_print_md_page(ctx); 3533 } 3534 3535 ctx->cur_page++; 3536 3537 if (ctx->cur_page < ctx->super->md_len) { 3538 _spdk_bs_dump_read_md_page(seq, cb_arg); 3539 } else { 3540 spdk_free(ctx->page); 3541 _spdk_bs_dump_finish(seq, ctx, 0); 3542 } 3543 } 3544 3545 static void 3546 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3547 { 3548 struct spdk_bs_dump_ctx *ctx = cb_arg; 3549 uint64_t lba; 3550 3551 assert(ctx->cur_page < ctx->super->md_len); 3552 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 3553 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3554 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3555 _spdk_bs_dump_read_md_page_cpl, ctx); 3556 } 3557 3558 static void 3559 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3560 { 3561 struct spdk_bs_dump_ctx *ctx = cb_arg; 3562 3563 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature); 3564 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3565 sizeof(ctx->super->signature)) != 0) { 3566 fprintf(ctx->fp, "(Mismatch)\n"); 3567 _spdk_bs_dump_finish(seq, ctx, bserrno); 3568 return; 3569 } else { 3570 fprintf(ctx->fp, "(OK)\n"); 3571 } 3572 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version); 3573 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc, 3574 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch"); 3575 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype); 3576 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size); 3577 fprintf(ctx->fp, "Super Blob ID: "); 3578 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) { 3579 fprintf(ctx->fp, "(None)\n"); 3580 } else { 3581 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob); 3582 } 3583 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean); 3584 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start); 3585 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len); 3586 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start); 3587 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len); 3588 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start); 3589 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len); 3590 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start); 3591 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len); 3592 3593 ctx->cur_page = 0; 3594 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3595 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3596 if (!ctx->page) { 3597 _spdk_bs_dump_finish(seq, ctx, -ENOMEM); 3598 return; 3599 } 3600 _spdk_bs_dump_read_md_page(seq, cb_arg); 3601 } 3602 3603 void 3604 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn, 3605 spdk_bs_op_complete cb_fn, void *cb_arg) 3606 { 3607 struct spdk_blob_store *bs; 3608 struct spdk_bs_cpl cpl; 3609 spdk_bs_sequence_t *seq; 3610 struct spdk_bs_dump_ctx *ctx; 3611 struct spdk_bs_opts opts = {}; 3612 int err; 3613 3614 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev); 3615 3616 spdk_bs_opts_init(&opts); 3617 3618 err = _spdk_bs_alloc(dev, &opts, &bs); 3619 if (err) { 3620 dev->destroy(dev); 3621 cb_fn(cb_arg, err); 3622 return; 3623 } 3624 3625 ctx = calloc(1, sizeof(*ctx)); 3626 if (!ctx) { 3627 _spdk_bs_free(bs); 3628 cb_fn(cb_arg, -ENOMEM); 3629 return; 3630 } 3631 3632 ctx->bs = bs; 3633 ctx->fp = fp; 3634 ctx->print_xattr_fn = print_xattr_fn; 3635 3636 /* Allocate memory for the super block */ 3637 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3638 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3639 if (!ctx->super) { 3640 free(ctx); 3641 _spdk_bs_free(bs); 3642 cb_fn(cb_arg, -ENOMEM); 3643 return; 3644 } 3645 3646 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3647 cpl.u.bs_basic.cb_fn = cb_fn; 3648 cpl.u.bs_basic.cb_arg = cb_arg; 3649 3650 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3651 if (!seq) { 3652 spdk_free(ctx->super); 3653 free(ctx); 3654 _spdk_bs_free(bs); 3655 cb_fn(cb_arg, -ENOMEM); 3656 return; 3657 } 3658 3659 /* Read the super block */ 3660 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3661 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3662 _spdk_bs_dump_super_cpl, ctx); 3663 } 3664 3665 /* END spdk_bs_dump */ 3666 3667 /* START spdk_bs_init */ 3668 3669 struct spdk_bs_init_ctx { 3670 struct spdk_blob_store *bs; 3671 struct spdk_bs_super_block *super; 3672 }; 3673 3674 static void 3675 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3676 { 3677 struct spdk_bs_init_ctx *ctx = cb_arg; 3678 3679 spdk_free(ctx->super); 3680 free(ctx); 3681 3682 spdk_bs_sequence_finish(seq, bserrno); 3683 } 3684 3685 static void 3686 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3687 { 3688 struct spdk_bs_init_ctx *ctx = cb_arg; 3689 3690 /* Write super block */ 3691 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 3692 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 3693 _spdk_bs_init_persist_super_cpl, ctx); 3694 } 3695 3696 void 3697 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3698 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3699 { 3700 struct spdk_bs_init_ctx *ctx; 3701 struct spdk_blob_store *bs; 3702 struct spdk_bs_cpl cpl; 3703 spdk_bs_sequence_t *seq; 3704 spdk_bs_batch_t *batch; 3705 uint64_t num_md_lba; 3706 uint64_t num_md_pages; 3707 uint64_t num_md_clusters; 3708 uint32_t i; 3709 struct spdk_bs_opts opts = {}; 3710 int rc; 3711 3712 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 3713 3714 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3715 SPDK_ERRLOG("unsupported dev block length of %d\n", 3716 dev->blocklen); 3717 dev->destroy(dev); 3718 cb_fn(cb_arg, NULL, -EINVAL); 3719 return; 3720 } 3721 3722 if (o) { 3723 opts = *o; 3724 } else { 3725 spdk_bs_opts_init(&opts); 3726 } 3727 3728 if (_spdk_bs_opts_verify(&opts) != 0) { 3729 dev->destroy(dev); 3730 cb_fn(cb_arg, NULL, -EINVAL); 3731 return; 3732 } 3733 3734 rc = _spdk_bs_alloc(dev, &opts, &bs); 3735 if (rc) { 3736 dev->destroy(dev); 3737 cb_fn(cb_arg, NULL, rc); 3738 return; 3739 } 3740 3741 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 3742 /* By default, allocate 1 page per cluster. 3743 * Technically, this over-allocates metadata 3744 * because more metadata will reduce the number 3745 * of usable clusters. This can be addressed with 3746 * more complex math in the future. 3747 */ 3748 bs->md_len = bs->total_clusters; 3749 } else { 3750 bs->md_len = opts.num_md_pages; 3751 } 3752 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 3753 if (rc < 0) { 3754 _spdk_bs_free(bs); 3755 cb_fn(cb_arg, NULL, -ENOMEM); 3756 return; 3757 } 3758 3759 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 3760 if (rc < 0) { 3761 _spdk_bs_free(bs); 3762 cb_fn(cb_arg, NULL, -ENOMEM); 3763 return; 3764 } 3765 3766 ctx = calloc(1, sizeof(*ctx)); 3767 if (!ctx) { 3768 _spdk_bs_free(bs); 3769 cb_fn(cb_arg, NULL, -ENOMEM); 3770 return; 3771 } 3772 3773 ctx->bs = bs; 3774 3775 /* Allocate memory for the super block */ 3776 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3777 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3778 if (!ctx->super) { 3779 free(ctx); 3780 _spdk_bs_free(bs); 3781 cb_fn(cb_arg, NULL, -ENOMEM); 3782 return; 3783 } 3784 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3785 sizeof(ctx->super->signature)); 3786 ctx->super->version = SPDK_BS_VERSION; 3787 ctx->super->length = sizeof(*ctx->super); 3788 ctx->super->super_blob = bs->super_blob; 3789 ctx->super->clean = 0; 3790 ctx->super->cluster_size = bs->cluster_sz; 3791 ctx->super->io_unit_size = bs->io_unit_size; 3792 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 3793 3794 /* Calculate how many pages the metadata consumes at the front 3795 * of the disk. 3796 */ 3797 3798 /* The super block uses 1 page */ 3799 num_md_pages = 1; 3800 3801 /* The used_md_pages mask requires 1 bit per metadata page, rounded 3802 * up to the nearest page, plus a header. 3803 */ 3804 ctx->super->used_page_mask_start = num_md_pages; 3805 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3806 spdk_divide_round_up(bs->md_len, 8), 3807 SPDK_BS_PAGE_SIZE); 3808 num_md_pages += ctx->super->used_page_mask_len; 3809 3810 /* The used_clusters mask requires 1 bit per cluster, rounded 3811 * up to the nearest page, plus a header. 3812 */ 3813 ctx->super->used_cluster_mask_start = num_md_pages; 3814 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3815 spdk_divide_round_up(bs->total_clusters, 8), 3816 SPDK_BS_PAGE_SIZE); 3817 num_md_pages += ctx->super->used_cluster_mask_len; 3818 3819 /* The used_blobids mask requires 1 bit per metadata page, rounded 3820 * up to the nearest page, plus a header. 3821 */ 3822 ctx->super->used_blobid_mask_start = num_md_pages; 3823 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3824 spdk_divide_round_up(bs->md_len, 8), 3825 SPDK_BS_PAGE_SIZE); 3826 num_md_pages += ctx->super->used_blobid_mask_len; 3827 3828 /* The metadata region size was chosen above */ 3829 ctx->super->md_start = bs->md_start = num_md_pages; 3830 ctx->super->md_len = bs->md_len; 3831 num_md_pages += bs->md_len; 3832 3833 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 3834 3835 ctx->super->size = dev->blockcnt * dev->blocklen; 3836 3837 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 3838 3839 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster); 3840 if (num_md_clusters > bs->total_clusters) { 3841 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 3842 "please decrease number of pages reserved for metadata " 3843 "or increase cluster size.\n"); 3844 spdk_free(ctx->super); 3845 free(ctx); 3846 _spdk_bs_free(bs); 3847 cb_fn(cb_arg, NULL, -ENOMEM); 3848 return; 3849 } 3850 /* Claim all of the clusters used by the metadata */ 3851 for (i = 0; i < num_md_clusters; i++) { 3852 _spdk_bs_claim_cluster(bs, i); 3853 } 3854 3855 bs->total_data_clusters = bs->num_free_clusters; 3856 3857 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3858 cpl.u.bs_handle.cb_fn = cb_fn; 3859 cpl.u.bs_handle.cb_arg = cb_arg; 3860 cpl.u.bs_handle.bs = bs; 3861 3862 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3863 if (!seq) { 3864 spdk_free(ctx->super); 3865 free(ctx); 3866 _spdk_bs_free(bs); 3867 cb_fn(cb_arg, NULL, -ENOMEM); 3868 return; 3869 } 3870 3871 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 3872 3873 /* Clear metadata space */ 3874 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba); 3875 3876 switch (opts.clear_method) { 3877 case BS_CLEAR_WITH_UNMAP: 3878 /* Trim data clusters */ 3879 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3880 break; 3881 case BS_CLEAR_WITH_WRITE_ZEROES: 3882 /* Write_zeroes to data clusters */ 3883 spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3884 break; 3885 case BS_CLEAR_WITH_NONE: 3886 default: 3887 break; 3888 } 3889 3890 spdk_bs_batch_close(batch); 3891 } 3892 3893 /* END spdk_bs_init */ 3894 3895 /* START spdk_bs_destroy */ 3896 3897 static void 3898 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3899 { 3900 struct spdk_bs_init_ctx *ctx = cb_arg; 3901 struct spdk_blob_store *bs = ctx->bs; 3902 3903 /* 3904 * We need to defer calling spdk_bs_call_cpl() until after 3905 * dev destruction, so tuck these away for later use. 3906 */ 3907 bs->unload_err = bserrno; 3908 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3909 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3910 3911 spdk_bs_sequence_finish(seq, bserrno); 3912 3913 _spdk_bs_free(bs); 3914 free(ctx); 3915 } 3916 3917 void 3918 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 3919 void *cb_arg) 3920 { 3921 struct spdk_bs_cpl cpl; 3922 spdk_bs_sequence_t *seq; 3923 struct spdk_bs_init_ctx *ctx; 3924 3925 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 3926 3927 if (!TAILQ_EMPTY(&bs->blobs)) { 3928 SPDK_ERRLOG("Blobstore still has open blobs\n"); 3929 cb_fn(cb_arg, -EBUSY); 3930 return; 3931 } 3932 3933 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3934 cpl.u.bs_basic.cb_fn = cb_fn; 3935 cpl.u.bs_basic.cb_arg = cb_arg; 3936 3937 ctx = calloc(1, sizeof(*ctx)); 3938 if (!ctx) { 3939 cb_fn(cb_arg, -ENOMEM); 3940 return; 3941 } 3942 3943 ctx->bs = bs; 3944 3945 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3946 if (!seq) { 3947 free(ctx); 3948 cb_fn(cb_arg, -ENOMEM); 3949 return; 3950 } 3951 3952 /* Write zeroes to the super block */ 3953 spdk_bs_sequence_write_zeroes_dev(seq, 3954 _spdk_bs_page_to_lba(bs, 0), 3955 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 3956 _spdk_bs_destroy_trim_cpl, ctx); 3957 } 3958 3959 /* END spdk_bs_destroy */ 3960 3961 /* START spdk_bs_unload */ 3962 3963 static void 3964 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3965 { 3966 struct spdk_bs_load_ctx *ctx = cb_arg; 3967 3968 spdk_free(ctx->super); 3969 3970 /* 3971 * We need to defer calling spdk_bs_call_cpl() until after 3972 * dev destruction, so tuck these away for later use. 3973 */ 3974 ctx->bs->unload_err = bserrno; 3975 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3976 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3977 3978 spdk_bs_sequence_finish(seq, bserrno); 3979 3980 _spdk_bs_free(ctx->bs); 3981 free(ctx); 3982 } 3983 3984 static void 3985 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3986 { 3987 struct spdk_bs_load_ctx *ctx = cb_arg; 3988 3989 spdk_free(ctx->mask); 3990 ctx->super->clean = 1; 3991 3992 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 3993 } 3994 3995 static void 3996 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3997 { 3998 struct spdk_bs_load_ctx *ctx = cb_arg; 3999 4000 spdk_free(ctx->mask); 4001 ctx->mask = NULL; 4002 4003 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 4004 } 4005 4006 static void 4007 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4008 { 4009 struct spdk_bs_load_ctx *ctx = cb_arg; 4010 4011 spdk_free(ctx->mask); 4012 ctx->mask = NULL; 4013 4014 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl); 4015 } 4016 4017 static void 4018 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4019 { 4020 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 4021 } 4022 4023 void 4024 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 4025 { 4026 struct spdk_bs_cpl cpl; 4027 spdk_bs_sequence_t *seq; 4028 struct spdk_bs_load_ctx *ctx; 4029 4030 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 4031 4032 if (!TAILQ_EMPTY(&bs->blobs)) { 4033 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4034 cb_fn(cb_arg, -EBUSY); 4035 return; 4036 } 4037 4038 ctx = calloc(1, sizeof(*ctx)); 4039 if (!ctx) { 4040 cb_fn(cb_arg, -ENOMEM); 4041 return; 4042 } 4043 4044 ctx->bs = bs; 4045 4046 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4047 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4048 if (!ctx->super) { 4049 free(ctx); 4050 cb_fn(cb_arg, -ENOMEM); 4051 return; 4052 } 4053 4054 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4055 cpl.u.bs_basic.cb_fn = cb_fn; 4056 cpl.u.bs_basic.cb_arg = cb_arg; 4057 4058 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4059 if (!seq) { 4060 spdk_free(ctx->super); 4061 free(ctx); 4062 cb_fn(cb_arg, -ENOMEM); 4063 return; 4064 } 4065 4066 /* Read super block */ 4067 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4068 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4069 _spdk_bs_unload_read_super_cpl, ctx); 4070 } 4071 4072 /* END spdk_bs_unload */ 4073 4074 /* START spdk_bs_set_super */ 4075 4076 struct spdk_bs_set_super_ctx { 4077 struct spdk_blob_store *bs; 4078 struct spdk_bs_super_block *super; 4079 }; 4080 4081 static void 4082 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4083 { 4084 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4085 4086 if (bserrno != 0) { 4087 SPDK_ERRLOG("Unable to write to super block of blobstore\n"); 4088 } 4089 4090 spdk_free(ctx->super); 4091 4092 spdk_bs_sequence_finish(seq, bserrno); 4093 4094 free(ctx); 4095 } 4096 4097 static void 4098 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4099 { 4100 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4101 4102 if (bserrno != 0) { 4103 SPDK_ERRLOG("Unable to read super block of blobstore\n"); 4104 spdk_free(ctx->super); 4105 spdk_bs_sequence_finish(seq, bserrno); 4106 free(ctx); 4107 return; 4108 } 4109 4110 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx); 4111 } 4112 4113 void 4114 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 4115 spdk_bs_op_complete cb_fn, void *cb_arg) 4116 { 4117 struct spdk_bs_cpl cpl; 4118 spdk_bs_sequence_t *seq; 4119 struct spdk_bs_set_super_ctx *ctx; 4120 4121 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n"); 4122 4123 ctx = calloc(1, sizeof(*ctx)); 4124 if (!ctx) { 4125 cb_fn(cb_arg, -ENOMEM); 4126 return; 4127 } 4128 4129 ctx->bs = bs; 4130 4131 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4132 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4133 if (!ctx->super) { 4134 free(ctx); 4135 cb_fn(cb_arg, -ENOMEM); 4136 return; 4137 } 4138 4139 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4140 cpl.u.bs_basic.cb_fn = cb_fn; 4141 cpl.u.bs_basic.cb_arg = cb_arg; 4142 4143 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4144 if (!seq) { 4145 spdk_free(ctx->super); 4146 free(ctx); 4147 cb_fn(cb_arg, -ENOMEM); 4148 return; 4149 } 4150 4151 bs->super_blob = blobid; 4152 4153 /* Read super block */ 4154 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4155 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4156 _spdk_bs_set_super_read_cpl, ctx); 4157 } 4158 4159 /* END spdk_bs_set_super */ 4160 4161 void 4162 spdk_bs_get_super(struct spdk_blob_store *bs, 4163 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4164 { 4165 if (bs->super_blob == SPDK_BLOBID_INVALID) { 4166 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 4167 } else { 4168 cb_fn(cb_arg, bs->super_blob, 0); 4169 } 4170 } 4171 4172 uint64_t 4173 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 4174 { 4175 return bs->cluster_sz; 4176 } 4177 4178 uint64_t 4179 spdk_bs_get_page_size(struct spdk_blob_store *bs) 4180 { 4181 return SPDK_BS_PAGE_SIZE; 4182 } 4183 4184 uint64_t 4185 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs) 4186 { 4187 return bs->io_unit_size; 4188 } 4189 4190 uint64_t 4191 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 4192 { 4193 return bs->num_free_clusters; 4194 } 4195 4196 uint64_t 4197 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 4198 { 4199 return bs->total_data_clusters; 4200 } 4201 4202 static int 4203 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 4204 { 4205 bs->md_channel = spdk_get_io_channel(bs); 4206 if (!bs->md_channel) { 4207 SPDK_ERRLOG("Failed to get IO channel.\n"); 4208 return -1; 4209 } 4210 4211 return 0; 4212 } 4213 4214 static int 4215 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 4216 { 4217 spdk_put_io_channel(bs->md_channel); 4218 4219 return 0; 4220 } 4221 4222 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 4223 { 4224 assert(blob != NULL); 4225 4226 return blob->id; 4227 } 4228 4229 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 4230 { 4231 assert(blob != NULL); 4232 4233 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 4234 } 4235 4236 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob) 4237 { 4238 assert(blob != NULL); 4239 4240 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs); 4241 } 4242 4243 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 4244 { 4245 assert(blob != NULL); 4246 4247 return blob->active.num_clusters; 4248 } 4249 4250 /* START spdk_bs_create_blob */ 4251 4252 static void 4253 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4254 { 4255 struct spdk_blob *blob = cb_arg; 4256 4257 _spdk_blob_free(blob); 4258 4259 spdk_bs_sequence_finish(seq, bserrno); 4260 } 4261 4262 static int 4263 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs, 4264 bool internal) 4265 { 4266 uint64_t i; 4267 size_t value_len = 0; 4268 int rc; 4269 const void *value = NULL; 4270 if (xattrs->count > 0 && xattrs->get_value == NULL) { 4271 return -EINVAL; 4272 } 4273 for (i = 0; i < xattrs->count; i++) { 4274 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len); 4275 if (value == NULL || value_len == 0) { 4276 return -EINVAL; 4277 } 4278 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal); 4279 if (rc < 0) { 4280 return rc; 4281 } 4282 } 4283 return 0; 4284 } 4285 4286 static void 4287 _spdk_bs_create_blob(struct spdk_blob_store *bs, 4288 const struct spdk_blob_opts *opts, 4289 const struct spdk_blob_xattr_opts *internal_xattrs, 4290 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4291 { 4292 struct spdk_blob *blob; 4293 uint32_t page_idx; 4294 struct spdk_bs_cpl cpl; 4295 struct spdk_blob_opts opts_default; 4296 struct spdk_blob_xattr_opts internal_xattrs_default; 4297 spdk_bs_sequence_t *seq; 4298 spdk_blob_id id; 4299 int rc; 4300 4301 assert(spdk_get_thread() == bs->md_thread); 4302 4303 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 4304 if (page_idx == UINT32_MAX) { 4305 cb_fn(cb_arg, 0, -ENOMEM); 4306 return; 4307 } 4308 spdk_bit_array_set(bs->used_blobids, page_idx); 4309 spdk_bit_array_set(bs->used_md_pages, page_idx); 4310 4311 id = _spdk_bs_page_to_blobid(page_idx); 4312 4313 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 4314 4315 blob = _spdk_blob_alloc(bs, id); 4316 if (!blob) { 4317 cb_fn(cb_arg, 0, -ENOMEM); 4318 return; 4319 } 4320 4321 if (!opts) { 4322 spdk_blob_opts_init(&opts_default); 4323 opts = &opts_default; 4324 } 4325 if (!internal_xattrs) { 4326 _spdk_blob_xattrs_init(&internal_xattrs_default); 4327 internal_xattrs = &internal_xattrs_default; 4328 } 4329 4330 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false); 4331 if (rc < 0) { 4332 _spdk_blob_free(blob); 4333 cb_fn(cb_arg, 0, rc); 4334 return; 4335 } 4336 4337 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true); 4338 if (rc < 0) { 4339 _spdk_blob_free(blob); 4340 cb_fn(cb_arg, 0, rc); 4341 return; 4342 } 4343 4344 if (opts->thin_provision) { 4345 _spdk_blob_set_thin_provision(blob); 4346 } 4347 4348 rc = _spdk_blob_resize(blob, opts->num_clusters); 4349 if (rc < 0) { 4350 _spdk_blob_free(blob); 4351 cb_fn(cb_arg, 0, rc); 4352 return; 4353 } 4354 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4355 cpl.u.blobid.cb_fn = cb_fn; 4356 cpl.u.blobid.cb_arg = cb_arg; 4357 cpl.u.blobid.blobid = blob->id; 4358 4359 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4360 if (!seq) { 4361 _spdk_blob_free(blob); 4362 cb_fn(cb_arg, 0, -ENOMEM); 4363 return; 4364 } 4365 4366 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 4367 } 4368 4369 void spdk_bs_create_blob(struct spdk_blob_store *bs, 4370 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4371 { 4372 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg); 4373 } 4374 4375 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 4376 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4377 { 4378 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg); 4379 } 4380 4381 /* END spdk_bs_create_blob */ 4382 4383 /* START blob_cleanup */ 4384 4385 struct spdk_clone_snapshot_ctx { 4386 struct spdk_bs_cpl cpl; 4387 int bserrno; 4388 bool frozen; 4389 4390 struct spdk_io_channel *channel; 4391 4392 /* Current cluster for inflate operation */ 4393 uint64_t cluster; 4394 4395 /* For inflation force allocation of all unallocated clusters and remove 4396 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */ 4397 bool allocate_all; 4398 4399 struct { 4400 spdk_blob_id id; 4401 struct spdk_blob *blob; 4402 } original; 4403 struct { 4404 spdk_blob_id id; 4405 struct spdk_blob *blob; 4406 } new; 4407 4408 /* xattrs specified for snapshot/clones only. They have no impact on 4409 * the original blobs xattrs. */ 4410 const struct spdk_blob_xattr_opts *xattrs; 4411 }; 4412 4413 static void 4414 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) 4415 { 4416 struct spdk_clone_snapshot_ctx *ctx = cb_arg; 4417 struct spdk_bs_cpl *cpl = &ctx->cpl; 4418 4419 if (bserrno != 0) { 4420 if (ctx->bserrno != 0) { 4421 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4422 } else { 4423 ctx->bserrno = bserrno; 4424 } 4425 } 4426 4427 switch (cpl->type) { 4428 case SPDK_BS_CPL_TYPE_BLOBID: 4429 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno); 4430 break; 4431 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 4432 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno); 4433 break; 4434 default: 4435 SPDK_UNREACHABLE(); 4436 break; 4437 } 4438 4439 free(ctx); 4440 } 4441 4442 static void 4443 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 4444 { 4445 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4446 struct spdk_blob *origblob = ctx->original.blob; 4447 4448 if (bserrno != 0) { 4449 if (ctx->bserrno != 0) { 4450 SPDK_ERRLOG("Unfreeze error %d\n", bserrno); 4451 } else { 4452 ctx->bserrno = bserrno; 4453 } 4454 } 4455 4456 ctx->original.id = origblob->id; 4457 origblob->locked_operation_in_progress = false; 4458 4459 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4460 } 4461 4462 static void 4463 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) 4464 { 4465 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4466 struct spdk_blob *origblob = ctx->original.blob; 4467 4468 if (bserrno != 0) { 4469 if (ctx->bserrno != 0) { 4470 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4471 } else { 4472 ctx->bserrno = bserrno; 4473 } 4474 } 4475 4476 if (ctx->frozen) { 4477 /* Unfreeze any outstanding I/O */ 4478 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx); 4479 } else { 4480 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0); 4481 } 4482 4483 } 4484 4485 static void 4486 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno) 4487 { 4488 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4489 struct spdk_blob *newblob = ctx->new.blob; 4490 4491 if (bserrno != 0) { 4492 if (ctx->bserrno != 0) { 4493 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4494 } else { 4495 ctx->bserrno = bserrno; 4496 } 4497 } 4498 4499 ctx->new.id = newblob->id; 4500 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4501 } 4502 4503 /* END blob_cleanup */ 4504 4505 /* START spdk_bs_create_snapshot */ 4506 4507 static void 4508 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2) 4509 { 4510 uint64_t *cluster_temp; 4511 4512 cluster_temp = blob1->active.clusters; 4513 blob1->active.clusters = blob2->active.clusters; 4514 blob2->active.clusters = cluster_temp; 4515 } 4516 4517 static void 4518 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno) 4519 { 4520 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4521 struct spdk_blob *origblob = ctx->original.blob; 4522 struct spdk_blob *newblob = ctx->new.blob; 4523 4524 if (bserrno != 0) { 4525 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4526 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4527 return; 4528 } 4529 4530 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */ 4531 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true); 4532 if (bserrno != 0) { 4533 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4534 return; 4535 } 4536 4537 _spdk_bs_blob_list_add(ctx->original.blob); 4538 4539 spdk_blob_set_read_only(newblob); 4540 4541 /* sync snapshot metadata */ 4542 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg); 4543 } 4544 4545 static void 4546 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) 4547 { 4548 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4549 struct spdk_blob *origblob = ctx->original.blob; 4550 struct spdk_blob *newblob = ctx->new.blob; 4551 4552 if (bserrno != 0) { 4553 /* return cluster map back to original */ 4554 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4555 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4556 return; 4557 } 4558 4559 /* Set internal xattr for snapshot id */ 4560 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true); 4561 if (bserrno != 0) { 4562 /* return cluster map back to original */ 4563 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4564 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4565 return; 4566 } 4567 4568 _spdk_bs_blob_list_remove(origblob); 4569 origblob->parent_id = newblob->id; 4570 4571 /* Create new back_bs_dev for snapshot */ 4572 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob); 4573 if (origblob->back_bs_dev == NULL) { 4574 /* return cluster map back to original */ 4575 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4576 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL); 4577 return; 4578 } 4579 4580 /* set clone blob as thin provisioned */ 4581 _spdk_blob_set_thin_provision(origblob); 4582 4583 _spdk_bs_blob_list_add(newblob); 4584 4585 /* sync clone metadata */ 4586 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx); 4587 } 4588 4589 static void 4590 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc) 4591 { 4592 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4593 struct spdk_blob *origblob = ctx->original.blob; 4594 struct spdk_blob *newblob = ctx->new.blob; 4595 int bserrno; 4596 4597 if (rc != 0) { 4598 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc); 4599 return; 4600 } 4601 4602 ctx->frozen = true; 4603 4604 /* set new back_bs_dev for snapshot */ 4605 newblob->back_bs_dev = origblob->back_bs_dev; 4606 /* Set invalid flags from origblob */ 4607 newblob->invalid_flags = origblob->invalid_flags; 4608 4609 /* inherit parent from original blob if set */ 4610 newblob->parent_id = origblob->parent_id; 4611 if (origblob->parent_id != SPDK_BLOBID_INVALID) { 4612 /* Set internal xattr for snapshot id */ 4613 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT, 4614 &origblob->parent_id, sizeof(spdk_blob_id), true); 4615 if (bserrno != 0) { 4616 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4617 return; 4618 } 4619 } 4620 4621 /* swap cluster maps */ 4622 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4623 4624 /* sync snapshot metadata */ 4625 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); 4626 } 4627 4628 static void 4629 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4630 { 4631 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4632 struct spdk_blob *origblob = ctx->original.blob; 4633 struct spdk_blob *newblob = _blob; 4634 4635 if (bserrno != 0) { 4636 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4637 return; 4638 } 4639 4640 ctx->new.blob = newblob; 4641 assert(spdk_blob_is_thin_provisioned(newblob)); 4642 assert(spdk_mem_all_zero(newblob->active.clusters, 4643 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 4644 4645 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx); 4646 } 4647 4648 static void 4649 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4650 { 4651 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4652 struct spdk_blob *origblob = ctx->original.blob; 4653 4654 if (bserrno != 0) { 4655 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4656 return; 4657 } 4658 4659 ctx->new.id = blobid; 4660 ctx->cpl.u.blobid.blobid = blobid; 4661 4662 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx); 4663 } 4664 4665 4666 static void 4667 _spdk_bs_xattr_snapshot(void *arg, const char *name, 4668 const void **value, size_t *value_len) 4669 { 4670 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0); 4671 4672 struct spdk_blob *blob = (struct spdk_blob *)arg; 4673 *value = &blob->id; 4674 *value_len = sizeof(blob->id); 4675 } 4676 4677 static void 4678 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4679 { 4680 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4681 struct spdk_blob_opts opts; 4682 struct spdk_blob_xattr_opts internal_xattrs; 4683 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS }; 4684 4685 if (bserrno != 0) { 4686 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4687 return; 4688 } 4689 4690 ctx->original.blob = _blob; 4691 4692 if (_blob->data_ro || _blob->md_ro) { 4693 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n", 4694 _blob->id); 4695 ctx->bserrno = -EINVAL; 4696 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4697 return; 4698 } 4699 4700 if (_blob->locked_operation_in_progress) { 4701 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n"); 4702 ctx->bserrno = -EBUSY; 4703 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4704 return; 4705 } 4706 4707 _blob->locked_operation_in_progress = true; 4708 4709 spdk_blob_opts_init(&opts); 4710 _spdk_blob_xattrs_init(&internal_xattrs); 4711 4712 /* Change the size of new blob to the same as in original blob, 4713 * but do not allocate clusters */ 4714 opts.thin_provision = true; 4715 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4716 4717 /* If there are any xattrs specified for snapshot, set them now */ 4718 if (ctx->xattrs) { 4719 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4720 } 4721 /* Set internal xattr SNAPSHOT_IN_PROGRESS */ 4722 internal_xattrs.count = 1; 4723 internal_xattrs.ctx = _blob; 4724 internal_xattrs.names = xattrs_names; 4725 internal_xattrs.get_value = _spdk_bs_xattr_snapshot; 4726 4727 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4728 _spdk_bs_snapshot_newblob_create_cpl, ctx); 4729 } 4730 4731 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, 4732 const struct spdk_blob_xattr_opts *snapshot_xattrs, 4733 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4734 { 4735 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4736 4737 if (!ctx) { 4738 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4739 return; 4740 } 4741 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4742 ctx->cpl.u.blobid.cb_fn = cb_fn; 4743 ctx->cpl.u.blobid.cb_arg = cb_arg; 4744 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4745 ctx->bserrno = 0; 4746 ctx->frozen = false; 4747 ctx->original.id = blobid; 4748 ctx->xattrs = snapshot_xattrs; 4749 4750 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx); 4751 } 4752 /* END spdk_bs_create_snapshot */ 4753 4754 /* START spdk_bs_create_clone */ 4755 4756 static void 4757 _spdk_bs_xattr_clone(void *arg, const char *name, 4758 const void **value, size_t *value_len) 4759 { 4760 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0); 4761 4762 struct spdk_blob *blob = (struct spdk_blob *)arg; 4763 *value = &blob->id; 4764 *value_len = sizeof(blob->id); 4765 } 4766 4767 static void 4768 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4769 { 4770 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4771 struct spdk_blob *clone = _blob; 4772 4773 ctx->new.blob = clone; 4774 _spdk_bs_blob_list_add(clone); 4775 4776 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4777 } 4778 4779 static void 4780 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4781 { 4782 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4783 4784 ctx->cpl.u.blobid.blobid = blobid; 4785 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx); 4786 } 4787 4788 static void 4789 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4790 { 4791 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4792 struct spdk_blob_opts opts; 4793 struct spdk_blob_xattr_opts internal_xattrs; 4794 char *xattr_names[] = { BLOB_SNAPSHOT }; 4795 4796 if (bserrno != 0) { 4797 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4798 return; 4799 } 4800 4801 ctx->original.blob = _blob; 4802 4803 if (!_blob->data_ro || !_blob->md_ro) { 4804 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n"); 4805 ctx->bserrno = -EINVAL; 4806 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4807 return; 4808 } 4809 4810 if (_blob->locked_operation_in_progress) { 4811 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n"); 4812 ctx->bserrno = -EBUSY; 4813 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4814 return; 4815 } 4816 4817 _blob->locked_operation_in_progress = true; 4818 4819 spdk_blob_opts_init(&opts); 4820 _spdk_blob_xattrs_init(&internal_xattrs); 4821 4822 opts.thin_provision = true; 4823 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4824 if (ctx->xattrs) { 4825 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4826 } 4827 4828 /* Set internal xattr BLOB_SNAPSHOT */ 4829 internal_xattrs.count = 1; 4830 internal_xattrs.ctx = _blob; 4831 internal_xattrs.names = xattr_names; 4832 internal_xattrs.get_value = _spdk_bs_xattr_clone; 4833 4834 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4835 _spdk_bs_clone_newblob_create_cpl, ctx); 4836 } 4837 4838 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid, 4839 const struct spdk_blob_xattr_opts *clone_xattrs, 4840 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4841 { 4842 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4843 4844 if (!ctx) { 4845 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4846 return; 4847 } 4848 4849 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4850 ctx->cpl.u.blobid.cb_fn = cb_fn; 4851 ctx->cpl.u.blobid.cb_arg = cb_arg; 4852 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4853 ctx->bserrno = 0; 4854 ctx->xattrs = clone_xattrs; 4855 ctx->original.id = blobid; 4856 4857 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx); 4858 } 4859 4860 /* END spdk_bs_create_clone */ 4861 4862 /* START spdk_bs_inflate_blob */ 4863 4864 static void 4865 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno) 4866 { 4867 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4868 struct spdk_blob *_blob = ctx->original.blob; 4869 4870 if (bserrno != 0) { 4871 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4872 return; 4873 } 4874 4875 assert(_parent != NULL); 4876 4877 _spdk_bs_blob_list_remove(_blob); 4878 _blob->parent_id = _parent->id; 4879 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id, 4880 sizeof(spdk_blob_id), true); 4881 4882 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4883 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent); 4884 _spdk_bs_blob_list_add(_blob); 4885 4886 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4887 } 4888 4889 static void 4890 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno) 4891 { 4892 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4893 struct spdk_blob *_blob = ctx->original.blob; 4894 struct spdk_blob *_parent; 4895 4896 if (bserrno != 0) { 4897 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4898 return; 4899 } 4900 4901 if (ctx->allocate_all) { 4902 /* remove thin provisioning */ 4903 _spdk_bs_blob_list_remove(_blob); 4904 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4905 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV; 4906 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4907 _blob->back_bs_dev = NULL; 4908 _blob->parent_id = SPDK_BLOBID_INVALID; 4909 } else { 4910 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob; 4911 if (_parent->parent_id != SPDK_BLOBID_INVALID) { 4912 /* We must change the parent of the inflated blob */ 4913 spdk_bs_open_blob(_blob->bs, _parent->parent_id, 4914 _spdk_bs_inflate_blob_set_parent_cpl, ctx); 4915 return; 4916 } 4917 4918 _spdk_bs_blob_list_remove(_blob); 4919 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4920 _blob->parent_id = SPDK_BLOBID_INVALID; 4921 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4922 _blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 4923 } 4924 4925 _blob->state = SPDK_BLOB_STATE_DIRTY; 4926 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4927 } 4928 4929 /* Check if cluster needs allocation */ 4930 static inline bool 4931 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all) 4932 { 4933 struct spdk_blob_bs_dev *b; 4934 4935 assert(blob != NULL); 4936 4937 if (blob->active.clusters[cluster] != 0) { 4938 /* Cluster is already allocated */ 4939 return false; 4940 } 4941 4942 if (blob->parent_id == SPDK_BLOBID_INVALID) { 4943 /* Blob have no parent blob */ 4944 return allocate_all; 4945 } 4946 4947 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev; 4948 return (allocate_all || b->blob->active.clusters[cluster] != 0); 4949 } 4950 4951 static void 4952 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno) 4953 { 4954 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4955 struct spdk_blob *_blob = ctx->original.blob; 4956 uint64_t offset; 4957 4958 if (bserrno != 0) { 4959 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4960 return; 4961 } 4962 4963 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) { 4964 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) { 4965 break; 4966 } 4967 } 4968 4969 if (ctx->cluster < _blob->active.num_clusters) { 4970 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster); 4971 4972 /* We may safely increment a cluster before write */ 4973 ctx->cluster++; 4974 4975 /* Use zero length write to touch a cluster */ 4976 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0, 4977 _spdk_bs_inflate_blob_touch_next, ctx); 4978 } else { 4979 _spdk_bs_inflate_blob_done(cb_arg, bserrno); 4980 } 4981 } 4982 4983 static void 4984 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4985 { 4986 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4987 uint64_t lfc; /* lowest free cluster */ 4988 uint64_t i; 4989 4990 if (bserrno != 0) { 4991 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4992 return; 4993 } 4994 4995 ctx->original.blob = _blob; 4996 4997 if (_blob->locked_operation_in_progress) { 4998 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n"); 4999 ctx->bserrno = -EBUSY; 5000 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 5001 return; 5002 } 5003 5004 _blob->locked_operation_in_progress = true; 5005 5006 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) { 5007 /* This blob have no parent, so we cannot decouple it. */ 5008 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n"); 5009 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL); 5010 return; 5011 } 5012 5013 if (spdk_blob_is_thin_provisioned(_blob) == false) { 5014 /* This is not thin provisioned blob. No need to inflate. */ 5015 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0); 5016 return; 5017 } 5018 5019 /* Do two passes - one to verify that we can obtain enough clusters 5020 * and another to actually claim them. 5021 */ 5022 lfc = 0; 5023 for (i = 0; i < _blob->active.num_clusters; i++) { 5024 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) { 5025 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc); 5026 if (lfc == UINT32_MAX) { 5027 /* No more free clusters. Cannot satisfy the request */ 5028 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC); 5029 return; 5030 } 5031 lfc++; 5032 } 5033 } 5034 5035 ctx->cluster = 0; 5036 _spdk_bs_inflate_blob_touch_next(ctx, 0); 5037 } 5038 5039 static void 5040 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5041 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg) 5042 { 5043 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5044 5045 if (!ctx) { 5046 cb_fn(cb_arg, -ENOMEM); 5047 return; 5048 } 5049 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5050 ctx->cpl.u.bs_basic.cb_fn = cb_fn; 5051 ctx->cpl.u.bs_basic.cb_arg = cb_arg; 5052 ctx->bserrno = 0; 5053 ctx->original.id = blobid; 5054 ctx->channel = channel; 5055 ctx->allocate_all = allocate_all; 5056 5057 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx); 5058 } 5059 5060 void 5061 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5062 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5063 { 5064 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg); 5065 } 5066 5067 void 5068 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5069 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5070 { 5071 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg); 5072 } 5073 /* END spdk_bs_inflate_blob */ 5074 5075 /* START spdk_blob_resize */ 5076 struct spdk_bs_resize_ctx { 5077 spdk_blob_op_complete cb_fn; 5078 void *cb_arg; 5079 struct spdk_blob *blob; 5080 uint64_t sz; 5081 int rc; 5082 }; 5083 5084 static void 5085 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc) 5086 { 5087 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5088 5089 if (rc != 0) { 5090 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc); 5091 } 5092 5093 if (ctx->rc != 0) { 5094 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc); 5095 rc = ctx->rc; 5096 } 5097 5098 ctx->blob->locked_operation_in_progress = false; 5099 5100 ctx->cb_fn(ctx->cb_arg, rc); 5101 free(ctx); 5102 } 5103 5104 static void 5105 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc) 5106 { 5107 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5108 5109 if (rc != 0) { 5110 ctx->blob->locked_operation_in_progress = false; 5111 ctx->cb_fn(ctx->cb_arg, rc); 5112 free(ctx); 5113 return; 5114 } 5115 5116 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz); 5117 5118 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx); 5119 } 5120 5121 void 5122 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg) 5123 { 5124 struct spdk_bs_resize_ctx *ctx; 5125 5126 _spdk_blob_verify_md_op(blob); 5127 5128 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 5129 5130 if (blob->md_ro) { 5131 cb_fn(cb_arg, -EPERM); 5132 return; 5133 } 5134 5135 if (sz == blob->active.num_clusters) { 5136 cb_fn(cb_arg, 0); 5137 return; 5138 } 5139 5140 if (blob->locked_operation_in_progress) { 5141 cb_fn(cb_arg, -EBUSY); 5142 return; 5143 } 5144 5145 ctx = calloc(1, sizeof(*ctx)); 5146 if (!ctx) { 5147 cb_fn(cb_arg, -ENOMEM); 5148 return; 5149 } 5150 5151 blob->locked_operation_in_progress = true; 5152 ctx->cb_fn = cb_fn; 5153 ctx->cb_arg = cb_arg; 5154 ctx->blob = blob; 5155 ctx->sz = sz; 5156 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx); 5157 } 5158 5159 /* END spdk_blob_resize */ 5160 5161 5162 /* START spdk_bs_delete_blob */ 5163 5164 static void 5165 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 5166 { 5167 spdk_bs_sequence_t *seq = cb_arg; 5168 5169 spdk_bs_sequence_finish(seq, bserrno); 5170 } 5171 5172 static void 5173 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5174 { 5175 struct spdk_blob *blob = cb_arg; 5176 5177 if (bserrno != 0) { 5178 /* 5179 * We already removed this blob from the blobstore tailq, so 5180 * we need to free it here since this is the last reference 5181 * to it. 5182 */ 5183 _spdk_blob_free(blob); 5184 _spdk_bs_delete_close_cpl(seq, bserrno); 5185 return; 5186 } 5187 5188 /* 5189 * This will immediately decrement the ref_count and call 5190 * the completion routine since the metadata state is clean. 5191 * By calling spdk_blob_close, we reduce the number of call 5192 * points into code that touches the blob->open_ref count 5193 * and the blobstore's blob list. 5194 */ 5195 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq); 5196 } 5197 5198 struct delete_snapshot_ctx { 5199 struct spdk_blob_list *parent_snapshot_entry; 5200 struct spdk_blob *snapshot; 5201 bool snapshot_md_ro; 5202 struct spdk_blob *clone; 5203 bool clone_md_ro; 5204 spdk_blob_op_with_handle_complete cb_fn; 5205 void *cb_arg; 5206 int bserrno; 5207 }; 5208 5209 static void 5210 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno) 5211 { 5212 struct delete_snapshot_ctx *ctx = cb_arg; 5213 5214 if (bserrno != 0) { 5215 SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno); 5216 } 5217 5218 assert(ctx != NULL); 5219 5220 if (bserrno != 0 && ctx->bserrno == 0) { 5221 ctx->bserrno = bserrno; 5222 } 5223 5224 ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno); 5225 free(ctx); 5226 } 5227 5228 static void 5229 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno) 5230 { 5231 struct delete_snapshot_ctx *ctx = cb_arg; 5232 5233 if (bserrno != 0) { 5234 ctx->bserrno = bserrno; 5235 SPDK_ERRLOG("Clone cleanup error %d\n", bserrno); 5236 } 5237 5238 /* open_ref == 1 menas that only deletion context has opened this snapshot 5239 * open_ref == 2 menas that clone has opened this snapshot as well, 5240 * so we have to add it back to the blobs list */ 5241 if (ctx->snapshot->open_ref == 2) { 5242 TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link); 5243 } 5244 5245 ctx->snapshot->locked_operation_in_progress = false; 5246 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5247 5248 spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx); 5249 } 5250 5251 static void 5252 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno) 5253 { 5254 struct delete_snapshot_ctx *ctx = cb_arg; 5255 5256 ctx->clone->locked_operation_in_progress = false; 5257 ctx->clone->md_ro = ctx->clone_md_ro; 5258 5259 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5260 } 5261 5262 static void 5263 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 5264 { 5265 struct delete_snapshot_ctx *ctx = cb_arg; 5266 5267 if (bserrno) { 5268 ctx->bserrno = bserrno; 5269 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5270 return; 5271 } 5272 5273 ctx->clone->locked_operation_in_progress = false; 5274 spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx); 5275 } 5276 5277 static void 5278 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno) 5279 { 5280 struct delete_snapshot_ctx *ctx = cb_arg; 5281 struct spdk_blob_list *parent_snapshot_entry = NULL; 5282 struct spdk_blob_list *snapshot_entry = NULL; 5283 struct spdk_blob_list *clone_entry = NULL; 5284 struct spdk_blob_list *snapshot_clone_entry = NULL; 5285 5286 if (bserrno) { 5287 SPDK_ERRLOG("Failed to sync MD on blob\n"); 5288 ctx->bserrno = bserrno; 5289 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5290 return; 5291 } 5292 5293 /* Get snapshot entry for the snapshot we want to remove */ 5294 snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id); 5295 5296 assert(snapshot_entry != NULL); 5297 5298 /* Remove clone entry in this snapshot (at this point there can be only one clone) */ 5299 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5300 assert(clone_entry != NULL); 5301 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 5302 snapshot_entry->clone_count--; 5303 assert(TAILQ_EMPTY(&snapshot_entry->clones)); 5304 5305 if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) { 5306 /* This snapshot is at the same time a clone of another snapshot - we need to 5307 * update parent snapshot (remove current clone, add new one inherited from 5308 * the snapshot that is being removed) */ 5309 5310 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5311 * snapshot that we are removing */ 5312 _spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry, 5313 &snapshot_clone_entry); 5314 5315 /* Switch clone entry in parent snapshot */ 5316 TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link); 5317 TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link); 5318 free(snapshot_clone_entry); 5319 } else { 5320 /* No parent snapshot - just remove clone entry */ 5321 free(clone_entry); 5322 } 5323 5324 /* Restore md_ro flags */ 5325 ctx->clone->md_ro = ctx->clone_md_ro; 5326 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5327 5328 _spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx); 5329 } 5330 5331 static void 5332 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno) 5333 { 5334 struct delete_snapshot_ctx *ctx = cb_arg; 5335 uint64_t i; 5336 5337 ctx->snapshot->md_ro = false; 5338 5339 if (bserrno) { 5340 SPDK_ERRLOG("Failed to sync MD on clone\n"); 5341 ctx->bserrno = bserrno; 5342 5343 /* Restore snapshot to previous state */ 5344 bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true); 5345 if (bserrno != 0) { 5346 _spdk_delete_snapshot_cleanup_clone(ctx, bserrno); 5347 return; 5348 } 5349 5350 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx); 5351 return; 5352 } 5353 5354 /* Clear cluster map entries for snapshot */ 5355 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5356 if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) { 5357 ctx->snapshot->active.clusters[i] = 0; 5358 } 5359 } 5360 5361 ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY; 5362 5363 if (ctx->parent_snapshot_entry != NULL) { 5364 ctx->snapshot->back_bs_dev = NULL; 5365 } 5366 5367 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx); 5368 } 5369 5370 static void 5371 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno) 5372 { 5373 struct delete_snapshot_ctx *ctx = cb_arg; 5374 uint64_t i; 5375 5376 /* Temporarily override md_ro flag for clone for MD modification */ 5377 ctx->clone_md_ro = ctx->clone->md_ro; 5378 ctx->clone->md_ro = false; 5379 5380 if (bserrno) { 5381 SPDK_ERRLOG("Failed to sync MD with xattr on blob\n"); 5382 ctx->bserrno = bserrno; 5383 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5384 return; 5385 } 5386 5387 /* Copy snapshot map to clone map (only unallocated clusters in clone) */ 5388 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5389 if (ctx->clone->active.clusters[i] == 0) { 5390 ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i]; 5391 } 5392 } 5393 5394 /* Delete old backing bs_dev from clone (related to snapshot that will be removed) */ 5395 ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev); 5396 5397 /* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */ 5398 if (ctx->parent_snapshot_entry != NULL) { 5399 /* ...to parent snapshot */ 5400 ctx->clone->parent_id = ctx->parent_snapshot_entry->id; 5401 ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev; 5402 _spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id, 5403 sizeof(spdk_blob_id), 5404 true); 5405 } else { 5406 /* ...to blobid invalid and zeroes dev */ 5407 ctx->clone->parent_id = SPDK_BLOBID_INVALID; 5408 ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev(); 5409 _spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true); 5410 } 5411 5412 spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx); 5413 } 5414 5415 static void 5416 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno) 5417 { 5418 struct delete_snapshot_ctx *ctx = cb_arg; 5419 5420 if (bserrno) { 5421 SPDK_ERRLOG("Failed to freeze I/O on clone\n"); 5422 ctx->bserrno = bserrno; 5423 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5424 return; 5425 } 5426 5427 /* Temporarily override md_ro flag for snapshot for MD modification */ 5428 ctx->snapshot_md_ro = ctx->snapshot->md_ro; 5429 ctx->snapshot->md_ro = false; 5430 5431 /* Mark blob as pending for removal for power failure safety, use clone id for recovery */ 5432 ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id, 5433 sizeof(spdk_blob_id), true); 5434 if (ctx->bserrno != 0) { 5435 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5436 return; 5437 } 5438 5439 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx); 5440 } 5441 5442 static void 5443 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno) 5444 { 5445 struct delete_snapshot_ctx *ctx = cb_arg; 5446 5447 if (bserrno) { 5448 SPDK_ERRLOG("Failed to open clone\n"); 5449 ctx->bserrno = bserrno; 5450 _spdk_delete_snapshot_cleanup_snapshot(ctx, 0); 5451 return; 5452 } 5453 5454 ctx->clone = clone; 5455 5456 if (clone->locked_operation_in_progress) { 5457 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n"); 5458 ctx->bserrno = -EBUSY; 5459 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5460 return; 5461 } 5462 5463 clone->locked_operation_in_progress = true; 5464 5465 _spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx); 5466 } 5467 5468 static void 5469 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx) 5470 { 5471 struct spdk_blob_list *snapshot_entry = NULL; 5472 struct spdk_blob_list *clone_entry = NULL; 5473 struct spdk_blob_list *snapshot_clone_entry = NULL; 5474 5475 /* Get snapshot entry for the snapshot we want to remove */ 5476 snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id); 5477 5478 assert(snapshot_entry != NULL); 5479 5480 /* Get clone of the snapshot (at this point there can be only one clone) */ 5481 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5482 assert(snapshot_entry->clone_count == 1); 5483 assert(clone_entry != NULL); 5484 5485 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5486 * snapshot that we are removing */ 5487 _spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry, 5488 &snapshot_clone_entry); 5489 5490 spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx); 5491 } 5492 5493 static void 5494 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno) 5495 { 5496 spdk_bs_sequence_t *seq = cb_arg; 5497 struct spdk_blob_list *snapshot_entry = NULL; 5498 uint32_t page_num; 5499 5500 if (bserrno) { 5501 SPDK_ERRLOG("Failed to remove blob\n"); 5502 spdk_bs_sequence_finish(seq, bserrno); 5503 return; 5504 } 5505 5506 /* Remove snapshot from the list */ 5507 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5508 if (snapshot_entry != NULL) { 5509 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link); 5510 free(snapshot_entry); 5511 } 5512 5513 page_num = _spdk_bs_blobid_to_page(blob->id); 5514 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 5515 blob->state = SPDK_BLOB_STATE_DIRTY; 5516 blob->active.num_pages = 0; 5517 _spdk_blob_resize(blob, 0); 5518 5519 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob); 5520 } 5521 5522 static int 5523 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone) 5524 { 5525 struct spdk_blob_list *snapshot_entry = NULL; 5526 struct spdk_blob_list *clone_entry = NULL; 5527 struct spdk_blob *clone = NULL; 5528 bool has_one_clone = false; 5529 5530 /* Check if this is a snapshot with clones */ 5531 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5532 if (snapshot_entry != NULL) { 5533 if (snapshot_entry->clone_count > 1) { 5534 SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n"); 5535 return -EBUSY; 5536 } else if (snapshot_entry->clone_count == 1) { 5537 has_one_clone = true; 5538 } 5539 } 5540 5541 /* Check if someone has this blob open (besides this delete context): 5542 * - open_ref = 1 - only this context opened blob, so it is ok to remove it 5543 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot 5544 * and that is ok, because we will update it accordingly */ 5545 if (blob->open_ref <= 2 && has_one_clone) { 5546 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5547 assert(clone_entry != NULL); 5548 clone = _spdk_blob_lookup(blob->bs, clone_entry->id); 5549 5550 if (blob->open_ref == 2 && clone == NULL) { 5551 /* Clone is closed and someone else opened this blob */ 5552 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5553 return -EBUSY; 5554 } 5555 5556 *update_clone = true; 5557 return 0; 5558 } 5559 5560 if (blob->open_ref > 1) { 5561 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5562 return -EBUSY; 5563 } 5564 5565 assert(has_one_clone == false); 5566 *update_clone = false; 5567 return 0; 5568 } 5569 5570 static void 5571 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno) 5572 { 5573 spdk_bs_sequence_t *seq = cb_arg; 5574 5575 spdk_bs_sequence_finish(seq, -ENOMEM); 5576 } 5577 5578 static void 5579 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 5580 { 5581 spdk_bs_sequence_t *seq = cb_arg; 5582 struct delete_snapshot_ctx *ctx; 5583 bool update_clone = false; 5584 5585 if (bserrno != 0) { 5586 spdk_bs_sequence_finish(seq, bserrno); 5587 return; 5588 } 5589 5590 _spdk_blob_verify_md_op(blob); 5591 5592 ctx = calloc(1, sizeof(*ctx)); 5593 if (ctx == NULL) { 5594 spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq); 5595 return; 5596 } 5597 5598 ctx->snapshot = blob; 5599 ctx->cb_fn = _spdk_bs_delete_blob_finish; 5600 ctx->cb_arg = seq; 5601 5602 /* Check if blob can be removed and if it is a snapshot with clone on top of it */ 5603 ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone); 5604 if (ctx->bserrno) { 5605 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5606 return; 5607 } 5608 5609 if (blob->locked_operation_in_progress) { 5610 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n"); 5611 ctx->bserrno = -EBUSY; 5612 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5613 return; 5614 } 5615 5616 blob->locked_operation_in_progress = true; 5617 5618 /* 5619 * Remove the blob from the blob_store list now, to ensure it does not 5620 * get returned after this point by _spdk_blob_lookup(). 5621 */ 5622 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5623 5624 if (update_clone) { 5625 /* This blob is a snapshot with active clone - update clone first */ 5626 _spdk_update_clone_on_snapshot_deletion(blob, ctx); 5627 } else { 5628 /* This blob does not have any clones - just remove it */ 5629 _spdk_bs_blob_list_remove(blob); 5630 _spdk_bs_delete_blob_finish(seq, blob, 0); 5631 free(ctx); 5632 } 5633 } 5634 5635 void 5636 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5637 spdk_blob_op_complete cb_fn, void *cb_arg) 5638 { 5639 struct spdk_bs_cpl cpl; 5640 spdk_bs_sequence_t *seq; 5641 5642 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 5643 5644 assert(spdk_get_thread() == bs->md_thread); 5645 5646 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5647 cpl.u.blob_basic.cb_fn = cb_fn; 5648 cpl.u.blob_basic.cb_arg = cb_arg; 5649 5650 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5651 if (!seq) { 5652 cb_fn(cb_arg, -ENOMEM); 5653 return; 5654 } 5655 5656 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 5657 } 5658 5659 /* END spdk_bs_delete_blob */ 5660 5661 /* START spdk_bs_open_blob */ 5662 5663 static void 5664 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5665 { 5666 struct spdk_blob *blob = cb_arg; 5667 5668 if (bserrno != 0) { 5669 _spdk_blob_free(blob); 5670 seq->cpl.u.blob_handle.blob = NULL; 5671 spdk_bs_sequence_finish(seq, bserrno); 5672 return; 5673 } 5674 5675 blob->open_ref++; 5676 5677 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 5678 5679 spdk_bs_sequence_finish(seq, bserrno); 5680 } 5681 5682 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5683 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5684 { 5685 struct spdk_blob *blob; 5686 struct spdk_bs_cpl cpl; 5687 struct spdk_blob_open_opts opts_default; 5688 spdk_bs_sequence_t *seq; 5689 uint32_t page_num; 5690 5691 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 5692 assert(spdk_get_thread() == bs->md_thread); 5693 5694 page_num = _spdk_bs_blobid_to_page(blobid); 5695 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 5696 /* Invalid blobid */ 5697 cb_fn(cb_arg, NULL, -ENOENT); 5698 return; 5699 } 5700 5701 blob = _spdk_blob_lookup(bs, blobid); 5702 if (blob) { 5703 blob->open_ref++; 5704 cb_fn(cb_arg, blob, 0); 5705 return; 5706 } 5707 5708 blob = _spdk_blob_alloc(bs, blobid); 5709 if (!blob) { 5710 cb_fn(cb_arg, NULL, -ENOMEM); 5711 return; 5712 } 5713 5714 if (!opts) { 5715 spdk_blob_open_opts_init(&opts_default); 5716 opts = &opts_default; 5717 } 5718 5719 blob->clear_method = opts->clear_method; 5720 5721 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 5722 cpl.u.blob_handle.cb_fn = cb_fn; 5723 cpl.u.blob_handle.cb_arg = cb_arg; 5724 cpl.u.blob_handle.blob = blob; 5725 5726 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5727 if (!seq) { 5728 _spdk_blob_free(blob); 5729 cb_fn(cb_arg, NULL, -ENOMEM); 5730 return; 5731 } 5732 5733 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 5734 } 5735 5736 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5737 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5738 { 5739 _spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg); 5740 } 5741 5742 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid, 5743 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5744 { 5745 _spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg); 5746 } 5747 5748 /* END spdk_bs_open_blob */ 5749 5750 /* START spdk_blob_set_read_only */ 5751 int spdk_blob_set_read_only(struct spdk_blob *blob) 5752 { 5753 _spdk_blob_verify_md_op(blob); 5754 5755 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 5756 5757 blob->state = SPDK_BLOB_STATE_DIRTY; 5758 return 0; 5759 } 5760 /* END spdk_blob_set_read_only */ 5761 5762 /* START spdk_blob_sync_md */ 5763 5764 static void 5765 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5766 { 5767 struct spdk_blob *blob = cb_arg; 5768 5769 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 5770 blob->data_ro = true; 5771 blob->md_ro = true; 5772 } 5773 5774 spdk_bs_sequence_finish(seq, bserrno); 5775 } 5776 5777 static void 5778 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5779 { 5780 struct spdk_bs_cpl cpl; 5781 spdk_bs_sequence_t *seq; 5782 5783 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5784 cpl.u.blob_basic.cb_fn = cb_fn; 5785 cpl.u.blob_basic.cb_arg = cb_arg; 5786 5787 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5788 if (!seq) { 5789 cb_fn(cb_arg, -ENOMEM); 5790 return; 5791 } 5792 5793 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 5794 } 5795 5796 void 5797 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5798 { 5799 _spdk_blob_verify_md_op(blob); 5800 5801 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 5802 5803 if (blob->md_ro) { 5804 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 5805 cb_fn(cb_arg, 0); 5806 return; 5807 } 5808 5809 _spdk_blob_sync_md(blob, cb_fn, cb_arg); 5810 } 5811 5812 /* END spdk_blob_sync_md */ 5813 5814 struct spdk_blob_insert_cluster_ctx { 5815 struct spdk_thread *thread; 5816 struct spdk_blob *blob; 5817 uint32_t cluster_num; /* cluster index in blob */ 5818 uint32_t cluster; /* cluster on disk */ 5819 int rc; 5820 spdk_blob_op_complete cb_fn; 5821 void *cb_arg; 5822 }; 5823 5824 static void 5825 _spdk_blob_insert_cluster_msg_cpl(void *arg) 5826 { 5827 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5828 5829 ctx->cb_fn(ctx->cb_arg, ctx->rc); 5830 free(ctx); 5831 } 5832 5833 static void 5834 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno) 5835 { 5836 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5837 5838 ctx->rc = bserrno; 5839 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5840 } 5841 5842 static void 5843 _spdk_blob_insert_cluster_msg(void *arg) 5844 { 5845 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5846 5847 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster); 5848 if (ctx->rc != 0) { 5849 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5850 return; 5851 } 5852 5853 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 5854 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx); 5855 } 5856 5857 static void 5858 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 5859 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg) 5860 { 5861 struct spdk_blob_insert_cluster_ctx *ctx; 5862 5863 ctx = calloc(1, sizeof(*ctx)); 5864 if (ctx == NULL) { 5865 cb_fn(cb_arg, -ENOMEM); 5866 return; 5867 } 5868 5869 ctx->thread = spdk_get_thread(); 5870 ctx->blob = blob; 5871 ctx->cluster_num = cluster_num; 5872 ctx->cluster = cluster; 5873 ctx->cb_fn = cb_fn; 5874 ctx->cb_arg = cb_arg; 5875 5876 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx); 5877 } 5878 5879 /* START spdk_blob_close */ 5880 5881 static void 5882 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5883 { 5884 struct spdk_blob *blob = cb_arg; 5885 5886 if (bserrno == 0) { 5887 blob->open_ref--; 5888 if (blob->open_ref == 0) { 5889 /* 5890 * Blobs with active.num_pages == 0 are deleted blobs. 5891 * these blobs are removed from the blob_store list 5892 * when the deletion process starts - so don't try to 5893 * remove them again. 5894 */ 5895 if (blob->active.num_pages > 0) { 5896 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5897 } 5898 _spdk_blob_free(blob); 5899 } 5900 } 5901 5902 spdk_bs_sequence_finish(seq, bserrno); 5903 } 5904 5905 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5906 { 5907 struct spdk_bs_cpl cpl; 5908 spdk_bs_sequence_t *seq; 5909 5910 _spdk_blob_verify_md_op(blob); 5911 5912 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 5913 5914 if (blob->open_ref == 0) { 5915 cb_fn(cb_arg, -EBADF); 5916 return; 5917 } 5918 5919 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5920 cpl.u.blob_basic.cb_fn = cb_fn; 5921 cpl.u.blob_basic.cb_arg = cb_arg; 5922 5923 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5924 if (!seq) { 5925 cb_fn(cb_arg, -ENOMEM); 5926 return; 5927 } 5928 5929 /* Sync metadata */ 5930 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 5931 } 5932 5933 /* END spdk_blob_close */ 5934 5935 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 5936 { 5937 return spdk_get_io_channel(bs); 5938 } 5939 5940 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 5941 { 5942 spdk_put_io_channel(channel); 5943 } 5944 5945 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel, 5946 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5947 { 5948 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5949 SPDK_BLOB_UNMAP); 5950 } 5951 5952 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel, 5953 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5954 { 5955 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5956 SPDK_BLOB_WRITE_ZEROES); 5957 } 5958 5959 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel, 5960 void *payload, uint64_t offset, uint64_t length, 5961 spdk_blob_op_complete cb_fn, void *cb_arg) 5962 { 5963 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 5964 SPDK_BLOB_WRITE); 5965 } 5966 5967 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel, 5968 void *payload, uint64_t offset, uint64_t length, 5969 spdk_blob_op_complete cb_fn, void *cb_arg) 5970 { 5971 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 5972 SPDK_BLOB_READ); 5973 } 5974 5975 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel, 5976 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 5977 spdk_blob_op_complete cb_fn, void *cb_arg) 5978 { 5979 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 5980 } 5981 5982 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel, 5983 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 5984 spdk_blob_op_complete cb_fn, void *cb_arg) 5985 { 5986 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 5987 } 5988 5989 struct spdk_bs_iter_ctx { 5990 int64_t page_num; 5991 struct spdk_blob_store *bs; 5992 5993 spdk_blob_op_with_handle_complete cb_fn; 5994 void *cb_arg; 5995 }; 5996 5997 static void 5998 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5999 { 6000 struct spdk_bs_iter_ctx *ctx = cb_arg; 6001 struct spdk_blob_store *bs = ctx->bs; 6002 spdk_blob_id id; 6003 6004 if (bserrno == 0) { 6005 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 6006 free(ctx); 6007 return; 6008 } 6009 6010 ctx->page_num++; 6011 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 6012 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 6013 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 6014 free(ctx); 6015 return; 6016 } 6017 6018 id = _spdk_bs_page_to_blobid(ctx->page_num); 6019 6020 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 6021 } 6022 6023 void 6024 spdk_bs_iter_first(struct spdk_blob_store *bs, 6025 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6026 { 6027 struct spdk_bs_iter_ctx *ctx; 6028 6029 ctx = calloc(1, sizeof(*ctx)); 6030 if (!ctx) { 6031 cb_fn(cb_arg, NULL, -ENOMEM); 6032 return; 6033 } 6034 6035 ctx->page_num = -1; 6036 ctx->bs = bs; 6037 ctx->cb_fn = cb_fn; 6038 ctx->cb_arg = cb_arg; 6039 6040 _spdk_bs_iter_cpl(ctx, NULL, -1); 6041 } 6042 6043 static void 6044 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 6045 { 6046 struct spdk_bs_iter_ctx *ctx = cb_arg; 6047 6048 _spdk_bs_iter_cpl(ctx, NULL, -1); 6049 } 6050 6051 void 6052 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob, 6053 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6054 { 6055 struct spdk_bs_iter_ctx *ctx; 6056 6057 assert(blob != NULL); 6058 6059 ctx = calloc(1, sizeof(*ctx)); 6060 if (!ctx) { 6061 cb_fn(cb_arg, NULL, -ENOMEM); 6062 return; 6063 } 6064 6065 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 6066 ctx->bs = bs; 6067 ctx->cb_fn = cb_fn; 6068 ctx->cb_arg = cb_arg; 6069 6070 /* Close the existing blob */ 6071 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx); 6072 } 6073 6074 static int 6075 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6076 uint16_t value_len, bool internal) 6077 { 6078 struct spdk_xattr_tailq *xattrs; 6079 struct spdk_xattr *xattr; 6080 size_t desc_size; 6081 6082 _spdk_blob_verify_md_op(blob); 6083 6084 if (blob->md_ro) { 6085 return -EPERM; 6086 } 6087 6088 desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len; 6089 if (desc_size > SPDK_BS_MAX_DESC_SIZE) { 6090 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name, 6091 desc_size, SPDK_BS_MAX_DESC_SIZE); 6092 return -ENOMEM; 6093 } 6094 6095 if (internal) { 6096 xattrs = &blob->xattrs_internal; 6097 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR; 6098 } else { 6099 xattrs = &blob->xattrs; 6100 } 6101 6102 TAILQ_FOREACH(xattr, xattrs, link) { 6103 if (!strcmp(name, xattr->name)) { 6104 free(xattr->value); 6105 xattr->value_len = value_len; 6106 xattr->value = malloc(value_len); 6107 memcpy(xattr->value, value, value_len); 6108 6109 blob->state = SPDK_BLOB_STATE_DIRTY; 6110 6111 return 0; 6112 } 6113 } 6114 6115 xattr = calloc(1, sizeof(*xattr)); 6116 if (!xattr) { 6117 return -ENOMEM; 6118 } 6119 xattr->name = strdup(name); 6120 xattr->value_len = value_len; 6121 xattr->value = malloc(value_len); 6122 memcpy(xattr->value, value, value_len); 6123 TAILQ_INSERT_TAIL(xattrs, xattr, link); 6124 6125 blob->state = SPDK_BLOB_STATE_DIRTY; 6126 6127 return 0; 6128 } 6129 6130 int 6131 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6132 uint16_t value_len) 6133 { 6134 return _spdk_blob_set_xattr(blob, name, value, value_len, false); 6135 } 6136 6137 static int 6138 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal) 6139 { 6140 struct spdk_xattr_tailq *xattrs; 6141 struct spdk_xattr *xattr; 6142 6143 _spdk_blob_verify_md_op(blob); 6144 6145 if (blob->md_ro) { 6146 return -EPERM; 6147 } 6148 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6149 6150 TAILQ_FOREACH(xattr, xattrs, link) { 6151 if (!strcmp(name, xattr->name)) { 6152 TAILQ_REMOVE(xattrs, xattr, link); 6153 free(xattr->value); 6154 free(xattr->name); 6155 free(xattr); 6156 6157 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) { 6158 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR; 6159 } 6160 blob->state = SPDK_BLOB_STATE_DIRTY; 6161 6162 return 0; 6163 } 6164 } 6165 6166 return -ENOENT; 6167 } 6168 6169 int 6170 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name) 6171 { 6172 return _spdk_blob_remove_xattr(blob, name, false); 6173 } 6174 6175 static int 6176 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6177 const void **value, size_t *value_len, bool internal) 6178 { 6179 struct spdk_xattr *xattr; 6180 struct spdk_xattr_tailq *xattrs; 6181 6182 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6183 6184 TAILQ_FOREACH(xattr, xattrs, link) { 6185 if (!strcmp(name, xattr->name)) { 6186 *value = xattr->value; 6187 *value_len = xattr->value_len; 6188 return 0; 6189 } 6190 } 6191 return -ENOENT; 6192 } 6193 6194 int 6195 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6196 const void **value, size_t *value_len) 6197 { 6198 _spdk_blob_verify_md_op(blob); 6199 6200 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false); 6201 } 6202 6203 struct spdk_xattr_names { 6204 uint32_t count; 6205 const char *names[0]; 6206 }; 6207 6208 static int 6209 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names) 6210 { 6211 struct spdk_xattr *xattr; 6212 int count = 0; 6213 6214 TAILQ_FOREACH(xattr, xattrs, link) { 6215 count++; 6216 } 6217 6218 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 6219 if (*names == NULL) { 6220 return -ENOMEM; 6221 } 6222 6223 TAILQ_FOREACH(xattr, xattrs, link) { 6224 (*names)->names[(*names)->count++] = xattr->name; 6225 } 6226 6227 return 0; 6228 } 6229 6230 int 6231 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names) 6232 { 6233 _spdk_blob_verify_md_op(blob); 6234 6235 return _spdk_blob_get_xattr_names(&blob->xattrs, names); 6236 } 6237 6238 uint32_t 6239 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 6240 { 6241 assert(names != NULL); 6242 6243 return names->count; 6244 } 6245 6246 const char * 6247 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 6248 { 6249 if (index >= names->count) { 6250 return NULL; 6251 } 6252 6253 return names->names[index]; 6254 } 6255 6256 void 6257 spdk_xattr_names_free(struct spdk_xattr_names *names) 6258 { 6259 free(names); 6260 } 6261 6262 struct spdk_bs_type 6263 spdk_bs_get_bstype(struct spdk_blob_store *bs) 6264 { 6265 return bs->bstype; 6266 } 6267 6268 void 6269 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 6270 { 6271 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 6272 } 6273 6274 bool 6275 spdk_blob_is_read_only(struct spdk_blob *blob) 6276 { 6277 assert(blob != NULL); 6278 return (blob->data_ro || blob->md_ro); 6279 } 6280 6281 bool 6282 spdk_blob_is_snapshot(struct spdk_blob *blob) 6283 { 6284 struct spdk_blob_list *snapshot_entry; 6285 6286 assert(blob != NULL); 6287 6288 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 6289 if (snapshot_entry == NULL) { 6290 return false; 6291 } 6292 6293 return true; 6294 } 6295 6296 bool 6297 spdk_blob_is_clone(struct spdk_blob *blob) 6298 { 6299 assert(blob != NULL); 6300 6301 if (blob->parent_id != SPDK_BLOBID_INVALID) { 6302 assert(spdk_blob_is_thin_provisioned(blob)); 6303 return true; 6304 } 6305 6306 return false; 6307 } 6308 6309 bool 6310 spdk_blob_is_thin_provisioned(struct spdk_blob *blob) 6311 { 6312 assert(blob != NULL); 6313 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV); 6314 } 6315 6316 spdk_blob_id 6317 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id) 6318 { 6319 struct spdk_blob_list *snapshot_entry = NULL; 6320 struct spdk_blob_list *clone_entry = NULL; 6321 6322 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 6323 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6324 if (clone_entry->id == blob_id) { 6325 return snapshot_entry->id; 6326 } 6327 } 6328 } 6329 6330 return SPDK_BLOBID_INVALID; 6331 } 6332 6333 int 6334 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids, 6335 size_t *count) 6336 { 6337 struct spdk_blob_list *snapshot_entry, *clone_entry; 6338 size_t n; 6339 6340 snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid); 6341 if (snapshot_entry == NULL) { 6342 *count = 0; 6343 return 0; 6344 } 6345 6346 if (ids == NULL || *count < snapshot_entry->clone_count) { 6347 *count = snapshot_entry->clone_count; 6348 return -ENOMEM; 6349 } 6350 *count = snapshot_entry->clone_count; 6351 6352 n = 0; 6353 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6354 ids[n++] = clone_entry->id; 6355 } 6356 6357 return 0; 6358 } 6359 6360 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 6361