1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 #include "spdk/util.h" 44 #include "spdk/string.h" 45 46 #include "spdk_internal/assert.h" 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 51 #define BLOB_CRC32C_INITIAL 0xffffffffUL 52 53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 57 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg); 58 59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 60 uint16_t value_len, bool internal); 61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 62 const void **value, size_t *value_len, bool internal); 63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal); 64 65 static void 66 _spdk_blob_verify_md_op(struct spdk_blob *blob) 67 { 68 assert(blob != NULL); 69 assert(spdk_get_thread() == blob->bs->md_thread); 70 assert(blob->state != SPDK_BLOB_STATE_LOADING); 71 } 72 73 static struct spdk_blob_list * 74 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) 75 { 76 struct spdk_blob_list *snapshot_entry = NULL; 77 78 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 79 if (snapshot_entry->id == blobid) { 80 break; 81 } 82 } 83 84 return snapshot_entry; 85 } 86 87 static void 88 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 89 { 90 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 91 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 92 assert(bs->num_free_clusters > 0); 93 94 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 95 96 spdk_bit_array_set(bs->used_clusters, cluster_num); 97 bs->num_free_clusters--; 98 } 99 100 static int 101 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster) 102 { 103 uint64_t *cluster_lba = &blob->active.clusters[cluster_num]; 104 105 _spdk_blob_verify_md_op(blob); 106 107 if (*cluster_lba != 0) { 108 return -EEXIST; 109 } 110 111 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster); 112 return 0; 113 } 114 115 static int 116 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, 117 uint64_t *lowest_free_cluster, bool update_map) 118 { 119 pthread_mutex_lock(&blob->bs->used_clusters_mutex); 120 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters, 121 *lowest_free_cluster); 122 if (*lowest_free_cluster == UINT32_MAX) { 123 /* No more free clusters. Cannot satisfy the request */ 124 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 125 return -ENOSPC; 126 } 127 128 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id); 129 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster); 130 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 131 132 if (update_map) { 133 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster); 134 } 135 136 return 0; 137 } 138 139 static void 140 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 141 { 142 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 143 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 144 assert(bs->num_free_clusters < bs->total_clusters); 145 146 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 147 148 pthread_mutex_lock(&bs->used_clusters_mutex); 149 spdk_bit_array_clear(bs->used_clusters, cluster_num); 150 bs->num_free_clusters++; 151 pthread_mutex_unlock(&bs->used_clusters_mutex); 152 } 153 154 static void 155 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs) 156 { 157 xattrs->count = 0; 158 xattrs->names = NULL; 159 xattrs->ctx = NULL; 160 xattrs->get_value = NULL; 161 } 162 163 void 164 spdk_blob_opts_init(struct spdk_blob_opts *opts) 165 { 166 opts->num_clusters = 0; 167 opts->thin_provision = false; 168 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 169 _spdk_blob_xattrs_init(&opts->xattrs); 170 } 171 172 void 173 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts) 174 { 175 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 176 } 177 178 static struct spdk_blob * 179 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 180 { 181 struct spdk_blob *blob; 182 183 blob = calloc(1, sizeof(*blob)); 184 if (!blob) { 185 return NULL; 186 } 187 188 blob->id = id; 189 blob->bs = bs; 190 191 blob->parent_id = SPDK_BLOBID_INVALID; 192 193 blob->state = SPDK_BLOB_STATE_DIRTY; 194 blob->active.num_pages = 1; 195 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 196 if (!blob->active.pages) { 197 free(blob); 198 return NULL; 199 } 200 201 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 202 203 TAILQ_INIT(&blob->xattrs); 204 TAILQ_INIT(&blob->xattrs_internal); 205 206 return blob; 207 } 208 209 static void 210 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs) 211 { 212 struct spdk_xattr *xattr, *xattr_tmp; 213 214 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) { 215 TAILQ_REMOVE(xattrs, xattr, link); 216 free(xattr->name); 217 free(xattr->value); 218 free(xattr); 219 } 220 } 221 222 static void 223 _spdk_blob_free(struct spdk_blob *blob) 224 { 225 assert(blob != NULL); 226 227 free(blob->active.clusters); 228 free(blob->clean.clusters); 229 free(blob->active.pages); 230 free(blob->clean.pages); 231 232 _spdk_xattrs_free(&blob->xattrs); 233 _spdk_xattrs_free(&blob->xattrs_internal); 234 235 if (blob->back_bs_dev) { 236 blob->back_bs_dev->destroy(blob->back_bs_dev); 237 } 238 239 free(blob); 240 } 241 242 struct freeze_io_ctx { 243 struct spdk_bs_cpl cpl; 244 struct spdk_blob *blob; 245 }; 246 247 static void 248 _spdk_blob_io_sync(struct spdk_io_channel_iter *i) 249 { 250 spdk_for_each_channel_continue(i, 0); 251 } 252 253 static void 254 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i) 255 { 256 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 257 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); 258 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 259 struct spdk_bs_request_set *set; 260 struct spdk_bs_user_op_args *args; 261 spdk_bs_user_op_t *op, *tmp; 262 263 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { 264 set = (struct spdk_bs_request_set *)op; 265 args = &set->u.user_op; 266 267 if (args->blob == ctx->blob) { 268 TAILQ_REMOVE(&ch->queued_io, op, link); 269 spdk_bs_user_op_execute(op); 270 } 271 } 272 273 spdk_for_each_channel_continue(i, 0); 274 } 275 276 static void 277 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status) 278 { 279 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 280 281 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); 282 283 free(ctx); 284 } 285 286 static void 287 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 288 { 289 struct freeze_io_ctx *ctx; 290 291 ctx = calloc(1, sizeof(*ctx)); 292 if (!ctx) { 293 cb_fn(cb_arg, -ENOMEM); 294 return; 295 } 296 297 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 298 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 299 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 300 ctx->blob = blob; 301 302 /* Freeze I/O on blob */ 303 blob->frozen_refcnt++; 304 305 if (blob->frozen_refcnt == 1) { 306 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl); 307 } else { 308 cb_fn(cb_arg, 0); 309 free(ctx); 310 } 311 } 312 313 static void 314 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 315 { 316 struct freeze_io_ctx *ctx; 317 318 ctx = calloc(1, sizeof(*ctx)); 319 if (!ctx) { 320 cb_fn(cb_arg, -ENOMEM); 321 return; 322 } 323 324 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 325 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 326 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 327 ctx->blob = blob; 328 329 assert(blob->frozen_refcnt > 0); 330 331 blob->frozen_refcnt--; 332 333 if (blob->frozen_refcnt == 0) { 334 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl); 335 } else { 336 cb_fn(cb_arg, 0); 337 free(ctx); 338 } 339 } 340 341 static int 342 _spdk_blob_mark_clean(struct spdk_blob *blob) 343 { 344 uint64_t *clusters = NULL; 345 uint32_t *pages = NULL; 346 347 assert(blob != NULL); 348 349 if (blob->active.num_clusters) { 350 assert(blob->active.clusters); 351 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 352 if (!clusters) { 353 return -ENOMEM; 354 } 355 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 356 } 357 358 if (blob->active.num_pages) { 359 assert(blob->active.pages); 360 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 361 if (!pages) { 362 free(clusters); 363 return -ENOMEM; 364 } 365 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 366 } 367 368 free(blob->clean.clusters); 369 free(blob->clean.pages); 370 371 blob->clean.num_clusters = blob->active.num_clusters; 372 blob->clean.clusters = blob->active.clusters; 373 blob->clean.num_pages = blob->active.num_pages; 374 blob->clean.pages = blob->active.pages; 375 376 blob->active.clusters = clusters; 377 blob->active.pages = pages; 378 379 /* If the metadata was dirtied again while the metadata was being written to disk, 380 * we do not want to revert the DIRTY state back to CLEAN here. 381 */ 382 if (blob->state == SPDK_BLOB_STATE_LOADING) { 383 blob->state = SPDK_BLOB_STATE_CLEAN; 384 } 385 386 return 0; 387 } 388 389 static int 390 _spdk_blob_deserialize_xattr(struct spdk_blob *blob, 391 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal) 392 { 393 struct spdk_xattr *xattr; 394 395 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 396 sizeof(desc_xattr->value_length) + 397 desc_xattr->name_length + desc_xattr->value_length) { 398 return -EINVAL; 399 } 400 401 xattr = calloc(1, sizeof(*xattr)); 402 if (xattr == NULL) { 403 return -ENOMEM; 404 } 405 406 xattr->name = malloc(desc_xattr->name_length + 1); 407 if (xattr->name == NULL) { 408 free(xattr); 409 return -ENOMEM; 410 } 411 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 412 xattr->name[desc_xattr->name_length] = '\0'; 413 414 xattr->value = malloc(desc_xattr->value_length); 415 if (xattr->value == NULL) { 416 free(xattr->name); 417 free(xattr); 418 return -ENOMEM; 419 } 420 xattr->value_len = desc_xattr->value_length; 421 memcpy(xattr->value, 422 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 423 desc_xattr->value_length); 424 425 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link); 426 427 return 0; 428 } 429 430 431 static int 432 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 433 { 434 struct spdk_blob_md_descriptor *desc; 435 size_t cur_desc = 0; 436 void *tmp; 437 438 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 439 while (cur_desc < sizeof(page->descriptors)) { 440 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 441 if (desc->length == 0) { 442 /* If padding and length are 0, this terminates the page */ 443 break; 444 } 445 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 446 struct spdk_blob_md_descriptor_flags *desc_flags; 447 448 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 449 450 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 451 return -EINVAL; 452 } 453 454 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 455 SPDK_BLOB_INVALID_FLAGS_MASK) { 456 return -EINVAL; 457 } 458 459 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 460 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 461 blob->data_ro = true; 462 blob->md_ro = true; 463 } 464 465 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 466 SPDK_BLOB_MD_RO_FLAGS_MASK) { 467 blob->md_ro = true; 468 } 469 470 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 471 blob->data_ro = true; 472 blob->md_ro = true; 473 } 474 475 blob->invalid_flags = desc_flags->invalid_flags; 476 blob->data_ro_flags = desc_flags->data_ro_flags; 477 blob->md_ro_flags = desc_flags->md_ro_flags; 478 479 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 480 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 481 unsigned int i, j; 482 unsigned int cluster_count = blob->active.num_clusters; 483 484 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 485 486 if (desc_extent_rle->length == 0 || 487 (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) { 488 return -EINVAL; 489 } 490 491 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 492 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 493 if (desc_extent_rle->extents[i].cluster_idx != 0) { 494 if (!spdk_bit_array_get(blob->bs->used_clusters, 495 desc_extent_rle->extents[i].cluster_idx + j)) { 496 return -EINVAL; 497 } 498 } 499 cluster_count++; 500 } 501 } 502 503 if (cluster_count == 0) { 504 return -EINVAL; 505 } 506 tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters)); 507 if (tmp == NULL) { 508 return -ENOMEM; 509 } 510 blob->active.clusters = tmp; 511 blob->active.cluster_array_size = cluster_count; 512 513 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 514 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 515 if (desc_extent_rle->extents[i].cluster_idx != 0) { 516 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 517 desc_extent_rle->extents[i].cluster_idx + j); 518 } else if (spdk_blob_is_thin_provisioned(blob)) { 519 blob->active.clusters[blob->active.num_clusters++] = 0; 520 } else { 521 return -EINVAL; 522 } 523 } 524 } 525 526 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 527 int rc; 528 529 rc = _spdk_blob_deserialize_xattr(blob, 530 (struct spdk_blob_md_descriptor_xattr *) desc, false); 531 if (rc != 0) { 532 return rc; 533 } 534 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 535 int rc; 536 537 rc = _spdk_blob_deserialize_xattr(blob, 538 (struct spdk_blob_md_descriptor_xattr *) desc, true); 539 if (rc != 0) { 540 return rc; 541 } 542 } else { 543 /* Unrecognized descriptor type. Do not fail - just continue to the 544 * next descriptor. If this descriptor is associated with some feature 545 * defined in a newer version of blobstore, that version of blobstore 546 * should create and set an associated feature flag to specify if this 547 * blob can be loaded or not. 548 */ 549 } 550 551 /* Advance to the next descriptor */ 552 cur_desc += sizeof(*desc) + desc->length; 553 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 554 break; 555 } 556 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 557 } 558 559 return 0; 560 } 561 562 static int 563 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 564 struct spdk_blob *blob) 565 { 566 const struct spdk_blob_md_page *page; 567 uint32_t i; 568 int rc; 569 570 assert(page_count > 0); 571 assert(pages[0].sequence_num == 0); 572 assert(blob != NULL); 573 assert(blob->state == SPDK_BLOB_STATE_LOADING); 574 assert(blob->active.clusters == NULL); 575 576 /* The blobid provided doesn't match what's in the MD, this can 577 * happen for example if a bogus blobid is passed in through open. 578 */ 579 if (blob->id != pages[0].id) { 580 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 581 blob->id, pages[0].id); 582 return -ENOENT; 583 } 584 585 for (i = 0; i < page_count; i++) { 586 page = &pages[i]; 587 588 assert(page->id == blob->id); 589 assert(page->sequence_num == i); 590 591 rc = _spdk_blob_parse_page(page, blob); 592 if (rc != 0) { 593 return rc; 594 } 595 } 596 597 return 0; 598 } 599 600 static int 601 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 602 struct spdk_blob_md_page **pages, 603 uint32_t *page_count, 604 struct spdk_blob_md_page **last_page) 605 { 606 struct spdk_blob_md_page *page; 607 608 assert(pages != NULL); 609 assert(page_count != NULL); 610 611 if (*page_count == 0) { 612 assert(*pages == NULL); 613 *page_count = 1; 614 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 615 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 616 } else { 617 assert(*pages != NULL); 618 (*page_count)++; 619 *pages = spdk_realloc(*pages, 620 SPDK_BS_PAGE_SIZE * (*page_count), 621 SPDK_BS_PAGE_SIZE); 622 } 623 624 if (*pages == NULL) { 625 *page_count = 0; 626 *last_page = NULL; 627 return -ENOMEM; 628 } 629 630 page = &(*pages)[*page_count - 1]; 631 memset(page, 0, sizeof(*page)); 632 page->id = blob->id; 633 page->sequence_num = *page_count - 1; 634 page->next = SPDK_INVALID_MD_PAGE; 635 *last_page = page; 636 637 return 0; 638 } 639 640 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 641 * Update required_sz on both success and failure. 642 * 643 */ 644 static int 645 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 646 uint8_t *buf, size_t buf_sz, 647 size_t *required_sz, bool internal) 648 { 649 struct spdk_blob_md_descriptor_xattr *desc; 650 651 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 652 strlen(xattr->name) + 653 xattr->value_len; 654 655 if (buf_sz < *required_sz) { 656 return -1; 657 } 658 659 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 660 661 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR; 662 desc->length = sizeof(desc->name_length) + 663 sizeof(desc->value_length) + 664 strlen(xattr->name) + 665 xattr->value_len; 666 desc->name_length = strlen(xattr->name); 667 desc->value_length = xattr->value_len; 668 669 memcpy(desc->name, xattr->name, desc->name_length); 670 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 671 xattr->value, 672 desc->value_length); 673 674 return 0; 675 } 676 677 static void 678 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob, 679 uint64_t start_cluster, uint64_t *next_cluster, 680 uint8_t **buf, size_t *buf_sz) 681 { 682 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 683 size_t cur_sz; 684 uint64_t i, extent_idx; 685 uint64_t lba, lba_per_cluster, lba_count; 686 687 /* The buffer must have room for at least one extent */ 688 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]); 689 if (*buf_sz < cur_sz) { 690 *next_cluster = start_cluster; 691 return; 692 } 693 694 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf; 695 desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE; 696 697 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 698 699 lba = blob->active.clusters[start_cluster]; 700 lba_count = lba_per_cluster; 701 extent_idx = 0; 702 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 703 if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) { 704 /* Run-length encode sequential non-zero LBA */ 705 lba_count += lba_per_cluster; 706 continue; 707 } else if (lba == 0 && blob->active.clusters[i] == 0) { 708 /* Run-length encode unallocated clusters */ 709 lba_count += lba_per_cluster; 710 continue; 711 } 712 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 713 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 714 extent_idx++; 715 716 cur_sz += sizeof(desc_extent_rle->extents[extent_idx]); 717 718 if (*buf_sz < cur_sz) { 719 /* If we ran out of buffer space, return */ 720 *next_cluster = i; 721 goto finish; 722 } 723 724 lba = blob->active.clusters[i]; 725 lba_count = lba_per_cluster; 726 } 727 728 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 729 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 730 extent_idx++; 731 732 *next_cluster = blob->active.num_clusters; 733 734 finish: 735 desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx; 736 *buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 737 *buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 738 739 return; 740 } 741 742 static int 743 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob, 744 struct spdk_blob_md_page **pages, 745 struct spdk_blob_md_page *cur_page, 746 uint32_t *page_count, uint8_t **buf, 747 size_t *remaining_sz) 748 { 749 uint64_t last_cluster; 750 int rc; 751 752 last_cluster = 0; 753 while (last_cluster < blob->active.num_clusters) { 754 _spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz); 755 756 if (last_cluster == blob->active.num_clusters) { 757 break; 758 } 759 760 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 761 if (rc < 0) { 762 return rc; 763 } 764 765 *buf = (uint8_t *)cur_page->descriptors; 766 *remaining_sz = sizeof(cur_page->descriptors); 767 } 768 769 return 0; 770 } 771 772 static void 773 _spdk_blob_serialize_flags(const struct spdk_blob *blob, 774 uint8_t *buf, size_t *buf_sz) 775 { 776 struct spdk_blob_md_descriptor_flags *desc; 777 778 /* 779 * Flags get serialized first, so we should always have room for the flags 780 * descriptor. 781 */ 782 assert(*buf_sz >= sizeof(*desc)); 783 784 desc = (struct spdk_blob_md_descriptor_flags *)buf; 785 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 786 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 787 desc->invalid_flags = blob->invalid_flags; 788 desc->data_ro_flags = blob->data_ro_flags; 789 desc->md_ro_flags = blob->md_ro_flags; 790 791 *buf_sz -= sizeof(*desc); 792 } 793 794 static int 795 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob, 796 const struct spdk_xattr_tailq *xattrs, bool internal, 797 struct spdk_blob_md_page **pages, 798 struct spdk_blob_md_page *cur_page, 799 uint32_t *page_count, uint8_t **buf, 800 size_t *remaining_sz) 801 { 802 const struct spdk_xattr *xattr; 803 int rc; 804 805 TAILQ_FOREACH(xattr, xattrs, link) { 806 size_t required_sz = 0; 807 808 rc = _spdk_blob_serialize_xattr(xattr, 809 *buf, *remaining_sz, 810 &required_sz, internal); 811 if (rc < 0) { 812 /* Need to add a new page to the chain */ 813 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 814 &cur_page); 815 if (rc < 0) { 816 spdk_free(*pages); 817 *pages = NULL; 818 *page_count = 0; 819 return rc; 820 } 821 822 *buf = (uint8_t *)cur_page->descriptors; 823 *remaining_sz = sizeof(cur_page->descriptors); 824 825 /* Try again */ 826 required_sz = 0; 827 rc = _spdk_blob_serialize_xattr(xattr, 828 *buf, *remaining_sz, 829 &required_sz, internal); 830 831 if (rc < 0) { 832 spdk_free(*pages); 833 *pages = NULL; 834 *page_count = 0; 835 return rc; 836 } 837 } 838 839 *remaining_sz -= required_sz; 840 *buf += required_sz; 841 } 842 843 return 0; 844 } 845 846 static int 847 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 848 uint32_t *page_count) 849 { 850 struct spdk_blob_md_page *cur_page; 851 int rc; 852 uint8_t *buf; 853 size_t remaining_sz; 854 855 assert(pages != NULL); 856 assert(page_count != NULL); 857 assert(blob != NULL); 858 assert(blob->state == SPDK_BLOB_STATE_DIRTY); 859 860 *pages = NULL; 861 *page_count = 0; 862 863 /* A blob always has at least 1 page, even if it has no descriptors */ 864 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 865 if (rc < 0) { 866 return rc; 867 } 868 869 buf = (uint8_t *)cur_page->descriptors; 870 remaining_sz = sizeof(cur_page->descriptors); 871 872 /* Serialize flags */ 873 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 874 buf += sizeof(struct spdk_blob_md_descriptor_flags); 875 876 /* Serialize xattrs */ 877 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false, 878 pages, cur_page, page_count, &buf, &remaining_sz); 879 if (rc < 0) { 880 return rc; 881 } 882 883 /* Serialize internal xattrs */ 884 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true, 885 pages, cur_page, page_count, &buf, &remaining_sz); 886 if (rc < 0) { 887 return rc; 888 } 889 890 /* Serialize extents */ 891 rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz); 892 893 return rc; 894 } 895 896 struct spdk_blob_load_ctx { 897 struct spdk_blob *blob; 898 899 struct spdk_blob_md_page *pages; 900 uint32_t num_pages; 901 spdk_bs_sequence_t *seq; 902 903 spdk_bs_sequence_cpl cb_fn; 904 void *cb_arg; 905 }; 906 907 static uint32_t 908 _spdk_blob_md_page_calc_crc(void *page) 909 { 910 uint32_t crc; 911 912 crc = BLOB_CRC32C_INITIAL; 913 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 914 crc ^= BLOB_CRC32C_INITIAL; 915 916 return crc; 917 918 } 919 920 static void 921 _spdk_blob_load_final(void *cb_arg, int bserrno) 922 { 923 struct spdk_blob_load_ctx *ctx = cb_arg; 924 struct spdk_blob *blob = ctx->blob; 925 926 if (bserrno == 0) { 927 _spdk_blob_mark_clean(blob); 928 } 929 930 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno); 931 932 /* Free the memory */ 933 spdk_free(ctx->pages); 934 free(ctx); 935 } 936 937 static void 938 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno) 939 { 940 struct spdk_blob_load_ctx *ctx = cb_arg; 941 struct spdk_blob *blob = ctx->blob; 942 943 if (bserrno == 0) { 944 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot); 945 if (blob->back_bs_dev == NULL) { 946 bserrno = -ENOMEM; 947 } 948 } 949 if (bserrno != 0) { 950 SPDK_ERRLOG("Snapshot fail\n"); 951 } 952 953 _spdk_blob_load_final(ctx, bserrno); 954 } 955 956 static void _spdk_blob_update_clear_method(struct spdk_blob *blob); 957 958 static void 959 _spdk_blob_load_backing_dev(void *cb_arg) 960 { 961 struct spdk_blob_load_ctx *ctx = cb_arg; 962 struct spdk_blob *blob = ctx->blob; 963 const void *value; 964 size_t len; 965 int rc; 966 967 if (spdk_blob_is_thin_provisioned(blob)) { 968 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true); 969 if (rc == 0) { 970 if (len != sizeof(spdk_blob_id)) { 971 _spdk_blob_load_final(ctx, -EINVAL); 972 return; 973 } 974 /* open snapshot blob and continue in the callback function */ 975 blob->parent_id = *(spdk_blob_id *)value; 976 spdk_bs_open_blob(blob->bs, blob->parent_id, 977 _spdk_blob_load_snapshot_cpl, ctx); 978 return; 979 } else { 980 /* add zeroes_dev for thin provisioned blob */ 981 blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 982 } 983 } else { 984 /* standard blob */ 985 blob->back_bs_dev = NULL; 986 } 987 _spdk_blob_load_final(ctx, 0); 988 } 989 990 static void 991 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 992 { 993 struct spdk_blob_load_ctx *ctx = cb_arg; 994 struct spdk_blob *blob = ctx->blob; 995 struct spdk_blob_md_page *page; 996 int rc; 997 uint32_t crc; 998 999 if (bserrno) { 1000 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno); 1001 _spdk_blob_load_final(ctx, bserrno); 1002 return; 1003 } 1004 1005 page = &ctx->pages[ctx->num_pages - 1]; 1006 crc = _spdk_blob_md_page_calc_crc(page); 1007 if (crc != page->crc) { 1008 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 1009 _spdk_blob_load_final(ctx, -EINVAL); 1010 return; 1011 } 1012 1013 if (page->next != SPDK_INVALID_MD_PAGE) { 1014 uint32_t next_page = page->next; 1015 uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page); 1016 1017 /* Read the next page */ 1018 ctx->num_pages++; 1019 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 1020 sizeof(*page)); 1021 if (ctx->pages == NULL) { 1022 _spdk_blob_load_final(ctx, -ENOMEM); 1023 return; 1024 } 1025 1026 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1], 1027 next_lba, 1028 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 1029 _spdk_blob_load_cpl, ctx); 1030 return; 1031 } 1032 1033 /* Parse the pages */ 1034 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 1035 if (rc) { 1036 _spdk_blob_load_final(ctx, rc); 1037 return; 1038 } 1039 ctx->seq = seq; 1040 1041 /* Check the clear_method stored in metadata vs what may have been passed 1042 * via spdk_bs_open_blob_ext() and update accordingly. 1043 */ 1044 _spdk_blob_update_clear_method(blob); 1045 1046 _spdk_blob_load_backing_dev(ctx); 1047 } 1048 1049 /* Load a blob from disk given a blobid */ 1050 static void 1051 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1052 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1053 { 1054 struct spdk_blob_load_ctx *ctx; 1055 struct spdk_blob_store *bs; 1056 uint32_t page_num; 1057 uint64_t lba; 1058 1059 _spdk_blob_verify_md_op(blob); 1060 1061 bs = blob->bs; 1062 1063 ctx = calloc(1, sizeof(*ctx)); 1064 if (!ctx) { 1065 cb_fn(seq, cb_arg, -ENOMEM); 1066 return; 1067 } 1068 1069 ctx->blob = blob; 1070 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE); 1071 if (!ctx->pages) { 1072 free(ctx); 1073 cb_fn(seq, cb_arg, -ENOMEM); 1074 return; 1075 } 1076 ctx->num_pages = 1; 1077 ctx->cb_fn = cb_fn; 1078 ctx->cb_arg = cb_arg; 1079 ctx->seq = seq; 1080 1081 page_num = _spdk_bs_blobid_to_page(blob->id); 1082 lba = _spdk_bs_md_page_to_lba(blob->bs, page_num); 1083 1084 blob->state = SPDK_BLOB_STATE_LOADING; 1085 1086 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1087 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 1088 _spdk_blob_load_cpl, ctx); 1089 } 1090 1091 struct spdk_blob_persist_ctx { 1092 struct spdk_blob *blob; 1093 1094 struct spdk_bs_super_block *super; 1095 1096 struct spdk_blob_md_page *pages; 1097 1098 spdk_bs_sequence_t *seq; 1099 spdk_bs_sequence_cpl cb_fn; 1100 void *cb_arg; 1101 }; 1102 1103 static void 1104 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba, 1105 uint32_t lba_count) 1106 { 1107 switch (ctx->blob->clear_method) { 1108 case BLOB_CLEAR_WITH_DEFAULT: 1109 case BLOB_CLEAR_WITH_UNMAP: 1110 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1111 break; 1112 case BLOB_CLEAR_WITH_WRITE_ZEROES: 1113 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1114 break; 1115 case BLOB_CLEAR_WITH_NONE: 1116 default: 1117 break; 1118 } 1119 } 1120 1121 static void 1122 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1123 { 1124 struct spdk_blob_persist_ctx *ctx = cb_arg; 1125 struct spdk_blob *blob = ctx->blob; 1126 1127 if (bserrno == 0) { 1128 _spdk_blob_mark_clean(blob); 1129 } 1130 1131 /* Call user callback */ 1132 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 1133 1134 /* Free the memory */ 1135 spdk_free(ctx->pages); 1136 free(ctx); 1137 } 1138 1139 static void 1140 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1141 { 1142 struct spdk_blob_persist_ctx *ctx = cb_arg; 1143 struct spdk_blob *blob = ctx->blob; 1144 struct spdk_blob_store *bs = blob->bs; 1145 size_t i; 1146 1147 /* Release all clusters that were truncated */ 1148 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1149 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 1150 1151 /* Nothing to release if it was not allocated */ 1152 if (blob->active.clusters[i] != 0) { 1153 _spdk_bs_release_cluster(bs, cluster_num); 1154 } 1155 } 1156 1157 if (blob->active.num_clusters == 0) { 1158 free(blob->active.clusters); 1159 blob->active.clusters = NULL; 1160 blob->active.cluster_array_size = 0; 1161 } else if (blob->active.num_clusters != blob->active.cluster_array_size) { 1162 #ifndef __clang_analyzer__ 1163 void *tmp; 1164 1165 /* scan-build really can't figure reallocs, workaround it */ 1166 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters); 1167 assert(tmp != NULL); 1168 blob->active.clusters = tmp; 1169 #endif 1170 blob->active.cluster_array_size = blob->active.num_clusters; 1171 } 1172 1173 _spdk_blob_persist_complete(seq, ctx, bserrno); 1174 } 1175 1176 static void 1177 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1178 { 1179 struct spdk_blob_persist_ctx *ctx = cb_arg; 1180 struct spdk_blob *blob = ctx->blob; 1181 struct spdk_blob_store *bs = blob->bs; 1182 spdk_bs_batch_t *batch; 1183 size_t i; 1184 uint64_t lba; 1185 uint32_t lba_count; 1186 1187 /* Clusters don't move around in blobs. The list shrinks or grows 1188 * at the end, but no changes ever occur in the middle of the list. 1189 */ 1190 1191 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx); 1192 1193 /* Clear all clusters that were truncated */ 1194 lba = 0; 1195 lba_count = 0; 1196 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1197 uint64_t next_lba = blob->active.clusters[i]; 1198 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 1199 1200 if (next_lba > 0 && (lba + lba_count) == next_lba) { 1201 /* This cluster is contiguous with the previous one. */ 1202 lba_count += next_lba_count; 1203 continue; 1204 } 1205 1206 /* This cluster is not contiguous with the previous one. */ 1207 1208 /* If a run of LBAs previously existing, clear them now */ 1209 if (lba_count > 0) { 1210 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1211 } 1212 1213 /* Start building the next batch */ 1214 lba = next_lba; 1215 if (next_lba > 0) { 1216 lba_count = next_lba_count; 1217 } else { 1218 lba_count = 0; 1219 } 1220 } 1221 1222 /* If we ended with a contiguous set of LBAs, clear them now */ 1223 if (lba_count > 0) { 1224 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1225 } 1226 1227 spdk_bs_batch_close(batch); 1228 } 1229 1230 static void 1231 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1232 { 1233 struct spdk_blob_persist_ctx *ctx = cb_arg; 1234 struct spdk_blob *blob = ctx->blob; 1235 struct spdk_blob_store *bs = blob->bs; 1236 size_t i; 1237 1238 /* This loop starts at 1 because the first page is special and handled 1239 * below. The pages (except the first) are never written in place, 1240 * so any pages in the clean list must be zeroed. 1241 */ 1242 for (i = 1; i < blob->clean.num_pages; i++) { 1243 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 1244 } 1245 1246 if (blob->active.num_pages == 0) { 1247 uint32_t page_num; 1248 1249 page_num = _spdk_bs_blobid_to_page(blob->id); 1250 spdk_bit_array_clear(bs->used_md_pages, page_num); 1251 } 1252 1253 /* Move on to clearing clusters */ 1254 _spdk_blob_persist_clear_clusters(seq, ctx, 0); 1255 } 1256 1257 static void 1258 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1259 { 1260 struct spdk_blob_persist_ctx *ctx = cb_arg; 1261 struct spdk_blob *blob = ctx->blob; 1262 struct spdk_blob_store *bs = blob->bs; 1263 uint64_t lba; 1264 uint32_t lba_count; 1265 spdk_bs_batch_t *batch; 1266 size_t i; 1267 1268 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 1269 1270 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 1271 1272 /* This loop starts at 1 because the first page is special and handled 1273 * below. The pages (except the first) are never written in place, 1274 * so any pages in the clean list must be zeroed. 1275 */ 1276 for (i = 1; i < blob->clean.num_pages; i++) { 1277 lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]); 1278 1279 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1280 } 1281 1282 /* The first page will only be zeroed if this is a delete. */ 1283 if (blob->active.num_pages == 0) { 1284 uint32_t page_num; 1285 1286 /* The first page in the metadata goes where the blobid indicates */ 1287 page_num = _spdk_bs_blobid_to_page(blob->id); 1288 lba = _spdk_bs_md_page_to_lba(bs, page_num); 1289 1290 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1291 } 1292 1293 spdk_bs_batch_close(batch); 1294 } 1295 1296 static void 1297 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1298 { 1299 struct spdk_blob_persist_ctx *ctx = cb_arg; 1300 struct spdk_blob *blob = ctx->blob; 1301 struct spdk_blob_store *bs = blob->bs; 1302 uint64_t lba; 1303 uint32_t lba_count; 1304 struct spdk_blob_md_page *page; 1305 1306 if (blob->active.num_pages == 0) { 1307 /* Move on to the next step */ 1308 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1309 return; 1310 } 1311 1312 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1313 1314 page = &ctx->pages[0]; 1315 /* The first page in the metadata goes where the blobid indicates */ 1316 lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id)); 1317 1318 spdk_bs_sequence_write_dev(seq, page, lba, lba_count, 1319 _spdk_blob_persist_zero_pages, ctx); 1320 } 1321 1322 static void 1323 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1324 { 1325 struct spdk_blob_persist_ctx *ctx = cb_arg; 1326 struct spdk_blob *blob = ctx->blob; 1327 struct spdk_blob_store *bs = blob->bs; 1328 uint64_t lba; 1329 uint32_t lba_count; 1330 struct spdk_blob_md_page *page; 1331 spdk_bs_batch_t *batch; 1332 size_t i; 1333 1334 /* Clusters don't move around in blobs. The list shrinks or grows 1335 * at the end, but no changes ever occur in the middle of the list. 1336 */ 1337 1338 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1339 1340 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 1341 1342 /* This starts at 1. The root page is not written until 1343 * all of the others are finished 1344 */ 1345 for (i = 1; i < blob->active.num_pages; i++) { 1346 page = &ctx->pages[i]; 1347 assert(page->sequence_num == i); 1348 1349 lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]); 1350 1351 spdk_bs_batch_write_dev(batch, page, lba, lba_count); 1352 } 1353 1354 spdk_bs_batch_close(batch); 1355 } 1356 1357 static int 1358 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz) 1359 { 1360 uint64_t i; 1361 uint64_t *tmp; 1362 uint64_t lfc; /* lowest free cluster */ 1363 uint64_t num_clusters; 1364 struct spdk_blob_store *bs; 1365 1366 bs = blob->bs; 1367 1368 _spdk_blob_verify_md_op(blob); 1369 1370 if (blob->active.num_clusters == sz) { 1371 return 0; 1372 } 1373 1374 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1375 /* If this blob was resized to be larger, then smaller, then 1376 * larger without syncing, then the cluster array already 1377 * contains spare assigned clusters we can use. 1378 */ 1379 num_clusters = spdk_min(blob->active.cluster_array_size, 1380 sz); 1381 } else { 1382 num_clusters = blob->active.num_clusters; 1383 } 1384 1385 /* Do two passes - one to verify that we can obtain enough clusters 1386 * and another to actually claim them. 1387 */ 1388 1389 if (spdk_blob_is_thin_provisioned(blob) == false) { 1390 lfc = 0; 1391 for (i = num_clusters; i < sz; i++) { 1392 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1393 if (lfc == UINT32_MAX) { 1394 /* No more free clusters. Cannot satisfy the request */ 1395 return -ENOSPC; 1396 } 1397 lfc++; 1398 } 1399 } 1400 1401 if (sz > num_clusters) { 1402 /* Expand the cluster array if necessary. 1403 * We only shrink the array when persisting. 1404 */ 1405 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz); 1406 if (sz > 0 && tmp == NULL) { 1407 return -ENOMEM; 1408 } 1409 memset(tmp + blob->active.cluster_array_size, 0, 1410 sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); 1411 blob->active.clusters = tmp; 1412 blob->active.cluster_array_size = sz; 1413 } 1414 1415 blob->state = SPDK_BLOB_STATE_DIRTY; 1416 1417 if (spdk_blob_is_thin_provisioned(blob) == false) { 1418 lfc = 0; 1419 for (i = num_clusters; i < sz; i++) { 1420 _spdk_bs_allocate_cluster(blob, i, &lfc, true); 1421 lfc++; 1422 } 1423 } 1424 1425 blob->active.num_clusters = sz; 1426 1427 return 0; 1428 } 1429 1430 static void 1431 _spdk_blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) 1432 { 1433 spdk_bs_sequence_t *seq = ctx->seq; 1434 struct spdk_blob *blob = ctx->blob; 1435 struct spdk_blob_store *bs = blob->bs; 1436 uint64_t i; 1437 uint32_t page_num; 1438 void *tmp; 1439 int rc; 1440 1441 /* Generate the new metadata */ 1442 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1443 if (rc < 0) { 1444 _spdk_blob_persist_complete(seq, ctx, rc); 1445 return; 1446 } 1447 1448 assert(blob->active.num_pages >= 1); 1449 1450 /* Resize the cache of page indices */ 1451 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 1452 if (!tmp) { 1453 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1454 return; 1455 } 1456 blob->active.pages = tmp; 1457 1458 /* Assign this metadata to pages. This requires two passes - 1459 * one to verify that there are enough pages and a second 1460 * to actually claim them. */ 1461 page_num = 0; 1462 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1463 for (i = 1; i < blob->active.num_pages; i++) { 1464 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1465 if (page_num == UINT32_MAX) { 1466 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1467 return; 1468 } 1469 page_num++; 1470 } 1471 1472 page_num = 0; 1473 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1474 for (i = 1; i < blob->active.num_pages; i++) { 1475 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1476 ctx->pages[i - 1].next = page_num; 1477 /* Now that previous metadata page is complete, calculate the crc for it. */ 1478 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1479 blob->active.pages[i] = page_num; 1480 spdk_bit_array_set(bs->used_md_pages, page_num); 1481 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1482 page_num++; 1483 } 1484 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1485 /* Start writing the metadata from last page to first */ 1486 blob->state = SPDK_BLOB_STATE_CLEAN; 1487 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1488 } 1489 1490 static void 1491 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx) 1492 { 1493 spdk_bs_sequence_t *seq = ctx->seq; 1494 struct spdk_blob *blob = ctx->blob; 1495 1496 if (blob->active.num_pages == 0) { 1497 /* This is the signal that the blob should be deleted. 1498 * Immediately jump to the clean up routine. */ 1499 assert(blob->clean.num_pages > 0); 1500 blob->state = SPDK_BLOB_STATE_CLEAN; 1501 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1502 return; 1503 1504 } 1505 1506 _spdk_blob_persist_generate_new_md(ctx); 1507 } 1508 1509 static void 1510 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1511 { 1512 struct spdk_blob_persist_ctx *ctx = cb_arg; 1513 1514 ctx->blob->bs->clean = 0; 1515 1516 spdk_free(ctx->super); 1517 1518 _spdk_blob_persist_start(ctx); 1519 } 1520 1521 static void 1522 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1523 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg); 1524 1525 1526 static void 1527 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1528 { 1529 struct spdk_blob_persist_ctx *ctx = cb_arg; 1530 1531 ctx->super->clean = 0; 1532 if (ctx->super->size == 0) { 1533 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen; 1534 } 1535 1536 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx); 1537 } 1538 1539 1540 /* Write a blob to disk */ 1541 static void 1542 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1543 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1544 { 1545 struct spdk_blob_persist_ctx *ctx; 1546 1547 _spdk_blob_verify_md_op(blob); 1548 1549 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1550 cb_fn(seq, cb_arg, 0); 1551 return; 1552 } 1553 1554 ctx = calloc(1, sizeof(*ctx)); 1555 if (!ctx) { 1556 cb_fn(seq, cb_arg, -ENOMEM); 1557 return; 1558 } 1559 ctx->blob = blob; 1560 ctx->seq = seq; 1561 ctx->cb_fn = cb_fn; 1562 ctx->cb_arg = cb_arg; 1563 1564 if (blob->bs->clean) { 1565 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 1566 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1567 if (!ctx->super) { 1568 cb_fn(seq, cb_arg, -ENOMEM); 1569 free(ctx); 1570 return; 1571 } 1572 1573 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0), 1574 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)), 1575 _spdk_blob_persist_dirty, ctx); 1576 } else { 1577 _spdk_blob_persist_start(ctx); 1578 } 1579 } 1580 1581 struct spdk_blob_copy_cluster_ctx { 1582 struct spdk_blob *blob; 1583 uint8_t *buf; 1584 uint64_t page; 1585 uint64_t new_cluster; 1586 spdk_bs_sequence_t *seq; 1587 }; 1588 1589 static void 1590 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno) 1591 { 1592 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1593 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq; 1594 TAILQ_HEAD(, spdk_bs_request_set) requests; 1595 spdk_bs_user_op_t *op; 1596 1597 TAILQ_INIT(&requests); 1598 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link); 1599 1600 while (!TAILQ_EMPTY(&requests)) { 1601 op = TAILQ_FIRST(&requests); 1602 TAILQ_REMOVE(&requests, op, link); 1603 if (bserrno == 0) { 1604 spdk_bs_user_op_execute(op); 1605 } else { 1606 spdk_bs_user_op_abort(op); 1607 } 1608 } 1609 1610 spdk_free(ctx->buf); 1611 free(ctx); 1612 } 1613 1614 static void 1615 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno) 1616 { 1617 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1618 1619 if (bserrno) { 1620 if (bserrno == -EEXIST) { 1621 /* The metadata insert failed because another thread 1622 * allocated the cluster first. Free our cluster 1623 * but continue without error. */ 1624 bserrno = 0; 1625 } 1626 _spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster); 1627 } 1628 1629 spdk_bs_sequence_finish(ctx->seq, bserrno); 1630 } 1631 1632 static void 1633 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1634 { 1635 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1636 uint32_t cluster_number; 1637 1638 if (bserrno) { 1639 /* The write failed, so jump to the final completion handler */ 1640 spdk_bs_sequence_finish(seq, bserrno); 1641 return; 1642 } 1643 1644 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page); 1645 1646 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1647 _spdk_blob_insert_cluster_cpl, ctx); 1648 } 1649 1650 static void 1651 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1652 { 1653 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1654 1655 if (bserrno != 0) { 1656 /* The read failed, so jump to the final completion handler */ 1657 spdk_bs_sequence_finish(seq, bserrno); 1658 return; 1659 } 1660 1661 /* Write whole cluster */ 1662 spdk_bs_sequence_write_dev(seq, ctx->buf, 1663 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster), 1664 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1), 1665 _spdk_blob_write_copy_cpl, ctx); 1666 } 1667 1668 static void 1669 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob, 1670 struct spdk_io_channel *_ch, 1671 uint64_t io_unit, spdk_bs_user_op_t *op) 1672 { 1673 struct spdk_bs_cpl cpl; 1674 struct spdk_bs_channel *ch; 1675 struct spdk_blob_copy_cluster_ctx *ctx; 1676 uint32_t cluster_start_page; 1677 uint32_t cluster_number; 1678 int rc; 1679 1680 ch = spdk_io_channel_get_ctx(_ch); 1681 1682 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) { 1683 /* There are already operations pending. Queue this user op 1684 * and return because it will be re-executed when the outstanding 1685 * cluster allocation completes. */ 1686 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1687 return; 1688 } 1689 1690 /* Round the io_unit offset down to the first page in the cluster */ 1691 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit); 1692 1693 /* Calculate which index in the metadata cluster array the corresponding 1694 * cluster is supposed to be at. */ 1695 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit); 1696 1697 ctx = calloc(1, sizeof(*ctx)); 1698 if (!ctx) { 1699 spdk_bs_user_op_abort(op); 1700 return; 1701 } 1702 1703 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0); 1704 1705 ctx->blob = blob; 1706 ctx->page = cluster_start_page; 1707 1708 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1709 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, 1710 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1711 if (!ctx->buf) { 1712 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n", 1713 blob->bs->cluster_sz); 1714 free(ctx); 1715 spdk_bs_user_op_abort(op); 1716 return; 1717 } 1718 } 1719 1720 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false); 1721 if (rc != 0) { 1722 spdk_free(ctx->buf); 1723 free(ctx); 1724 spdk_bs_user_op_abort(op); 1725 return; 1726 } 1727 1728 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1729 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl; 1730 cpl.u.blob_basic.cb_arg = ctx; 1731 1732 ctx->seq = spdk_bs_sequence_start(_ch, &cpl); 1733 if (!ctx->seq) { 1734 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster); 1735 spdk_free(ctx->buf); 1736 free(ctx); 1737 spdk_bs_user_op_abort(op); 1738 return; 1739 } 1740 1741 /* Queue the user op to block other incoming operations */ 1742 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1743 1744 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1745 /* Read cluster from backing device */ 1746 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf, 1747 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page), 1748 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz), 1749 _spdk_blob_write_copy, ctx); 1750 } else { 1751 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1752 _spdk_blob_insert_cluster_cpl, ctx); 1753 } 1754 } 1755 1756 static void 1757 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length, 1758 uint64_t *lba, uint32_t *lba_count) 1759 { 1760 *lba_count = length; 1761 1762 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) { 1763 assert(blob->back_bs_dev != NULL); 1764 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit); 1765 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count); 1766 } else { 1767 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit); 1768 } 1769 } 1770 1771 struct op_split_ctx { 1772 struct spdk_blob *blob; 1773 struct spdk_io_channel *channel; 1774 uint64_t io_unit_offset; 1775 uint64_t io_units_remaining; 1776 void *curr_payload; 1777 enum spdk_blob_op_type op_type; 1778 spdk_bs_sequence_t *seq; 1779 }; 1780 1781 static void 1782 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) 1783 { 1784 struct op_split_ctx *ctx = cb_arg; 1785 struct spdk_blob *blob = ctx->blob; 1786 struct spdk_io_channel *ch = ctx->channel; 1787 enum spdk_blob_op_type op_type = ctx->op_type; 1788 uint8_t *buf = ctx->curr_payload; 1789 uint64_t offset = ctx->io_unit_offset; 1790 uint64_t length = ctx->io_units_remaining; 1791 uint64_t op_length; 1792 1793 if (bserrno != 0 || ctx->io_units_remaining == 0) { 1794 spdk_bs_sequence_finish(ctx->seq, bserrno); 1795 free(ctx); 1796 return; 1797 } 1798 1799 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob, 1800 offset)); 1801 1802 /* Update length and payload for next operation */ 1803 ctx->io_units_remaining -= op_length; 1804 ctx->io_unit_offset += op_length; 1805 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1806 ctx->curr_payload += op_length * blob->bs->io_unit_size; 1807 } 1808 1809 switch (op_type) { 1810 case SPDK_BLOB_READ: 1811 spdk_blob_io_read(blob, ch, buf, offset, op_length, 1812 _spdk_blob_request_submit_op_split_next, ctx); 1813 break; 1814 case SPDK_BLOB_WRITE: 1815 spdk_blob_io_write(blob, ch, buf, offset, op_length, 1816 _spdk_blob_request_submit_op_split_next, ctx); 1817 break; 1818 case SPDK_BLOB_UNMAP: 1819 spdk_blob_io_unmap(blob, ch, offset, op_length, 1820 _spdk_blob_request_submit_op_split_next, ctx); 1821 break; 1822 case SPDK_BLOB_WRITE_ZEROES: 1823 spdk_blob_io_write_zeroes(blob, ch, offset, op_length, 1824 _spdk_blob_request_submit_op_split_next, ctx); 1825 break; 1826 case SPDK_BLOB_READV: 1827 case SPDK_BLOB_WRITEV: 1828 SPDK_ERRLOG("readv/write not valid\n"); 1829 spdk_bs_sequence_finish(ctx->seq, -EINVAL); 1830 free(ctx); 1831 break; 1832 } 1833 } 1834 1835 static void 1836 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, 1837 void *payload, uint64_t offset, uint64_t length, 1838 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1839 { 1840 struct op_split_ctx *ctx; 1841 spdk_bs_sequence_t *seq; 1842 struct spdk_bs_cpl cpl; 1843 1844 assert(blob != NULL); 1845 1846 ctx = calloc(1, sizeof(struct op_split_ctx)); 1847 if (ctx == NULL) { 1848 cb_fn(cb_arg, -ENOMEM); 1849 return; 1850 } 1851 1852 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1853 cpl.u.blob_basic.cb_fn = cb_fn; 1854 cpl.u.blob_basic.cb_arg = cb_arg; 1855 1856 seq = spdk_bs_sequence_start(ch, &cpl); 1857 if (!seq) { 1858 free(ctx); 1859 cb_fn(cb_arg, -ENOMEM); 1860 return; 1861 } 1862 1863 ctx->blob = blob; 1864 ctx->channel = ch; 1865 ctx->curr_payload = payload; 1866 ctx->io_unit_offset = offset; 1867 ctx->io_units_remaining = length; 1868 ctx->op_type = op_type; 1869 ctx->seq = seq; 1870 1871 _spdk_blob_request_submit_op_split_next(ctx, 0); 1872 } 1873 1874 static void 1875 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob, 1876 void *payload, uint64_t offset, uint64_t length, 1877 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1878 { 1879 struct spdk_bs_cpl cpl; 1880 uint64_t lba; 1881 uint32_t lba_count; 1882 1883 assert(blob != NULL); 1884 1885 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1886 cpl.u.blob_basic.cb_fn = cb_fn; 1887 cpl.u.blob_basic.cb_arg = cb_arg; 1888 1889 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 1890 1891 if (blob->frozen_refcnt) { 1892 /* This blob I/O is frozen */ 1893 spdk_bs_user_op_t *op; 1894 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); 1895 1896 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1897 if (!op) { 1898 cb_fn(cb_arg, -ENOMEM); 1899 return; 1900 } 1901 1902 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 1903 1904 return; 1905 } 1906 1907 switch (op_type) { 1908 case SPDK_BLOB_READ: { 1909 spdk_bs_batch_t *batch; 1910 1911 batch = spdk_bs_batch_open(_ch, &cpl); 1912 if (!batch) { 1913 cb_fn(cb_arg, -ENOMEM); 1914 return; 1915 } 1916 1917 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1918 /* Read from the blob */ 1919 spdk_bs_batch_read_dev(batch, payload, lba, lba_count); 1920 } else { 1921 /* Read from the backing block device */ 1922 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count); 1923 } 1924 1925 spdk_bs_batch_close(batch); 1926 break; 1927 } 1928 case SPDK_BLOB_WRITE: 1929 case SPDK_BLOB_WRITE_ZEROES: { 1930 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1931 /* Write to the blob */ 1932 spdk_bs_batch_t *batch; 1933 1934 if (lba_count == 0) { 1935 cb_fn(cb_arg, 0); 1936 return; 1937 } 1938 1939 batch = spdk_bs_batch_open(_ch, &cpl); 1940 if (!batch) { 1941 cb_fn(cb_arg, -ENOMEM); 1942 return; 1943 } 1944 1945 if (op_type == SPDK_BLOB_WRITE) { 1946 spdk_bs_batch_write_dev(batch, payload, lba, lba_count); 1947 } else { 1948 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1949 } 1950 1951 spdk_bs_batch_close(batch); 1952 } else { 1953 /* Queue this operation and allocate the cluster */ 1954 spdk_bs_user_op_t *op; 1955 1956 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1957 if (!op) { 1958 cb_fn(cb_arg, -ENOMEM); 1959 return; 1960 } 1961 1962 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op); 1963 } 1964 break; 1965 } 1966 case SPDK_BLOB_UNMAP: { 1967 spdk_bs_batch_t *batch; 1968 1969 batch = spdk_bs_batch_open(_ch, &cpl); 1970 if (!batch) { 1971 cb_fn(cb_arg, -ENOMEM); 1972 return; 1973 } 1974 1975 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1976 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1977 } 1978 1979 spdk_bs_batch_close(batch); 1980 break; 1981 } 1982 case SPDK_BLOB_READV: 1983 case SPDK_BLOB_WRITEV: 1984 SPDK_ERRLOG("readv/write not valid\n"); 1985 cb_fn(cb_arg, -EINVAL); 1986 break; 1987 } 1988 } 1989 1990 static void 1991 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1992 void *payload, uint64_t offset, uint64_t length, 1993 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1994 { 1995 assert(blob != NULL); 1996 1997 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1998 cb_fn(cb_arg, -EPERM); 1999 return; 2000 } 2001 2002 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2003 cb_fn(cb_arg, -EINVAL); 2004 return; 2005 } 2006 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) { 2007 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length, 2008 cb_fn, cb_arg, op_type); 2009 } else { 2010 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length, 2011 cb_fn, cb_arg, op_type); 2012 } 2013 } 2014 2015 struct rw_iov_ctx { 2016 struct spdk_blob *blob; 2017 struct spdk_io_channel *channel; 2018 spdk_blob_op_complete cb_fn; 2019 void *cb_arg; 2020 bool read; 2021 int iovcnt; 2022 struct iovec *orig_iov; 2023 uint64_t io_unit_offset; 2024 uint64_t io_units_remaining; 2025 uint64_t io_units_done; 2026 struct iovec iov[0]; 2027 }; 2028 2029 static void 2030 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2031 { 2032 assert(cb_arg == NULL); 2033 spdk_bs_sequence_finish(seq, bserrno); 2034 } 2035 2036 static void 2037 _spdk_rw_iov_split_next(void *cb_arg, int bserrno) 2038 { 2039 struct rw_iov_ctx *ctx = cb_arg; 2040 struct spdk_blob *blob = ctx->blob; 2041 struct iovec *iov, *orig_iov; 2042 int iovcnt; 2043 size_t orig_iovoff; 2044 uint64_t io_units_count, io_units_to_boundary, io_unit_offset; 2045 uint64_t byte_count; 2046 2047 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2048 ctx->cb_fn(ctx->cb_arg, bserrno); 2049 free(ctx); 2050 return; 2051 } 2052 2053 io_unit_offset = ctx->io_unit_offset; 2054 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset); 2055 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary); 2056 /* 2057 * Get index and offset into the original iov array for our current position in the I/O sequence. 2058 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 2059 * point to the current position in the I/O sequence. 2060 */ 2061 byte_count = ctx->io_units_done * blob->bs->io_unit_size; 2062 orig_iov = &ctx->orig_iov[0]; 2063 orig_iovoff = 0; 2064 while (byte_count > 0) { 2065 if (byte_count >= orig_iov->iov_len) { 2066 byte_count -= orig_iov->iov_len; 2067 orig_iov++; 2068 } else { 2069 orig_iovoff = byte_count; 2070 byte_count = 0; 2071 } 2072 } 2073 2074 /* 2075 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 2076 * bytes of this next I/O remain to be accounted for in the new iov array. 2077 */ 2078 byte_count = io_units_count * blob->bs->io_unit_size; 2079 iov = &ctx->iov[0]; 2080 iovcnt = 0; 2081 while (byte_count > 0) { 2082 assert(iovcnt < ctx->iovcnt); 2083 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 2084 iov->iov_base = orig_iov->iov_base + orig_iovoff; 2085 byte_count -= iov->iov_len; 2086 orig_iovoff = 0; 2087 orig_iov++; 2088 iov++; 2089 iovcnt++; 2090 } 2091 2092 ctx->io_unit_offset += io_units_count; 2093 ctx->io_units_remaining -= io_units_count; 2094 ctx->io_units_done += io_units_count; 2095 iov = &ctx->iov[0]; 2096 2097 if (ctx->read) { 2098 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2099 io_units_count, _spdk_rw_iov_split_next, ctx); 2100 } else { 2101 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2102 io_units_count, _spdk_rw_iov_split_next, ctx); 2103 } 2104 } 2105 2106 static void 2107 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2108 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2109 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 2110 { 2111 struct spdk_bs_cpl cpl; 2112 2113 assert(blob != NULL); 2114 2115 if (!read && blob->data_ro) { 2116 cb_fn(cb_arg, -EPERM); 2117 return; 2118 } 2119 2120 if (length == 0) { 2121 cb_fn(cb_arg, 0); 2122 return; 2123 } 2124 2125 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2126 cb_fn(cb_arg, -EINVAL); 2127 return; 2128 } 2129 2130 /* 2131 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 2132 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 2133 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 2134 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 2135 * to allocate a separate iov array and split the I/O such that none of the resulting 2136 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 2137 * but since this case happens very infrequently, any performance impact will be negligible. 2138 * 2139 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 2140 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 2141 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 2142 * when the batch was completed, to allow for freeing the memory for the iov arrays. 2143 */ 2144 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) { 2145 uint32_t lba_count; 2146 uint64_t lba; 2147 2148 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2149 cpl.u.blob_basic.cb_fn = cb_fn; 2150 cpl.u.blob_basic.cb_arg = cb_arg; 2151 2152 if (blob->frozen_refcnt) { 2153 /* This blob I/O is frozen */ 2154 enum spdk_blob_op_type op_type; 2155 spdk_bs_user_op_t *op; 2156 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); 2157 2158 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV; 2159 op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length); 2160 if (!op) { 2161 cb_fn(cb_arg, -ENOMEM); 2162 return; 2163 } 2164 2165 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2166 2167 return; 2168 } 2169 2170 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2171 2172 if (read) { 2173 spdk_bs_sequence_t *seq; 2174 2175 seq = spdk_bs_sequence_start(_channel, &cpl); 2176 if (!seq) { 2177 cb_fn(cb_arg, -ENOMEM); 2178 return; 2179 } 2180 2181 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2182 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2183 } else { 2184 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count, 2185 _spdk_rw_iov_done, NULL); 2186 } 2187 } else { 2188 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2189 spdk_bs_sequence_t *seq; 2190 2191 seq = spdk_bs_sequence_start(_channel, &cpl); 2192 if (!seq) { 2193 cb_fn(cb_arg, -ENOMEM); 2194 return; 2195 } 2196 2197 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2198 } else { 2199 /* Queue this operation and allocate the cluster */ 2200 spdk_bs_user_op_t *op; 2201 2202 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, 2203 length); 2204 if (!op) { 2205 cb_fn(cb_arg, -ENOMEM); 2206 return; 2207 } 2208 2209 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op); 2210 } 2211 } 2212 } else { 2213 struct rw_iov_ctx *ctx; 2214 2215 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 2216 if (ctx == NULL) { 2217 cb_fn(cb_arg, -ENOMEM); 2218 return; 2219 } 2220 2221 ctx->blob = blob; 2222 ctx->channel = _channel; 2223 ctx->cb_fn = cb_fn; 2224 ctx->cb_arg = cb_arg; 2225 ctx->read = read; 2226 ctx->orig_iov = iov; 2227 ctx->iovcnt = iovcnt; 2228 ctx->io_unit_offset = offset; 2229 ctx->io_units_remaining = length; 2230 ctx->io_units_done = 0; 2231 2232 _spdk_rw_iov_split_next(ctx, 0); 2233 } 2234 } 2235 2236 static struct spdk_blob * 2237 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 2238 { 2239 struct spdk_blob *blob; 2240 2241 TAILQ_FOREACH(blob, &bs->blobs, link) { 2242 if (blob->id == blobid) { 2243 return blob; 2244 } 2245 } 2246 2247 return NULL; 2248 } 2249 2250 static void 2251 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob, 2252 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry) 2253 { 2254 assert(blob != NULL); 2255 *snapshot_entry = NULL; 2256 *clone_entry = NULL; 2257 2258 if (blob->parent_id == SPDK_BLOBID_INVALID) { 2259 return; 2260 } 2261 2262 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) { 2263 if ((*snapshot_entry)->id == blob->parent_id) { 2264 break; 2265 } 2266 } 2267 2268 if (*snapshot_entry != NULL) { 2269 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) { 2270 if ((*clone_entry)->id == blob->id) { 2271 break; 2272 } 2273 } 2274 2275 assert(clone_entry != NULL); 2276 } 2277 } 2278 2279 static int 2280 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 2281 { 2282 struct spdk_blob_store *bs = io_device; 2283 struct spdk_bs_channel *channel = ctx_buf; 2284 struct spdk_bs_dev *dev; 2285 uint32_t max_ops = bs->max_channel_ops; 2286 uint32_t i; 2287 2288 dev = bs->dev; 2289 2290 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 2291 if (!channel->req_mem) { 2292 return -1; 2293 } 2294 2295 TAILQ_INIT(&channel->reqs); 2296 2297 for (i = 0; i < max_ops; i++) { 2298 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 2299 } 2300 2301 channel->bs = bs; 2302 channel->dev = dev; 2303 channel->dev_channel = dev->create_channel(dev); 2304 2305 if (!channel->dev_channel) { 2306 SPDK_ERRLOG("Failed to create device channel.\n"); 2307 free(channel->req_mem); 2308 return -1; 2309 } 2310 2311 TAILQ_INIT(&channel->need_cluster_alloc); 2312 TAILQ_INIT(&channel->queued_io); 2313 2314 return 0; 2315 } 2316 2317 static void 2318 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 2319 { 2320 struct spdk_bs_channel *channel = ctx_buf; 2321 spdk_bs_user_op_t *op; 2322 2323 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) { 2324 op = TAILQ_FIRST(&channel->need_cluster_alloc); 2325 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link); 2326 spdk_bs_user_op_abort(op); 2327 } 2328 2329 while (!TAILQ_EMPTY(&channel->queued_io)) { 2330 op = TAILQ_FIRST(&channel->queued_io); 2331 TAILQ_REMOVE(&channel->queued_io, op, link); 2332 spdk_bs_user_op_abort(op); 2333 } 2334 2335 free(channel->req_mem); 2336 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 2337 } 2338 2339 static void 2340 _spdk_bs_dev_destroy(void *io_device) 2341 { 2342 struct spdk_blob_store *bs = io_device; 2343 struct spdk_blob *blob, *blob_tmp; 2344 2345 bs->dev->destroy(bs->dev); 2346 2347 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 2348 TAILQ_REMOVE(&bs->blobs, blob, link); 2349 _spdk_blob_free(blob); 2350 } 2351 2352 pthread_mutex_destroy(&bs->used_clusters_mutex); 2353 2354 spdk_bit_array_free(&bs->used_blobids); 2355 spdk_bit_array_free(&bs->used_md_pages); 2356 spdk_bit_array_free(&bs->used_clusters); 2357 /* 2358 * If this function is called for any reason except a successful unload, 2359 * the unload_cpl type will be NONE and this will be a nop. 2360 */ 2361 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 2362 2363 free(bs); 2364 } 2365 2366 static int 2367 _spdk_bs_blob_list_add(struct spdk_blob *blob) 2368 { 2369 spdk_blob_id snapshot_id; 2370 struct spdk_blob_list *snapshot_entry = NULL; 2371 struct spdk_blob_list *clone_entry = NULL; 2372 2373 assert(blob != NULL); 2374 2375 snapshot_id = blob->parent_id; 2376 if (snapshot_id == SPDK_BLOBID_INVALID) { 2377 return 0; 2378 } 2379 2380 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id); 2381 if (snapshot_entry == NULL) { 2382 /* Snapshot not found */ 2383 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list)); 2384 if (snapshot_entry == NULL) { 2385 return -ENOMEM; 2386 } 2387 snapshot_entry->id = snapshot_id; 2388 TAILQ_INIT(&snapshot_entry->clones); 2389 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link); 2390 } else { 2391 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 2392 if (clone_entry->id == blob->id) { 2393 break; 2394 } 2395 } 2396 } 2397 2398 if (clone_entry == NULL) { 2399 /* Clone not found */ 2400 clone_entry = calloc(1, sizeof(struct spdk_blob_list)); 2401 if (clone_entry == NULL) { 2402 return -ENOMEM; 2403 } 2404 clone_entry->id = blob->id; 2405 TAILQ_INIT(&clone_entry->clones); 2406 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link); 2407 snapshot_entry->clone_count++; 2408 } 2409 2410 return 0; 2411 } 2412 2413 static void 2414 _spdk_bs_blob_list_remove(struct spdk_blob *blob) 2415 { 2416 struct spdk_blob_list *snapshot_entry = NULL; 2417 struct spdk_blob_list *clone_entry = NULL; 2418 2419 _spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry); 2420 2421 if (snapshot_entry == NULL) { 2422 return; 2423 } 2424 2425 blob->parent_id = SPDK_BLOBID_INVALID; 2426 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2427 free(clone_entry); 2428 2429 snapshot_entry->clone_count--; 2430 } 2431 2432 static int 2433 _spdk_bs_blob_list_free(struct spdk_blob_store *bs) 2434 { 2435 struct spdk_blob_list *snapshot_entry; 2436 struct spdk_blob_list *snapshot_entry_tmp; 2437 struct spdk_blob_list *clone_entry; 2438 struct spdk_blob_list *clone_entry_tmp; 2439 2440 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) { 2441 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) { 2442 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2443 free(clone_entry); 2444 } 2445 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link); 2446 free(snapshot_entry); 2447 } 2448 2449 return 0; 2450 } 2451 2452 static void 2453 _spdk_bs_free(struct spdk_blob_store *bs) 2454 { 2455 _spdk_bs_blob_list_free(bs); 2456 2457 spdk_bs_unregister_md_thread(bs); 2458 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 2459 } 2460 2461 void 2462 spdk_bs_opts_init(struct spdk_bs_opts *opts) 2463 { 2464 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 2465 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 2466 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 2467 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 2468 opts->clear_method = BS_CLEAR_WITH_UNMAP; 2469 memset(&opts->bstype, 0, sizeof(opts->bstype)); 2470 opts->iter_cb_fn = NULL; 2471 opts->iter_cb_arg = NULL; 2472 } 2473 2474 static int 2475 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 2476 { 2477 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 2478 opts->max_channel_ops == 0) { 2479 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 2480 return -1; 2481 } 2482 2483 return 0; 2484 } 2485 2486 static int 2487 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs) 2488 { 2489 struct spdk_blob_store *bs; 2490 uint64_t dev_size; 2491 int rc; 2492 2493 dev_size = dev->blocklen * dev->blockcnt; 2494 if (dev_size < opts->cluster_sz) { 2495 /* Device size cannot be smaller than cluster size of blobstore */ 2496 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n", 2497 dev_size, opts->cluster_sz); 2498 return -ENOSPC; 2499 } 2500 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 2501 /* Cluster size cannot be smaller than page size */ 2502 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n", 2503 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 2504 return -EINVAL; 2505 } 2506 bs = calloc(1, sizeof(struct spdk_blob_store)); 2507 if (!bs) { 2508 return -ENOMEM; 2509 } 2510 2511 TAILQ_INIT(&bs->blobs); 2512 TAILQ_INIT(&bs->snapshots); 2513 bs->dev = dev; 2514 bs->md_thread = spdk_get_thread(); 2515 assert(bs->md_thread != NULL); 2516 2517 /* 2518 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 2519 * even multiple of the cluster size. 2520 */ 2521 bs->cluster_sz = opts->cluster_sz; 2522 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 2523 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2524 bs->num_free_clusters = bs->total_clusters; 2525 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 2526 bs->io_unit_size = dev->blocklen; 2527 if (bs->used_clusters == NULL) { 2528 free(bs); 2529 return -ENOMEM; 2530 } 2531 2532 bs->max_channel_ops = opts->max_channel_ops; 2533 bs->super_blob = SPDK_BLOBID_INVALID; 2534 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 2535 2536 /* The metadata is assumed to be at least 1 page */ 2537 bs->used_md_pages = spdk_bit_array_create(1); 2538 bs->used_blobids = spdk_bit_array_create(0); 2539 2540 pthread_mutex_init(&bs->used_clusters_mutex, NULL); 2541 2542 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 2543 sizeof(struct spdk_bs_channel), "blobstore"); 2544 rc = spdk_bs_register_md_thread(bs); 2545 if (rc == -1) { 2546 spdk_io_device_unregister(bs, NULL); 2547 pthread_mutex_destroy(&bs->used_clusters_mutex); 2548 spdk_bit_array_free(&bs->used_blobids); 2549 spdk_bit_array_free(&bs->used_md_pages); 2550 spdk_bit_array_free(&bs->used_clusters); 2551 free(bs); 2552 /* FIXME: this is a lie but don't know how to get a proper error code here */ 2553 return -ENOMEM; 2554 } 2555 2556 *_bs = bs; 2557 return 0; 2558 } 2559 2560 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 2561 2562 struct spdk_bs_load_ctx { 2563 struct spdk_blob_store *bs; 2564 struct spdk_bs_super_block *super; 2565 2566 struct spdk_bs_md_mask *mask; 2567 bool in_page_chain; 2568 uint32_t page_index; 2569 uint32_t cur_page; 2570 struct spdk_blob_md_page *page; 2571 2572 spdk_bs_sequence_t *seq; 2573 spdk_blob_op_with_handle_complete iter_cb_fn; 2574 void *iter_cb_arg; 2575 struct spdk_blob *blob; 2576 spdk_blob_id blobid; 2577 }; 2578 2579 static void 2580 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno) 2581 { 2582 assert(bserrno != 0); 2583 2584 spdk_free(ctx->super); 2585 spdk_bs_sequence_finish(seq, bserrno); 2586 _spdk_bs_free(ctx->bs); 2587 free(ctx); 2588 } 2589 2590 static void 2591 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 2592 { 2593 uint32_t i = 0; 2594 2595 while (true) { 2596 i = spdk_bit_array_find_first_set(array, i); 2597 if (i >= mask->length) { 2598 break; 2599 } 2600 mask->mask[i / 8] |= 1U << (i % 8); 2601 i++; 2602 } 2603 } 2604 2605 static int 2606 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask) 2607 { 2608 struct spdk_bit_array *array; 2609 uint32_t i; 2610 2611 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) { 2612 return -ENOMEM; 2613 } 2614 2615 array = *array_ptr; 2616 for (i = 0; i < mask->length; i++) { 2617 if (mask->mask[i / 8] & (1U << (i % 8))) { 2618 spdk_bit_array_set(array, i); 2619 } 2620 } 2621 2622 return 0; 2623 } 2624 2625 static void 2626 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 2627 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 2628 { 2629 /* Update the values in the super block */ 2630 super->super_blob = bs->super_blob; 2631 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 2632 super->crc = _spdk_blob_md_page_calc_crc(super); 2633 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0), 2634 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 2635 cb_fn, cb_arg); 2636 } 2637 2638 static void 2639 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2640 { 2641 struct spdk_bs_load_ctx *ctx = arg; 2642 uint64_t mask_size, lba, lba_count; 2643 2644 /* Write out the used clusters mask */ 2645 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 2646 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2647 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2648 if (!ctx->mask) { 2649 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2650 return; 2651 } 2652 2653 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 2654 ctx->mask->length = ctx->bs->total_clusters; 2655 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 2656 2657 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 2658 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 2659 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 2660 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2661 } 2662 2663 static void 2664 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2665 { 2666 struct spdk_bs_load_ctx *ctx = arg; 2667 uint64_t mask_size, lba, lba_count; 2668 2669 if (seq->bserrno) { 2670 _spdk_bs_load_ctx_fail(seq, ctx, seq->bserrno); 2671 return; 2672 } 2673 2674 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 2675 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2676 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2677 if (!ctx->mask) { 2678 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2679 return; 2680 } 2681 2682 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 2683 ctx->mask->length = ctx->super->md_len; 2684 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 2685 2686 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 2687 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 2688 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 2689 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2690 } 2691 2692 static void 2693 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2694 { 2695 struct spdk_bs_load_ctx *ctx = arg; 2696 uint64_t mask_size, lba, lba_count; 2697 2698 if (ctx->super->used_blobid_mask_len == 0) { 2699 /* 2700 * This is a pre-v3 on-disk format where the blobid mask does not get 2701 * written to disk. 2702 */ 2703 cb_fn(seq, arg, 0); 2704 return; 2705 } 2706 2707 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2708 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2709 SPDK_MALLOC_DMA); 2710 if (!ctx->mask) { 2711 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2712 return; 2713 } 2714 2715 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 2716 ctx->mask->length = ctx->super->md_len; 2717 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 2718 2719 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask); 2720 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2721 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2722 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2723 } 2724 2725 static void 2726 _spdk_blob_set_thin_provision(struct spdk_blob *blob) 2727 { 2728 _spdk_blob_verify_md_op(blob); 2729 blob->invalid_flags |= SPDK_BLOB_THIN_PROV; 2730 blob->state = SPDK_BLOB_STATE_DIRTY; 2731 } 2732 2733 static void 2734 _spdk_blob_set_clear_method(struct spdk_blob *blob, enum blob_clear_method clear_method) 2735 { 2736 _spdk_blob_verify_md_op(blob); 2737 blob->clear_method = clear_method; 2738 blob->md_ro_flags |= (clear_method << SPDK_BLOB_CLEAR_METHOD_SHIFT); 2739 blob->state = SPDK_BLOB_STATE_DIRTY; 2740 } 2741 2742 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno); 2743 2744 static void 2745 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno) 2746 { 2747 struct spdk_bs_load_ctx *ctx = cb_arg; 2748 spdk_blob_id id; 2749 int64_t page_num; 2750 2751 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our 2752 * last blob has been removed */ 2753 page_num = _spdk_bs_blobid_to_page(ctx->blobid); 2754 page_num++; 2755 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num); 2756 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) { 2757 _spdk_bs_load_iter(ctx, NULL, -ENOENT); 2758 return; 2759 } 2760 2761 id = _spdk_bs_page_to_blobid(page_num); 2762 2763 spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx); 2764 } 2765 2766 static void 2767 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno) 2768 { 2769 struct spdk_bs_load_ctx *ctx = cb_arg; 2770 2771 if (bserrno != 0) { 2772 SPDK_ERRLOG("Failed to close corrupted blob\n"); 2773 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2774 return; 2775 } 2776 2777 spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx); 2778 } 2779 2780 static void 2781 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno) 2782 { 2783 struct spdk_bs_load_ctx *ctx = cb_arg; 2784 uint64_t i; 2785 2786 if (bserrno != 0) { 2787 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2788 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2789 return; 2790 } 2791 2792 /* Snapshot and clone have the same copy of cluster map at this point. 2793 * Let's clear cluster map for snpashot now so that it won't be cleared 2794 * for clone later when we remove snapshot. Also set thin provision to 2795 * pass data corruption check */ 2796 for (i = 0; i < ctx->blob->active.num_clusters; i++) { 2797 ctx->blob->active.clusters[i] = 0; 2798 } 2799 2800 ctx->blob->md_ro = false; 2801 2802 _spdk_blob_set_thin_provision(ctx->blob); 2803 2804 ctx->blobid = ctx->blob->id; 2805 2806 spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx); 2807 } 2808 2809 static void 2810 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno) 2811 { 2812 struct spdk_bs_load_ctx *ctx = cb_arg; 2813 2814 if (bserrno != 0) { 2815 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2816 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2817 return; 2818 } 2819 2820 ctx->blob->md_ro = false; 2821 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true); 2822 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true); 2823 spdk_blob_set_read_only(ctx->blob); 2824 2825 if (ctx->iter_cb_fn) { 2826 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0); 2827 } 2828 _spdk_bs_blob_list_add(ctx->blob); 2829 2830 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2831 } 2832 2833 static void 2834 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno) 2835 { 2836 struct spdk_bs_load_ctx *ctx = cb_arg; 2837 2838 if (bserrno != 0) { 2839 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n"); 2840 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2841 return; 2842 } 2843 2844 if (blob->parent_id == ctx->blob->id) { 2845 /* Power failure occured before updating clone (snapshot delete case) 2846 * or after updating clone (creating snapshot case) - keep snapshot */ 2847 spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx); 2848 } else { 2849 /* Power failure occured after updating clone (snapshot delete case) 2850 * or before updating clone (creating snapshot case) - remove snapshot */ 2851 spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx); 2852 } 2853 } 2854 2855 static void 2856 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno) 2857 { 2858 struct spdk_bs_load_ctx *ctx = arg; 2859 const void *value; 2860 size_t len; 2861 int rc = 0; 2862 2863 if (bserrno == 0) { 2864 /* Examine blob if it is corrupted after power failure. Fix 2865 * the ones that can be fixed and remove any other corrupted 2866 * ones. If it is not corrupted just process it */ 2867 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true); 2868 if (rc != 0) { 2869 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true); 2870 if (rc != 0) { 2871 /* Not corrupted - process it and continue with iterating through blobs */ 2872 if (ctx->iter_cb_fn) { 2873 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0); 2874 } 2875 _spdk_bs_blob_list_add(blob); 2876 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx); 2877 return; 2878 } 2879 2880 } 2881 2882 assert(len == sizeof(spdk_blob_id)); 2883 2884 ctx->blob = blob; 2885 2886 /* Open clone to check if we are able to fix this blob or should we remove it */ 2887 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx); 2888 return; 2889 } else if (bserrno == -ENOENT) { 2890 bserrno = 0; 2891 } else { 2892 /* 2893 * This case needs to be looked at further. Same problem 2894 * exists with applications that rely on explicit blob 2895 * iteration. We should just skip the blob that failed 2896 * to load and continue on to the next one. 2897 */ 2898 SPDK_ERRLOG("Error in iterating blobs\n"); 2899 } 2900 2901 ctx->iter_cb_fn = NULL; 2902 2903 spdk_free(ctx->super); 2904 spdk_free(ctx->mask); 2905 spdk_bs_sequence_finish(ctx->seq, bserrno); 2906 free(ctx); 2907 } 2908 2909 static void 2910 _spdk_bs_load_complete(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno) 2911 { 2912 ctx->seq = seq; 2913 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx); 2914 } 2915 2916 static void 2917 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2918 { 2919 struct spdk_bs_load_ctx *ctx = cb_arg; 2920 int rc; 2921 2922 /* The type must be correct */ 2923 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 2924 2925 /* The length of the mask (in bits) must not be greater than 2926 * the length of the buffer (converted to bits) */ 2927 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 2928 2929 /* The length of the mask must be exactly equal to the size 2930 * (in pages) of the metadata region */ 2931 assert(ctx->mask->length == ctx->super->md_len); 2932 2933 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask); 2934 if (rc < 0) { 2935 spdk_free(ctx->mask); 2936 _spdk_bs_load_ctx_fail(seq, ctx, rc); 2937 return; 2938 } 2939 2940 _spdk_bs_load_complete(seq, ctx, bserrno); 2941 } 2942 2943 static void 2944 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2945 { 2946 struct spdk_bs_load_ctx *ctx = cb_arg; 2947 uint64_t lba, lba_count, mask_size; 2948 int rc; 2949 2950 /* The type must be correct */ 2951 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 2952 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2953 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 2954 struct spdk_blob_md_page) * 8)); 2955 /* The length of the mask must be exactly equal to the total number of clusters */ 2956 assert(ctx->mask->length == ctx->bs->total_clusters); 2957 2958 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask); 2959 if (rc < 0) { 2960 spdk_free(ctx->mask); 2961 _spdk_bs_load_ctx_fail(seq, ctx, rc); 2962 return; 2963 } 2964 2965 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters); 2966 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters); 2967 2968 spdk_free(ctx->mask); 2969 2970 /* Read the used blobids mask */ 2971 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2972 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2973 SPDK_MALLOC_DMA); 2974 if (!ctx->mask) { 2975 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2976 return; 2977 } 2978 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2979 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2980 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 2981 _spdk_bs_load_used_blobids_cpl, ctx); 2982 } 2983 2984 static void 2985 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2986 { 2987 struct spdk_bs_load_ctx *ctx = cb_arg; 2988 uint64_t lba, lba_count, mask_size; 2989 int rc; 2990 2991 /* The type must be correct */ 2992 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 2993 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2994 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 2995 8)); 2996 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 2997 assert(ctx->mask->length == ctx->super->md_len); 2998 2999 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask); 3000 if (rc < 0) { 3001 spdk_free(ctx->mask); 3002 _spdk_bs_load_ctx_fail(seq, ctx, rc); 3003 return; 3004 } 3005 3006 spdk_free(ctx->mask); 3007 3008 /* Read the used clusters mask */ 3009 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 3010 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3011 SPDK_MALLOC_DMA); 3012 if (!ctx->mask) { 3013 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3014 return; 3015 } 3016 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 3017 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 3018 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3019 _spdk_bs_load_used_clusters_cpl, ctx); 3020 } 3021 3022 static void 3023 _spdk_bs_load_read_used_pages(spdk_bs_sequence_t *seq, void *cb_arg) 3024 { 3025 struct spdk_bs_load_ctx *ctx = cb_arg; 3026 uint64_t lba, lba_count, mask_size; 3027 3028 /* Read the used pages mask */ 3029 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3030 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3031 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3032 if (!ctx->mask) { 3033 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3034 return; 3035 } 3036 3037 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3038 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3039 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3040 _spdk_bs_load_used_pages_cpl, ctx); 3041 } 3042 3043 static int 3044 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 3045 { 3046 struct spdk_blob_md_descriptor *desc; 3047 size_t cur_desc = 0; 3048 3049 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3050 while (cur_desc < sizeof(page->descriptors)) { 3051 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3052 if (desc->length == 0) { 3053 /* If padding and length are 0, this terminates the page */ 3054 break; 3055 } 3056 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3057 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3058 unsigned int i, j; 3059 unsigned int cluster_count = 0; 3060 uint32_t cluster_idx; 3061 3062 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3063 3064 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3065 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 3066 cluster_idx = desc_extent_rle->extents[i].cluster_idx; 3067 /* 3068 * cluster_idx = 0 means an unallocated cluster - don't mark that 3069 * in the used cluster map. 3070 */ 3071 if (cluster_idx != 0) { 3072 spdk_bit_array_set(bs->used_clusters, cluster_idx + j); 3073 if (bs->num_free_clusters == 0) { 3074 return -ENOSPC; 3075 } 3076 bs->num_free_clusters--; 3077 } 3078 cluster_count++; 3079 } 3080 } 3081 if (cluster_count == 0) { 3082 return -EINVAL; 3083 } 3084 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3085 /* Skip this item */ 3086 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3087 /* Skip this item */ 3088 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3089 /* Skip this item */ 3090 } else { 3091 /* Error */ 3092 return -EINVAL; 3093 } 3094 /* Advance to the next descriptor */ 3095 cur_desc += sizeof(*desc) + desc->length; 3096 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3097 break; 3098 } 3099 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3100 } 3101 return 0; 3102 } 3103 3104 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 3105 { 3106 uint32_t crc; 3107 3108 crc = _spdk_blob_md_page_calc_crc(ctx->page); 3109 if (crc != ctx->page->crc) { 3110 return false; 3111 } 3112 3113 if (ctx->page->sequence_num == 0 && 3114 _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 3115 return false; 3116 } 3117 return true; 3118 } 3119 3120 static void 3121 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3122 3123 static void 3124 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3125 { 3126 struct spdk_bs_load_ctx *ctx = cb_arg; 3127 3128 _spdk_bs_load_complete(seq, ctx, bserrno); 3129 } 3130 3131 static void 3132 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3133 { 3134 struct spdk_bs_load_ctx *ctx = cb_arg; 3135 3136 spdk_free(ctx->mask); 3137 ctx->mask = NULL; 3138 3139 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 3140 } 3141 3142 static void 3143 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3144 { 3145 struct spdk_bs_load_ctx *ctx = cb_arg; 3146 3147 spdk_free(ctx->mask); 3148 ctx->mask = NULL; 3149 3150 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl); 3151 } 3152 3153 static void 3154 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3155 { 3156 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 3157 } 3158 3159 static void 3160 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3161 { 3162 struct spdk_bs_load_ctx *ctx = cb_arg; 3163 uint64_t num_md_clusters; 3164 uint64_t i; 3165 uint32_t page_num; 3166 3167 if (bserrno != 0) { 3168 _spdk_bs_load_ctx_fail(seq, ctx, bserrno); 3169 return; 3170 } 3171 3172 page_num = ctx->cur_page; 3173 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 3174 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 3175 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 3176 if (ctx->page->sequence_num == 0) { 3177 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 3178 } 3179 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 3180 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3181 return; 3182 } 3183 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 3184 ctx->in_page_chain = true; 3185 ctx->cur_page = ctx->page->next; 3186 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 3187 return; 3188 } 3189 } 3190 } 3191 3192 ctx->in_page_chain = false; 3193 3194 do { 3195 ctx->page_index++; 3196 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 3197 3198 if (ctx->page_index < ctx->super->md_len) { 3199 ctx->cur_page = ctx->page_index; 3200 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 3201 } else { 3202 /* Claim all of the clusters used by the metadata */ 3203 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster); 3204 for (i = 0; i < num_md_clusters; i++) { 3205 _spdk_bs_claim_cluster(ctx->bs, i); 3206 } 3207 spdk_free(ctx->page); 3208 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 3209 } 3210 } 3211 3212 static void 3213 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3214 { 3215 struct spdk_bs_load_ctx *ctx = cb_arg; 3216 uint64_t lba; 3217 3218 assert(ctx->cur_page < ctx->super->md_len); 3219 lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page); 3220 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3221 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3222 _spdk_bs_load_replay_md_cpl, ctx); 3223 } 3224 3225 static void 3226 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 3227 { 3228 struct spdk_bs_load_ctx *ctx = cb_arg; 3229 3230 ctx->page_index = 0; 3231 ctx->cur_page = 0; 3232 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3233 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3234 if (!ctx->page) { 3235 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3236 return; 3237 } 3238 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 3239 } 3240 3241 static void 3242 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 3243 { 3244 struct spdk_bs_load_ctx *ctx = cb_arg; 3245 int rc; 3246 3247 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 3248 if (rc < 0) { 3249 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3250 return; 3251 } 3252 3253 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 3254 if (rc < 0) { 3255 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3256 return; 3257 } 3258 3259 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3260 if (rc < 0) { 3261 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3262 return; 3263 } 3264 3265 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 3266 _spdk_bs_load_replay_md(seq, cb_arg); 3267 } 3268 3269 static void 3270 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3271 { 3272 struct spdk_bs_load_ctx *ctx = cb_arg; 3273 uint32_t crc; 3274 int rc; 3275 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 3276 3277 if (ctx->super->version > SPDK_BS_VERSION || 3278 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 3279 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3280 return; 3281 } 3282 3283 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3284 sizeof(ctx->super->signature)) != 0) { 3285 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3286 return; 3287 } 3288 3289 crc = _spdk_blob_md_page_calc_crc(ctx->super); 3290 if (crc != ctx->super->crc) { 3291 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3292 return; 3293 } 3294 3295 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3296 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 3297 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3298 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 3299 } else { 3300 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 3301 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3302 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3303 _spdk_bs_load_ctx_fail(seq, ctx, -ENXIO); 3304 return; 3305 } 3306 3307 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) { 3308 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n", 3309 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size); 3310 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 3311 return; 3312 } 3313 3314 if (ctx->super->size == 0) { 3315 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen; 3316 } 3317 3318 if (ctx->super->io_unit_size == 0) { 3319 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE; 3320 } 3321 3322 /* Parse the super block */ 3323 ctx->bs->clean = 1; 3324 ctx->bs->cluster_sz = ctx->super->cluster_size; 3325 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size; 3326 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 3327 ctx->bs->io_unit_size = ctx->super->io_unit_size; 3328 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3329 if (rc < 0) { 3330 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 3331 return; 3332 } 3333 ctx->bs->md_start = ctx->super->md_start; 3334 ctx->bs->md_len = ctx->super->md_len; 3335 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up( 3336 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 3337 ctx->bs->super_blob = ctx->super->super_blob; 3338 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 3339 3340 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) { 3341 _spdk_bs_recover(seq, ctx); 3342 } else { 3343 _spdk_bs_load_read_used_pages(seq, ctx); 3344 } 3345 } 3346 3347 void 3348 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3349 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3350 { 3351 struct spdk_blob_store *bs; 3352 struct spdk_bs_cpl cpl; 3353 spdk_bs_sequence_t *seq; 3354 struct spdk_bs_load_ctx *ctx; 3355 struct spdk_bs_opts opts = {}; 3356 int err; 3357 3358 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 3359 3360 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3361 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen); 3362 dev->destroy(dev); 3363 cb_fn(cb_arg, NULL, -EINVAL); 3364 return; 3365 } 3366 3367 if (o) { 3368 opts = *o; 3369 } else { 3370 spdk_bs_opts_init(&opts); 3371 } 3372 3373 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 3374 dev->destroy(dev); 3375 cb_fn(cb_arg, NULL, -EINVAL); 3376 return; 3377 } 3378 3379 err = _spdk_bs_alloc(dev, &opts, &bs); 3380 if (err) { 3381 dev->destroy(dev); 3382 cb_fn(cb_arg, NULL, err); 3383 return; 3384 } 3385 3386 ctx = calloc(1, sizeof(*ctx)); 3387 if (!ctx) { 3388 _spdk_bs_free(bs); 3389 cb_fn(cb_arg, NULL, -ENOMEM); 3390 return; 3391 } 3392 3393 ctx->bs = bs; 3394 ctx->iter_cb_fn = opts.iter_cb_fn; 3395 ctx->iter_cb_arg = opts.iter_cb_arg; 3396 3397 /* Allocate memory for the super block */ 3398 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3399 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3400 if (!ctx->super) { 3401 free(ctx); 3402 _spdk_bs_free(bs); 3403 cb_fn(cb_arg, NULL, -ENOMEM); 3404 return; 3405 } 3406 3407 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3408 cpl.u.bs_handle.cb_fn = cb_fn; 3409 cpl.u.bs_handle.cb_arg = cb_arg; 3410 cpl.u.bs_handle.bs = bs; 3411 3412 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3413 if (!seq) { 3414 spdk_free(ctx->super); 3415 free(ctx); 3416 _spdk_bs_free(bs); 3417 cb_fn(cb_arg, NULL, -ENOMEM); 3418 return; 3419 } 3420 3421 /* Read the super block */ 3422 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3423 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3424 _spdk_bs_load_super_cpl, ctx); 3425 } 3426 3427 /* END spdk_bs_load */ 3428 3429 /* START spdk_bs_dump */ 3430 3431 struct spdk_bs_dump_ctx { 3432 struct spdk_blob_store *bs; 3433 struct spdk_bs_super_block *super; 3434 uint32_t cur_page; 3435 struct spdk_blob_md_page *page; 3436 spdk_bs_sequence_t *seq; 3437 FILE *fp; 3438 spdk_bs_dump_print_xattr print_xattr_fn; 3439 char xattr_name[4096]; 3440 }; 3441 3442 static void 3443 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno) 3444 { 3445 spdk_free(ctx->super); 3446 3447 /* 3448 * We need to defer calling spdk_bs_call_cpl() until after 3449 * dev destruction, so tuck these away for later use. 3450 */ 3451 ctx->bs->unload_err = bserrno; 3452 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3453 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3454 3455 spdk_bs_sequence_finish(seq, 0); 3456 _spdk_bs_free(ctx->bs); 3457 free(ctx); 3458 } 3459 3460 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3461 3462 static void 3463 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx) 3464 { 3465 uint32_t page_idx = ctx->cur_page; 3466 struct spdk_blob_md_page *page = ctx->page; 3467 struct spdk_blob_md_descriptor *desc; 3468 size_t cur_desc = 0; 3469 uint32_t crc; 3470 3471 fprintf(ctx->fp, "=========\n"); 3472 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx); 3473 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id); 3474 3475 crc = _spdk_blob_md_page_calc_crc(page); 3476 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch"); 3477 3478 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3479 while (cur_desc < sizeof(page->descriptors)) { 3480 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3481 if (desc->length == 0) { 3482 /* If padding and length are 0, this terminates the page */ 3483 break; 3484 } 3485 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3486 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3487 unsigned int i; 3488 3489 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3490 3491 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3492 if (desc_extent_rle->extents[i].cluster_idx != 0) { 3493 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 3494 desc_extent_rle->extents[i].cluster_idx); 3495 } else { 3496 fprintf(ctx->fp, "Unallocated Extent - "); 3497 } 3498 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length); 3499 fprintf(ctx->fp, "\n"); 3500 } 3501 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3502 struct spdk_blob_md_descriptor_xattr *desc_xattr; 3503 uint32_t i; 3504 3505 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 3506 3507 if (desc_xattr->length != 3508 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) + 3509 desc_xattr->name_length + desc_xattr->value_length) { 3510 } 3511 3512 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length); 3513 ctx->xattr_name[desc_xattr->name_length] = '\0'; 3514 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name); 3515 fprintf(ctx->fp, " value = \""); 3516 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name, 3517 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 3518 desc_xattr->value_length); 3519 fprintf(ctx->fp, "\"\n"); 3520 for (i = 0; i < desc_xattr->value_length; i++) { 3521 if (i % 16 == 0) { 3522 fprintf(ctx->fp, " "); 3523 } 3524 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i)); 3525 if ((i + 1) % 16 == 0) { 3526 fprintf(ctx->fp, "\n"); 3527 } 3528 } 3529 if (i % 16 != 0) { 3530 fprintf(ctx->fp, "\n"); 3531 } 3532 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3533 /* TODO */ 3534 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3535 /* TODO */ 3536 } else { 3537 /* Error */ 3538 } 3539 /* Advance to the next descriptor */ 3540 cur_desc += sizeof(*desc) + desc->length; 3541 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3542 break; 3543 } 3544 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3545 } 3546 } 3547 3548 static void 3549 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3550 { 3551 struct spdk_bs_dump_ctx *ctx = cb_arg; 3552 3553 if (bserrno != 0) { 3554 _spdk_bs_dump_finish(seq, ctx, bserrno); 3555 return; 3556 } 3557 3558 if (ctx->page->id != 0) { 3559 _spdk_bs_dump_print_md_page(ctx); 3560 } 3561 3562 ctx->cur_page++; 3563 3564 if (ctx->cur_page < ctx->super->md_len) { 3565 _spdk_bs_dump_read_md_page(seq, cb_arg); 3566 } else { 3567 spdk_free(ctx->page); 3568 _spdk_bs_dump_finish(seq, ctx, 0); 3569 } 3570 } 3571 3572 static void 3573 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3574 { 3575 struct spdk_bs_dump_ctx *ctx = cb_arg; 3576 uint64_t lba; 3577 3578 assert(ctx->cur_page < ctx->super->md_len); 3579 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 3580 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3581 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3582 _spdk_bs_dump_read_md_page_cpl, ctx); 3583 } 3584 3585 static void 3586 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3587 { 3588 struct spdk_bs_dump_ctx *ctx = cb_arg; 3589 3590 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature); 3591 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3592 sizeof(ctx->super->signature)) != 0) { 3593 fprintf(ctx->fp, "(Mismatch)\n"); 3594 _spdk_bs_dump_finish(seq, ctx, bserrno); 3595 return; 3596 } else { 3597 fprintf(ctx->fp, "(OK)\n"); 3598 } 3599 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version); 3600 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc, 3601 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch"); 3602 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype); 3603 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size); 3604 fprintf(ctx->fp, "Super Blob ID: "); 3605 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) { 3606 fprintf(ctx->fp, "(None)\n"); 3607 } else { 3608 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob); 3609 } 3610 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean); 3611 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start); 3612 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len); 3613 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start); 3614 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len); 3615 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start); 3616 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len); 3617 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start); 3618 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len); 3619 3620 ctx->cur_page = 0; 3621 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3622 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3623 if (!ctx->page) { 3624 _spdk_bs_dump_finish(seq, ctx, -ENOMEM); 3625 return; 3626 } 3627 _spdk_bs_dump_read_md_page(seq, cb_arg); 3628 } 3629 3630 void 3631 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn, 3632 spdk_bs_op_complete cb_fn, void *cb_arg) 3633 { 3634 struct spdk_blob_store *bs; 3635 struct spdk_bs_cpl cpl; 3636 spdk_bs_sequence_t *seq; 3637 struct spdk_bs_dump_ctx *ctx; 3638 struct spdk_bs_opts opts = {}; 3639 int err; 3640 3641 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev); 3642 3643 spdk_bs_opts_init(&opts); 3644 3645 err = _spdk_bs_alloc(dev, &opts, &bs); 3646 if (err) { 3647 dev->destroy(dev); 3648 cb_fn(cb_arg, err); 3649 return; 3650 } 3651 3652 ctx = calloc(1, sizeof(*ctx)); 3653 if (!ctx) { 3654 _spdk_bs_free(bs); 3655 cb_fn(cb_arg, -ENOMEM); 3656 return; 3657 } 3658 3659 ctx->bs = bs; 3660 ctx->fp = fp; 3661 ctx->print_xattr_fn = print_xattr_fn; 3662 3663 /* Allocate memory for the super block */ 3664 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3665 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3666 if (!ctx->super) { 3667 free(ctx); 3668 _spdk_bs_free(bs); 3669 cb_fn(cb_arg, -ENOMEM); 3670 return; 3671 } 3672 3673 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3674 cpl.u.bs_basic.cb_fn = cb_fn; 3675 cpl.u.bs_basic.cb_arg = cb_arg; 3676 3677 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3678 if (!seq) { 3679 spdk_free(ctx->super); 3680 free(ctx); 3681 _spdk_bs_free(bs); 3682 cb_fn(cb_arg, -ENOMEM); 3683 return; 3684 } 3685 3686 /* Read the super block */ 3687 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3688 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3689 _spdk_bs_dump_super_cpl, ctx); 3690 } 3691 3692 /* END spdk_bs_dump */ 3693 3694 /* START spdk_bs_init */ 3695 3696 struct spdk_bs_init_ctx { 3697 struct spdk_blob_store *bs; 3698 struct spdk_bs_super_block *super; 3699 }; 3700 3701 static void 3702 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3703 { 3704 struct spdk_bs_init_ctx *ctx = cb_arg; 3705 3706 spdk_free(ctx->super); 3707 free(ctx); 3708 3709 spdk_bs_sequence_finish(seq, bserrno); 3710 } 3711 3712 static void 3713 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3714 { 3715 struct spdk_bs_init_ctx *ctx = cb_arg; 3716 3717 /* Write super block */ 3718 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 3719 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 3720 _spdk_bs_init_persist_super_cpl, ctx); 3721 } 3722 3723 void 3724 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3725 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3726 { 3727 struct spdk_bs_init_ctx *ctx; 3728 struct spdk_blob_store *bs; 3729 struct spdk_bs_cpl cpl; 3730 spdk_bs_sequence_t *seq; 3731 spdk_bs_batch_t *batch; 3732 uint64_t num_md_lba; 3733 uint64_t num_md_pages; 3734 uint64_t num_md_clusters; 3735 uint32_t i; 3736 struct spdk_bs_opts opts = {}; 3737 int rc; 3738 3739 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 3740 3741 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3742 SPDK_ERRLOG("unsupported dev block length of %d\n", 3743 dev->blocklen); 3744 dev->destroy(dev); 3745 cb_fn(cb_arg, NULL, -EINVAL); 3746 return; 3747 } 3748 3749 if (o) { 3750 opts = *o; 3751 } else { 3752 spdk_bs_opts_init(&opts); 3753 } 3754 3755 if (_spdk_bs_opts_verify(&opts) != 0) { 3756 dev->destroy(dev); 3757 cb_fn(cb_arg, NULL, -EINVAL); 3758 return; 3759 } 3760 3761 rc = _spdk_bs_alloc(dev, &opts, &bs); 3762 if (rc) { 3763 dev->destroy(dev); 3764 cb_fn(cb_arg, NULL, rc); 3765 return; 3766 } 3767 3768 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 3769 /* By default, allocate 1 page per cluster. 3770 * Technically, this over-allocates metadata 3771 * because more metadata will reduce the number 3772 * of usable clusters. This can be addressed with 3773 * more complex math in the future. 3774 */ 3775 bs->md_len = bs->total_clusters; 3776 } else { 3777 bs->md_len = opts.num_md_pages; 3778 } 3779 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 3780 if (rc < 0) { 3781 _spdk_bs_free(bs); 3782 cb_fn(cb_arg, NULL, -ENOMEM); 3783 return; 3784 } 3785 3786 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 3787 if (rc < 0) { 3788 _spdk_bs_free(bs); 3789 cb_fn(cb_arg, NULL, -ENOMEM); 3790 return; 3791 } 3792 3793 ctx = calloc(1, sizeof(*ctx)); 3794 if (!ctx) { 3795 _spdk_bs_free(bs); 3796 cb_fn(cb_arg, NULL, -ENOMEM); 3797 return; 3798 } 3799 3800 ctx->bs = bs; 3801 3802 /* Allocate memory for the super block */ 3803 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3804 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3805 if (!ctx->super) { 3806 free(ctx); 3807 _spdk_bs_free(bs); 3808 cb_fn(cb_arg, NULL, -ENOMEM); 3809 return; 3810 } 3811 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3812 sizeof(ctx->super->signature)); 3813 ctx->super->version = SPDK_BS_VERSION; 3814 ctx->super->length = sizeof(*ctx->super); 3815 ctx->super->super_blob = bs->super_blob; 3816 ctx->super->clean = 0; 3817 ctx->super->cluster_size = bs->cluster_sz; 3818 ctx->super->io_unit_size = bs->io_unit_size; 3819 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 3820 3821 /* Calculate how many pages the metadata consumes at the front 3822 * of the disk. 3823 */ 3824 3825 /* The super block uses 1 page */ 3826 num_md_pages = 1; 3827 3828 /* The used_md_pages mask requires 1 bit per metadata page, rounded 3829 * up to the nearest page, plus a header. 3830 */ 3831 ctx->super->used_page_mask_start = num_md_pages; 3832 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3833 spdk_divide_round_up(bs->md_len, 8), 3834 SPDK_BS_PAGE_SIZE); 3835 num_md_pages += ctx->super->used_page_mask_len; 3836 3837 /* The used_clusters mask requires 1 bit per cluster, rounded 3838 * up to the nearest page, plus a header. 3839 */ 3840 ctx->super->used_cluster_mask_start = num_md_pages; 3841 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3842 spdk_divide_round_up(bs->total_clusters, 8), 3843 SPDK_BS_PAGE_SIZE); 3844 num_md_pages += ctx->super->used_cluster_mask_len; 3845 3846 /* The used_blobids mask requires 1 bit per metadata page, rounded 3847 * up to the nearest page, plus a header. 3848 */ 3849 ctx->super->used_blobid_mask_start = num_md_pages; 3850 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3851 spdk_divide_round_up(bs->md_len, 8), 3852 SPDK_BS_PAGE_SIZE); 3853 num_md_pages += ctx->super->used_blobid_mask_len; 3854 3855 /* The metadata region size was chosen above */ 3856 ctx->super->md_start = bs->md_start = num_md_pages; 3857 ctx->super->md_len = bs->md_len; 3858 num_md_pages += bs->md_len; 3859 3860 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 3861 3862 ctx->super->size = dev->blockcnt * dev->blocklen; 3863 3864 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 3865 3866 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster); 3867 if (num_md_clusters > bs->total_clusters) { 3868 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 3869 "please decrease number of pages reserved for metadata " 3870 "or increase cluster size.\n"); 3871 spdk_free(ctx->super); 3872 free(ctx); 3873 _spdk_bs_free(bs); 3874 cb_fn(cb_arg, NULL, -ENOMEM); 3875 return; 3876 } 3877 /* Claim all of the clusters used by the metadata */ 3878 for (i = 0; i < num_md_clusters; i++) { 3879 _spdk_bs_claim_cluster(bs, i); 3880 } 3881 3882 bs->total_data_clusters = bs->num_free_clusters; 3883 3884 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3885 cpl.u.bs_handle.cb_fn = cb_fn; 3886 cpl.u.bs_handle.cb_arg = cb_arg; 3887 cpl.u.bs_handle.bs = bs; 3888 3889 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3890 if (!seq) { 3891 spdk_free(ctx->super); 3892 free(ctx); 3893 _spdk_bs_free(bs); 3894 cb_fn(cb_arg, NULL, -ENOMEM); 3895 return; 3896 } 3897 3898 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 3899 3900 /* Clear metadata space */ 3901 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba); 3902 3903 switch (opts.clear_method) { 3904 case BS_CLEAR_WITH_UNMAP: 3905 /* Trim data clusters */ 3906 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3907 break; 3908 case BS_CLEAR_WITH_WRITE_ZEROES: 3909 /* Write_zeroes to data clusters */ 3910 spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3911 break; 3912 case BS_CLEAR_WITH_NONE: 3913 default: 3914 break; 3915 } 3916 3917 spdk_bs_batch_close(batch); 3918 } 3919 3920 /* END spdk_bs_init */ 3921 3922 /* START spdk_bs_destroy */ 3923 3924 static void 3925 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3926 { 3927 struct spdk_bs_init_ctx *ctx = cb_arg; 3928 struct spdk_blob_store *bs = ctx->bs; 3929 3930 /* 3931 * We need to defer calling spdk_bs_call_cpl() until after 3932 * dev destruction, so tuck these away for later use. 3933 */ 3934 bs->unload_err = bserrno; 3935 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3936 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3937 3938 spdk_bs_sequence_finish(seq, bserrno); 3939 3940 _spdk_bs_free(bs); 3941 free(ctx); 3942 } 3943 3944 void 3945 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 3946 void *cb_arg) 3947 { 3948 struct spdk_bs_cpl cpl; 3949 spdk_bs_sequence_t *seq; 3950 struct spdk_bs_init_ctx *ctx; 3951 3952 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 3953 3954 if (!TAILQ_EMPTY(&bs->blobs)) { 3955 SPDK_ERRLOG("Blobstore still has open blobs\n"); 3956 cb_fn(cb_arg, -EBUSY); 3957 return; 3958 } 3959 3960 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3961 cpl.u.bs_basic.cb_fn = cb_fn; 3962 cpl.u.bs_basic.cb_arg = cb_arg; 3963 3964 ctx = calloc(1, sizeof(*ctx)); 3965 if (!ctx) { 3966 cb_fn(cb_arg, -ENOMEM); 3967 return; 3968 } 3969 3970 ctx->bs = bs; 3971 3972 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3973 if (!seq) { 3974 free(ctx); 3975 cb_fn(cb_arg, -ENOMEM); 3976 return; 3977 } 3978 3979 /* Write zeroes to the super block */ 3980 spdk_bs_sequence_write_zeroes_dev(seq, 3981 _spdk_bs_page_to_lba(bs, 0), 3982 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 3983 _spdk_bs_destroy_trim_cpl, ctx); 3984 } 3985 3986 /* END spdk_bs_destroy */ 3987 3988 /* START spdk_bs_unload */ 3989 3990 static void 3991 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3992 { 3993 struct spdk_bs_load_ctx *ctx = cb_arg; 3994 3995 spdk_free(ctx->super); 3996 3997 /* 3998 * We need to defer calling spdk_bs_call_cpl() until after 3999 * dev destruction, so tuck these away for later use. 4000 */ 4001 ctx->bs->unload_err = bserrno; 4002 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 4003 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 4004 4005 spdk_bs_sequence_finish(seq, bserrno); 4006 4007 _spdk_bs_free(ctx->bs); 4008 free(ctx); 4009 } 4010 4011 static void 4012 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4013 { 4014 struct spdk_bs_load_ctx *ctx = cb_arg; 4015 4016 spdk_free(ctx->mask); 4017 ctx->super->clean = 1; 4018 4019 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 4020 } 4021 4022 static void 4023 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4024 { 4025 struct spdk_bs_load_ctx *ctx = cb_arg; 4026 4027 spdk_free(ctx->mask); 4028 ctx->mask = NULL; 4029 4030 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 4031 } 4032 4033 static void 4034 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4035 { 4036 struct spdk_bs_load_ctx *ctx = cb_arg; 4037 4038 spdk_free(ctx->mask); 4039 ctx->mask = NULL; 4040 4041 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl); 4042 } 4043 4044 static void 4045 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4046 { 4047 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 4048 } 4049 4050 void 4051 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 4052 { 4053 struct spdk_bs_cpl cpl; 4054 spdk_bs_sequence_t *seq; 4055 struct spdk_bs_load_ctx *ctx; 4056 4057 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 4058 4059 if (!TAILQ_EMPTY(&bs->blobs)) { 4060 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4061 cb_fn(cb_arg, -EBUSY); 4062 return; 4063 } 4064 4065 ctx = calloc(1, sizeof(*ctx)); 4066 if (!ctx) { 4067 cb_fn(cb_arg, -ENOMEM); 4068 return; 4069 } 4070 4071 ctx->bs = bs; 4072 4073 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4074 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4075 if (!ctx->super) { 4076 free(ctx); 4077 cb_fn(cb_arg, -ENOMEM); 4078 return; 4079 } 4080 4081 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4082 cpl.u.bs_basic.cb_fn = cb_fn; 4083 cpl.u.bs_basic.cb_arg = cb_arg; 4084 4085 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4086 if (!seq) { 4087 spdk_free(ctx->super); 4088 free(ctx); 4089 cb_fn(cb_arg, -ENOMEM); 4090 return; 4091 } 4092 4093 /* Read super block */ 4094 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4095 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4096 _spdk_bs_unload_read_super_cpl, ctx); 4097 } 4098 4099 /* END spdk_bs_unload */ 4100 4101 /* START spdk_bs_set_super */ 4102 4103 struct spdk_bs_set_super_ctx { 4104 struct spdk_blob_store *bs; 4105 struct spdk_bs_super_block *super; 4106 }; 4107 4108 static void 4109 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4110 { 4111 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4112 4113 if (bserrno != 0) { 4114 SPDK_ERRLOG("Unable to write to super block of blobstore\n"); 4115 } 4116 4117 spdk_free(ctx->super); 4118 4119 spdk_bs_sequence_finish(seq, bserrno); 4120 4121 free(ctx); 4122 } 4123 4124 static void 4125 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4126 { 4127 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4128 4129 if (bserrno != 0) { 4130 SPDK_ERRLOG("Unable to read super block of blobstore\n"); 4131 spdk_free(ctx->super); 4132 spdk_bs_sequence_finish(seq, bserrno); 4133 free(ctx); 4134 return; 4135 } 4136 4137 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx); 4138 } 4139 4140 void 4141 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 4142 spdk_bs_op_complete cb_fn, void *cb_arg) 4143 { 4144 struct spdk_bs_cpl cpl; 4145 spdk_bs_sequence_t *seq; 4146 struct spdk_bs_set_super_ctx *ctx; 4147 4148 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n"); 4149 4150 ctx = calloc(1, sizeof(*ctx)); 4151 if (!ctx) { 4152 cb_fn(cb_arg, -ENOMEM); 4153 return; 4154 } 4155 4156 ctx->bs = bs; 4157 4158 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4159 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4160 if (!ctx->super) { 4161 free(ctx); 4162 cb_fn(cb_arg, -ENOMEM); 4163 return; 4164 } 4165 4166 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4167 cpl.u.bs_basic.cb_fn = cb_fn; 4168 cpl.u.bs_basic.cb_arg = cb_arg; 4169 4170 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4171 if (!seq) { 4172 spdk_free(ctx->super); 4173 free(ctx); 4174 cb_fn(cb_arg, -ENOMEM); 4175 return; 4176 } 4177 4178 bs->super_blob = blobid; 4179 4180 /* Read super block */ 4181 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4182 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4183 _spdk_bs_set_super_read_cpl, ctx); 4184 } 4185 4186 /* END spdk_bs_set_super */ 4187 4188 void 4189 spdk_bs_get_super(struct spdk_blob_store *bs, 4190 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4191 { 4192 if (bs->super_blob == SPDK_BLOBID_INVALID) { 4193 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 4194 } else { 4195 cb_fn(cb_arg, bs->super_blob, 0); 4196 } 4197 } 4198 4199 uint64_t 4200 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 4201 { 4202 return bs->cluster_sz; 4203 } 4204 4205 uint64_t 4206 spdk_bs_get_page_size(struct spdk_blob_store *bs) 4207 { 4208 return SPDK_BS_PAGE_SIZE; 4209 } 4210 4211 uint64_t 4212 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs) 4213 { 4214 return bs->io_unit_size; 4215 } 4216 4217 uint64_t 4218 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 4219 { 4220 return bs->num_free_clusters; 4221 } 4222 4223 uint64_t 4224 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 4225 { 4226 return bs->total_data_clusters; 4227 } 4228 4229 static int 4230 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 4231 { 4232 bs->md_channel = spdk_get_io_channel(bs); 4233 if (!bs->md_channel) { 4234 SPDK_ERRLOG("Failed to get IO channel.\n"); 4235 return -1; 4236 } 4237 4238 return 0; 4239 } 4240 4241 static int 4242 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 4243 { 4244 spdk_put_io_channel(bs->md_channel); 4245 4246 return 0; 4247 } 4248 4249 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 4250 { 4251 assert(blob != NULL); 4252 4253 return blob->id; 4254 } 4255 4256 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 4257 { 4258 assert(blob != NULL); 4259 4260 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 4261 } 4262 4263 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob) 4264 { 4265 assert(blob != NULL); 4266 4267 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs); 4268 } 4269 4270 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 4271 { 4272 assert(blob != NULL); 4273 4274 return blob->active.num_clusters; 4275 } 4276 4277 /* START spdk_bs_create_blob */ 4278 4279 static void 4280 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4281 { 4282 struct spdk_blob *blob = cb_arg; 4283 4284 _spdk_blob_free(blob); 4285 4286 spdk_bs_sequence_finish(seq, bserrno); 4287 } 4288 4289 static int 4290 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs, 4291 bool internal) 4292 { 4293 uint64_t i; 4294 size_t value_len = 0; 4295 int rc; 4296 const void *value = NULL; 4297 if (xattrs->count > 0 && xattrs->get_value == NULL) { 4298 return -EINVAL; 4299 } 4300 for (i = 0; i < xattrs->count; i++) { 4301 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len); 4302 if (value == NULL || value_len == 0) { 4303 return -EINVAL; 4304 } 4305 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal); 4306 if (rc < 0) { 4307 return rc; 4308 } 4309 } 4310 return 0; 4311 } 4312 4313 static void 4314 _spdk_bs_create_blob(struct spdk_blob_store *bs, 4315 const struct spdk_blob_opts *opts, 4316 const struct spdk_blob_xattr_opts *internal_xattrs, 4317 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4318 { 4319 struct spdk_blob *blob; 4320 uint32_t page_idx; 4321 struct spdk_bs_cpl cpl; 4322 struct spdk_blob_opts opts_default; 4323 struct spdk_blob_xattr_opts internal_xattrs_default; 4324 spdk_bs_sequence_t *seq; 4325 spdk_blob_id id; 4326 int rc; 4327 4328 assert(spdk_get_thread() == bs->md_thread); 4329 4330 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 4331 if (page_idx == UINT32_MAX) { 4332 cb_fn(cb_arg, 0, -ENOMEM); 4333 return; 4334 } 4335 spdk_bit_array_set(bs->used_blobids, page_idx); 4336 spdk_bit_array_set(bs->used_md_pages, page_idx); 4337 4338 id = _spdk_bs_page_to_blobid(page_idx); 4339 4340 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 4341 4342 blob = _spdk_blob_alloc(bs, id); 4343 if (!blob) { 4344 cb_fn(cb_arg, 0, -ENOMEM); 4345 return; 4346 } 4347 4348 if (!opts) { 4349 spdk_blob_opts_init(&opts_default); 4350 opts = &opts_default; 4351 } 4352 if (!internal_xattrs) { 4353 _spdk_blob_xattrs_init(&internal_xattrs_default); 4354 internal_xattrs = &internal_xattrs_default; 4355 } 4356 4357 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false); 4358 if (rc < 0) { 4359 _spdk_blob_free(blob); 4360 cb_fn(cb_arg, 0, rc); 4361 return; 4362 } 4363 4364 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true); 4365 if (rc < 0) { 4366 _spdk_blob_free(blob); 4367 cb_fn(cb_arg, 0, rc); 4368 return; 4369 } 4370 4371 if (opts->thin_provision) { 4372 _spdk_blob_set_thin_provision(blob); 4373 } 4374 4375 _spdk_blob_set_clear_method(blob, opts->clear_method); 4376 4377 rc = _spdk_blob_resize(blob, opts->num_clusters); 4378 if (rc < 0) { 4379 _spdk_blob_free(blob); 4380 cb_fn(cb_arg, 0, rc); 4381 return; 4382 } 4383 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4384 cpl.u.blobid.cb_fn = cb_fn; 4385 cpl.u.blobid.cb_arg = cb_arg; 4386 cpl.u.blobid.blobid = blob->id; 4387 4388 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4389 if (!seq) { 4390 _spdk_blob_free(blob); 4391 cb_fn(cb_arg, 0, -ENOMEM); 4392 return; 4393 } 4394 4395 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 4396 } 4397 4398 void spdk_bs_create_blob(struct spdk_blob_store *bs, 4399 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4400 { 4401 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg); 4402 } 4403 4404 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 4405 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4406 { 4407 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg); 4408 } 4409 4410 /* END spdk_bs_create_blob */ 4411 4412 /* START blob_cleanup */ 4413 4414 struct spdk_clone_snapshot_ctx { 4415 struct spdk_bs_cpl cpl; 4416 int bserrno; 4417 bool frozen; 4418 4419 struct spdk_io_channel *channel; 4420 4421 /* Current cluster for inflate operation */ 4422 uint64_t cluster; 4423 4424 /* For inflation force allocation of all unallocated clusters and remove 4425 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */ 4426 bool allocate_all; 4427 4428 struct { 4429 spdk_blob_id id; 4430 struct spdk_blob *blob; 4431 } original; 4432 struct { 4433 spdk_blob_id id; 4434 struct spdk_blob *blob; 4435 } new; 4436 4437 /* xattrs specified for snapshot/clones only. They have no impact on 4438 * the original blobs xattrs. */ 4439 const struct spdk_blob_xattr_opts *xattrs; 4440 }; 4441 4442 static void 4443 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) 4444 { 4445 struct spdk_clone_snapshot_ctx *ctx = cb_arg; 4446 struct spdk_bs_cpl *cpl = &ctx->cpl; 4447 4448 if (bserrno != 0) { 4449 if (ctx->bserrno != 0) { 4450 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4451 } else { 4452 ctx->bserrno = bserrno; 4453 } 4454 } 4455 4456 switch (cpl->type) { 4457 case SPDK_BS_CPL_TYPE_BLOBID: 4458 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno); 4459 break; 4460 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 4461 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno); 4462 break; 4463 default: 4464 SPDK_UNREACHABLE(); 4465 break; 4466 } 4467 4468 free(ctx); 4469 } 4470 4471 static void 4472 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 4473 { 4474 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4475 struct spdk_blob *origblob = ctx->original.blob; 4476 4477 if (bserrno != 0) { 4478 if (ctx->bserrno != 0) { 4479 SPDK_ERRLOG("Unfreeze error %d\n", bserrno); 4480 } else { 4481 ctx->bserrno = bserrno; 4482 } 4483 } 4484 4485 ctx->original.id = origblob->id; 4486 origblob->locked_operation_in_progress = false; 4487 4488 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4489 } 4490 4491 static void 4492 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) 4493 { 4494 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4495 struct spdk_blob *origblob = ctx->original.blob; 4496 4497 if (bserrno != 0) { 4498 if (ctx->bserrno != 0) { 4499 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4500 } else { 4501 ctx->bserrno = bserrno; 4502 } 4503 } 4504 4505 if (ctx->frozen) { 4506 /* Unfreeze any outstanding I/O */ 4507 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx); 4508 } else { 4509 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0); 4510 } 4511 4512 } 4513 4514 static void 4515 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno) 4516 { 4517 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4518 struct spdk_blob *newblob = ctx->new.blob; 4519 4520 if (bserrno != 0) { 4521 if (ctx->bserrno != 0) { 4522 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4523 } else { 4524 ctx->bserrno = bserrno; 4525 } 4526 } 4527 4528 ctx->new.id = newblob->id; 4529 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4530 } 4531 4532 /* END blob_cleanup */ 4533 4534 /* START spdk_bs_create_snapshot */ 4535 4536 static void 4537 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2) 4538 { 4539 uint64_t *cluster_temp; 4540 4541 cluster_temp = blob1->active.clusters; 4542 blob1->active.clusters = blob2->active.clusters; 4543 blob2->active.clusters = cluster_temp; 4544 } 4545 4546 static void 4547 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno) 4548 { 4549 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4550 struct spdk_blob *origblob = ctx->original.blob; 4551 struct spdk_blob *newblob = ctx->new.blob; 4552 4553 if (bserrno != 0) { 4554 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4555 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4556 return; 4557 } 4558 4559 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */ 4560 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true); 4561 if (bserrno != 0) { 4562 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4563 return; 4564 } 4565 4566 _spdk_bs_blob_list_add(ctx->original.blob); 4567 4568 spdk_blob_set_read_only(newblob); 4569 4570 /* sync snapshot metadata */ 4571 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, cb_arg); 4572 } 4573 4574 static void 4575 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) 4576 { 4577 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4578 struct spdk_blob *origblob = ctx->original.blob; 4579 struct spdk_blob *newblob = ctx->new.blob; 4580 4581 if (bserrno != 0) { 4582 /* return cluster map back to original */ 4583 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4584 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4585 return; 4586 } 4587 4588 /* Set internal xattr for snapshot id */ 4589 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true); 4590 if (bserrno != 0) { 4591 /* return cluster map back to original */ 4592 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4593 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4594 return; 4595 } 4596 4597 _spdk_bs_blob_list_remove(origblob); 4598 origblob->parent_id = newblob->id; 4599 4600 /* Create new back_bs_dev for snapshot */ 4601 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob); 4602 if (origblob->back_bs_dev == NULL) { 4603 /* return cluster map back to original */ 4604 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4605 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL); 4606 return; 4607 } 4608 4609 /* set clone blob as thin provisioned */ 4610 _spdk_blob_set_thin_provision(origblob); 4611 4612 _spdk_bs_blob_list_add(newblob); 4613 4614 /* sync clone metadata */ 4615 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx); 4616 } 4617 4618 static void 4619 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc) 4620 { 4621 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4622 struct spdk_blob *origblob = ctx->original.blob; 4623 struct spdk_blob *newblob = ctx->new.blob; 4624 int bserrno; 4625 4626 if (rc != 0) { 4627 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc); 4628 return; 4629 } 4630 4631 ctx->frozen = true; 4632 4633 /* set new back_bs_dev for snapshot */ 4634 newblob->back_bs_dev = origblob->back_bs_dev; 4635 /* Set invalid flags from origblob */ 4636 newblob->invalid_flags = origblob->invalid_flags; 4637 4638 /* inherit parent from original blob if set */ 4639 newblob->parent_id = origblob->parent_id; 4640 if (origblob->parent_id != SPDK_BLOBID_INVALID) { 4641 /* Set internal xattr for snapshot id */ 4642 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT, 4643 &origblob->parent_id, sizeof(spdk_blob_id), true); 4644 if (bserrno != 0) { 4645 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4646 return; 4647 } 4648 } 4649 4650 /* swap cluster maps */ 4651 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4652 4653 /* Set the clear method on the new blob to match the original. */ 4654 _spdk_blob_set_clear_method(newblob, origblob->clear_method); 4655 4656 /* sync snapshot metadata */ 4657 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); 4658 } 4659 4660 static void 4661 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4662 { 4663 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4664 struct spdk_blob *origblob = ctx->original.blob; 4665 struct spdk_blob *newblob = _blob; 4666 4667 if (bserrno != 0) { 4668 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4669 return; 4670 } 4671 4672 ctx->new.blob = newblob; 4673 assert(spdk_blob_is_thin_provisioned(newblob)); 4674 assert(spdk_mem_all_zero(newblob->active.clusters, 4675 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 4676 4677 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx); 4678 } 4679 4680 static void 4681 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4682 { 4683 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4684 struct spdk_blob *origblob = ctx->original.blob; 4685 4686 if (bserrno != 0) { 4687 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4688 return; 4689 } 4690 4691 ctx->new.id = blobid; 4692 ctx->cpl.u.blobid.blobid = blobid; 4693 4694 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx); 4695 } 4696 4697 4698 static void 4699 _spdk_bs_xattr_snapshot(void *arg, const char *name, 4700 const void **value, size_t *value_len) 4701 { 4702 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0); 4703 4704 struct spdk_blob *blob = (struct spdk_blob *)arg; 4705 *value = &blob->id; 4706 *value_len = sizeof(blob->id); 4707 } 4708 4709 static void 4710 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4711 { 4712 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4713 struct spdk_blob_opts opts; 4714 struct spdk_blob_xattr_opts internal_xattrs; 4715 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS }; 4716 4717 if (bserrno != 0) { 4718 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4719 return; 4720 } 4721 4722 ctx->original.blob = _blob; 4723 4724 if (_blob->data_ro || _blob->md_ro) { 4725 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n", 4726 _blob->id); 4727 ctx->bserrno = -EINVAL; 4728 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4729 return; 4730 } 4731 4732 if (_blob->locked_operation_in_progress) { 4733 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n"); 4734 ctx->bserrno = -EBUSY; 4735 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4736 return; 4737 } 4738 4739 _blob->locked_operation_in_progress = true; 4740 4741 spdk_blob_opts_init(&opts); 4742 _spdk_blob_xattrs_init(&internal_xattrs); 4743 4744 /* Change the size of new blob to the same as in original blob, 4745 * but do not allocate clusters */ 4746 opts.thin_provision = true; 4747 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4748 4749 /* If there are any xattrs specified for snapshot, set them now */ 4750 if (ctx->xattrs) { 4751 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4752 } 4753 /* Set internal xattr SNAPSHOT_IN_PROGRESS */ 4754 internal_xattrs.count = 1; 4755 internal_xattrs.ctx = _blob; 4756 internal_xattrs.names = xattrs_names; 4757 internal_xattrs.get_value = _spdk_bs_xattr_snapshot; 4758 4759 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4760 _spdk_bs_snapshot_newblob_create_cpl, ctx); 4761 } 4762 4763 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, 4764 const struct spdk_blob_xattr_opts *snapshot_xattrs, 4765 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4766 { 4767 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4768 4769 if (!ctx) { 4770 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4771 return; 4772 } 4773 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4774 ctx->cpl.u.blobid.cb_fn = cb_fn; 4775 ctx->cpl.u.blobid.cb_arg = cb_arg; 4776 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4777 ctx->bserrno = 0; 4778 ctx->frozen = false; 4779 ctx->original.id = blobid; 4780 ctx->xattrs = snapshot_xattrs; 4781 4782 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx); 4783 } 4784 /* END spdk_bs_create_snapshot */ 4785 4786 /* START spdk_bs_create_clone */ 4787 4788 static void 4789 _spdk_bs_xattr_clone(void *arg, const char *name, 4790 const void **value, size_t *value_len) 4791 { 4792 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0); 4793 4794 struct spdk_blob *blob = (struct spdk_blob *)arg; 4795 *value = &blob->id; 4796 *value_len = sizeof(blob->id); 4797 } 4798 4799 static void 4800 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4801 { 4802 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4803 struct spdk_blob *clone = _blob; 4804 4805 ctx->new.blob = clone; 4806 _spdk_bs_blob_list_add(clone); 4807 4808 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4809 } 4810 4811 static void 4812 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4813 { 4814 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4815 4816 ctx->cpl.u.blobid.blobid = blobid; 4817 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx); 4818 } 4819 4820 static void 4821 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4822 { 4823 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4824 struct spdk_blob_opts opts; 4825 struct spdk_blob_xattr_opts internal_xattrs; 4826 char *xattr_names[] = { BLOB_SNAPSHOT }; 4827 4828 if (bserrno != 0) { 4829 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4830 return; 4831 } 4832 4833 ctx->original.blob = _blob; 4834 4835 if (!_blob->data_ro || !_blob->md_ro) { 4836 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n"); 4837 ctx->bserrno = -EINVAL; 4838 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4839 return; 4840 } 4841 4842 if (_blob->locked_operation_in_progress) { 4843 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n"); 4844 ctx->bserrno = -EBUSY; 4845 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4846 return; 4847 } 4848 4849 _blob->locked_operation_in_progress = true; 4850 4851 spdk_blob_opts_init(&opts); 4852 _spdk_blob_xattrs_init(&internal_xattrs); 4853 4854 opts.thin_provision = true; 4855 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4856 if (ctx->xattrs) { 4857 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4858 } 4859 4860 /* Set internal xattr BLOB_SNAPSHOT */ 4861 internal_xattrs.count = 1; 4862 internal_xattrs.ctx = _blob; 4863 internal_xattrs.names = xattr_names; 4864 internal_xattrs.get_value = _spdk_bs_xattr_clone; 4865 4866 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4867 _spdk_bs_clone_newblob_create_cpl, ctx); 4868 } 4869 4870 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid, 4871 const struct spdk_blob_xattr_opts *clone_xattrs, 4872 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4873 { 4874 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4875 4876 if (!ctx) { 4877 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4878 return; 4879 } 4880 4881 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4882 ctx->cpl.u.blobid.cb_fn = cb_fn; 4883 ctx->cpl.u.blobid.cb_arg = cb_arg; 4884 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4885 ctx->bserrno = 0; 4886 ctx->xattrs = clone_xattrs; 4887 ctx->original.id = blobid; 4888 4889 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx); 4890 } 4891 4892 /* END spdk_bs_create_clone */ 4893 4894 /* START spdk_bs_inflate_blob */ 4895 4896 static void 4897 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno) 4898 { 4899 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4900 struct spdk_blob *_blob = ctx->original.blob; 4901 4902 if (bserrno != 0) { 4903 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4904 return; 4905 } 4906 4907 assert(_parent != NULL); 4908 4909 _spdk_bs_blob_list_remove(_blob); 4910 _blob->parent_id = _parent->id; 4911 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id, 4912 sizeof(spdk_blob_id), true); 4913 4914 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4915 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent); 4916 _spdk_bs_blob_list_add(_blob); 4917 4918 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4919 } 4920 4921 static void 4922 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno) 4923 { 4924 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4925 struct spdk_blob *_blob = ctx->original.blob; 4926 struct spdk_blob *_parent; 4927 4928 if (bserrno != 0) { 4929 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4930 return; 4931 } 4932 4933 if (ctx->allocate_all) { 4934 /* remove thin provisioning */ 4935 _spdk_bs_blob_list_remove(_blob); 4936 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4937 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV; 4938 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4939 _blob->back_bs_dev = NULL; 4940 _blob->parent_id = SPDK_BLOBID_INVALID; 4941 } else { 4942 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob; 4943 if (_parent->parent_id != SPDK_BLOBID_INVALID) { 4944 /* We must change the parent of the inflated blob */ 4945 spdk_bs_open_blob(_blob->bs, _parent->parent_id, 4946 _spdk_bs_inflate_blob_set_parent_cpl, ctx); 4947 return; 4948 } 4949 4950 _spdk_bs_blob_list_remove(_blob); 4951 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4952 _blob->parent_id = SPDK_BLOBID_INVALID; 4953 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4954 _blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 4955 } 4956 4957 _blob->state = SPDK_BLOB_STATE_DIRTY; 4958 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4959 } 4960 4961 /* Check if cluster needs allocation */ 4962 static inline bool 4963 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all) 4964 { 4965 struct spdk_blob_bs_dev *b; 4966 4967 assert(blob != NULL); 4968 4969 if (blob->active.clusters[cluster] != 0) { 4970 /* Cluster is already allocated */ 4971 return false; 4972 } 4973 4974 if (blob->parent_id == SPDK_BLOBID_INVALID) { 4975 /* Blob have no parent blob */ 4976 return allocate_all; 4977 } 4978 4979 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev; 4980 return (allocate_all || b->blob->active.clusters[cluster] != 0); 4981 } 4982 4983 static void 4984 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno) 4985 { 4986 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4987 struct spdk_blob *_blob = ctx->original.blob; 4988 uint64_t offset; 4989 4990 if (bserrno != 0) { 4991 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4992 return; 4993 } 4994 4995 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) { 4996 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) { 4997 break; 4998 } 4999 } 5000 5001 if (ctx->cluster < _blob->active.num_clusters) { 5002 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster); 5003 5004 /* We may safely increment a cluster before write */ 5005 ctx->cluster++; 5006 5007 /* Use zero length write to touch a cluster */ 5008 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0, 5009 _spdk_bs_inflate_blob_touch_next, ctx); 5010 } else { 5011 _spdk_bs_inflate_blob_done(cb_arg, bserrno); 5012 } 5013 } 5014 5015 static void 5016 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5017 { 5018 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5019 uint64_t lfc; /* lowest free cluster */ 5020 uint64_t i; 5021 5022 if (bserrno != 0) { 5023 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5024 return; 5025 } 5026 5027 ctx->original.blob = _blob; 5028 5029 if (_blob->locked_operation_in_progress) { 5030 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n"); 5031 ctx->bserrno = -EBUSY; 5032 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 5033 return; 5034 } 5035 5036 _blob->locked_operation_in_progress = true; 5037 5038 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) { 5039 /* This blob have no parent, so we cannot decouple it. */ 5040 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n"); 5041 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL); 5042 return; 5043 } 5044 5045 if (spdk_blob_is_thin_provisioned(_blob) == false) { 5046 /* This is not thin provisioned blob. No need to inflate. */ 5047 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0); 5048 return; 5049 } 5050 5051 /* Do two passes - one to verify that we can obtain enough clusters 5052 * and another to actually claim them. 5053 */ 5054 lfc = 0; 5055 for (i = 0; i < _blob->active.num_clusters; i++) { 5056 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) { 5057 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc); 5058 if (lfc == UINT32_MAX) { 5059 /* No more free clusters. Cannot satisfy the request */ 5060 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC); 5061 return; 5062 } 5063 lfc++; 5064 } 5065 } 5066 5067 ctx->cluster = 0; 5068 _spdk_bs_inflate_blob_touch_next(ctx, 0); 5069 } 5070 5071 static void 5072 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5073 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg) 5074 { 5075 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5076 5077 if (!ctx) { 5078 cb_fn(cb_arg, -ENOMEM); 5079 return; 5080 } 5081 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5082 ctx->cpl.u.bs_basic.cb_fn = cb_fn; 5083 ctx->cpl.u.bs_basic.cb_arg = cb_arg; 5084 ctx->bserrno = 0; 5085 ctx->original.id = blobid; 5086 ctx->channel = channel; 5087 ctx->allocate_all = allocate_all; 5088 5089 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx); 5090 } 5091 5092 void 5093 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5094 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5095 { 5096 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg); 5097 } 5098 5099 void 5100 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5101 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5102 { 5103 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg); 5104 } 5105 /* END spdk_bs_inflate_blob */ 5106 5107 /* START spdk_blob_resize */ 5108 struct spdk_bs_resize_ctx { 5109 spdk_blob_op_complete cb_fn; 5110 void *cb_arg; 5111 struct spdk_blob *blob; 5112 uint64_t sz; 5113 int rc; 5114 }; 5115 5116 static void 5117 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc) 5118 { 5119 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5120 5121 if (rc != 0) { 5122 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc); 5123 } 5124 5125 if (ctx->rc != 0) { 5126 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc); 5127 rc = ctx->rc; 5128 } 5129 5130 ctx->blob->locked_operation_in_progress = false; 5131 5132 ctx->cb_fn(ctx->cb_arg, rc); 5133 free(ctx); 5134 } 5135 5136 static void 5137 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc) 5138 { 5139 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5140 5141 if (rc != 0) { 5142 ctx->blob->locked_operation_in_progress = false; 5143 ctx->cb_fn(ctx->cb_arg, rc); 5144 free(ctx); 5145 return; 5146 } 5147 5148 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz); 5149 5150 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx); 5151 } 5152 5153 void 5154 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg) 5155 { 5156 struct spdk_bs_resize_ctx *ctx; 5157 5158 _spdk_blob_verify_md_op(blob); 5159 5160 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 5161 5162 if (blob->md_ro) { 5163 cb_fn(cb_arg, -EPERM); 5164 return; 5165 } 5166 5167 if (sz == blob->active.num_clusters) { 5168 cb_fn(cb_arg, 0); 5169 return; 5170 } 5171 5172 if (blob->locked_operation_in_progress) { 5173 cb_fn(cb_arg, -EBUSY); 5174 return; 5175 } 5176 5177 ctx = calloc(1, sizeof(*ctx)); 5178 if (!ctx) { 5179 cb_fn(cb_arg, -ENOMEM); 5180 return; 5181 } 5182 5183 blob->locked_operation_in_progress = true; 5184 ctx->cb_fn = cb_fn; 5185 ctx->cb_arg = cb_arg; 5186 ctx->blob = blob; 5187 ctx->sz = sz; 5188 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx); 5189 } 5190 5191 /* END spdk_blob_resize */ 5192 5193 5194 /* START spdk_bs_delete_blob */ 5195 5196 static void 5197 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 5198 { 5199 spdk_bs_sequence_t *seq = cb_arg; 5200 5201 spdk_bs_sequence_finish(seq, bserrno); 5202 } 5203 5204 static void 5205 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5206 { 5207 struct spdk_blob *blob = cb_arg; 5208 5209 if (bserrno != 0) { 5210 /* 5211 * We already removed this blob from the blobstore tailq, so 5212 * we need to free it here since this is the last reference 5213 * to it. 5214 */ 5215 _spdk_blob_free(blob); 5216 _spdk_bs_delete_close_cpl(seq, bserrno); 5217 return; 5218 } 5219 5220 /* 5221 * This will immediately decrement the ref_count and call 5222 * the completion routine since the metadata state is clean. 5223 * By calling spdk_blob_close, we reduce the number of call 5224 * points into code that touches the blob->open_ref count 5225 * and the blobstore's blob list. 5226 */ 5227 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq); 5228 } 5229 5230 struct delete_snapshot_ctx { 5231 struct spdk_blob_list *parent_snapshot_entry; 5232 struct spdk_blob *snapshot; 5233 bool snapshot_md_ro; 5234 struct spdk_blob *clone; 5235 bool clone_md_ro; 5236 spdk_blob_op_with_handle_complete cb_fn; 5237 void *cb_arg; 5238 int bserrno; 5239 }; 5240 5241 static void 5242 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno) 5243 { 5244 struct delete_snapshot_ctx *ctx = cb_arg; 5245 5246 if (bserrno != 0) { 5247 SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno); 5248 } 5249 5250 assert(ctx != NULL); 5251 5252 if (bserrno != 0 && ctx->bserrno == 0) { 5253 ctx->bserrno = bserrno; 5254 } 5255 5256 ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno); 5257 free(ctx); 5258 } 5259 5260 static void 5261 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno) 5262 { 5263 struct delete_snapshot_ctx *ctx = cb_arg; 5264 5265 if (bserrno != 0) { 5266 ctx->bserrno = bserrno; 5267 SPDK_ERRLOG("Clone cleanup error %d\n", bserrno); 5268 } 5269 5270 /* open_ref == 1 menas that only deletion context has opened this snapshot 5271 * open_ref == 2 menas that clone has opened this snapshot as well, 5272 * so we have to add it back to the blobs list */ 5273 if (ctx->snapshot->open_ref == 2) { 5274 TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link); 5275 } 5276 5277 ctx->snapshot->locked_operation_in_progress = false; 5278 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5279 5280 spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx); 5281 } 5282 5283 static void 5284 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno) 5285 { 5286 struct delete_snapshot_ctx *ctx = cb_arg; 5287 5288 ctx->clone->locked_operation_in_progress = false; 5289 ctx->clone->md_ro = ctx->clone_md_ro; 5290 5291 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5292 } 5293 5294 static void 5295 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 5296 { 5297 struct delete_snapshot_ctx *ctx = cb_arg; 5298 5299 if (bserrno) { 5300 ctx->bserrno = bserrno; 5301 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5302 return; 5303 } 5304 5305 ctx->clone->locked_operation_in_progress = false; 5306 spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx); 5307 } 5308 5309 static void 5310 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno) 5311 { 5312 struct delete_snapshot_ctx *ctx = cb_arg; 5313 struct spdk_blob_list *parent_snapshot_entry = NULL; 5314 struct spdk_blob_list *snapshot_entry = NULL; 5315 struct spdk_blob_list *clone_entry = NULL; 5316 struct spdk_blob_list *snapshot_clone_entry = NULL; 5317 5318 if (bserrno) { 5319 SPDK_ERRLOG("Failed to sync MD on blob\n"); 5320 ctx->bserrno = bserrno; 5321 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5322 return; 5323 } 5324 5325 /* Get snapshot entry for the snapshot we want to remove */ 5326 snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id); 5327 5328 assert(snapshot_entry != NULL); 5329 5330 /* Remove clone entry in this snapshot (at this point there can be only one clone) */ 5331 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5332 assert(clone_entry != NULL); 5333 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 5334 snapshot_entry->clone_count--; 5335 assert(TAILQ_EMPTY(&snapshot_entry->clones)); 5336 5337 if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) { 5338 /* This snapshot is at the same time a clone of another snapshot - we need to 5339 * update parent snapshot (remove current clone, add new one inherited from 5340 * the snapshot that is being removed) */ 5341 5342 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5343 * snapshot that we are removing */ 5344 _spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry, 5345 &snapshot_clone_entry); 5346 5347 /* Switch clone entry in parent snapshot */ 5348 TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link); 5349 TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link); 5350 free(snapshot_clone_entry); 5351 } else { 5352 /* No parent snapshot - just remove clone entry */ 5353 free(clone_entry); 5354 } 5355 5356 /* Restore md_ro flags */ 5357 ctx->clone->md_ro = ctx->clone_md_ro; 5358 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5359 5360 _spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx); 5361 } 5362 5363 static void 5364 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno) 5365 { 5366 struct delete_snapshot_ctx *ctx = cb_arg; 5367 uint64_t i; 5368 5369 ctx->snapshot->md_ro = false; 5370 5371 if (bserrno) { 5372 SPDK_ERRLOG("Failed to sync MD on clone\n"); 5373 ctx->bserrno = bserrno; 5374 5375 /* Restore snapshot to previous state */ 5376 bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true); 5377 if (bserrno != 0) { 5378 _spdk_delete_snapshot_cleanup_clone(ctx, bserrno); 5379 return; 5380 } 5381 5382 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx); 5383 return; 5384 } 5385 5386 /* Clear cluster map entries for snapshot */ 5387 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5388 if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) { 5389 ctx->snapshot->active.clusters[i] = 0; 5390 } 5391 } 5392 5393 ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY; 5394 5395 if (ctx->parent_snapshot_entry != NULL) { 5396 ctx->snapshot->back_bs_dev = NULL; 5397 } 5398 5399 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx); 5400 } 5401 5402 static void 5403 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno) 5404 { 5405 struct delete_snapshot_ctx *ctx = cb_arg; 5406 uint64_t i; 5407 5408 /* Temporarily override md_ro flag for clone for MD modification */ 5409 ctx->clone_md_ro = ctx->clone->md_ro; 5410 ctx->clone->md_ro = false; 5411 5412 if (bserrno) { 5413 SPDK_ERRLOG("Failed to sync MD with xattr on blob\n"); 5414 ctx->bserrno = bserrno; 5415 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5416 return; 5417 } 5418 5419 /* Copy snapshot map to clone map (only unallocated clusters in clone) */ 5420 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5421 if (ctx->clone->active.clusters[i] == 0) { 5422 ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i]; 5423 } 5424 } 5425 5426 /* Delete old backing bs_dev from clone (related to snapshot that will be removed) */ 5427 ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev); 5428 5429 /* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */ 5430 if (ctx->parent_snapshot_entry != NULL) { 5431 /* ...to parent snapshot */ 5432 ctx->clone->parent_id = ctx->parent_snapshot_entry->id; 5433 ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev; 5434 _spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id, 5435 sizeof(spdk_blob_id), 5436 true); 5437 } else { 5438 /* ...to blobid invalid and zeroes dev */ 5439 ctx->clone->parent_id = SPDK_BLOBID_INVALID; 5440 ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev(); 5441 _spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true); 5442 } 5443 5444 spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx); 5445 } 5446 5447 static void 5448 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno) 5449 { 5450 struct delete_snapshot_ctx *ctx = cb_arg; 5451 5452 if (bserrno) { 5453 SPDK_ERRLOG("Failed to freeze I/O on clone\n"); 5454 ctx->bserrno = bserrno; 5455 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5456 return; 5457 } 5458 5459 /* Temporarily override md_ro flag for snapshot for MD modification */ 5460 ctx->snapshot_md_ro = ctx->snapshot->md_ro; 5461 ctx->snapshot->md_ro = false; 5462 5463 /* Mark blob as pending for removal for power failure safety, use clone id for recovery */ 5464 ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id, 5465 sizeof(spdk_blob_id), true); 5466 if (ctx->bserrno != 0) { 5467 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5468 return; 5469 } 5470 5471 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx); 5472 } 5473 5474 static void 5475 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno) 5476 { 5477 struct delete_snapshot_ctx *ctx = cb_arg; 5478 5479 if (bserrno) { 5480 SPDK_ERRLOG("Failed to open clone\n"); 5481 ctx->bserrno = bserrno; 5482 _spdk_delete_snapshot_cleanup_snapshot(ctx, 0); 5483 return; 5484 } 5485 5486 ctx->clone = clone; 5487 5488 if (clone->locked_operation_in_progress) { 5489 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n"); 5490 ctx->bserrno = -EBUSY; 5491 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5492 return; 5493 } 5494 5495 clone->locked_operation_in_progress = true; 5496 5497 _spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx); 5498 } 5499 5500 static void 5501 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx) 5502 { 5503 struct spdk_blob_list *snapshot_entry = NULL; 5504 struct spdk_blob_list *clone_entry = NULL; 5505 struct spdk_blob_list *snapshot_clone_entry = NULL; 5506 5507 /* Get snapshot entry for the snapshot we want to remove */ 5508 snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id); 5509 5510 assert(snapshot_entry != NULL); 5511 5512 /* Get clone of the snapshot (at this point there can be only one clone) */ 5513 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5514 assert(snapshot_entry->clone_count == 1); 5515 assert(clone_entry != NULL); 5516 5517 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5518 * snapshot that we are removing */ 5519 _spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry, 5520 &snapshot_clone_entry); 5521 5522 spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx); 5523 } 5524 5525 static void 5526 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno) 5527 { 5528 spdk_bs_sequence_t *seq = cb_arg; 5529 struct spdk_blob_list *snapshot_entry = NULL; 5530 uint32_t page_num; 5531 5532 if (bserrno) { 5533 SPDK_ERRLOG("Failed to remove blob\n"); 5534 spdk_bs_sequence_finish(seq, bserrno); 5535 return; 5536 } 5537 5538 /* Remove snapshot from the list */ 5539 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5540 if (snapshot_entry != NULL) { 5541 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link); 5542 free(snapshot_entry); 5543 } 5544 5545 page_num = _spdk_bs_blobid_to_page(blob->id); 5546 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 5547 blob->state = SPDK_BLOB_STATE_DIRTY; 5548 blob->active.num_pages = 0; 5549 _spdk_blob_resize(blob, 0); 5550 5551 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob); 5552 } 5553 5554 static int 5555 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone) 5556 { 5557 struct spdk_blob_list *snapshot_entry = NULL; 5558 struct spdk_blob_list *clone_entry = NULL; 5559 struct spdk_blob *clone = NULL; 5560 bool has_one_clone = false; 5561 5562 /* Check if this is a snapshot with clones */ 5563 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5564 if (snapshot_entry != NULL) { 5565 if (snapshot_entry->clone_count > 1) { 5566 SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n"); 5567 return -EBUSY; 5568 } else if (snapshot_entry->clone_count == 1) { 5569 has_one_clone = true; 5570 } 5571 } 5572 5573 /* Check if someone has this blob open (besides this delete context): 5574 * - open_ref = 1 - only this context opened blob, so it is ok to remove it 5575 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot 5576 * and that is ok, because we will update it accordingly */ 5577 if (blob->open_ref <= 2 && has_one_clone) { 5578 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5579 assert(clone_entry != NULL); 5580 clone = _spdk_blob_lookup(blob->bs, clone_entry->id); 5581 5582 if (blob->open_ref == 2 && clone == NULL) { 5583 /* Clone is closed and someone else opened this blob */ 5584 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5585 return -EBUSY; 5586 } 5587 5588 *update_clone = true; 5589 return 0; 5590 } 5591 5592 if (blob->open_ref > 1) { 5593 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5594 return -EBUSY; 5595 } 5596 5597 assert(has_one_clone == false); 5598 *update_clone = false; 5599 return 0; 5600 } 5601 5602 static void 5603 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno) 5604 { 5605 spdk_bs_sequence_t *seq = cb_arg; 5606 5607 spdk_bs_sequence_finish(seq, -ENOMEM); 5608 } 5609 5610 static void 5611 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 5612 { 5613 spdk_bs_sequence_t *seq = cb_arg; 5614 struct delete_snapshot_ctx *ctx; 5615 bool update_clone = false; 5616 5617 if (bserrno != 0) { 5618 spdk_bs_sequence_finish(seq, bserrno); 5619 return; 5620 } 5621 5622 _spdk_blob_verify_md_op(blob); 5623 5624 ctx = calloc(1, sizeof(*ctx)); 5625 if (ctx == NULL) { 5626 spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq); 5627 return; 5628 } 5629 5630 ctx->snapshot = blob; 5631 ctx->cb_fn = _spdk_bs_delete_blob_finish; 5632 ctx->cb_arg = seq; 5633 5634 /* Check if blob can be removed and if it is a snapshot with clone on top of it */ 5635 ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone); 5636 if (ctx->bserrno) { 5637 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5638 return; 5639 } 5640 5641 if (blob->locked_operation_in_progress) { 5642 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n"); 5643 ctx->bserrno = -EBUSY; 5644 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5645 return; 5646 } 5647 5648 blob->locked_operation_in_progress = true; 5649 5650 /* 5651 * Remove the blob from the blob_store list now, to ensure it does not 5652 * get returned after this point by _spdk_blob_lookup(). 5653 */ 5654 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5655 5656 if (update_clone) { 5657 /* This blob is a snapshot with active clone - update clone first */ 5658 _spdk_update_clone_on_snapshot_deletion(blob, ctx); 5659 } else { 5660 /* This blob does not have any clones - just remove it */ 5661 _spdk_bs_blob_list_remove(blob); 5662 _spdk_bs_delete_blob_finish(seq, blob, 0); 5663 free(ctx); 5664 } 5665 } 5666 5667 void 5668 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5669 spdk_blob_op_complete cb_fn, void *cb_arg) 5670 { 5671 struct spdk_bs_cpl cpl; 5672 spdk_bs_sequence_t *seq; 5673 5674 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 5675 5676 assert(spdk_get_thread() == bs->md_thread); 5677 5678 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5679 cpl.u.blob_basic.cb_fn = cb_fn; 5680 cpl.u.blob_basic.cb_arg = cb_arg; 5681 5682 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5683 if (!seq) { 5684 cb_fn(cb_arg, -ENOMEM); 5685 return; 5686 } 5687 5688 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 5689 } 5690 5691 /* END spdk_bs_delete_blob */ 5692 5693 /* START spdk_bs_open_blob */ 5694 5695 static void 5696 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5697 { 5698 struct spdk_blob *blob = cb_arg; 5699 5700 if (bserrno != 0) { 5701 _spdk_blob_free(blob); 5702 seq->cpl.u.blob_handle.blob = NULL; 5703 spdk_bs_sequence_finish(seq, bserrno); 5704 return; 5705 } 5706 5707 blob->open_ref++; 5708 5709 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 5710 5711 spdk_bs_sequence_finish(seq, bserrno); 5712 } 5713 5714 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5715 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5716 { 5717 struct spdk_blob *blob; 5718 struct spdk_bs_cpl cpl; 5719 struct spdk_blob_open_opts opts_default; 5720 spdk_bs_sequence_t *seq; 5721 uint32_t page_num; 5722 5723 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 5724 assert(spdk_get_thread() == bs->md_thread); 5725 5726 page_num = _spdk_bs_blobid_to_page(blobid); 5727 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 5728 /* Invalid blobid */ 5729 cb_fn(cb_arg, NULL, -ENOENT); 5730 return; 5731 } 5732 5733 blob = _spdk_blob_lookup(bs, blobid); 5734 if (blob) { 5735 blob->open_ref++; 5736 cb_fn(cb_arg, blob, 0); 5737 return; 5738 } 5739 5740 blob = _spdk_blob_alloc(bs, blobid); 5741 if (!blob) { 5742 cb_fn(cb_arg, NULL, -ENOMEM); 5743 return; 5744 } 5745 5746 if (!opts) { 5747 spdk_blob_open_opts_init(&opts_default); 5748 opts = &opts_default; 5749 } 5750 5751 blob->clear_method = opts->clear_method; 5752 5753 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 5754 cpl.u.blob_handle.cb_fn = cb_fn; 5755 cpl.u.blob_handle.cb_arg = cb_arg; 5756 cpl.u.blob_handle.blob = blob; 5757 5758 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5759 if (!seq) { 5760 _spdk_blob_free(blob); 5761 cb_fn(cb_arg, NULL, -ENOMEM); 5762 return; 5763 } 5764 5765 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 5766 } 5767 5768 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5769 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5770 { 5771 _spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg); 5772 } 5773 5774 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid, 5775 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5776 { 5777 _spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg); 5778 } 5779 5780 /* END spdk_bs_open_blob */ 5781 5782 /* START spdk_blob_set_read_only */ 5783 int spdk_blob_set_read_only(struct spdk_blob *blob) 5784 { 5785 _spdk_blob_verify_md_op(blob); 5786 5787 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 5788 5789 blob->state = SPDK_BLOB_STATE_DIRTY; 5790 return 0; 5791 } 5792 /* END spdk_blob_set_read_only */ 5793 5794 /* START spdk_blob_sync_md */ 5795 5796 static void 5797 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5798 { 5799 struct spdk_blob *blob = cb_arg; 5800 5801 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 5802 blob->data_ro = true; 5803 blob->md_ro = true; 5804 } 5805 5806 spdk_bs_sequence_finish(seq, bserrno); 5807 } 5808 5809 static void 5810 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5811 { 5812 struct spdk_bs_cpl cpl; 5813 spdk_bs_sequence_t *seq; 5814 5815 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5816 cpl.u.blob_basic.cb_fn = cb_fn; 5817 cpl.u.blob_basic.cb_arg = cb_arg; 5818 5819 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5820 if (!seq) { 5821 cb_fn(cb_arg, -ENOMEM); 5822 return; 5823 } 5824 5825 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 5826 } 5827 5828 void 5829 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5830 { 5831 _spdk_blob_verify_md_op(blob); 5832 5833 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 5834 5835 if (blob->md_ro) { 5836 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 5837 cb_fn(cb_arg, 0); 5838 return; 5839 } 5840 5841 _spdk_blob_sync_md(blob, cb_fn, cb_arg); 5842 } 5843 5844 /* END spdk_blob_sync_md */ 5845 5846 struct spdk_blob_insert_cluster_ctx { 5847 struct spdk_thread *thread; 5848 struct spdk_blob *blob; 5849 uint32_t cluster_num; /* cluster index in blob */ 5850 uint32_t cluster; /* cluster on disk */ 5851 int rc; 5852 spdk_blob_op_complete cb_fn; 5853 void *cb_arg; 5854 }; 5855 5856 static void 5857 _spdk_blob_insert_cluster_msg_cpl(void *arg) 5858 { 5859 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5860 5861 ctx->cb_fn(ctx->cb_arg, ctx->rc); 5862 free(ctx); 5863 } 5864 5865 static void 5866 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno) 5867 { 5868 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5869 5870 ctx->rc = bserrno; 5871 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5872 } 5873 5874 static void 5875 _spdk_blob_insert_cluster_msg(void *arg) 5876 { 5877 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5878 5879 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster); 5880 if (ctx->rc != 0) { 5881 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5882 return; 5883 } 5884 5885 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 5886 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx); 5887 } 5888 5889 static void 5890 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 5891 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg) 5892 { 5893 struct spdk_blob_insert_cluster_ctx *ctx; 5894 5895 ctx = calloc(1, sizeof(*ctx)); 5896 if (ctx == NULL) { 5897 cb_fn(cb_arg, -ENOMEM); 5898 return; 5899 } 5900 5901 ctx->thread = spdk_get_thread(); 5902 ctx->blob = blob; 5903 ctx->cluster_num = cluster_num; 5904 ctx->cluster = cluster; 5905 ctx->cb_fn = cb_fn; 5906 ctx->cb_arg = cb_arg; 5907 5908 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx); 5909 } 5910 5911 /* START spdk_blob_close */ 5912 5913 static void 5914 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5915 { 5916 struct spdk_blob *blob = cb_arg; 5917 5918 if (bserrno == 0) { 5919 blob->open_ref--; 5920 if (blob->open_ref == 0) { 5921 /* 5922 * Blobs with active.num_pages == 0 are deleted blobs. 5923 * these blobs are removed from the blob_store list 5924 * when the deletion process starts - so don't try to 5925 * remove them again. 5926 */ 5927 if (blob->active.num_pages > 0) { 5928 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5929 } 5930 _spdk_blob_free(blob); 5931 } 5932 } 5933 5934 spdk_bs_sequence_finish(seq, bserrno); 5935 } 5936 5937 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5938 { 5939 struct spdk_bs_cpl cpl; 5940 spdk_bs_sequence_t *seq; 5941 5942 _spdk_blob_verify_md_op(blob); 5943 5944 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 5945 5946 if (blob->open_ref == 0) { 5947 cb_fn(cb_arg, -EBADF); 5948 return; 5949 } 5950 5951 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5952 cpl.u.blob_basic.cb_fn = cb_fn; 5953 cpl.u.blob_basic.cb_arg = cb_arg; 5954 5955 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5956 if (!seq) { 5957 cb_fn(cb_arg, -ENOMEM); 5958 return; 5959 } 5960 5961 /* Sync metadata */ 5962 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 5963 } 5964 5965 /* END spdk_blob_close */ 5966 5967 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 5968 { 5969 return spdk_get_io_channel(bs); 5970 } 5971 5972 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 5973 { 5974 spdk_put_io_channel(channel); 5975 } 5976 5977 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel, 5978 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5979 { 5980 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5981 SPDK_BLOB_UNMAP); 5982 } 5983 5984 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel, 5985 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5986 { 5987 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5988 SPDK_BLOB_WRITE_ZEROES); 5989 } 5990 5991 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel, 5992 void *payload, uint64_t offset, uint64_t length, 5993 spdk_blob_op_complete cb_fn, void *cb_arg) 5994 { 5995 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 5996 SPDK_BLOB_WRITE); 5997 } 5998 5999 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel, 6000 void *payload, uint64_t offset, uint64_t length, 6001 spdk_blob_op_complete cb_fn, void *cb_arg) 6002 { 6003 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 6004 SPDK_BLOB_READ); 6005 } 6006 6007 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel, 6008 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6009 spdk_blob_op_complete cb_fn, void *cb_arg) 6010 { 6011 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 6012 } 6013 6014 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel, 6015 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6016 spdk_blob_op_complete cb_fn, void *cb_arg) 6017 { 6018 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 6019 } 6020 6021 struct spdk_bs_iter_ctx { 6022 int64_t page_num; 6023 struct spdk_blob_store *bs; 6024 6025 spdk_blob_op_with_handle_complete cb_fn; 6026 void *cb_arg; 6027 }; 6028 6029 static void 6030 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 6031 { 6032 struct spdk_bs_iter_ctx *ctx = cb_arg; 6033 struct spdk_blob_store *bs = ctx->bs; 6034 spdk_blob_id id; 6035 6036 if (bserrno == 0) { 6037 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 6038 free(ctx); 6039 return; 6040 } 6041 6042 ctx->page_num++; 6043 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 6044 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 6045 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 6046 free(ctx); 6047 return; 6048 } 6049 6050 id = _spdk_bs_page_to_blobid(ctx->page_num); 6051 6052 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 6053 } 6054 6055 void 6056 spdk_bs_iter_first(struct spdk_blob_store *bs, 6057 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6058 { 6059 struct spdk_bs_iter_ctx *ctx; 6060 6061 ctx = calloc(1, sizeof(*ctx)); 6062 if (!ctx) { 6063 cb_fn(cb_arg, NULL, -ENOMEM); 6064 return; 6065 } 6066 6067 ctx->page_num = -1; 6068 ctx->bs = bs; 6069 ctx->cb_fn = cb_fn; 6070 ctx->cb_arg = cb_arg; 6071 6072 _spdk_bs_iter_cpl(ctx, NULL, -1); 6073 } 6074 6075 static void 6076 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 6077 { 6078 struct spdk_bs_iter_ctx *ctx = cb_arg; 6079 6080 _spdk_bs_iter_cpl(ctx, NULL, -1); 6081 } 6082 6083 void 6084 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob, 6085 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6086 { 6087 struct spdk_bs_iter_ctx *ctx; 6088 6089 assert(blob != NULL); 6090 6091 ctx = calloc(1, sizeof(*ctx)); 6092 if (!ctx) { 6093 cb_fn(cb_arg, NULL, -ENOMEM); 6094 return; 6095 } 6096 6097 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 6098 ctx->bs = bs; 6099 ctx->cb_fn = cb_fn; 6100 ctx->cb_arg = cb_arg; 6101 6102 /* Close the existing blob */ 6103 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx); 6104 } 6105 6106 static int 6107 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6108 uint16_t value_len, bool internal) 6109 { 6110 struct spdk_xattr_tailq *xattrs; 6111 struct spdk_xattr *xattr; 6112 size_t desc_size; 6113 6114 _spdk_blob_verify_md_op(blob); 6115 6116 if (blob->md_ro) { 6117 return -EPERM; 6118 } 6119 6120 desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len; 6121 if (desc_size > SPDK_BS_MAX_DESC_SIZE) { 6122 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name, 6123 desc_size, SPDK_BS_MAX_DESC_SIZE); 6124 return -ENOMEM; 6125 } 6126 6127 if (internal) { 6128 xattrs = &blob->xattrs_internal; 6129 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR; 6130 } else { 6131 xattrs = &blob->xattrs; 6132 } 6133 6134 TAILQ_FOREACH(xattr, xattrs, link) { 6135 if (!strcmp(name, xattr->name)) { 6136 free(xattr->value); 6137 xattr->value_len = value_len; 6138 xattr->value = malloc(value_len); 6139 memcpy(xattr->value, value, value_len); 6140 6141 blob->state = SPDK_BLOB_STATE_DIRTY; 6142 6143 return 0; 6144 } 6145 } 6146 6147 xattr = calloc(1, sizeof(*xattr)); 6148 if (!xattr) { 6149 return -ENOMEM; 6150 } 6151 xattr->name = strdup(name); 6152 xattr->value_len = value_len; 6153 xattr->value = malloc(value_len); 6154 memcpy(xattr->value, value, value_len); 6155 TAILQ_INSERT_TAIL(xattrs, xattr, link); 6156 6157 blob->state = SPDK_BLOB_STATE_DIRTY; 6158 6159 return 0; 6160 } 6161 6162 int 6163 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6164 uint16_t value_len) 6165 { 6166 return _spdk_blob_set_xattr(blob, name, value, value_len, false); 6167 } 6168 6169 static int 6170 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal) 6171 { 6172 struct spdk_xattr_tailq *xattrs; 6173 struct spdk_xattr *xattr; 6174 6175 _spdk_blob_verify_md_op(blob); 6176 6177 if (blob->md_ro) { 6178 return -EPERM; 6179 } 6180 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6181 6182 TAILQ_FOREACH(xattr, xattrs, link) { 6183 if (!strcmp(name, xattr->name)) { 6184 TAILQ_REMOVE(xattrs, xattr, link); 6185 free(xattr->value); 6186 free(xattr->name); 6187 free(xattr); 6188 6189 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) { 6190 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR; 6191 } 6192 blob->state = SPDK_BLOB_STATE_DIRTY; 6193 6194 return 0; 6195 } 6196 } 6197 6198 return -ENOENT; 6199 } 6200 6201 int 6202 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name) 6203 { 6204 return _spdk_blob_remove_xattr(blob, name, false); 6205 } 6206 6207 static int 6208 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6209 const void **value, size_t *value_len, bool internal) 6210 { 6211 struct spdk_xattr *xattr; 6212 struct spdk_xattr_tailq *xattrs; 6213 6214 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6215 6216 TAILQ_FOREACH(xattr, xattrs, link) { 6217 if (!strcmp(name, xattr->name)) { 6218 *value = xattr->value; 6219 *value_len = xattr->value_len; 6220 return 0; 6221 } 6222 } 6223 return -ENOENT; 6224 } 6225 6226 int 6227 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6228 const void **value, size_t *value_len) 6229 { 6230 _spdk_blob_verify_md_op(blob); 6231 6232 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false); 6233 } 6234 6235 struct spdk_xattr_names { 6236 uint32_t count; 6237 const char *names[0]; 6238 }; 6239 6240 static int 6241 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names) 6242 { 6243 struct spdk_xattr *xattr; 6244 int count = 0; 6245 6246 TAILQ_FOREACH(xattr, xattrs, link) { 6247 count++; 6248 } 6249 6250 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 6251 if (*names == NULL) { 6252 return -ENOMEM; 6253 } 6254 6255 TAILQ_FOREACH(xattr, xattrs, link) { 6256 (*names)->names[(*names)->count++] = xattr->name; 6257 } 6258 6259 return 0; 6260 } 6261 6262 int 6263 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names) 6264 { 6265 _spdk_blob_verify_md_op(blob); 6266 6267 return _spdk_blob_get_xattr_names(&blob->xattrs, names); 6268 } 6269 6270 uint32_t 6271 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 6272 { 6273 assert(names != NULL); 6274 6275 return names->count; 6276 } 6277 6278 const char * 6279 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 6280 { 6281 if (index >= names->count) { 6282 return NULL; 6283 } 6284 6285 return names->names[index]; 6286 } 6287 6288 void 6289 spdk_xattr_names_free(struct spdk_xattr_names *names) 6290 { 6291 free(names); 6292 } 6293 6294 struct spdk_bs_type 6295 spdk_bs_get_bstype(struct spdk_blob_store *bs) 6296 { 6297 return bs->bstype; 6298 } 6299 6300 void 6301 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 6302 { 6303 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 6304 } 6305 6306 bool 6307 spdk_blob_is_read_only(struct spdk_blob *blob) 6308 { 6309 assert(blob != NULL); 6310 return (blob->data_ro || blob->md_ro); 6311 } 6312 6313 bool 6314 spdk_blob_is_snapshot(struct spdk_blob *blob) 6315 { 6316 struct spdk_blob_list *snapshot_entry; 6317 6318 assert(blob != NULL); 6319 6320 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 6321 if (snapshot_entry == NULL) { 6322 return false; 6323 } 6324 6325 return true; 6326 } 6327 6328 bool 6329 spdk_blob_is_clone(struct spdk_blob *blob) 6330 { 6331 assert(blob != NULL); 6332 6333 if (blob->parent_id != SPDK_BLOBID_INVALID) { 6334 assert(spdk_blob_is_thin_provisioned(blob)); 6335 return true; 6336 } 6337 6338 return false; 6339 } 6340 6341 bool 6342 spdk_blob_is_thin_provisioned(struct spdk_blob *blob) 6343 { 6344 assert(blob != NULL); 6345 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV); 6346 } 6347 6348 static void 6349 _spdk_blob_update_clear_method(struct spdk_blob *blob) 6350 { 6351 enum blob_clear_method stored_cm; 6352 6353 assert(blob != NULL); 6354 6355 /* If BLOB_CLEAR_WITH_DEFAULT was passed in, use the setting stored 6356 * in metadata previously. If something other than the default was 6357 * specified, ignore stored value and used what was passed in. 6358 */ 6359 stored_cm = ((blob->md_ro_flags & SPDK_BLOB_CLEAR_METHOD) >> SPDK_BLOB_CLEAR_METHOD_SHIFT); 6360 6361 if (blob->clear_method == BLOB_CLEAR_WITH_DEFAULT) { 6362 blob->clear_method = stored_cm; 6363 } else if (blob->clear_method != stored_cm) { 6364 SPDK_WARNLOG("Using passed in clear method 0x%x instead of stored value of 0x%x\n", 6365 blob->clear_method, stored_cm); 6366 } 6367 } 6368 6369 spdk_blob_id 6370 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id) 6371 { 6372 struct spdk_blob_list *snapshot_entry = NULL; 6373 struct spdk_blob_list *clone_entry = NULL; 6374 6375 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 6376 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6377 if (clone_entry->id == blob_id) { 6378 return snapshot_entry->id; 6379 } 6380 } 6381 } 6382 6383 return SPDK_BLOBID_INVALID; 6384 } 6385 6386 int 6387 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids, 6388 size_t *count) 6389 { 6390 struct spdk_blob_list *snapshot_entry, *clone_entry; 6391 size_t n; 6392 6393 snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid); 6394 if (snapshot_entry == NULL) { 6395 *count = 0; 6396 return 0; 6397 } 6398 6399 if (ids == NULL || *count < snapshot_entry->clone_count) { 6400 *count = snapshot_entry->clone_count; 6401 return -ENOMEM; 6402 } 6403 *count = snapshot_entry->clone_count; 6404 6405 n = 0; 6406 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6407 ids[n++] = clone_entry->id; 6408 } 6409 6410 return 0; 6411 } 6412 6413 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 6414