1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 #include "spdk/util.h" 44 #include "spdk/string.h" 45 46 #include "spdk_internal/assert.h" 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 51 #define BLOB_CRC32C_INITIAL 0xffffffffUL 52 53 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 54 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 55 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 56 static void _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 57 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg); 58 59 static int _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 60 uint16_t value_len, bool internal); 61 static int _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 62 const void **value, size_t *value_len, bool internal); 63 static int _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal); 64 65 static void 66 _spdk_blob_verify_md_op(struct spdk_blob *blob) 67 { 68 assert(blob != NULL); 69 assert(spdk_get_thread() == blob->bs->md_thread); 70 assert(blob->state != SPDK_BLOB_STATE_LOADING); 71 } 72 73 static struct spdk_blob_list * 74 _spdk_bs_get_snapshot_entry(struct spdk_blob_store *bs, spdk_blob_id blobid) 75 { 76 struct spdk_blob_list *snapshot_entry = NULL; 77 78 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 79 if (snapshot_entry->id == blobid) { 80 break; 81 } 82 } 83 84 return snapshot_entry; 85 } 86 87 static void 88 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 89 { 90 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 91 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 92 assert(bs->num_free_clusters > 0); 93 94 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 95 96 spdk_bit_array_set(bs->used_clusters, cluster_num); 97 bs->num_free_clusters--; 98 } 99 100 static int 101 _spdk_blob_insert_cluster(struct spdk_blob *blob, uint32_t cluster_num, uint64_t cluster) 102 { 103 uint64_t *cluster_lba = &blob->active.clusters[cluster_num]; 104 105 _spdk_blob_verify_md_op(blob); 106 107 if (*cluster_lba != 0) { 108 return -EEXIST; 109 } 110 111 *cluster_lba = _spdk_bs_cluster_to_lba(blob->bs, cluster); 112 return 0; 113 } 114 115 static int 116 _spdk_bs_allocate_cluster(struct spdk_blob *blob, uint32_t cluster_num, 117 uint64_t *lowest_free_cluster, bool update_map) 118 { 119 pthread_mutex_lock(&blob->bs->used_clusters_mutex); 120 *lowest_free_cluster = spdk_bit_array_find_first_clear(blob->bs->used_clusters, 121 *lowest_free_cluster); 122 if (*lowest_free_cluster == UINT32_MAX) { 123 /* No more free clusters. Cannot satisfy the request */ 124 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 125 return -ENOSPC; 126 } 127 128 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", *lowest_free_cluster, blob->id); 129 _spdk_bs_claim_cluster(blob->bs, *lowest_free_cluster); 130 pthread_mutex_unlock(&blob->bs->used_clusters_mutex); 131 132 if (update_map) { 133 _spdk_blob_insert_cluster(blob, cluster_num, *lowest_free_cluster); 134 } 135 136 return 0; 137 } 138 139 static void 140 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 141 { 142 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 143 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 144 assert(bs->num_free_clusters < bs->total_clusters); 145 146 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 147 148 pthread_mutex_lock(&bs->used_clusters_mutex); 149 spdk_bit_array_clear(bs->used_clusters, cluster_num); 150 bs->num_free_clusters++; 151 pthread_mutex_unlock(&bs->used_clusters_mutex); 152 } 153 154 static void 155 _spdk_blob_xattrs_init(struct spdk_blob_xattr_opts *xattrs) 156 { 157 xattrs->count = 0; 158 xattrs->names = NULL; 159 xattrs->ctx = NULL; 160 xattrs->get_value = NULL; 161 } 162 163 void 164 spdk_blob_opts_init(struct spdk_blob_opts *opts) 165 { 166 opts->num_clusters = 0; 167 opts->thin_provision = false; 168 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 169 _spdk_blob_xattrs_init(&opts->xattrs); 170 } 171 172 void 173 spdk_blob_open_opts_init(struct spdk_blob_open_opts *opts) 174 { 175 opts->clear_method = BLOB_CLEAR_WITH_DEFAULT; 176 } 177 178 static struct spdk_blob * 179 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 180 { 181 struct spdk_blob *blob; 182 183 blob = calloc(1, sizeof(*blob)); 184 if (!blob) { 185 return NULL; 186 } 187 188 blob->id = id; 189 blob->bs = bs; 190 191 blob->parent_id = SPDK_BLOBID_INVALID; 192 193 blob->state = SPDK_BLOB_STATE_DIRTY; 194 blob->active.num_pages = 1; 195 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 196 if (!blob->active.pages) { 197 free(blob); 198 return NULL; 199 } 200 201 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 202 203 TAILQ_INIT(&blob->xattrs); 204 TAILQ_INIT(&blob->xattrs_internal); 205 206 return blob; 207 } 208 209 static void 210 _spdk_xattrs_free(struct spdk_xattr_tailq *xattrs) 211 { 212 struct spdk_xattr *xattr, *xattr_tmp; 213 214 TAILQ_FOREACH_SAFE(xattr, xattrs, link, xattr_tmp) { 215 TAILQ_REMOVE(xattrs, xattr, link); 216 free(xattr->name); 217 free(xattr->value); 218 free(xattr); 219 } 220 } 221 222 static void 223 _spdk_blob_free(struct spdk_blob *blob) 224 { 225 assert(blob != NULL); 226 227 free(blob->active.clusters); 228 free(blob->clean.clusters); 229 free(blob->active.pages); 230 free(blob->clean.pages); 231 232 _spdk_xattrs_free(&blob->xattrs); 233 _spdk_xattrs_free(&blob->xattrs_internal); 234 235 if (blob->back_bs_dev) { 236 blob->back_bs_dev->destroy(blob->back_bs_dev); 237 } 238 239 free(blob); 240 } 241 242 struct freeze_io_ctx { 243 struct spdk_bs_cpl cpl; 244 struct spdk_blob *blob; 245 }; 246 247 static void 248 _spdk_blob_io_sync(struct spdk_io_channel_iter *i) 249 { 250 spdk_for_each_channel_continue(i, 0); 251 } 252 253 static void 254 _spdk_blob_execute_queued_io(struct spdk_io_channel_iter *i) 255 { 256 struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i); 257 struct spdk_bs_channel *ch = spdk_io_channel_get_ctx(_ch); 258 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 259 struct spdk_bs_request_set *set; 260 struct spdk_bs_user_op_args *args; 261 spdk_bs_user_op_t *op, *tmp; 262 263 TAILQ_FOREACH_SAFE(op, &ch->queued_io, link, tmp) { 264 set = (struct spdk_bs_request_set *)op; 265 args = &set->u.user_op; 266 267 if (args->blob == ctx->blob) { 268 TAILQ_REMOVE(&ch->queued_io, op, link); 269 spdk_bs_user_op_execute(op); 270 } 271 } 272 273 spdk_for_each_channel_continue(i, 0); 274 } 275 276 static void 277 _spdk_blob_io_cpl(struct spdk_io_channel_iter *i, int status) 278 { 279 struct freeze_io_ctx *ctx = spdk_io_channel_iter_get_ctx(i); 280 281 ctx->cpl.u.blob_basic.cb_fn(ctx->cpl.u.blob_basic.cb_arg, 0); 282 283 free(ctx); 284 } 285 286 static void 287 _spdk_blob_freeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 288 { 289 struct freeze_io_ctx *ctx; 290 291 ctx = calloc(1, sizeof(*ctx)); 292 if (!ctx) { 293 cb_fn(cb_arg, -ENOMEM); 294 return; 295 } 296 297 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 298 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 299 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 300 ctx->blob = blob; 301 302 /* Freeze I/O on blob */ 303 blob->frozen_refcnt++; 304 305 if (blob->frozen_refcnt == 1) { 306 spdk_for_each_channel(blob->bs, _spdk_blob_io_sync, ctx, _spdk_blob_io_cpl); 307 } else { 308 cb_fn(cb_arg, 0); 309 free(ctx); 310 } 311 } 312 313 static void 314 _spdk_blob_unfreeze_io(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 315 { 316 struct freeze_io_ctx *ctx; 317 318 ctx = calloc(1, sizeof(*ctx)); 319 if (!ctx) { 320 cb_fn(cb_arg, -ENOMEM); 321 return; 322 } 323 324 ctx->cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 325 ctx->cpl.u.blob_basic.cb_fn = cb_fn; 326 ctx->cpl.u.blob_basic.cb_arg = cb_arg; 327 ctx->blob = blob; 328 329 assert(blob->frozen_refcnt > 0); 330 331 blob->frozen_refcnt--; 332 333 if (blob->frozen_refcnt == 0) { 334 spdk_for_each_channel(blob->bs, _spdk_blob_execute_queued_io, ctx, _spdk_blob_io_cpl); 335 } else { 336 cb_fn(cb_arg, 0); 337 free(ctx); 338 } 339 } 340 341 static int 342 _spdk_blob_mark_clean(struct spdk_blob *blob) 343 { 344 uint64_t *clusters = NULL; 345 uint32_t *pages = NULL; 346 347 assert(blob != NULL); 348 349 if (blob->active.num_clusters) { 350 assert(blob->active.clusters); 351 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 352 if (!clusters) { 353 return -ENOMEM; 354 } 355 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*blob->active.clusters)); 356 } 357 358 if (blob->active.num_pages) { 359 assert(blob->active.pages); 360 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 361 if (!pages) { 362 free(clusters); 363 return -ENOMEM; 364 } 365 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 366 } 367 368 free(blob->clean.clusters); 369 free(blob->clean.pages); 370 371 blob->clean.num_clusters = blob->active.num_clusters; 372 blob->clean.clusters = blob->active.clusters; 373 blob->clean.num_pages = blob->active.num_pages; 374 blob->clean.pages = blob->active.pages; 375 376 blob->active.clusters = clusters; 377 blob->active.pages = pages; 378 379 /* If the metadata was dirtied again while the metadata was being written to disk, 380 * we do not want to revert the DIRTY state back to CLEAN here. 381 */ 382 if (blob->state == SPDK_BLOB_STATE_LOADING) { 383 blob->state = SPDK_BLOB_STATE_CLEAN; 384 } 385 386 return 0; 387 } 388 389 static int 390 _spdk_blob_deserialize_xattr(struct spdk_blob *blob, 391 struct spdk_blob_md_descriptor_xattr *desc_xattr, bool internal) 392 { 393 struct spdk_xattr *xattr; 394 395 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 396 sizeof(desc_xattr->value_length) + 397 desc_xattr->name_length + desc_xattr->value_length) { 398 return -EINVAL; 399 } 400 401 xattr = calloc(1, sizeof(*xattr)); 402 if (xattr == NULL) { 403 return -ENOMEM; 404 } 405 406 xattr->name = malloc(desc_xattr->name_length + 1); 407 if (xattr->name == NULL) { 408 free(xattr); 409 return -ENOMEM; 410 } 411 memcpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 412 xattr->name[desc_xattr->name_length] = '\0'; 413 414 xattr->value = malloc(desc_xattr->value_length); 415 if (xattr->value == NULL) { 416 free(xattr->name); 417 free(xattr); 418 return -ENOMEM; 419 } 420 xattr->value_len = desc_xattr->value_length; 421 memcpy(xattr->value, 422 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 423 desc_xattr->value_length); 424 425 TAILQ_INSERT_TAIL(internal ? &blob->xattrs_internal : &blob->xattrs, xattr, link); 426 427 return 0; 428 } 429 430 431 static int 432 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 433 { 434 struct spdk_blob_md_descriptor *desc; 435 size_t cur_desc = 0; 436 void *tmp; 437 438 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 439 while (cur_desc < sizeof(page->descriptors)) { 440 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 441 if (desc->length == 0) { 442 /* If padding and length are 0, this terminates the page */ 443 break; 444 } 445 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 446 struct spdk_blob_md_descriptor_flags *desc_flags; 447 448 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 449 450 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 451 return -EINVAL; 452 } 453 454 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 455 SPDK_BLOB_INVALID_FLAGS_MASK) { 456 return -EINVAL; 457 } 458 459 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 460 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 461 blob->data_ro = true; 462 blob->md_ro = true; 463 } 464 465 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 466 SPDK_BLOB_MD_RO_FLAGS_MASK) { 467 blob->md_ro = true; 468 } 469 470 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 471 blob->data_ro = true; 472 blob->md_ro = true; 473 } 474 475 blob->invalid_flags = desc_flags->invalid_flags; 476 blob->data_ro_flags = desc_flags->data_ro_flags; 477 blob->md_ro_flags = desc_flags->md_ro_flags; 478 479 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 480 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 481 unsigned int i, j; 482 unsigned int cluster_count = blob->active.num_clusters; 483 484 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 485 486 if (desc_extent_rle->length == 0 || 487 (desc_extent_rle->length % sizeof(desc_extent_rle->extents[0]) != 0)) { 488 return -EINVAL; 489 } 490 491 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 492 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 493 if (desc_extent_rle->extents[i].cluster_idx != 0) { 494 if (!spdk_bit_array_get(blob->bs->used_clusters, 495 desc_extent_rle->extents[i].cluster_idx + j)) { 496 return -EINVAL; 497 } 498 } 499 cluster_count++; 500 } 501 } 502 503 if (cluster_count == 0) { 504 return -EINVAL; 505 } 506 tmp = realloc(blob->active.clusters, cluster_count * sizeof(*blob->active.clusters)); 507 if (tmp == NULL) { 508 return -ENOMEM; 509 } 510 blob->active.clusters = tmp; 511 blob->active.cluster_array_size = cluster_count; 512 513 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 514 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 515 if (desc_extent_rle->extents[i].cluster_idx != 0) { 516 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 517 desc_extent_rle->extents[i].cluster_idx + j); 518 } else if (spdk_blob_is_thin_provisioned(blob)) { 519 blob->active.clusters[blob->active.num_clusters++] = 0; 520 } else { 521 return -EINVAL; 522 } 523 } 524 } 525 526 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 527 int rc; 528 529 rc = _spdk_blob_deserialize_xattr(blob, 530 (struct spdk_blob_md_descriptor_xattr *) desc, false); 531 if (rc != 0) { 532 return rc; 533 } 534 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 535 int rc; 536 537 rc = _spdk_blob_deserialize_xattr(blob, 538 (struct spdk_blob_md_descriptor_xattr *) desc, true); 539 if (rc != 0) { 540 return rc; 541 } 542 } else { 543 /* Unrecognized descriptor type. Do not fail - just continue to the 544 * next descriptor. If this descriptor is associated with some feature 545 * defined in a newer version of blobstore, that version of blobstore 546 * should create and set an associated feature flag to specify if this 547 * blob can be loaded or not. 548 */ 549 } 550 551 /* Advance to the next descriptor */ 552 cur_desc += sizeof(*desc) + desc->length; 553 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 554 break; 555 } 556 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 557 } 558 559 return 0; 560 } 561 562 static int 563 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 564 struct spdk_blob *blob) 565 { 566 const struct spdk_blob_md_page *page; 567 uint32_t i; 568 int rc; 569 570 assert(page_count > 0); 571 assert(pages[0].sequence_num == 0); 572 assert(blob != NULL); 573 assert(blob->state == SPDK_BLOB_STATE_LOADING); 574 assert(blob->active.clusters == NULL); 575 576 /* The blobid provided doesn't match what's in the MD, this can 577 * happen for example if a bogus blobid is passed in through open. 578 */ 579 if (blob->id != pages[0].id) { 580 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 581 blob->id, pages[0].id); 582 return -ENOENT; 583 } 584 585 for (i = 0; i < page_count; i++) { 586 page = &pages[i]; 587 588 assert(page->id == blob->id); 589 assert(page->sequence_num == i); 590 591 rc = _spdk_blob_parse_page(page, blob); 592 if (rc != 0) { 593 return rc; 594 } 595 } 596 597 return 0; 598 } 599 600 static int 601 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 602 struct spdk_blob_md_page **pages, 603 uint32_t *page_count, 604 struct spdk_blob_md_page **last_page) 605 { 606 struct spdk_blob_md_page *page; 607 608 assert(pages != NULL); 609 assert(page_count != NULL); 610 611 if (*page_count == 0) { 612 assert(*pages == NULL); 613 *page_count = 1; 614 *pages = spdk_malloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 615 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 616 } else { 617 assert(*pages != NULL); 618 (*page_count)++; 619 *pages = spdk_realloc(*pages, 620 SPDK_BS_PAGE_SIZE * (*page_count), 621 SPDK_BS_PAGE_SIZE); 622 } 623 624 if (*pages == NULL) { 625 *page_count = 0; 626 *last_page = NULL; 627 return -ENOMEM; 628 } 629 630 page = &(*pages)[*page_count - 1]; 631 memset(page, 0, sizeof(*page)); 632 page->id = blob->id; 633 page->sequence_num = *page_count - 1; 634 page->next = SPDK_INVALID_MD_PAGE; 635 *last_page = page; 636 637 return 0; 638 } 639 640 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 641 * Update required_sz on both success and failure. 642 * 643 */ 644 static int 645 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 646 uint8_t *buf, size_t buf_sz, 647 size_t *required_sz, bool internal) 648 { 649 struct spdk_blob_md_descriptor_xattr *desc; 650 651 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 652 strlen(xattr->name) + 653 xattr->value_len; 654 655 if (buf_sz < *required_sz) { 656 return -1; 657 } 658 659 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 660 661 desc->type = internal ? SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL : SPDK_MD_DESCRIPTOR_TYPE_XATTR; 662 desc->length = sizeof(desc->name_length) + 663 sizeof(desc->value_length) + 664 strlen(xattr->name) + 665 xattr->value_len; 666 desc->name_length = strlen(xattr->name); 667 desc->value_length = xattr->value_len; 668 669 memcpy(desc->name, xattr->name, desc->name_length); 670 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 671 xattr->value, 672 desc->value_length); 673 674 return 0; 675 } 676 677 static void 678 _spdk_blob_serialize_extent_rle(const struct spdk_blob *blob, 679 uint64_t start_cluster, uint64_t *next_cluster, 680 uint8_t **buf, size_t *buf_sz) 681 { 682 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 683 size_t cur_sz; 684 uint64_t i, extent_idx; 685 uint64_t lba, lba_per_cluster, lba_count; 686 687 /* The buffer must have room for at least one extent */ 688 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc_extent_rle->extents[0]); 689 if (*buf_sz < cur_sz) { 690 *next_cluster = start_cluster; 691 return; 692 } 693 694 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)*buf; 695 desc_extent_rle->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE; 696 697 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 698 699 lba = blob->active.clusters[start_cluster]; 700 lba_count = lba_per_cluster; 701 extent_idx = 0; 702 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 703 if ((lba + lba_count) == blob->active.clusters[i] && lba != 0) { 704 /* Run-length encode sequential non-zero LBA */ 705 lba_count += lba_per_cluster; 706 continue; 707 } else if (lba == 0 && blob->active.clusters[i] == 0) { 708 /* Run-length encode unallocated clusters */ 709 lba_count += lba_per_cluster; 710 continue; 711 } 712 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 713 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 714 extent_idx++; 715 716 cur_sz += sizeof(desc_extent_rle->extents[extent_idx]); 717 718 if (*buf_sz < cur_sz) { 719 /* If we ran out of buffer space, return */ 720 *next_cluster = i; 721 goto finish; 722 } 723 724 lba = blob->active.clusters[i]; 725 lba_count = lba_per_cluster; 726 } 727 728 desc_extent_rle->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 729 desc_extent_rle->extents[extent_idx].length = lba_count / lba_per_cluster; 730 extent_idx++; 731 732 *next_cluster = blob->active.num_clusters; 733 734 finish: 735 desc_extent_rle->length = sizeof(desc_extent_rle->extents[0]) * extent_idx; 736 *buf_sz -= sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 737 *buf += sizeof(struct spdk_blob_md_descriptor) + desc_extent_rle->length; 738 739 return; 740 } 741 742 static int 743 _spdk_blob_serialize_extents_rle(const struct spdk_blob *blob, 744 struct spdk_blob_md_page **pages, 745 struct spdk_blob_md_page *cur_page, 746 uint32_t *page_count, uint8_t **buf, 747 size_t *remaining_sz) 748 { 749 uint64_t last_cluster; 750 int rc; 751 752 last_cluster = 0; 753 while (last_cluster < blob->active.num_clusters) { 754 _spdk_blob_serialize_extent_rle(blob, last_cluster, &last_cluster, buf, remaining_sz); 755 756 if (last_cluster == blob->active.num_clusters) { 757 break; 758 } 759 760 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 761 if (rc < 0) { 762 return rc; 763 } 764 765 *buf = (uint8_t *)cur_page->descriptors; 766 *remaining_sz = sizeof(cur_page->descriptors); 767 } 768 769 return 0; 770 } 771 772 static void 773 _spdk_blob_serialize_flags(const struct spdk_blob *blob, 774 uint8_t *buf, size_t *buf_sz) 775 { 776 struct spdk_blob_md_descriptor_flags *desc; 777 778 /* 779 * Flags get serialized first, so we should always have room for the flags 780 * descriptor. 781 */ 782 assert(*buf_sz >= sizeof(*desc)); 783 784 desc = (struct spdk_blob_md_descriptor_flags *)buf; 785 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 786 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 787 desc->invalid_flags = blob->invalid_flags; 788 desc->data_ro_flags = blob->data_ro_flags; 789 desc->md_ro_flags = blob->md_ro_flags; 790 791 *buf_sz -= sizeof(*desc); 792 } 793 794 static int 795 _spdk_blob_serialize_xattrs(const struct spdk_blob *blob, 796 const struct spdk_xattr_tailq *xattrs, bool internal, 797 struct spdk_blob_md_page **pages, 798 struct spdk_blob_md_page *cur_page, 799 uint32_t *page_count, uint8_t **buf, 800 size_t *remaining_sz) 801 { 802 const struct spdk_xattr *xattr; 803 int rc; 804 805 TAILQ_FOREACH(xattr, xattrs, link) { 806 size_t required_sz = 0; 807 808 rc = _spdk_blob_serialize_xattr(xattr, 809 *buf, *remaining_sz, 810 &required_sz, internal); 811 if (rc < 0) { 812 /* Need to add a new page to the chain */ 813 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 814 &cur_page); 815 if (rc < 0) { 816 spdk_free(*pages); 817 *pages = NULL; 818 *page_count = 0; 819 return rc; 820 } 821 822 *buf = (uint8_t *)cur_page->descriptors; 823 *remaining_sz = sizeof(cur_page->descriptors); 824 825 /* Try again */ 826 required_sz = 0; 827 rc = _spdk_blob_serialize_xattr(xattr, 828 *buf, *remaining_sz, 829 &required_sz, internal); 830 831 if (rc < 0) { 832 spdk_free(*pages); 833 *pages = NULL; 834 *page_count = 0; 835 return rc; 836 } 837 } 838 839 *remaining_sz -= required_sz; 840 *buf += required_sz; 841 } 842 843 return 0; 844 } 845 846 static int 847 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 848 uint32_t *page_count) 849 { 850 struct spdk_blob_md_page *cur_page; 851 int rc; 852 uint8_t *buf; 853 size_t remaining_sz; 854 855 assert(pages != NULL); 856 assert(page_count != NULL); 857 assert(blob != NULL); 858 assert(blob->state == SPDK_BLOB_STATE_DIRTY); 859 860 *pages = NULL; 861 *page_count = 0; 862 863 /* A blob always has at least 1 page, even if it has no descriptors */ 864 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 865 if (rc < 0) { 866 return rc; 867 } 868 869 buf = (uint8_t *)cur_page->descriptors; 870 remaining_sz = sizeof(cur_page->descriptors); 871 872 /* Serialize flags */ 873 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 874 buf += sizeof(struct spdk_blob_md_descriptor_flags); 875 876 /* Serialize xattrs */ 877 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs, false, 878 pages, cur_page, page_count, &buf, &remaining_sz); 879 if (rc < 0) { 880 return rc; 881 } 882 883 /* Serialize internal xattrs */ 884 rc = _spdk_blob_serialize_xattrs(blob, &blob->xattrs_internal, true, 885 pages, cur_page, page_count, &buf, &remaining_sz); 886 if (rc < 0) { 887 return rc; 888 } 889 890 /* Serialize extents */ 891 rc = _spdk_blob_serialize_extents_rle(blob, pages, cur_page, page_count, &buf, &remaining_sz); 892 893 return rc; 894 } 895 896 struct spdk_blob_load_ctx { 897 struct spdk_blob *blob; 898 899 struct spdk_blob_md_page *pages; 900 uint32_t num_pages; 901 spdk_bs_sequence_t *seq; 902 903 spdk_bs_sequence_cpl cb_fn; 904 void *cb_arg; 905 }; 906 907 static uint32_t 908 _spdk_blob_md_page_calc_crc(void *page) 909 { 910 uint32_t crc; 911 912 crc = BLOB_CRC32C_INITIAL; 913 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 914 crc ^= BLOB_CRC32C_INITIAL; 915 916 return crc; 917 918 } 919 920 static void 921 _spdk_blob_load_final(void *cb_arg, int bserrno) 922 { 923 struct spdk_blob_load_ctx *ctx = cb_arg; 924 struct spdk_blob *blob = ctx->blob; 925 926 if (bserrno == 0) { 927 _spdk_blob_mark_clean(blob); 928 } 929 930 ctx->cb_fn(ctx->seq, ctx->cb_arg, bserrno); 931 932 /* Free the memory */ 933 spdk_free(ctx->pages); 934 free(ctx); 935 } 936 937 static void 938 _spdk_blob_load_snapshot_cpl(void *cb_arg, struct spdk_blob *snapshot, int bserrno) 939 { 940 struct spdk_blob_load_ctx *ctx = cb_arg; 941 struct spdk_blob *blob = ctx->blob; 942 943 if (bserrno == 0) { 944 blob->back_bs_dev = spdk_bs_create_blob_bs_dev(snapshot); 945 if (blob->back_bs_dev == NULL) { 946 bserrno = -ENOMEM; 947 } 948 } 949 if (bserrno != 0) { 950 SPDK_ERRLOG("Snapshot fail\n"); 951 } 952 953 _spdk_blob_load_final(ctx, bserrno); 954 } 955 956 static void _spdk_blob_update_clear_method(struct spdk_blob *blob); 957 958 static void 959 _spdk_blob_load_backing_dev(void *cb_arg) 960 { 961 struct spdk_blob_load_ctx *ctx = cb_arg; 962 struct spdk_blob *blob = ctx->blob; 963 const void *value; 964 size_t len; 965 int rc; 966 967 if (spdk_blob_is_thin_provisioned(blob)) { 968 rc = _spdk_blob_get_xattr_value(blob, BLOB_SNAPSHOT, &value, &len, true); 969 if (rc == 0) { 970 if (len != sizeof(spdk_blob_id)) { 971 _spdk_blob_load_final(ctx, -EINVAL); 972 return; 973 } 974 /* open snapshot blob and continue in the callback function */ 975 blob->parent_id = *(spdk_blob_id *)value; 976 spdk_bs_open_blob(blob->bs, blob->parent_id, 977 _spdk_blob_load_snapshot_cpl, ctx); 978 return; 979 } else { 980 /* add zeroes_dev for thin provisioned blob */ 981 blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 982 } 983 } else { 984 /* standard blob */ 985 blob->back_bs_dev = NULL; 986 } 987 _spdk_blob_load_final(ctx, 0); 988 } 989 990 static void 991 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 992 { 993 struct spdk_blob_load_ctx *ctx = cb_arg; 994 struct spdk_blob *blob = ctx->blob; 995 struct spdk_blob_md_page *page; 996 int rc; 997 uint32_t crc; 998 999 if (bserrno) { 1000 SPDK_ERRLOG("Metadata page read failed: %d\n", bserrno); 1001 _spdk_blob_load_final(ctx, bserrno); 1002 return; 1003 } 1004 1005 page = &ctx->pages[ctx->num_pages - 1]; 1006 crc = _spdk_blob_md_page_calc_crc(page); 1007 if (crc != page->crc) { 1008 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 1009 _spdk_blob_load_final(ctx, -EINVAL); 1010 return; 1011 } 1012 1013 if (page->next != SPDK_INVALID_MD_PAGE) { 1014 uint32_t next_page = page->next; 1015 uint64_t next_lba = _spdk_bs_md_page_to_lba(blob->bs, next_page); 1016 1017 /* Read the next page */ 1018 ctx->num_pages++; 1019 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 1020 sizeof(*page)); 1021 if (ctx->pages == NULL) { 1022 _spdk_blob_load_final(ctx, -ENOMEM); 1023 return; 1024 } 1025 1026 spdk_bs_sequence_read_dev(seq, &ctx->pages[ctx->num_pages - 1], 1027 next_lba, 1028 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 1029 _spdk_blob_load_cpl, ctx); 1030 return; 1031 } 1032 1033 /* Parse the pages */ 1034 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 1035 if (rc) { 1036 _spdk_blob_load_final(ctx, rc); 1037 return; 1038 } 1039 ctx->seq = seq; 1040 1041 /* Check the clear_method stored in metadata vs what may have been passed 1042 * via spdk_bs_open_blob_ext() and update accordingly. 1043 */ 1044 _spdk_blob_update_clear_method(blob); 1045 1046 _spdk_blob_load_backing_dev(ctx); 1047 } 1048 1049 /* Load a blob from disk given a blobid */ 1050 static void 1051 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1052 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1053 { 1054 struct spdk_blob_load_ctx *ctx; 1055 struct spdk_blob_store *bs; 1056 uint32_t page_num; 1057 uint64_t lba; 1058 1059 _spdk_blob_verify_md_op(blob); 1060 1061 bs = blob->bs; 1062 1063 ctx = calloc(1, sizeof(*ctx)); 1064 if (!ctx) { 1065 cb_fn(seq, cb_arg, -ENOMEM); 1066 return; 1067 } 1068 1069 ctx->blob = blob; 1070 ctx->pages = spdk_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE); 1071 if (!ctx->pages) { 1072 free(ctx); 1073 cb_fn(seq, cb_arg, -ENOMEM); 1074 return; 1075 } 1076 ctx->num_pages = 1; 1077 ctx->cb_fn = cb_fn; 1078 ctx->cb_arg = cb_arg; 1079 ctx->seq = seq; 1080 1081 page_num = _spdk_bs_blobid_to_page(blob->id); 1082 lba = _spdk_bs_md_page_to_lba(blob->bs, page_num); 1083 1084 blob->state = SPDK_BLOB_STATE_LOADING; 1085 1086 spdk_bs_sequence_read_dev(seq, &ctx->pages[0], lba, 1087 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 1088 _spdk_blob_load_cpl, ctx); 1089 } 1090 1091 struct spdk_blob_persist_ctx { 1092 struct spdk_blob *blob; 1093 1094 struct spdk_bs_super_block *super; 1095 1096 struct spdk_blob_md_page *pages; 1097 1098 spdk_bs_sequence_t *seq; 1099 spdk_bs_sequence_cpl cb_fn; 1100 void *cb_arg; 1101 }; 1102 1103 static void 1104 spdk_bs_batch_clear_dev(struct spdk_blob_persist_ctx *ctx, spdk_bs_batch_t *batch, uint64_t lba, 1105 uint32_t lba_count) 1106 { 1107 switch (ctx->blob->clear_method) { 1108 case BLOB_CLEAR_WITH_DEFAULT: 1109 case BLOB_CLEAR_WITH_UNMAP: 1110 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1111 break; 1112 case BLOB_CLEAR_WITH_WRITE_ZEROES: 1113 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1114 break; 1115 case BLOB_CLEAR_WITH_NONE: 1116 default: 1117 break; 1118 } 1119 } 1120 1121 static void 1122 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1123 { 1124 struct spdk_blob_persist_ctx *ctx = cb_arg; 1125 struct spdk_blob *blob = ctx->blob; 1126 1127 if (bserrno == 0) { 1128 _spdk_blob_mark_clean(blob); 1129 } 1130 1131 /* Call user callback */ 1132 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 1133 1134 /* Free the memory */ 1135 spdk_free(ctx->pages); 1136 free(ctx); 1137 } 1138 1139 static void 1140 _spdk_blob_persist_clear_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1141 { 1142 struct spdk_blob_persist_ctx *ctx = cb_arg; 1143 struct spdk_blob *blob = ctx->blob; 1144 struct spdk_blob_store *bs = blob->bs; 1145 size_t i; 1146 1147 /* Release all clusters that were truncated */ 1148 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1149 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 1150 1151 /* Nothing to release if it was not allocated */ 1152 if (blob->active.clusters[i] != 0) { 1153 _spdk_bs_release_cluster(bs, cluster_num); 1154 } 1155 } 1156 1157 if (blob->active.num_clusters == 0) { 1158 free(blob->active.clusters); 1159 blob->active.clusters = NULL; 1160 blob->active.cluster_array_size = 0; 1161 } else if (blob->active.num_clusters != blob->active.cluster_array_size) { 1162 #ifndef __clang_analyzer__ 1163 void *tmp; 1164 1165 /* scan-build really can't figure reallocs, workaround it */ 1166 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * blob->active.num_clusters); 1167 assert(tmp != NULL); 1168 blob->active.clusters = tmp; 1169 #endif 1170 blob->active.cluster_array_size = blob->active.num_clusters; 1171 } 1172 1173 _spdk_blob_persist_complete(seq, ctx, bserrno); 1174 } 1175 1176 static void 1177 _spdk_blob_persist_clear_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1178 { 1179 struct spdk_blob_persist_ctx *ctx = cb_arg; 1180 struct spdk_blob *blob = ctx->blob; 1181 struct spdk_blob_store *bs = blob->bs; 1182 spdk_bs_batch_t *batch; 1183 size_t i; 1184 uint64_t lba; 1185 uint32_t lba_count; 1186 1187 /* Clusters don't move around in blobs. The list shrinks or grows 1188 * at the end, but no changes ever occur in the middle of the list. 1189 */ 1190 1191 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_clear_clusters_cpl, ctx); 1192 1193 /* Clear all clusters that were truncated */ 1194 lba = 0; 1195 lba_count = 0; 1196 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 1197 uint64_t next_lba = blob->active.clusters[i]; 1198 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 1199 1200 if (next_lba > 0 && (lba + lba_count) == next_lba) { 1201 /* This cluster is contiguous with the previous one. */ 1202 lba_count += next_lba_count; 1203 continue; 1204 } 1205 1206 /* This cluster is not contiguous with the previous one. */ 1207 1208 /* If a run of LBAs previously existing, clear them now */ 1209 if (lba_count > 0) { 1210 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1211 } 1212 1213 /* Start building the next batch */ 1214 lba = next_lba; 1215 if (next_lba > 0) { 1216 lba_count = next_lba_count; 1217 } else { 1218 lba_count = 0; 1219 } 1220 } 1221 1222 /* If we ended with a contiguous set of LBAs, clear them now */ 1223 if (lba_count > 0) { 1224 spdk_bs_batch_clear_dev(ctx, batch, lba, lba_count); 1225 } 1226 1227 spdk_bs_batch_close(batch); 1228 } 1229 1230 static void 1231 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1232 { 1233 struct spdk_blob_persist_ctx *ctx = cb_arg; 1234 struct spdk_blob *blob = ctx->blob; 1235 struct spdk_blob_store *bs = blob->bs; 1236 size_t i; 1237 1238 /* This loop starts at 1 because the first page is special and handled 1239 * below. The pages (except the first) are never written in place, 1240 * so any pages in the clean list must be zeroed. 1241 */ 1242 for (i = 1; i < blob->clean.num_pages; i++) { 1243 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 1244 } 1245 1246 if (blob->active.num_pages == 0) { 1247 uint32_t page_num; 1248 1249 page_num = _spdk_bs_blobid_to_page(blob->id); 1250 spdk_bit_array_clear(bs->used_md_pages, page_num); 1251 } 1252 1253 /* Move on to clearing clusters */ 1254 _spdk_blob_persist_clear_clusters(seq, ctx, 0); 1255 } 1256 1257 static void 1258 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1259 { 1260 struct spdk_blob_persist_ctx *ctx = cb_arg; 1261 struct spdk_blob *blob = ctx->blob; 1262 struct spdk_blob_store *bs = blob->bs; 1263 uint64_t lba; 1264 uint32_t lba_count; 1265 spdk_bs_batch_t *batch; 1266 size_t i; 1267 1268 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 1269 1270 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 1271 1272 /* This loop starts at 1 because the first page is special and handled 1273 * below. The pages (except the first) are never written in place, 1274 * so any pages in the clean list must be zeroed. 1275 */ 1276 for (i = 1; i < blob->clean.num_pages; i++) { 1277 lba = _spdk_bs_md_page_to_lba(bs, blob->clean.pages[i]); 1278 1279 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1280 } 1281 1282 /* The first page will only be zeroed if this is a delete. */ 1283 if (blob->active.num_pages == 0) { 1284 uint32_t page_num; 1285 1286 /* The first page in the metadata goes where the blobid indicates */ 1287 page_num = _spdk_bs_blobid_to_page(blob->id); 1288 lba = _spdk_bs_md_page_to_lba(bs, page_num); 1289 1290 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1291 } 1292 1293 spdk_bs_batch_close(batch); 1294 } 1295 1296 static void 1297 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1298 { 1299 struct spdk_blob_persist_ctx *ctx = cb_arg; 1300 struct spdk_blob *blob = ctx->blob; 1301 struct spdk_blob_store *bs = blob->bs; 1302 uint64_t lba; 1303 uint32_t lba_count; 1304 struct spdk_blob_md_page *page; 1305 1306 if (blob->active.num_pages == 0) { 1307 /* Move on to the next step */ 1308 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1309 return; 1310 } 1311 1312 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1313 1314 page = &ctx->pages[0]; 1315 /* The first page in the metadata goes where the blobid indicates */ 1316 lba = _spdk_bs_md_page_to_lba(bs, _spdk_bs_blobid_to_page(blob->id)); 1317 1318 spdk_bs_sequence_write_dev(seq, page, lba, lba_count, 1319 _spdk_blob_persist_zero_pages, ctx); 1320 } 1321 1322 static void 1323 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1324 { 1325 struct spdk_blob_persist_ctx *ctx = cb_arg; 1326 struct spdk_blob *blob = ctx->blob; 1327 struct spdk_blob_store *bs = blob->bs; 1328 uint64_t lba; 1329 uint32_t lba_count; 1330 struct spdk_blob_md_page *page; 1331 spdk_bs_batch_t *batch; 1332 size_t i; 1333 1334 /* Clusters don't move around in blobs. The list shrinks or grows 1335 * at the end, but no changes ever occur in the middle of the list. 1336 */ 1337 1338 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 1339 1340 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 1341 1342 /* This starts at 1. The root page is not written until 1343 * all of the others are finished 1344 */ 1345 for (i = 1; i < blob->active.num_pages; i++) { 1346 page = &ctx->pages[i]; 1347 assert(page->sequence_num == i); 1348 1349 lba = _spdk_bs_md_page_to_lba(bs, blob->active.pages[i]); 1350 1351 spdk_bs_batch_write_dev(batch, page, lba, lba_count); 1352 } 1353 1354 spdk_bs_batch_close(batch); 1355 } 1356 1357 static int 1358 _spdk_blob_resize(struct spdk_blob *blob, uint64_t sz) 1359 { 1360 uint64_t i; 1361 uint64_t *tmp; 1362 uint64_t lfc; /* lowest free cluster */ 1363 uint64_t num_clusters; 1364 struct spdk_blob_store *bs; 1365 1366 bs = blob->bs; 1367 1368 _spdk_blob_verify_md_op(blob); 1369 1370 if (blob->active.num_clusters == sz) { 1371 return 0; 1372 } 1373 1374 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1375 /* If this blob was resized to be larger, then smaller, then 1376 * larger without syncing, then the cluster array already 1377 * contains spare assigned clusters we can use. 1378 */ 1379 num_clusters = spdk_min(blob->active.cluster_array_size, 1380 sz); 1381 } else { 1382 num_clusters = blob->active.num_clusters; 1383 } 1384 1385 /* Do two passes - one to verify that we can obtain enough clusters 1386 * and another to actually claim them. 1387 */ 1388 1389 if (spdk_blob_is_thin_provisioned(blob) == false) { 1390 lfc = 0; 1391 for (i = num_clusters; i < sz; i++) { 1392 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1393 if (lfc == UINT32_MAX) { 1394 /* No more free clusters. Cannot satisfy the request */ 1395 return -ENOSPC; 1396 } 1397 lfc++; 1398 } 1399 } 1400 1401 if (sz > num_clusters) { 1402 /* Expand the cluster array if necessary. 1403 * We only shrink the array when persisting. 1404 */ 1405 tmp = realloc(blob->active.clusters, sizeof(*blob->active.clusters) * sz); 1406 if (sz > 0 && tmp == NULL) { 1407 return -ENOMEM; 1408 } 1409 memset(tmp + blob->active.cluster_array_size, 0, 1410 sizeof(*blob->active.clusters) * (sz - blob->active.cluster_array_size)); 1411 blob->active.clusters = tmp; 1412 blob->active.cluster_array_size = sz; 1413 } 1414 1415 blob->state = SPDK_BLOB_STATE_DIRTY; 1416 1417 if (spdk_blob_is_thin_provisioned(blob) == false) { 1418 lfc = 0; 1419 for (i = num_clusters; i < sz; i++) { 1420 _spdk_bs_allocate_cluster(blob, i, &lfc, true); 1421 lfc++; 1422 } 1423 } 1424 1425 blob->active.num_clusters = sz; 1426 1427 return 0; 1428 } 1429 1430 static void 1431 _spdk_blob_persist_generate_new_md(struct spdk_blob_persist_ctx *ctx) 1432 { 1433 spdk_bs_sequence_t *seq = ctx->seq; 1434 struct spdk_blob *blob = ctx->blob; 1435 struct spdk_blob_store *bs = blob->bs; 1436 uint64_t i; 1437 uint32_t page_num; 1438 void *tmp; 1439 int rc; 1440 1441 /* Generate the new metadata */ 1442 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1443 if (rc < 0) { 1444 _spdk_blob_persist_complete(seq, ctx, rc); 1445 return; 1446 } 1447 1448 assert(blob->active.num_pages >= 1); 1449 1450 /* Resize the cache of page indices */ 1451 tmp = realloc(blob->active.pages, blob->active.num_pages * sizeof(*blob->active.pages)); 1452 if (!tmp) { 1453 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1454 return; 1455 } 1456 blob->active.pages = tmp; 1457 1458 /* Assign this metadata to pages. This requires two passes - 1459 * one to verify that there are enough pages and a second 1460 * to actually claim them. */ 1461 page_num = 0; 1462 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1463 for (i = 1; i < blob->active.num_pages; i++) { 1464 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1465 if (page_num == UINT32_MAX) { 1466 _spdk_blob_persist_complete(seq, ctx, -ENOMEM); 1467 return; 1468 } 1469 page_num++; 1470 } 1471 1472 page_num = 0; 1473 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1474 for (i = 1; i < blob->active.num_pages; i++) { 1475 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1476 ctx->pages[i - 1].next = page_num; 1477 /* Now that previous metadata page is complete, calculate the crc for it. */ 1478 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1479 blob->active.pages[i] = page_num; 1480 spdk_bit_array_set(bs->used_md_pages, page_num); 1481 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1482 page_num++; 1483 } 1484 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1485 /* Start writing the metadata from last page to first */ 1486 blob->state = SPDK_BLOB_STATE_CLEAN; 1487 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1488 } 1489 1490 static void 1491 _spdk_blob_persist_start(struct spdk_blob_persist_ctx *ctx) 1492 { 1493 spdk_bs_sequence_t *seq = ctx->seq; 1494 struct spdk_blob *blob = ctx->blob; 1495 1496 if (blob->active.num_pages == 0) { 1497 /* This is the signal that the blob should be deleted. 1498 * Immediately jump to the clean up routine. */ 1499 assert(blob->clean.num_pages > 0); 1500 blob->state = SPDK_BLOB_STATE_CLEAN; 1501 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1502 return; 1503 1504 } 1505 1506 _spdk_blob_persist_generate_new_md(ctx); 1507 } 1508 1509 static void 1510 _spdk_blob_persist_dirty_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1511 { 1512 struct spdk_blob_persist_ctx *ctx = cb_arg; 1513 1514 ctx->blob->bs->clean = 0; 1515 1516 spdk_free(ctx->super); 1517 1518 _spdk_blob_persist_start(ctx); 1519 } 1520 1521 static void 1522 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1523 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg); 1524 1525 1526 static void 1527 _spdk_blob_persist_dirty(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1528 { 1529 struct spdk_blob_persist_ctx *ctx = cb_arg; 1530 1531 ctx->super->clean = 0; 1532 if (ctx->super->size == 0) { 1533 ctx->super->size = ctx->blob->bs->dev->blockcnt * ctx->blob->bs->dev->blocklen; 1534 } 1535 1536 _spdk_bs_write_super(seq, ctx->blob->bs, ctx->super, _spdk_blob_persist_dirty_cpl, ctx); 1537 } 1538 1539 1540 /* Write a blob to disk */ 1541 static void 1542 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1543 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1544 { 1545 struct spdk_blob_persist_ctx *ctx; 1546 1547 _spdk_blob_verify_md_op(blob); 1548 1549 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1550 cb_fn(seq, cb_arg, 0); 1551 return; 1552 } 1553 1554 ctx = calloc(1, sizeof(*ctx)); 1555 if (!ctx) { 1556 cb_fn(seq, cb_arg, -ENOMEM); 1557 return; 1558 } 1559 ctx->blob = blob; 1560 ctx->seq = seq; 1561 ctx->cb_fn = cb_fn; 1562 ctx->cb_arg = cb_arg; 1563 1564 if (blob->bs->clean) { 1565 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 1566 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1567 if (!ctx->super) { 1568 cb_fn(seq, cb_arg, -ENOMEM); 1569 free(ctx); 1570 return; 1571 } 1572 1573 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(blob->bs, 0), 1574 _spdk_bs_byte_to_lba(blob->bs, sizeof(*ctx->super)), 1575 _spdk_blob_persist_dirty, ctx); 1576 } else { 1577 _spdk_blob_persist_start(ctx); 1578 } 1579 } 1580 1581 struct spdk_blob_copy_cluster_ctx { 1582 struct spdk_blob *blob; 1583 uint8_t *buf; 1584 uint64_t page; 1585 uint64_t new_cluster; 1586 spdk_bs_sequence_t *seq; 1587 }; 1588 1589 static void 1590 _spdk_blob_allocate_and_copy_cluster_cpl(void *cb_arg, int bserrno) 1591 { 1592 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1593 struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)ctx->seq; 1594 TAILQ_HEAD(, spdk_bs_request_set) requests; 1595 spdk_bs_user_op_t *op; 1596 1597 TAILQ_INIT(&requests); 1598 TAILQ_SWAP(&set->channel->need_cluster_alloc, &requests, spdk_bs_request_set, link); 1599 1600 while (!TAILQ_EMPTY(&requests)) { 1601 op = TAILQ_FIRST(&requests); 1602 TAILQ_REMOVE(&requests, op, link); 1603 if (bserrno == 0) { 1604 spdk_bs_user_op_execute(op); 1605 } else { 1606 spdk_bs_user_op_abort(op); 1607 } 1608 } 1609 1610 spdk_free(ctx->buf); 1611 free(ctx); 1612 } 1613 1614 static void 1615 _spdk_blob_insert_cluster_cpl(void *cb_arg, int bserrno) 1616 { 1617 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1618 1619 if (bserrno) { 1620 if (bserrno == -EEXIST) { 1621 /* The metadata insert failed because another thread 1622 * allocated the cluster first. Free our cluster 1623 * but continue without error. */ 1624 bserrno = 0; 1625 } 1626 _spdk_bs_release_cluster(ctx->blob->bs, ctx->new_cluster); 1627 } 1628 1629 spdk_bs_sequence_finish(ctx->seq, bserrno); 1630 } 1631 1632 static void 1633 _spdk_blob_write_copy_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1634 { 1635 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1636 uint32_t cluster_number; 1637 1638 if (bserrno) { 1639 /* The write failed, so jump to the final completion handler */ 1640 spdk_bs_sequence_finish(seq, bserrno); 1641 return; 1642 } 1643 1644 cluster_number = _spdk_bs_page_to_cluster(ctx->blob->bs, ctx->page); 1645 1646 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1647 _spdk_blob_insert_cluster_cpl, ctx); 1648 } 1649 1650 static void 1651 _spdk_blob_write_copy(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1652 { 1653 struct spdk_blob_copy_cluster_ctx *ctx = cb_arg; 1654 1655 if (bserrno != 0) { 1656 /* The read failed, so jump to the final completion handler */ 1657 spdk_bs_sequence_finish(seq, bserrno); 1658 return; 1659 } 1660 1661 /* Write whole cluster */ 1662 spdk_bs_sequence_write_dev(seq, ctx->buf, 1663 _spdk_bs_cluster_to_lba(ctx->blob->bs, ctx->new_cluster), 1664 _spdk_bs_cluster_to_lba(ctx->blob->bs, 1), 1665 _spdk_blob_write_copy_cpl, ctx); 1666 } 1667 1668 static void 1669 _spdk_bs_allocate_and_copy_cluster(struct spdk_blob *blob, 1670 struct spdk_io_channel *_ch, 1671 uint64_t io_unit, spdk_bs_user_op_t *op) 1672 { 1673 struct spdk_bs_cpl cpl; 1674 struct spdk_bs_channel *ch; 1675 struct spdk_blob_copy_cluster_ctx *ctx; 1676 uint32_t cluster_start_page; 1677 uint32_t cluster_number; 1678 int rc; 1679 1680 ch = spdk_io_channel_get_ctx(_ch); 1681 1682 if (!TAILQ_EMPTY(&ch->need_cluster_alloc)) { 1683 /* There are already operations pending. Queue this user op 1684 * and return because it will be re-executed when the outstanding 1685 * cluster allocation completes. */ 1686 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1687 return; 1688 } 1689 1690 /* Round the io_unit offset down to the first page in the cluster */ 1691 cluster_start_page = _spdk_bs_io_unit_to_cluster_start(blob, io_unit); 1692 1693 /* Calculate which index in the metadata cluster array the corresponding 1694 * cluster is supposed to be at. */ 1695 cluster_number = _spdk_bs_io_unit_to_cluster_number(blob, io_unit); 1696 1697 ctx = calloc(1, sizeof(*ctx)); 1698 if (!ctx) { 1699 spdk_bs_user_op_abort(op); 1700 return; 1701 } 1702 1703 assert(blob->bs->cluster_sz % blob->back_bs_dev->blocklen == 0); 1704 1705 ctx->blob = blob; 1706 ctx->page = cluster_start_page; 1707 1708 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1709 ctx->buf = spdk_malloc(blob->bs->cluster_sz, blob->back_bs_dev->blocklen, 1710 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1711 if (!ctx->buf) { 1712 SPDK_ERRLOG("DMA allocation for cluster of size = %" PRIu32 " failed.\n", 1713 blob->bs->cluster_sz); 1714 free(ctx); 1715 spdk_bs_user_op_abort(op); 1716 return; 1717 } 1718 } 1719 1720 rc = _spdk_bs_allocate_cluster(blob, cluster_number, &ctx->new_cluster, false); 1721 if (rc != 0) { 1722 spdk_free(ctx->buf); 1723 free(ctx); 1724 spdk_bs_user_op_abort(op); 1725 return; 1726 } 1727 1728 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1729 cpl.u.blob_basic.cb_fn = _spdk_blob_allocate_and_copy_cluster_cpl; 1730 cpl.u.blob_basic.cb_arg = ctx; 1731 1732 ctx->seq = spdk_bs_sequence_start(_ch, &cpl); 1733 if (!ctx->seq) { 1734 _spdk_bs_release_cluster(blob->bs, ctx->new_cluster); 1735 spdk_free(ctx->buf); 1736 free(ctx); 1737 spdk_bs_user_op_abort(op); 1738 return; 1739 } 1740 1741 /* Queue the user op to block other incoming operations */ 1742 TAILQ_INSERT_TAIL(&ch->need_cluster_alloc, op, link); 1743 1744 if (blob->parent_id != SPDK_BLOBID_INVALID) { 1745 /* Read cluster from backing device */ 1746 spdk_bs_sequence_read_bs_dev(ctx->seq, blob->back_bs_dev, ctx->buf, 1747 _spdk_bs_dev_page_to_lba(blob->back_bs_dev, cluster_start_page), 1748 _spdk_bs_dev_byte_to_lba(blob->back_bs_dev, blob->bs->cluster_sz), 1749 _spdk_blob_write_copy, ctx); 1750 } else { 1751 _spdk_blob_insert_cluster_on_md_thread(ctx->blob, cluster_number, ctx->new_cluster, 1752 _spdk_blob_insert_cluster_cpl, ctx); 1753 } 1754 } 1755 1756 static void 1757 _spdk_blob_calculate_lba_and_lba_count(struct spdk_blob *blob, uint64_t io_unit, uint64_t length, 1758 uint64_t *lba, uint32_t *lba_count) 1759 { 1760 *lba_count = length; 1761 1762 if (!_spdk_bs_io_unit_is_allocated(blob, io_unit)) { 1763 assert(blob->back_bs_dev != NULL); 1764 *lba = _spdk_bs_io_unit_to_back_dev_lba(blob, io_unit); 1765 *lba_count = _spdk_bs_io_unit_to_back_dev_lba(blob, *lba_count); 1766 } else { 1767 *lba = _spdk_bs_blob_io_unit_to_lba(blob, io_unit); 1768 } 1769 } 1770 1771 struct op_split_ctx { 1772 struct spdk_blob *blob; 1773 struct spdk_io_channel *channel; 1774 uint64_t io_unit_offset; 1775 uint64_t io_units_remaining; 1776 void *curr_payload; 1777 enum spdk_blob_op_type op_type; 1778 spdk_bs_sequence_t *seq; 1779 }; 1780 1781 static void 1782 _spdk_blob_request_submit_op_split_next(void *cb_arg, int bserrno) 1783 { 1784 struct op_split_ctx *ctx = cb_arg; 1785 struct spdk_blob *blob = ctx->blob; 1786 struct spdk_io_channel *ch = ctx->channel; 1787 enum spdk_blob_op_type op_type = ctx->op_type; 1788 uint8_t *buf = ctx->curr_payload; 1789 uint64_t offset = ctx->io_unit_offset; 1790 uint64_t length = ctx->io_units_remaining; 1791 uint64_t op_length; 1792 1793 if (bserrno != 0 || ctx->io_units_remaining == 0) { 1794 spdk_bs_sequence_finish(ctx->seq, bserrno); 1795 free(ctx); 1796 return; 1797 } 1798 1799 op_length = spdk_min(length, _spdk_bs_num_io_units_to_cluster_boundary(blob, 1800 offset)); 1801 1802 /* Update length and payload for next operation */ 1803 ctx->io_units_remaining -= op_length; 1804 ctx->io_unit_offset += op_length; 1805 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1806 ctx->curr_payload += op_length * blob->bs->io_unit_size; 1807 } 1808 1809 switch (op_type) { 1810 case SPDK_BLOB_READ: 1811 spdk_blob_io_read(blob, ch, buf, offset, op_length, 1812 _spdk_blob_request_submit_op_split_next, ctx); 1813 break; 1814 case SPDK_BLOB_WRITE: 1815 spdk_blob_io_write(blob, ch, buf, offset, op_length, 1816 _spdk_blob_request_submit_op_split_next, ctx); 1817 break; 1818 case SPDK_BLOB_UNMAP: 1819 spdk_blob_io_unmap(blob, ch, offset, op_length, 1820 _spdk_blob_request_submit_op_split_next, ctx); 1821 break; 1822 case SPDK_BLOB_WRITE_ZEROES: 1823 spdk_blob_io_write_zeroes(blob, ch, offset, op_length, 1824 _spdk_blob_request_submit_op_split_next, ctx); 1825 break; 1826 case SPDK_BLOB_READV: 1827 case SPDK_BLOB_WRITEV: 1828 SPDK_ERRLOG("readv/write not valid\n"); 1829 spdk_bs_sequence_finish(ctx->seq, -EINVAL); 1830 free(ctx); 1831 break; 1832 } 1833 } 1834 1835 static void 1836 _spdk_blob_request_submit_op_split(struct spdk_io_channel *ch, struct spdk_blob *blob, 1837 void *payload, uint64_t offset, uint64_t length, 1838 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1839 { 1840 struct op_split_ctx *ctx; 1841 spdk_bs_sequence_t *seq; 1842 struct spdk_bs_cpl cpl; 1843 1844 assert(blob != NULL); 1845 1846 ctx = calloc(1, sizeof(struct op_split_ctx)); 1847 if (ctx == NULL) { 1848 cb_fn(cb_arg, -ENOMEM); 1849 return; 1850 } 1851 1852 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1853 cpl.u.blob_basic.cb_fn = cb_fn; 1854 cpl.u.blob_basic.cb_arg = cb_arg; 1855 1856 seq = spdk_bs_sequence_start(ch, &cpl); 1857 if (!seq) { 1858 free(ctx); 1859 cb_fn(cb_arg, -ENOMEM); 1860 return; 1861 } 1862 1863 ctx->blob = blob; 1864 ctx->channel = ch; 1865 ctx->curr_payload = payload; 1866 ctx->io_unit_offset = offset; 1867 ctx->io_units_remaining = length; 1868 ctx->op_type = op_type; 1869 ctx->seq = seq; 1870 1871 _spdk_blob_request_submit_op_split_next(ctx, 0); 1872 } 1873 1874 static void 1875 _spdk_blob_request_submit_op_single(struct spdk_io_channel *_ch, struct spdk_blob *blob, 1876 void *payload, uint64_t offset, uint64_t length, 1877 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1878 { 1879 struct spdk_bs_cpl cpl; 1880 uint64_t lba; 1881 uint32_t lba_count; 1882 1883 assert(blob != NULL); 1884 1885 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1886 cpl.u.blob_basic.cb_fn = cb_fn; 1887 cpl.u.blob_basic.cb_arg = cb_arg; 1888 1889 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 1890 1891 if (blob->frozen_refcnt) { 1892 /* This blob I/O is frozen */ 1893 spdk_bs_user_op_t *op; 1894 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_ch); 1895 1896 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1897 if (!op) { 1898 cb_fn(cb_arg, -ENOMEM); 1899 return; 1900 } 1901 1902 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 1903 1904 return; 1905 } 1906 1907 switch (op_type) { 1908 case SPDK_BLOB_READ: { 1909 spdk_bs_batch_t *batch; 1910 1911 batch = spdk_bs_batch_open(_ch, &cpl); 1912 if (!batch) { 1913 cb_fn(cb_arg, -ENOMEM); 1914 return; 1915 } 1916 1917 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1918 /* Read from the blob */ 1919 spdk_bs_batch_read_dev(batch, payload, lba, lba_count); 1920 } else { 1921 /* Read from the backing block device */ 1922 spdk_bs_batch_read_bs_dev(batch, blob->back_bs_dev, payload, lba, lba_count); 1923 } 1924 1925 spdk_bs_batch_close(batch); 1926 break; 1927 } 1928 case SPDK_BLOB_WRITE: 1929 case SPDK_BLOB_WRITE_ZEROES: { 1930 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1931 /* Write to the blob */ 1932 spdk_bs_batch_t *batch; 1933 1934 if (lba_count == 0) { 1935 cb_fn(cb_arg, 0); 1936 return; 1937 } 1938 1939 batch = spdk_bs_batch_open(_ch, &cpl); 1940 if (!batch) { 1941 cb_fn(cb_arg, -ENOMEM); 1942 return; 1943 } 1944 1945 if (op_type == SPDK_BLOB_WRITE) { 1946 spdk_bs_batch_write_dev(batch, payload, lba, lba_count); 1947 } else { 1948 spdk_bs_batch_write_zeroes_dev(batch, lba, lba_count); 1949 } 1950 1951 spdk_bs_batch_close(batch); 1952 } else { 1953 /* Queue this operation and allocate the cluster */ 1954 spdk_bs_user_op_t *op; 1955 1956 op = spdk_bs_user_op_alloc(_ch, &cpl, op_type, blob, payload, 0, offset, length); 1957 if (!op) { 1958 cb_fn(cb_arg, -ENOMEM); 1959 return; 1960 } 1961 1962 _spdk_bs_allocate_and_copy_cluster(blob, _ch, offset, op); 1963 } 1964 break; 1965 } 1966 case SPDK_BLOB_UNMAP: { 1967 spdk_bs_batch_t *batch; 1968 1969 batch = spdk_bs_batch_open(_ch, &cpl); 1970 if (!batch) { 1971 cb_fn(cb_arg, -ENOMEM); 1972 return; 1973 } 1974 1975 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 1976 spdk_bs_batch_unmap_dev(batch, lba, lba_count); 1977 } 1978 1979 spdk_bs_batch_close(batch); 1980 break; 1981 } 1982 case SPDK_BLOB_READV: 1983 case SPDK_BLOB_WRITEV: 1984 SPDK_ERRLOG("readv/write not valid\n"); 1985 cb_fn(cb_arg, -EINVAL); 1986 break; 1987 } 1988 } 1989 1990 static void 1991 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1992 void *payload, uint64_t offset, uint64_t length, 1993 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1994 { 1995 assert(blob != NULL); 1996 1997 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1998 cb_fn(cb_arg, -EPERM); 1999 return; 2000 } 2001 2002 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2003 cb_fn(cb_arg, -EINVAL); 2004 return; 2005 } 2006 if (length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset)) { 2007 _spdk_blob_request_submit_op_single(_channel, blob, payload, offset, length, 2008 cb_fn, cb_arg, op_type); 2009 } else { 2010 _spdk_blob_request_submit_op_split(_channel, blob, payload, offset, length, 2011 cb_fn, cb_arg, op_type); 2012 } 2013 } 2014 2015 struct rw_iov_ctx { 2016 struct spdk_blob *blob; 2017 struct spdk_io_channel *channel; 2018 spdk_blob_op_complete cb_fn; 2019 void *cb_arg; 2020 bool read; 2021 int iovcnt; 2022 struct iovec *orig_iov; 2023 uint64_t io_unit_offset; 2024 uint64_t io_units_remaining; 2025 uint64_t io_units_done; 2026 struct iovec iov[0]; 2027 }; 2028 2029 static void 2030 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2031 { 2032 assert(cb_arg == NULL); 2033 spdk_bs_sequence_finish(seq, bserrno); 2034 } 2035 2036 static void 2037 _spdk_rw_iov_split_next(void *cb_arg, int bserrno) 2038 { 2039 struct rw_iov_ctx *ctx = cb_arg; 2040 struct spdk_blob *blob = ctx->blob; 2041 struct iovec *iov, *orig_iov; 2042 int iovcnt; 2043 size_t orig_iovoff; 2044 uint64_t io_units_count, io_units_to_boundary, io_unit_offset; 2045 uint64_t byte_count; 2046 2047 if (bserrno != 0 || ctx->io_units_remaining == 0) { 2048 ctx->cb_fn(ctx->cb_arg, bserrno); 2049 free(ctx); 2050 return; 2051 } 2052 2053 io_unit_offset = ctx->io_unit_offset; 2054 io_units_to_boundary = _spdk_bs_num_io_units_to_cluster_boundary(blob, io_unit_offset); 2055 io_units_count = spdk_min(ctx->io_units_remaining, io_units_to_boundary); 2056 /* 2057 * Get index and offset into the original iov array for our current position in the I/O sequence. 2058 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 2059 * point to the current position in the I/O sequence. 2060 */ 2061 byte_count = ctx->io_units_done * blob->bs->io_unit_size; 2062 orig_iov = &ctx->orig_iov[0]; 2063 orig_iovoff = 0; 2064 while (byte_count > 0) { 2065 if (byte_count >= orig_iov->iov_len) { 2066 byte_count -= orig_iov->iov_len; 2067 orig_iov++; 2068 } else { 2069 orig_iovoff = byte_count; 2070 byte_count = 0; 2071 } 2072 } 2073 2074 /* 2075 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 2076 * bytes of this next I/O remain to be accounted for in the new iov array. 2077 */ 2078 byte_count = io_units_count * blob->bs->io_unit_size; 2079 iov = &ctx->iov[0]; 2080 iovcnt = 0; 2081 while (byte_count > 0) { 2082 assert(iovcnt < ctx->iovcnt); 2083 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 2084 iov->iov_base = orig_iov->iov_base + orig_iovoff; 2085 byte_count -= iov->iov_len; 2086 orig_iovoff = 0; 2087 orig_iov++; 2088 iov++; 2089 iovcnt++; 2090 } 2091 2092 ctx->io_unit_offset += io_units_count; 2093 ctx->io_units_remaining -= io_units_count; 2094 ctx->io_units_done += io_units_count; 2095 iov = &ctx->iov[0]; 2096 2097 if (ctx->read) { 2098 spdk_blob_io_readv(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2099 io_units_count, _spdk_rw_iov_split_next, ctx); 2100 } else { 2101 spdk_blob_io_writev(ctx->blob, ctx->channel, iov, iovcnt, io_unit_offset, 2102 io_units_count, _spdk_rw_iov_split_next, ctx); 2103 } 2104 } 2105 2106 static void 2107 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 2108 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2109 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 2110 { 2111 struct spdk_bs_cpl cpl; 2112 2113 assert(blob != NULL); 2114 2115 if (!read && blob->data_ro) { 2116 cb_fn(cb_arg, -EPERM); 2117 return; 2118 } 2119 2120 if (length == 0) { 2121 cb_fn(cb_arg, 0); 2122 return; 2123 } 2124 2125 if (offset + length > _spdk_bs_cluster_to_lba(blob->bs, blob->active.num_clusters)) { 2126 cb_fn(cb_arg, -EINVAL); 2127 return; 2128 } 2129 2130 /* 2131 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 2132 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 2133 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 2134 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 2135 * to allocate a separate iov array and split the I/O such that none of the resulting 2136 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 2137 * but since this case happens very infrequently, any performance impact will be negligible. 2138 * 2139 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 2140 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 2141 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 2142 * when the batch was completed, to allow for freeing the memory for the iov arrays. 2143 */ 2144 if (spdk_likely(length <= _spdk_bs_num_io_units_to_cluster_boundary(blob, offset))) { 2145 uint32_t lba_count; 2146 uint64_t lba; 2147 2148 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2149 cpl.u.blob_basic.cb_fn = cb_fn; 2150 cpl.u.blob_basic.cb_arg = cb_arg; 2151 2152 if (blob->frozen_refcnt) { 2153 /* This blob I/O is frozen */ 2154 enum spdk_blob_op_type op_type; 2155 spdk_bs_user_op_t *op; 2156 struct spdk_bs_channel *bs_channel = spdk_io_channel_get_ctx(_channel); 2157 2158 op_type = read ? SPDK_BLOB_READV : SPDK_BLOB_WRITEV; 2159 op = spdk_bs_user_op_alloc(_channel, &cpl, op_type, blob, iov, iovcnt, offset, length); 2160 if (!op) { 2161 cb_fn(cb_arg, -ENOMEM); 2162 return; 2163 } 2164 2165 TAILQ_INSERT_TAIL(&bs_channel->queued_io, op, link); 2166 2167 return; 2168 } 2169 2170 _spdk_blob_calculate_lba_and_lba_count(blob, offset, length, &lba, &lba_count); 2171 2172 if (read) { 2173 spdk_bs_sequence_t *seq; 2174 2175 seq = spdk_bs_sequence_start(_channel, &cpl); 2176 if (!seq) { 2177 cb_fn(cb_arg, -ENOMEM); 2178 return; 2179 } 2180 2181 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2182 spdk_bs_sequence_readv_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2183 } else { 2184 spdk_bs_sequence_readv_bs_dev(seq, blob->back_bs_dev, iov, iovcnt, lba, lba_count, 2185 _spdk_rw_iov_done, NULL); 2186 } 2187 } else { 2188 if (_spdk_bs_io_unit_is_allocated(blob, offset)) { 2189 spdk_bs_sequence_t *seq; 2190 2191 seq = spdk_bs_sequence_start(_channel, &cpl); 2192 if (!seq) { 2193 cb_fn(cb_arg, -ENOMEM); 2194 return; 2195 } 2196 2197 spdk_bs_sequence_writev_dev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 2198 } else { 2199 /* Queue this operation and allocate the cluster */ 2200 spdk_bs_user_op_t *op; 2201 2202 op = spdk_bs_user_op_alloc(_channel, &cpl, SPDK_BLOB_WRITEV, blob, iov, iovcnt, offset, 2203 length); 2204 if (!op) { 2205 cb_fn(cb_arg, -ENOMEM); 2206 return; 2207 } 2208 2209 _spdk_bs_allocate_and_copy_cluster(blob, _channel, offset, op); 2210 } 2211 } 2212 } else { 2213 struct rw_iov_ctx *ctx; 2214 2215 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 2216 if (ctx == NULL) { 2217 cb_fn(cb_arg, -ENOMEM); 2218 return; 2219 } 2220 2221 ctx->blob = blob; 2222 ctx->channel = _channel; 2223 ctx->cb_fn = cb_fn; 2224 ctx->cb_arg = cb_arg; 2225 ctx->read = read; 2226 ctx->orig_iov = iov; 2227 ctx->iovcnt = iovcnt; 2228 ctx->io_unit_offset = offset; 2229 ctx->io_units_remaining = length; 2230 ctx->io_units_done = 0; 2231 2232 _spdk_rw_iov_split_next(ctx, 0); 2233 } 2234 } 2235 2236 static struct spdk_blob * 2237 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 2238 { 2239 struct spdk_blob *blob; 2240 2241 TAILQ_FOREACH(blob, &bs->blobs, link) { 2242 if (blob->id == blobid) { 2243 return blob; 2244 } 2245 } 2246 2247 return NULL; 2248 } 2249 2250 static void 2251 _spdk_blob_get_snapshot_and_clone_entries(struct spdk_blob *blob, 2252 struct spdk_blob_list **snapshot_entry, struct spdk_blob_list **clone_entry) 2253 { 2254 assert(blob != NULL); 2255 *snapshot_entry = NULL; 2256 *clone_entry = NULL; 2257 2258 if (blob->parent_id == SPDK_BLOBID_INVALID) { 2259 return; 2260 } 2261 2262 TAILQ_FOREACH(*snapshot_entry, &blob->bs->snapshots, link) { 2263 if ((*snapshot_entry)->id == blob->parent_id) { 2264 break; 2265 } 2266 } 2267 2268 if (*snapshot_entry != NULL) { 2269 TAILQ_FOREACH(*clone_entry, &(*snapshot_entry)->clones, link) { 2270 if ((*clone_entry)->id == blob->id) { 2271 break; 2272 } 2273 } 2274 2275 assert(clone_entry != NULL); 2276 } 2277 } 2278 2279 static int 2280 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 2281 { 2282 struct spdk_blob_store *bs = io_device; 2283 struct spdk_bs_channel *channel = ctx_buf; 2284 struct spdk_bs_dev *dev; 2285 uint32_t max_ops = bs->max_channel_ops; 2286 uint32_t i; 2287 2288 dev = bs->dev; 2289 2290 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 2291 if (!channel->req_mem) { 2292 return -1; 2293 } 2294 2295 TAILQ_INIT(&channel->reqs); 2296 2297 for (i = 0; i < max_ops; i++) { 2298 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 2299 } 2300 2301 channel->bs = bs; 2302 channel->dev = dev; 2303 channel->dev_channel = dev->create_channel(dev); 2304 2305 if (!channel->dev_channel) { 2306 SPDK_ERRLOG("Failed to create device channel.\n"); 2307 free(channel->req_mem); 2308 return -1; 2309 } 2310 2311 TAILQ_INIT(&channel->need_cluster_alloc); 2312 TAILQ_INIT(&channel->queued_io); 2313 2314 return 0; 2315 } 2316 2317 static void 2318 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 2319 { 2320 struct spdk_bs_channel *channel = ctx_buf; 2321 spdk_bs_user_op_t *op; 2322 2323 while (!TAILQ_EMPTY(&channel->need_cluster_alloc)) { 2324 op = TAILQ_FIRST(&channel->need_cluster_alloc); 2325 TAILQ_REMOVE(&channel->need_cluster_alloc, op, link); 2326 spdk_bs_user_op_abort(op); 2327 } 2328 2329 while (!TAILQ_EMPTY(&channel->queued_io)) { 2330 op = TAILQ_FIRST(&channel->queued_io); 2331 TAILQ_REMOVE(&channel->queued_io, op, link); 2332 spdk_bs_user_op_abort(op); 2333 } 2334 2335 free(channel->req_mem); 2336 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 2337 } 2338 2339 static void 2340 _spdk_bs_dev_destroy(void *io_device) 2341 { 2342 struct spdk_blob_store *bs = io_device; 2343 struct spdk_blob *blob, *blob_tmp; 2344 2345 bs->dev->destroy(bs->dev); 2346 2347 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 2348 TAILQ_REMOVE(&bs->blobs, blob, link); 2349 _spdk_blob_free(blob); 2350 } 2351 2352 pthread_mutex_destroy(&bs->used_clusters_mutex); 2353 2354 spdk_bit_array_free(&bs->used_blobids); 2355 spdk_bit_array_free(&bs->used_md_pages); 2356 spdk_bit_array_free(&bs->used_clusters); 2357 /* 2358 * If this function is called for any reason except a successful unload, 2359 * the unload_cpl type will be NONE and this will be a nop. 2360 */ 2361 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 2362 2363 free(bs); 2364 } 2365 2366 static int 2367 _spdk_bs_blob_list_add(struct spdk_blob *blob) 2368 { 2369 spdk_blob_id snapshot_id; 2370 struct spdk_blob_list *snapshot_entry = NULL; 2371 struct spdk_blob_list *clone_entry = NULL; 2372 2373 assert(blob != NULL); 2374 2375 snapshot_id = blob->parent_id; 2376 if (snapshot_id == SPDK_BLOBID_INVALID) { 2377 return 0; 2378 } 2379 2380 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, snapshot_id); 2381 if (snapshot_entry == NULL) { 2382 /* Snapshot not found */ 2383 snapshot_entry = calloc(1, sizeof(struct spdk_blob_list)); 2384 if (snapshot_entry == NULL) { 2385 return -ENOMEM; 2386 } 2387 snapshot_entry->id = snapshot_id; 2388 TAILQ_INIT(&snapshot_entry->clones); 2389 TAILQ_INSERT_TAIL(&blob->bs->snapshots, snapshot_entry, link); 2390 } else { 2391 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 2392 if (clone_entry->id == blob->id) { 2393 break; 2394 } 2395 } 2396 } 2397 2398 if (clone_entry == NULL) { 2399 /* Clone not found */ 2400 clone_entry = calloc(1, sizeof(struct spdk_blob_list)); 2401 if (clone_entry == NULL) { 2402 return -ENOMEM; 2403 } 2404 clone_entry->id = blob->id; 2405 TAILQ_INIT(&clone_entry->clones); 2406 TAILQ_INSERT_TAIL(&snapshot_entry->clones, clone_entry, link); 2407 snapshot_entry->clone_count++; 2408 } 2409 2410 return 0; 2411 } 2412 2413 static void 2414 _spdk_bs_blob_list_remove(struct spdk_blob *blob) 2415 { 2416 struct spdk_blob_list *snapshot_entry = NULL; 2417 struct spdk_blob_list *clone_entry = NULL; 2418 2419 _spdk_blob_get_snapshot_and_clone_entries(blob, &snapshot_entry, &clone_entry); 2420 2421 if (snapshot_entry == NULL) { 2422 return; 2423 } 2424 2425 blob->parent_id = SPDK_BLOBID_INVALID; 2426 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2427 free(clone_entry); 2428 2429 snapshot_entry->clone_count--; 2430 } 2431 2432 static int 2433 _spdk_bs_blob_list_free(struct spdk_blob_store *bs) 2434 { 2435 struct spdk_blob_list *snapshot_entry; 2436 struct spdk_blob_list *snapshot_entry_tmp; 2437 struct spdk_blob_list *clone_entry; 2438 struct spdk_blob_list *clone_entry_tmp; 2439 2440 TAILQ_FOREACH_SAFE(snapshot_entry, &bs->snapshots, link, snapshot_entry_tmp) { 2441 TAILQ_FOREACH_SAFE(clone_entry, &snapshot_entry->clones, link, clone_entry_tmp) { 2442 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 2443 free(clone_entry); 2444 } 2445 TAILQ_REMOVE(&bs->snapshots, snapshot_entry, link); 2446 free(snapshot_entry); 2447 } 2448 2449 return 0; 2450 } 2451 2452 static void 2453 _spdk_bs_free(struct spdk_blob_store *bs) 2454 { 2455 _spdk_bs_blob_list_free(bs); 2456 2457 spdk_bs_unregister_md_thread(bs); 2458 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 2459 } 2460 2461 void 2462 spdk_bs_opts_init(struct spdk_bs_opts *opts) 2463 { 2464 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 2465 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 2466 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 2467 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 2468 opts->clear_method = BS_CLEAR_WITH_UNMAP; 2469 memset(&opts->bstype, 0, sizeof(opts->bstype)); 2470 opts->iter_cb_fn = NULL; 2471 opts->iter_cb_arg = NULL; 2472 } 2473 2474 static int 2475 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 2476 { 2477 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 2478 opts->max_channel_ops == 0) { 2479 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 2480 return -1; 2481 } 2482 2483 return 0; 2484 } 2485 2486 static int 2487 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts, struct spdk_blob_store **_bs) 2488 { 2489 struct spdk_blob_store *bs; 2490 uint64_t dev_size; 2491 int rc; 2492 2493 dev_size = dev->blocklen * dev->blockcnt; 2494 if (dev_size < opts->cluster_sz) { 2495 /* Device size cannot be smaller than cluster size of blobstore */ 2496 SPDK_INFOLOG(SPDK_LOG_BLOB, "Device size %" PRIu64 " is smaller than cluster size %" PRIu32 "\n", 2497 dev_size, opts->cluster_sz); 2498 return -ENOSPC; 2499 } 2500 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 2501 /* Cluster size cannot be smaller than page size */ 2502 SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than page size %d\n", 2503 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 2504 return -EINVAL; 2505 } 2506 bs = calloc(1, sizeof(struct spdk_blob_store)); 2507 if (!bs) { 2508 return -ENOMEM; 2509 } 2510 2511 TAILQ_INIT(&bs->blobs); 2512 TAILQ_INIT(&bs->snapshots); 2513 bs->dev = dev; 2514 bs->md_thread = spdk_get_thread(); 2515 assert(bs->md_thread != NULL); 2516 2517 /* 2518 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 2519 * even multiple of the cluster size. 2520 */ 2521 bs->cluster_sz = opts->cluster_sz; 2522 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 2523 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2524 bs->num_free_clusters = bs->total_clusters; 2525 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 2526 bs->io_unit_size = dev->blocklen; 2527 if (bs->used_clusters == NULL) { 2528 free(bs); 2529 return -ENOMEM; 2530 } 2531 2532 bs->max_channel_ops = opts->max_channel_ops; 2533 bs->super_blob = SPDK_BLOBID_INVALID; 2534 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 2535 2536 /* The metadata is assumed to be at least 1 page */ 2537 bs->used_md_pages = spdk_bit_array_create(1); 2538 bs->used_blobids = spdk_bit_array_create(0); 2539 2540 pthread_mutex_init(&bs->used_clusters_mutex, NULL); 2541 2542 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 2543 sizeof(struct spdk_bs_channel), "blobstore"); 2544 rc = spdk_bs_register_md_thread(bs); 2545 if (rc == -1) { 2546 spdk_io_device_unregister(bs, NULL); 2547 pthread_mutex_destroy(&bs->used_clusters_mutex); 2548 spdk_bit_array_free(&bs->used_blobids); 2549 spdk_bit_array_free(&bs->used_md_pages); 2550 spdk_bit_array_free(&bs->used_clusters); 2551 free(bs); 2552 /* FIXME: this is a lie but don't know how to get a proper error code here */ 2553 return -ENOMEM; 2554 } 2555 2556 *_bs = bs; 2557 return 0; 2558 } 2559 2560 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 2561 2562 struct spdk_bs_load_ctx { 2563 struct spdk_blob_store *bs; 2564 struct spdk_bs_super_block *super; 2565 2566 struct spdk_bs_md_mask *mask; 2567 bool in_page_chain; 2568 uint32_t page_index; 2569 uint32_t cur_page; 2570 struct spdk_blob_md_page *page; 2571 2572 spdk_bs_sequence_t *seq; 2573 spdk_blob_op_with_handle_complete iter_cb_fn; 2574 void *iter_cb_arg; 2575 struct spdk_blob *blob; 2576 spdk_blob_id blobid; 2577 }; 2578 2579 static void 2580 _spdk_bs_load_ctx_fail(struct spdk_bs_load_ctx *ctx, int bserrno) 2581 { 2582 assert(bserrno != 0); 2583 2584 spdk_free(ctx->super); 2585 spdk_bs_sequence_finish(ctx->seq, bserrno); 2586 _spdk_bs_free(ctx->bs); 2587 free(ctx); 2588 } 2589 2590 static void 2591 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 2592 { 2593 uint32_t i = 0; 2594 2595 while (true) { 2596 i = spdk_bit_array_find_first_set(array, i); 2597 if (i >= mask->length) { 2598 break; 2599 } 2600 mask->mask[i / 8] |= 1U << (i % 8); 2601 i++; 2602 } 2603 } 2604 2605 static int 2606 _spdk_bs_load_mask(struct spdk_bit_array **array_ptr, struct spdk_bs_md_mask *mask) 2607 { 2608 struct spdk_bit_array *array; 2609 uint32_t i; 2610 2611 if (spdk_bit_array_resize(array_ptr, mask->length) < 0) { 2612 return -ENOMEM; 2613 } 2614 2615 array = *array_ptr; 2616 for (i = 0; i < mask->length; i++) { 2617 if (mask->mask[i / 8] & (1U << (i % 8))) { 2618 spdk_bit_array_set(array, i); 2619 } 2620 } 2621 2622 return 0; 2623 } 2624 2625 static void 2626 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 2627 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 2628 { 2629 /* Update the values in the super block */ 2630 super->super_blob = bs->super_blob; 2631 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 2632 super->crc = _spdk_blob_md_page_calc_crc(super); 2633 spdk_bs_sequence_write_dev(seq, super, _spdk_bs_page_to_lba(bs, 0), 2634 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 2635 cb_fn, cb_arg); 2636 } 2637 2638 static void 2639 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2640 { 2641 struct spdk_bs_load_ctx *ctx = arg; 2642 uint64_t mask_size, lba, lba_count; 2643 2644 /* Write out the used clusters mask */ 2645 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 2646 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2647 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2648 if (!ctx->mask) { 2649 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2650 return; 2651 } 2652 2653 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 2654 ctx->mask->length = ctx->bs->total_clusters; 2655 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 2656 2657 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 2658 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 2659 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 2660 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2661 } 2662 2663 static void 2664 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2665 { 2666 struct spdk_bs_load_ctx *ctx = arg; 2667 uint64_t mask_size, lba, lba_count; 2668 2669 if (seq->bserrno) { 2670 _spdk_bs_load_ctx_fail(ctx, seq->bserrno); 2671 return; 2672 } 2673 2674 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 2675 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 2676 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 2677 if (!ctx->mask) { 2678 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2679 return; 2680 } 2681 2682 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 2683 ctx->mask->length = ctx->super->md_len; 2684 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 2685 2686 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 2687 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 2688 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 2689 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2690 } 2691 2692 static void 2693 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 2694 { 2695 struct spdk_bs_load_ctx *ctx = arg; 2696 uint64_t mask_size, lba, lba_count; 2697 2698 if (ctx->super->used_blobid_mask_len == 0) { 2699 /* 2700 * This is a pre-v3 on-disk format where the blobid mask does not get 2701 * written to disk. 2702 */ 2703 cb_fn(seq, arg, 0); 2704 return; 2705 } 2706 2707 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2708 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2709 SPDK_MALLOC_DMA); 2710 if (!ctx->mask) { 2711 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2712 return; 2713 } 2714 2715 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 2716 ctx->mask->length = ctx->super->md_len; 2717 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 2718 2719 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask); 2720 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2721 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2722 spdk_bs_sequence_write_dev(seq, ctx->mask, lba, lba_count, cb_fn, arg); 2723 } 2724 2725 static void 2726 _spdk_blob_set_thin_provision(struct spdk_blob *blob) 2727 { 2728 _spdk_blob_verify_md_op(blob); 2729 blob->invalid_flags |= SPDK_BLOB_THIN_PROV; 2730 blob->state = SPDK_BLOB_STATE_DIRTY; 2731 } 2732 2733 static void 2734 _spdk_blob_set_clear_method(struct spdk_blob *blob, enum blob_clear_method clear_method) 2735 { 2736 _spdk_blob_verify_md_op(blob); 2737 blob->clear_method = clear_method; 2738 blob->md_ro_flags |= (clear_method << SPDK_BLOB_CLEAR_METHOD_SHIFT); 2739 blob->state = SPDK_BLOB_STATE_DIRTY; 2740 } 2741 2742 static void _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno); 2743 2744 static void 2745 _spdk_bs_delete_corrupted_blob_cpl(void *cb_arg, int bserrno) 2746 { 2747 struct spdk_bs_load_ctx *ctx = cb_arg; 2748 spdk_blob_id id; 2749 int64_t page_num; 2750 2751 /* Iterate to next blob (we can't use spdk_bs_iter_next function as our 2752 * last blob has been removed */ 2753 page_num = _spdk_bs_blobid_to_page(ctx->blobid); 2754 page_num++; 2755 page_num = spdk_bit_array_find_first_set(ctx->bs->used_blobids, page_num); 2756 if (page_num >= spdk_bit_array_capacity(ctx->bs->used_blobids)) { 2757 _spdk_bs_load_iter(ctx, NULL, -ENOENT); 2758 return; 2759 } 2760 2761 id = _spdk_bs_page_to_blobid(page_num); 2762 2763 spdk_bs_open_blob(ctx->bs, id, _spdk_bs_load_iter, ctx); 2764 } 2765 2766 static void 2767 _spdk_bs_delete_corrupted_close_cb(void *cb_arg, int bserrno) 2768 { 2769 struct spdk_bs_load_ctx *ctx = cb_arg; 2770 2771 if (bserrno != 0) { 2772 SPDK_ERRLOG("Failed to close corrupted blob\n"); 2773 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2774 return; 2775 } 2776 2777 spdk_bs_delete_blob(ctx->bs, ctx->blobid, _spdk_bs_delete_corrupted_blob_cpl, ctx); 2778 } 2779 2780 static void 2781 _spdk_bs_delete_corrupted_blob(void *cb_arg, int bserrno) 2782 { 2783 struct spdk_bs_load_ctx *ctx = cb_arg; 2784 uint64_t i; 2785 2786 if (bserrno != 0) { 2787 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2788 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2789 return; 2790 } 2791 2792 /* Snapshot and clone have the same copy of cluster map at this point. 2793 * Let's clear cluster map for snpashot now so that it won't be cleared 2794 * for clone later when we remove snapshot. Also set thin provision to 2795 * pass data corruption check */ 2796 for (i = 0; i < ctx->blob->active.num_clusters; i++) { 2797 ctx->blob->active.clusters[i] = 0; 2798 } 2799 2800 ctx->blob->md_ro = false; 2801 2802 _spdk_blob_set_thin_provision(ctx->blob); 2803 2804 ctx->blobid = ctx->blob->id; 2805 2806 spdk_blob_close(ctx->blob, _spdk_bs_delete_corrupted_close_cb, ctx); 2807 } 2808 2809 static void 2810 _spdk_bs_update_corrupted_blob(void *cb_arg, int bserrno) 2811 { 2812 struct spdk_bs_load_ctx *ctx = cb_arg; 2813 2814 if (bserrno != 0) { 2815 SPDK_ERRLOG("Failed to close clone of a corrupted blob\n"); 2816 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2817 return; 2818 } 2819 2820 ctx->blob->md_ro = false; 2821 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_PENDING_REMOVAL, true); 2822 _spdk_blob_remove_xattr(ctx->blob, SNAPSHOT_IN_PROGRESS, true); 2823 spdk_blob_set_read_only(ctx->blob); 2824 2825 if (ctx->iter_cb_fn) { 2826 ctx->iter_cb_fn(ctx->iter_cb_arg, ctx->blob, 0); 2827 } 2828 _spdk_bs_blob_list_add(ctx->blob); 2829 2830 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2831 } 2832 2833 static void 2834 _spdk_bs_examine_clone(void *cb_arg, struct spdk_blob *blob, int bserrno) 2835 { 2836 struct spdk_bs_load_ctx *ctx = cb_arg; 2837 2838 if (bserrno != 0) { 2839 SPDK_ERRLOG("Failed to open clone of a corrupted blob\n"); 2840 spdk_bs_iter_next(ctx->bs, ctx->blob, _spdk_bs_load_iter, ctx); 2841 return; 2842 } 2843 2844 if (blob->parent_id == ctx->blob->id) { 2845 /* Power failure occured before updating clone (snapshot delete case) 2846 * or after updating clone (creating snapshot case) - keep snapshot */ 2847 spdk_blob_close(blob, _spdk_bs_update_corrupted_blob, ctx); 2848 } else { 2849 /* Power failure occured after updating clone (snapshot delete case) 2850 * or before updating clone (creating snapshot case) - remove snapshot */ 2851 spdk_blob_close(blob, _spdk_bs_delete_corrupted_blob, ctx); 2852 } 2853 } 2854 2855 static void 2856 _spdk_bs_load_iter(void *arg, struct spdk_blob *blob, int bserrno) 2857 { 2858 struct spdk_bs_load_ctx *ctx = arg; 2859 const void *value; 2860 size_t len; 2861 int rc = 0; 2862 2863 if (bserrno == 0) { 2864 /* Examine blob if it is corrupted after power failure. Fix 2865 * the ones that can be fixed and remove any other corrupted 2866 * ones. If it is not corrupted just process it */ 2867 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_PENDING_REMOVAL, &value, &len, true); 2868 if (rc != 0) { 2869 rc = _spdk_blob_get_xattr_value(blob, SNAPSHOT_IN_PROGRESS, &value, &len, true); 2870 if (rc != 0) { 2871 /* Not corrupted - process it and continue with iterating through blobs */ 2872 if (ctx->iter_cb_fn) { 2873 ctx->iter_cb_fn(ctx->iter_cb_arg, blob, 0); 2874 } 2875 _spdk_bs_blob_list_add(blob); 2876 spdk_bs_iter_next(ctx->bs, blob, _spdk_bs_load_iter, ctx); 2877 return; 2878 } 2879 2880 } 2881 2882 assert(len == sizeof(spdk_blob_id)); 2883 2884 ctx->blob = blob; 2885 2886 /* Open clone to check if we are able to fix this blob or should we remove it */ 2887 spdk_bs_open_blob(ctx->bs, *(spdk_blob_id *)value, _spdk_bs_examine_clone, ctx); 2888 return; 2889 } else if (bserrno == -ENOENT) { 2890 bserrno = 0; 2891 } else { 2892 /* 2893 * This case needs to be looked at further. Same problem 2894 * exists with applications that rely on explicit blob 2895 * iteration. We should just skip the blob that failed 2896 * to load and continue on to the next one. 2897 */ 2898 SPDK_ERRLOG("Error in iterating blobs\n"); 2899 } 2900 2901 ctx->iter_cb_fn = NULL; 2902 2903 spdk_free(ctx->super); 2904 spdk_free(ctx->mask); 2905 spdk_bs_sequence_finish(ctx->seq, bserrno); 2906 free(ctx); 2907 } 2908 2909 static void 2910 _spdk_bs_load_complete(struct spdk_bs_load_ctx *ctx, int bserrno) 2911 { 2912 spdk_bs_iter_first(ctx->bs, _spdk_bs_load_iter, ctx); 2913 } 2914 2915 static void 2916 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2917 { 2918 struct spdk_bs_load_ctx *ctx = cb_arg; 2919 int rc; 2920 2921 /* The type must be correct */ 2922 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 2923 2924 /* The length of the mask (in bits) must not be greater than 2925 * the length of the buffer (converted to bits) */ 2926 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 2927 2928 /* The length of the mask must be exactly equal to the size 2929 * (in pages) of the metadata region */ 2930 assert(ctx->mask->length == ctx->super->md_len); 2931 2932 rc = _spdk_bs_load_mask(&ctx->bs->used_blobids, ctx->mask); 2933 if (rc < 0) { 2934 spdk_free(ctx->mask); 2935 _spdk_bs_load_ctx_fail(ctx, rc); 2936 return; 2937 } 2938 2939 _spdk_bs_load_complete(ctx, bserrno); 2940 } 2941 2942 static void 2943 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2944 { 2945 struct spdk_bs_load_ctx *ctx = cb_arg; 2946 uint64_t lba, lba_count, mask_size; 2947 int rc; 2948 2949 /* The type must be correct */ 2950 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 2951 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2952 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 2953 struct spdk_blob_md_page) * 8)); 2954 /* The length of the mask must be exactly equal to the total number of clusters */ 2955 assert(ctx->mask->length == ctx->bs->total_clusters); 2956 2957 rc = _spdk_bs_load_mask(&ctx->bs->used_clusters, ctx->mask); 2958 if (rc < 0) { 2959 spdk_free(ctx->mask); 2960 _spdk_bs_load_ctx_fail(ctx, rc); 2961 return; 2962 } 2963 2964 ctx->bs->num_free_clusters = spdk_bit_array_count_clear(ctx->bs->used_clusters); 2965 assert(ctx->bs->num_free_clusters <= ctx->bs->total_clusters); 2966 2967 spdk_free(ctx->mask); 2968 2969 /* Read the used blobids mask */ 2970 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 2971 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 2972 SPDK_MALLOC_DMA); 2973 if (!ctx->mask) { 2974 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 2975 return; 2976 } 2977 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 2978 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 2979 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 2980 _spdk_bs_load_used_blobids_cpl, ctx); 2981 } 2982 2983 static void 2984 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2985 { 2986 struct spdk_bs_load_ctx *ctx = cb_arg; 2987 uint64_t lba, lba_count, mask_size; 2988 int rc; 2989 2990 /* The type must be correct */ 2991 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 2992 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 2993 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 2994 8)); 2995 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 2996 assert(ctx->mask->length == ctx->super->md_len); 2997 2998 rc = _spdk_bs_load_mask(&ctx->bs->used_md_pages, ctx->mask); 2999 if (rc < 0) { 3000 spdk_free(ctx->mask); 3001 _spdk_bs_load_ctx_fail(ctx, rc); 3002 return; 3003 } 3004 3005 spdk_free(ctx->mask); 3006 3007 /* Read the used clusters mask */ 3008 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 3009 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 3010 SPDK_MALLOC_DMA); 3011 if (!ctx->mask) { 3012 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3013 return; 3014 } 3015 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 3016 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 3017 spdk_bs_sequence_read_dev(seq, ctx->mask, lba, lba_count, 3018 _spdk_bs_load_used_clusters_cpl, ctx); 3019 } 3020 3021 static void 3022 _spdk_bs_load_read_used_pages(struct spdk_bs_load_ctx *ctx) 3023 { 3024 uint64_t lba, lba_count, mask_size; 3025 3026 /* Read the used pages mask */ 3027 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 3028 ctx->mask = spdk_zmalloc(mask_size, 0x1000, NULL, 3029 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3030 if (!ctx->mask) { 3031 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3032 return; 3033 } 3034 3035 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 3036 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 3037 spdk_bs_sequence_read_dev(ctx->seq, ctx->mask, lba, lba_count, 3038 _spdk_bs_load_used_pages_cpl, ctx); 3039 } 3040 3041 static int 3042 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 3043 { 3044 struct spdk_blob_md_descriptor *desc; 3045 size_t cur_desc = 0; 3046 3047 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3048 while (cur_desc < sizeof(page->descriptors)) { 3049 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3050 if (desc->length == 0) { 3051 /* If padding and length are 0, this terminates the page */ 3052 break; 3053 } 3054 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3055 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3056 unsigned int i, j; 3057 unsigned int cluster_count = 0; 3058 uint32_t cluster_idx; 3059 3060 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3061 3062 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3063 for (j = 0; j < desc_extent_rle->extents[i].length; j++) { 3064 cluster_idx = desc_extent_rle->extents[i].cluster_idx; 3065 /* 3066 * cluster_idx = 0 means an unallocated cluster - don't mark that 3067 * in the used cluster map. 3068 */ 3069 if (cluster_idx != 0) { 3070 spdk_bit_array_set(bs->used_clusters, cluster_idx + j); 3071 if (bs->num_free_clusters == 0) { 3072 return -ENOSPC; 3073 } 3074 bs->num_free_clusters--; 3075 } 3076 cluster_count++; 3077 } 3078 } 3079 if (cluster_count == 0) { 3080 return -EINVAL; 3081 } 3082 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3083 /* Skip this item */ 3084 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3085 /* Skip this item */ 3086 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3087 /* Skip this item */ 3088 } else { 3089 /* Error */ 3090 return -EINVAL; 3091 } 3092 /* Advance to the next descriptor */ 3093 cur_desc += sizeof(*desc) + desc->length; 3094 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3095 break; 3096 } 3097 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3098 } 3099 return 0; 3100 } 3101 3102 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 3103 { 3104 uint32_t crc; 3105 3106 crc = _spdk_blob_md_page_calc_crc(ctx->page); 3107 if (crc != ctx->page->crc) { 3108 return false; 3109 } 3110 3111 if (ctx->page->sequence_num == 0 && 3112 _spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 3113 return false; 3114 } 3115 return true; 3116 } 3117 3118 static void 3119 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx); 3120 3121 static void 3122 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3123 { 3124 struct spdk_bs_load_ctx *ctx = cb_arg; 3125 3126 _spdk_bs_load_complete(ctx, bserrno); 3127 } 3128 3129 static void 3130 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3131 { 3132 struct spdk_bs_load_ctx *ctx = cb_arg; 3133 3134 spdk_free(ctx->mask); 3135 ctx->mask = NULL; 3136 3137 _spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_load_write_used_clusters_cpl); 3138 } 3139 3140 static void 3141 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3142 { 3143 struct spdk_bs_load_ctx *ctx = cb_arg; 3144 3145 spdk_free(ctx->mask); 3146 ctx->mask = NULL; 3147 3148 _spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_load_write_used_blobids_cpl); 3149 } 3150 3151 static void 3152 _spdk_bs_load_write_used_md(struct spdk_bs_load_ctx *ctx, int bserrno) 3153 { 3154 _spdk_bs_write_used_md(ctx->seq, ctx, _spdk_bs_load_write_used_pages_cpl); 3155 } 3156 3157 static void 3158 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3159 { 3160 struct spdk_bs_load_ctx *ctx = cb_arg; 3161 uint64_t num_md_clusters; 3162 uint64_t i; 3163 uint32_t page_num; 3164 3165 if (bserrno != 0) { 3166 _spdk_bs_load_ctx_fail(ctx, bserrno); 3167 return; 3168 } 3169 3170 page_num = ctx->cur_page; 3171 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 3172 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 3173 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 3174 if (ctx->page->sequence_num == 0) { 3175 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 3176 } 3177 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 3178 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3179 return; 3180 } 3181 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 3182 ctx->in_page_chain = true; 3183 ctx->cur_page = ctx->page->next; 3184 _spdk_bs_load_replay_cur_md_page(ctx); 3185 return; 3186 } 3187 } 3188 } 3189 3190 ctx->in_page_chain = false; 3191 3192 do { 3193 ctx->page_index++; 3194 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 3195 3196 if (ctx->page_index < ctx->super->md_len) { 3197 ctx->cur_page = ctx->page_index; 3198 _spdk_bs_load_replay_cur_md_page(ctx); 3199 } else { 3200 /* Claim all of the clusters used by the metadata */ 3201 num_md_clusters = spdk_divide_round_up(ctx->super->md_len, ctx->bs->pages_per_cluster); 3202 for (i = 0; i < num_md_clusters; i++) { 3203 _spdk_bs_claim_cluster(ctx->bs, i); 3204 } 3205 spdk_free(ctx->page); 3206 _spdk_bs_load_write_used_md(ctx, bserrno); 3207 } 3208 } 3209 3210 static void 3211 _spdk_bs_load_replay_cur_md_page(struct spdk_bs_load_ctx *ctx) 3212 { 3213 uint64_t lba; 3214 3215 assert(ctx->cur_page < ctx->super->md_len); 3216 lba = _spdk_bs_md_page_to_lba(ctx->bs, ctx->cur_page); 3217 spdk_bs_sequence_read_dev(ctx->seq, ctx->page, lba, 3218 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3219 _spdk_bs_load_replay_md_cpl, ctx); 3220 } 3221 3222 static void 3223 _spdk_bs_load_replay_md(struct spdk_bs_load_ctx *ctx) 3224 { 3225 ctx->page_index = 0; 3226 ctx->cur_page = 0; 3227 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3228 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3229 if (!ctx->page) { 3230 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3231 return; 3232 } 3233 _spdk_bs_load_replay_cur_md_page(ctx); 3234 } 3235 3236 static void 3237 _spdk_bs_recover(struct spdk_bs_load_ctx *ctx) 3238 { 3239 int rc; 3240 3241 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 3242 if (rc < 0) { 3243 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3244 return; 3245 } 3246 3247 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 3248 if (rc < 0) { 3249 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3250 return; 3251 } 3252 3253 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3254 if (rc < 0) { 3255 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3256 return; 3257 } 3258 3259 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 3260 _spdk_bs_load_replay_md(ctx); 3261 } 3262 3263 static void 3264 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3265 { 3266 struct spdk_bs_load_ctx *ctx = cb_arg; 3267 uint32_t crc; 3268 int rc; 3269 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 3270 3271 if (ctx->super->version > SPDK_BS_VERSION || 3272 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 3273 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3274 return; 3275 } 3276 3277 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3278 sizeof(ctx->super->signature)) != 0) { 3279 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3280 return; 3281 } 3282 3283 crc = _spdk_blob_md_page_calc_crc(ctx->super); 3284 if (crc != ctx->super->crc) { 3285 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3286 return; 3287 } 3288 3289 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3290 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 3291 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 3292 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 3293 } else { 3294 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 3295 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3296 SPDK_LOGDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 3297 _spdk_bs_load_ctx_fail(ctx, -ENXIO); 3298 return; 3299 } 3300 3301 if (ctx->super->size > ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen) { 3302 SPDK_NOTICELOG("Size mismatch, dev size: %lu, blobstore size: %lu\n", 3303 ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen, ctx->super->size); 3304 _spdk_bs_load_ctx_fail(ctx, -EILSEQ); 3305 return; 3306 } 3307 3308 if (ctx->super->size == 0) { 3309 ctx->super->size = ctx->bs->dev->blockcnt * ctx->bs->dev->blocklen; 3310 } 3311 3312 if (ctx->super->io_unit_size == 0) { 3313 ctx->super->io_unit_size = SPDK_BS_PAGE_SIZE; 3314 } 3315 3316 /* Parse the super block */ 3317 ctx->bs->clean = 1; 3318 ctx->bs->cluster_sz = ctx->super->cluster_size; 3319 ctx->bs->total_clusters = ctx->super->size / ctx->super->cluster_size; 3320 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 3321 ctx->bs->io_unit_size = ctx->super->io_unit_size; 3322 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 3323 if (rc < 0) { 3324 _spdk_bs_load_ctx_fail(ctx, -ENOMEM); 3325 return; 3326 } 3327 ctx->bs->md_start = ctx->super->md_start; 3328 ctx->bs->md_len = ctx->super->md_len; 3329 ctx->bs->total_data_clusters = ctx->bs->total_clusters - spdk_divide_round_up( 3330 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 3331 ctx->bs->super_blob = ctx->super->super_blob; 3332 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 3333 3334 if (ctx->super->used_blobid_mask_len == 0 || ctx->super->clean == 0) { 3335 _spdk_bs_recover(ctx); 3336 } else { 3337 _spdk_bs_load_read_used_pages(ctx); 3338 } 3339 } 3340 3341 void 3342 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3343 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3344 { 3345 struct spdk_blob_store *bs; 3346 struct spdk_bs_cpl cpl; 3347 struct spdk_bs_load_ctx *ctx; 3348 struct spdk_bs_opts opts = {}; 3349 int err; 3350 3351 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 3352 3353 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3354 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "unsupported dev block length of %d\n", dev->blocklen); 3355 dev->destroy(dev); 3356 cb_fn(cb_arg, NULL, -EINVAL); 3357 return; 3358 } 3359 3360 if (o) { 3361 opts = *o; 3362 } else { 3363 spdk_bs_opts_init(&opts); 3364 } 3365 3366 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 3367 dev->destroy(dev); 3368 cb_fn(cb_arg, NULL, -EINVAL); 3369 return; 3370 } 3371 3372 err = _spdk_bs_alloc(dev, &opts, &bs); 3373 if (err) { 3374 dev->destroy(dev); 3375 cb_fn(cb_arg, NULL, err); 3376 return; 3377 } 3378 3379 ctx = calloc(1, sizeof(*ctx)); 3380 if (!ctx) { 3381 _spdk_bs_free(bs); 3382 cb_fn(cb_arg, NULL, -ENOMEM); 3383 return; 3384 } 3385 3386 ctx->bs = bs; 3387 ctx->iter_cb_fn = opts.iter_cb_fn; 3388 ctx->iter_cb_arg = opts.iter_cb_arg; 3389 3390 /* Allocate memory for the super block */ 3391 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3392 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3393 if (!ctx->super) { 3394 free(ctx); 3395 _spdk_bs_free(bs); 3396 cb_fn(cb_arg, NULL, -ENOMEM); 3397 return; 3398 } 3399 3400 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3401 cpl.u.bs_handle.cb_fn = cb_fn; 3402 cpl.u.bs_handle.cb_arg = cb_arg; 3403 cpl.u.bs_handle.bs = bs; 3404 3405 ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3406 if (!ctx->seq) { 3407 spdk_free(ctx->super); 3408 free(ctx); 3409 _spdk_bs_free(bs); 3410 cb_fn(cb_arg, NULL, -ENOMEM); 3411 return; 3412 } 3413 3414 /* Read the super block */ 3415 spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3416 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3417 _spdk_bs_load_super_cpl, ctx); 3418 } 3419 3420 /* END spdk_bs_load */ 3421 3422 /* START spdk_bs_dump */ 3423 3424 struct spdk_bs_dump_ctx { 3425 struct spdk_blob_store *bs; 3426 struct spdk_bs_super_block *super; 3427 uint32_t cur_page; 3428 struct spdk_blob_md_page *page; 3429 spdk_bs_sequence_t *seq; 3430 FILE *fp; 3431 spdk_bs_dump_print_xattr print_xattr_fn; 3432 char xattr_name[4096]; 3433 }; 3434 3435 static void 3436 _spdk_bs_dump_finish(spdk_bs_sequence_t *seq, struct spdk_bs_dump_ctx *ctx, int bserrno) 3437 { 3438 spdk_free(ctx->super); 3439 3440 /* 3441 * We need to defer calling spdk_bs_call_cpl() until after 3442 * dev destruction, so tuck these away for later use. 3443 */ 3444 ctx->bs->unload_err = bserrno; 3445 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3446 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3447 3448 spdk_bs_sequence_finish(seq, 0); 3449 _spdk_bs_free(ctx->bs); 3450 free(ctx); 3451 } 3452 3453 static void _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 3454 3455 static void 3456 _spdk_bs_dump_print_md_page(struct spdk_bs_dump_ctx *ctx) 3457 { 3458 uint32_t page_idx = ctx->cur_page; 3459 struct spdk_blob_md_page *page = ctx->page; 3460 struct spdk_blob_md_descriptor *desc; 3461 size_t cur_desc = 0; 3462 uint32_t crc; 3463 3464 fprintf(ctx->fp, "=========\n"); 3465 fprintf(ctx->fp, "Metadata Page Index: %" PRIu32 " (0x%" PRIx32 ")\n", page_idx, page_idx); 3466 fprintf(ctx->fp, "Blob ID: 0x%" PRIx64 "\n", page->id); 3467 3468 crc = _spdk_blob_md_page_calc_crc(page); 3469 fprintf(ctx->fp, "CRC: 0x%" PRIx32 " (%s)\n", page->crc, crc == page->crc ? "OK" : "Mismatch"); 3470 3471 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 3472 while (cur_desc < sizeof(page->descriptors)) { 3473 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 3474 if (desc->length == 0) { 3475 /* If padding and length are 0, this terminates the page */ 3476 break; 3477 } 3478 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT_RLE) { 3479 struct spdk_blob_md_descriptor_extent_rle *desc_extent_rle; 3480 unsigned int i; 3481 3482 desc_extent_rle = (struct spdk_blob_md_descriptor_extent_rle *)desc; 3483 3484 for (i = 0; i < desc_extent_rle->length / sizeof(desc_extent_rle->extents[0]); i++) { 3485 if (desc_extent_rle->extents[i].cluster_idx != 0) { 3486 fprintf(ctx->fp, "Allocated Extent - Start: %" PRIu32, 3487 desc_extent_rle->extents[i].cluster_idx); 3488 } else { 3489 fprintf(ctx->fp, "Unallocated Extent - "); 3490 } 3491 fprintf(ctx->fp, " Length: %" PRIu32, desc_extent_rle->extents[i].length); 3492 fprintf(ctx->fp, "\n"); 3493 } 3494 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 3495 struct spdk_blob_md_descriptor_xattr *desc_xattr; 3496 uint32_t i; 3497 3498 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 3499 3500 if (desc_xattr->length != 3501 sizeof(desc_xattr->name_length) + sizeof(desc_xattr->value_length) + 3502 desc_xattr->name_length + desc_xattr->value_length) { 3503 } 3504 3505 memcpy(ctx->xattr_name, desc_xattr->name, desc_xattr->name_length); 3506 ctx->xattr_name[desc_xattr->name_length] = '\0'; 3507 fprintf(ctx->fp, "XATTR: name = \"%s\"\n", ctx->xattr_name); 3508 fprintf(ctx->fp, " value = \""); 3509 ctx->print_xattr_fn(ctx->fp, ctx->super->bstype.bstype, ctx->xattr_name, 3510 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 3511 desc_xattr->value_length); 3512 fprintf(ctx->fp, "\"\n"); 3513 for (i = 0; i < desc_xattr->value_length; i++) { 3514 if (i % 16 == 0) { 3515 fprintf(ctx->fp, " "); 3516 } 3517 fprintf(ctx->fp, "%02" PRIx8 " ", *((uint8_t *)desc_xattr->name + desc_xattr->name_length + i)); 3518 if ((i + 1) % 16 == 0) { 3519 fprintf(ctx->fp, "\n"); 3520 } 3521 } 3522 if (i % 16 != 0) { 3523 fprintf(ctx->fp, "\n"); 3524 } 3525 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR_INTERNAL) { 3526 /* TODO */ 3527 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 3528 /* TODO */ 3529 } else { 3530 /* Error */ 3531 } 3532 /* Advance to the next descriptor */ 3533 cur_desc += sizeof(*desc) + desc->length; 3534 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 3535 break; 3536 } 3537 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 3538 } 3539 } 3540 3541 static void 3542 _spdk_bs_dump_read_md_page_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3543 { 3544 struct spdk_bs_dump_ctx *ctx = cb_arg; 3545 3546 if (bserrno != 0) { 3547 _spdk_bs_dump_finish(seq, ctx, bserrno); 3548 return; 3549 } 3550 3551 if (ctx->page->id != 0) { 3552 _spdk_bs_dump_print_md_page(ctx); 3553 } 3554 3555 ctx->cur_page++; 3556 3557 if (ctx->cur_page < ctx->super->md_len) { 3558 _spdk_bs_dump_read_md_page(seq, ctx); 3559 } else { 3560 spdk_free(ctx->page); 3561 _spdk_bs_dump_finish(seq, ctx, 0); 3562 } 3563 } 3564 3565 static void 3566 _spdk_bs_dump_read_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 3567 { 3568 struct spdk_bs_dump_ctx *ctx = cb_arg; 3569 uint64_t lba; 3570 3571 assert(ctx->cur_page < ctx->super->md_len); 3572 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 3573 spdk_bs_sequence_read_dev(seq, ctx->page, lba, 3574 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 3575 _spdk_bs_dump_read_md_page_cpl, ctx); 3576 } 3577 3578 static void 3579 _spdk_bs_dump_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3580 { 3581 struct spdk_bs_dump_ctx *ctx = cb_arg; 3582 3583 fprintf(ctx->fp, "Signature: \"%.8s\" ", ctx->super->signature); 3584 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3585 sizeof(ctx->super->signature)) != 0) { 3586 fprintf(ctx->fp, "(Mismatch)\n"); 3587 _spdk_bs_dump_finish(seq, ctx, bserrno); 3588 return; 3589 } else { 3590 fprintf(ctx->fp, "(OK)\n"); 3591 } 3592 fprintf(ctx->fp, "Version: %" PRIu32 "\n", ctx->super->version); 3593 fprintf(ctx->fp, "CRC: 0x%x (%s)\n", ctx->super->crc, 3594 (ctx->super->crc == _spdk_blob_md_page_calc_crc(ctx->super)) ? "OK" : "Mismatch"); 3595 fprintf(ctx->fp, "Blobstore Type: %.*s\n", SPDK_BLOBSTORE_TYPE_LENGTH, ctx->super->bstype.bstype); 3596 fprintf(ctx->fp, "Cluster Size: %" PRIu32 "\n", ctx->super->cluster_size); 3597 fprintf(ctx->fp, "Super Blob ID: "); 3598 if (ctx->super->super_blob == SPDK_BLOBID_INVALID) { 3599 fprintf(ctx->fp, "(None)\n"); 3600 } else { 3601 fprintf(ctx->fp, "%" PRIu64 "\n", ctx->super->super_blob); 3602 } 3603 fprintf(ctx->fp, "Clean: %" PRIu32 "\n", ctx->super->clean); 3604 fprintf(ctx->fp, "Used Metadata Page Mask Start: %" PRIu32 "\n", ctx->super->used_page_mask_start); 3605 fprintf(ctx->fp, "Used Metadata Page Mask Length: %" PRIu32 "\n", ctx->super->used_page_mask_len); 3606 fprintf(ctx->fp, "Used Cluster Mask Start: %" PRIu32 "\n", ctx->super->used_cluster_mask_start); 3607 fprintf(ctx->fp, "Used Cluster Mask Length: %" PRIu32 "\n", ctx->super->used_cluster_mask_len); 3608 fprintf(ctx->fp, "Used Blob ID Mask Start: %" PRIu32 "\n", ctx->super->used_blobid_mask_start); 3609 fprintf(ctx->fp, "Used Blob ID Mask Length: %" PRIu32 "\n", ctx->super->used_blobid_mask_len); 3610 fprintf(ctx->fp, "Metadata Start: %" PRIu32 "\n", ctx->super->md_start); 3611 fprintf(ctx->fp, "Metadata Length: %" PRIu32 "\n", ctx->super->md_len); 3612 3613 ctx->cur_page = 0; 3614 ctx->page = spdk_zmalloc(SPDK_BS_PAGE_SIZE, SPDK_BS_PAGE_SIZE, 3615 NULL, SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3616 if (!ctx->page) { 3617 _spdk_bs_dump_finish(seq, ctx, -ENOMEM); 3618 return; 3619 } 3620 _spdk_bs_dump_read_md_page(seq, ctx); 3621 } 3622 3623 void 3624 spdk_bs_dump(struct spdk_bs_dev *dev, FILE *fp, spdk_bs_dump_print_xattr print_xattr_fn, 3625 spdk_bs_op_complete cb_fn, void *cb_arg) 3626 { 3627 struct spdk_blob_store *bs; 3628 struct spdk_bs_cpl cpl; 3629 spdk_bs_sequence_t *seq; 3630 struct spdk_bs_dump_ctx *ctx; 3631 struct spdk_bs_opts opts = {}; 3632 int err; 3633 3634 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Dumping blobstore from dev %p\n", dev); 3635 3636 spdk_bs_opts_init(&opts); 3637 3638 err = _spdk_bs_alloc(dev, &opts, &bs); 3639 if (err) { 3640 dev->destroy(dev); 3641 cb_fn(cb_arg, err); 3642 return; 3643 } 3644 3645 ctx = calloc(1, sizeof(*ctx)); 3646 if (!ctx) { 3647 _spdk_bs_free(bs); 3648 cb_fn(cb_arg, -ENOMEM); 3649 return; 3650 } 3651 3652 ctx->bs = bs; 3653 ctx->fp = fp; 3654 ctx->print_xattr_fn = print_xattr_fn; 3655 3656 /* Allocate memory for the super block */ 3657 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3658 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3659 if (!ctx->super) { 3660 free(ctx); 3661 _spdk_bs_free(bs); 3662 cb_fn(cb_arg, -ENOMEM); 3663 return; 3664 } 3665 3666 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3667 cpl.u.bs_basic.cb_fn = cb_fn; 3668 cpl.u.bs_basic.cb_arg = cb_arg; 3669 3670 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3671 if (!seq) { 3672 spdk_free(ctx->super); 3673 free(ctx); 3674 _spdk_bs_free(bs); 3675 cb_fn(cb_arg, -ENOMEM); 3676 return; 3677 } 3678 3679 /* Read the super block */ 3680 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 3681 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 3682 _spdk_bs_dump_super_cpl, ctx); 3683 } 3684 3685 /* END spdk_bs_dump */ 3686 3687 /* START spdk_bs_init */ 3688 3689 struct spdk_bs_init_ctx { 3690 struct spdk_blob_store *bs; 3691 struct spdk_bs_super_block *super; 3692 }; 3693 3694 static void 3695 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3696 { 3697 struct spdk_bs_init_ctx *ctx = cb_arg; 3698 3699 spdk_free(ctx->super); 3700 free(ctx); 3701 3702 spdk_bs_sequence_finish(seq, bserrno); 3703 } 3704 3705 static void 3706 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3707 { 3708 struct spdk_bs_init_ctx *ctx = cb_arg; 3709 3710 /* Write super block */ 3711 spdk_bs_sequence_write_dev(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 3712 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 3713 _spdk_bs_init_persist_super_cpl, ctx); 3714 } 3715 3716 void 3717 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 3718 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 3719 { 3720 struct spdk_bs_init_ctx *ctx; 3721 struct spdk_blob_store *bs; 3722 struct spdk_bs_cpl cpl; 3723 spdk_bs_sequence_t *seq; 3724 spdk_bs_batch_t *batch; 3725 uint64_t num_md_lba; 3726 uint64_t num_md_pages; 3727 uint64_t num_md_clusters; 3728 uint32_t i; 3729 struct spdk_bs_opts opts = {}; 3730 int rc; 3731 3732 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 3733 3734 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 3735 SPDK_ERRLOG("unsupported dev block length of %d\n", 3736 dev->blocklen); 3737 dev->destroy(dev); 3738 cb_fn(cb_arg, NULL, -EINVAL); 3739 return; 3740 } 3741 3742 if (o) { 3743 opts = *o; 3744 } else { 3745 spdk_bs_opts_init(&opts); 3746 } 3747 3748 if (_spdk_bs_opts_verify(&opts) != 0) { 3749 dev->destroy(dev); 3750 cb_fn(cb_arg, NULL, -EINVAL); 3751 return; 3752 } 3753 3754 rc = _spdk_bs_alloc(dev, &opts, &bs); 3755 if (rc) { 3756 dev->destroy(dev); 3757 cb_fn(cb_arg, NULL, rc); 3758 return; 3759 } 3760 3761 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 3762 /* By default, allocate 1 page per cluster. 3763 * Technically, this over-allocates metadata 3764 * because more metadata will reduce the number 3765 * of usable clusters. This can be addressed with 3766 * more complex math in the future. 3767 */ 3768 bs->md_len = bs->total_clusters; 3769 } else { 3770 bs->md_len = opts.num_md_pages; 3771 } 3772 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 3773 if (rc < 0) { 3774 _spdk_bs_free(bs); 3775 cb_fn(cb_arg, NULL, -ENOMEM); 3776 return; 3777 } 3778 3779 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 3780 if (rc < 0) { 3781 _spdk_bs_free(bs); 3782 cb_fn(cb_arg, NULL, -ENOMEM); 3783 return; 3784 } 3785 3786 ctx = calloc(1, sizeof(*ctx)); 3787 if (!ctx) { 3788 _spdk_bs_free(bs); 3789 cb_fn(cb_arg, NULL, -ENOMEM); 3790 return; 3791 } 3792 3793 ctx->bs = bs; 3794 3795 /* Allocate memory for the super block */ 3796 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 3797 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 3798 if (!ctx->super) { 3799 free(ctx); 3800 _spdk_bs_free(bs); 3801 cb_fn(cb_arg, NULL, -ENOMEM); 3802 return; 3803 } 3804 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 3805 sizeof(ctx->super->signature)); 3806 ctx->super->version = SPDK_BS_VERSION; 3807 ctx->super->length = sizeof(*ctx->super); 3808 ctx->super->super_blob = bs->super_blob; 3809 ctx->super->clean = 0; 3810 ctx->super->cluster_size = bs->cluster_sz; 3811 ctx->super->io_unit_size = bs->io_unit_size; 3812 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 3813 3814 /* Calculate how many pages the metadata consumes at the front 3815 * of the disk. 3816 */ 3817 3818 /* The super block uses 1 page */ 3819 num_md_pages = 1; 3820 3821 /* The used_md_pages mask requires 1 bit per metadata page, rounded 3822 * up to the nearest page, plus a header. 3823 */ 3824 ctx->super->used_page_mask_start = num_md_pages; 3825 ctx->super->used_page_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3826 spdk_divide_round_up(bs->md_len, 8), 3827 SPDK_BS_PAGE_SIZE); 3828 num_md_pages += ctx->super->used_page_mask_len; 3829 3830 /* The used_clusters mask requires 1 bit per cluster, rounded 3831 * up to the nearest page, plus a header. 3832 */ 3833 ctx->super->used_cluster_mask_start = num_md_pages; 3834 ctx->super->used_cluster_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3835 spdk_divide_round_up(bs->total_clusters, 8), 3836 SPDK_BS_PAGE_SIZE); 3837 num_md_pages += ctx->super->used_cluster_mask_len; 3838 3839 /* The used_blobids mask requires 1 bit per metadata page, rounded 3840 * up to the nearest page, plus a header. 3841 */ 3842 ctx->super->used_blobid_mask_start = num_md_pages; 3843 ctx->super->used_blobid_mask_len = spdk_divide_round_up(sizeof(struct spdk_bs_md_mask) + 3844 spdk_divide_round_up(bs->md_len, 8), 3845 SPDK_BS_PAGE_SIZE); 3846 num_md_pages += ctx->super->used_blobid_mask_len; 3847 3848 /* The metadata region size was chosen above */ 3849 ctx->super->md_start = bs->md_start = num_md_pages; 3850 ctx->super->md_len = bs->md_len; 3851 num_md_pages += bs->md_len; 3852 3853 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 3854 3855 ctx->super->size = dev->blockcnt * dev->blocklen; 3856 3857 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 3858 3859 num_md_clusters = spdk_divide_round_up(num_md_pages, bs->pages_per_cluster); 3860 if (num_md_clusters > bs->total_clusters) { 3861 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 3862 "please decrease number of pages reserved for metadata " 3863 "or increase cluster size.\n"); 3864 spdk_free(ctx->super); 3865 free(ctx); 3866 _spdk_bs_free(bs); 3867 cb_fn(cb_arg, NULL, -ENOMEM); 3868 return; 3869 } 3870 /* Claim all of the clusters used by the metadata */ 3871 for (i = 0; i < num_md_clusters; i++) { 3872 _spdk_bs_claim_cluster(bs, i); 3873 } 3874 3875 bs->total_data_clusters = bs->num_free_clusters; 3876 3877 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 3878 cpl.u.bs_handle.cb_fn = cb_fn; 3879 cpl.u.bs_handle.cb_arg = cb_arg; 3880 cpl.u.bs_handle.bs = bs; 3881 3882 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3883 if (!seq) { 3884 spdk_free(ctx->super); 3885 free(ctx); 3886 _spdk_bs_free(bs); 3887 cb_fn(cb_arg, NULL, -ENOMEM); 3888 return; 3889 } 3890 3891 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 3892 3893 /* Clear metadata space */ 3894 spdk_bs_batch_write_zeroes_dev(batch, 0, num_md_lba); 3895 3896 switch (opts.clear_method) { 3897 case BS_CLEAR_WITH_UNMAP: 3898 /* Trim data clusters */ 3899 spdk_bs_batch_unmap_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3900 break; 3901 case BS_CLEAR_WITH_WRITE_ZEROES: 3902 /* Write_zeroes to data clusters */ 3903 spdk_bs_batch_write_zeroes_dev(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 3904 break; 3905 case BS_CLEAR_WITH_NONE: 3906 default: 3907 break; 3908 } 3909 3910 spdk_bs_batch_close(batch); 3911 } 3912 3913 /* END spdk_bs_init */ 3914 3915 /* START spdk_bs_destroy */ 3916 3917 static void 3918 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3919 { 3920 struct spdk_bs_init_ctx *ctx = cb_arg; 3921 struct spdk_blob_store *bs = ctx->bs; 3922 3923 /* 3924 * We need to defer calling spdk_bs_call_cpl() until after 3925 * dev destruction, so tuck these away for later use. 3926 */ 3927 bs->unload_err = bserrno; 3928 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3929 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3930 3931 spdk_bs_sequence_finish(seq, bserrno); 3932 3933 _spdk_bs_free(bs); 3934 free(ctx); 3935 } 3936 3937 void 3938 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 3939 void *cb_arg) 3940 { 3941 struct spdk_bs_cpl cpl; 3942 spdk_bs_sequence_t *seq; 3943 struct spdk_bs_init_ctx *ctx; 3944 3945 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 3946 3947 if (!TAILQ_EMPTY(&bs->blobs)) { 3948 SPDK_ERRLOG("Blobstore still has open blobs\n"); 3949 cb_fn(cb_arg, -EBUSY); 3950 return; 3951 } 3952 3953 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 3954 cpl.u.bs_basic.cb_fn = cb_fn; 3955 cpl.u.bs_basic.cb_arg = cb_arg; 3956 3957 ctx = calloc(1, sizeof(*ctx)); 3958 if (!ctx) { 3959 cb_fn(cb_arg, -ENOMEM); 3960 return; 3961 } 3962 3963 ctx->bs = bs; 3964 3965 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 3966 if (!seq) { 3967 free(ctx); 3968 cb_fn(cb_arg, -ENOMEM); 3969 return; 3970 } 3971 3972 /* Write zeroes to the super block */ 3973 spdk_bs_sequence_write_zeroes_dev(seq, 3974 _spdk_bs_page_to_lba(bs, 0), 3975 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 3976 _spdk_bs_destroy_trim_cpl, ctx); 3977 } 3978 3979 /* END spdk_bs_destroy */ 3980 3981 /* START spdk_bs_unload */ 3982 3983 static void 3984 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3985 { 3986 struct spdk_bs_load_ctx *ctx = cb_arg; 3987 3988 spdk_free(ctx->super); 3989 3990 /* 3991 * We need to defer calling spdk_bs_call_cpl() until after 3992 * dev destruction, so tuck these away for later use. 3993 */ 3994 ctx->bs->unload_err = bserrno; 3995 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 3996 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 3997 3998 spdk_bs_sequence_finish(seq, bserrno); 3999 4000 _spdk_bs_free(ctx->bs); 4001 free(ctx); 4002 } 4003 4004 static void 4005 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4006 { 4007 struct spdk_bs_load_ctx *ctx = cb_arg; 4008 4009 spdk_free(ctx->mask); 4010 ctx->super->clean = 1; 4011 4012 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 4013 } 4014 4015 static void 4016 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4017 { 4018 struct spdk_bs_load_ctx *ctx = cb_arg; 4019 4020 spdk_free(ctx->mask); 4021 ctx->mask = NULL; 4022 4023 _spdk_bs_write_used_clusters(seq, ctx, _spdk_bs_unload_write_used_clusters_cpl); 4024 } 4025 4026 static void 4027 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4028 { 4029 struct spdk_bs_load_ctx *ctx = cb_arg; 4030 4031 spdk_free(ctx->mask); 4032 ctx->mask = NULL; 4033 4034 _spdk_bs_write_used_blobids(seq, ctx, _spdk_bs_unload_write_used_blobids_cpl); 4035 } 4036 4037 static void 4038 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4039 { 4040 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 4041 } 4042 4043 void 4044 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 4045 { 4046 struct spdk_bs_cpl cpl; 4047 struct spdk_bs_load_ctx *ctx; 4048 4049 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 4050 4051 if (!TAILQ_EMPTY(&bs->blobs)) { 4052 SPDK_ERRLOG("Blobstore still has open blobs\n"); 4053 cb_fn(cb_arg, -EBUSY); 4054 return; 4055 } 4056 4057 ctx = calloc(1, sizeof(*ctx)); 4058 if (!ctx) { 4059 cb_fn(cb_arg, -ENOMEM); 4060 return; 4061 } 4062 4063 ctx->bs = bs; 4064 4065 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4066 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4067 if (!ctx->super) { 4068 free(ctx); 4069 cb_fn(cb_arg, -ENOMEM); 4070 return; 4071 } 4072 4073 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4074 cpl.u.bs_basic.cb_fn = cb_fn; 4075 cpl.u.bs_basic.cb_arg = cb_arg; 4076 4077 ctx->seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4078 if (!ctx->seq) { 4079 spdk_free(ctx->super); 4080 free(ctx); 4081 cb_fn(cb_arg, -ENOMEM); 4082 return; 4083 } 4084 4085 /* Read super block */ 4086 spdk_bs_sequence_read_dev(ctx->seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4087 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4088 _spdk_bs_unload_read_super_cpl, ctx); 4089 } 4090 4091 /* END spdk_bs_unload */ 4092 4093 /* START spdk_bs_set_super */ 4094 4095 struct spdk_bs_set_super_ctx { 4096 struct spdk_blob_store *bs; 4097 struct spdk_bs_super_block *super; 4098 }; 4099 4100 static void 4101 _spdk_bs_set_super_write_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4102 { 4103 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4104 4105 if (bserrno != 0) { 4106 SPDK_ERRLOG("Unable to write to super block of blobstore\n"); 4107 } 4108 4109 spdk_free(ctx->super); 4110 4111 spdk_bs_sequence_finish(seq, bserrno); 4112 4113 free(ctx); 4114 } 4115 4116 static void 4117 _spdk_bs_set_super_read_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4118 { 4119 struct spdk_bs_set_super_ctx *ctx = cb_arg; 4120 4121 if (bserrno != 0) { 4122 SPDK_ERRLOG("Unable to read super block of blobstore\n"); 4123 spdk_free(ctx->super); 4124 spdk_bs_sequence_finish(seq, bserrno); 4125 free(ctx); 4126 return; 4127 } 4128 4129 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_set_super_write_cpl, ctx); 4130 } 4131 4132 void 4133 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 4134 spdk_bs_op_complete cb_fn, void *cb_arg) 4135 { 4136 struct spdk_bs_cpl cpl; 4137 spdk_bs_sequence_t *seq; 4138 struct spdk_bs_set_super_ctx *ctx; 4139 4140 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Setting super blob id on blobstore\n"); 4141 4142 ctx = calloc(1, sizeof(*ctx)); 4143 if (!ctx) { 4144 cb_fn(cb_arg, -ENOMEM); 4145 return; 4146 } 4147 4148 ctx->bs = bs; 4149 4150 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL, 4151 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 4152 if (!ctx->super) { 4153 free(ctx); 4154 cb_fn(cb_arg, -ENOMEM); 4155 return; 4156 } 4157 4158 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 4159 cpl.u.bs_basic.cb_fn = cb_fn; 4160 cpl.u.bs_basic.cb_arg = cb_arg; 4161 4162 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4163 if (!seq) { 4164 spdk_free(ctx->super); 4165 free(ctx); 4166 cb_fn(cb_arg, -ENOMEM); 4167 return; 4168 } 4169 4170 bs->super_blob = blobid; 4171 4172 /* Read super block */ 4173 spdk_bs_sequence_read_dev(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 4174 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 4175 _spdk_bs_set_super_read_cpl, ctx); 4176 } 4177 4178 /* END spdk_bs_set_super */ 4179 4180 void 4181 spdk_bs_get_super(struct spdk_blob_store *bs, 4182 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4183 { 4184 if (bs->super_blob == SPDK_BLOBID_INVALID) { 4185 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 4186 } else { 4187 cb_fn(cb_arg, bs->super_blob, 0); 4188 } 4189 } 4190 4191 uint64_t 4192 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 4193 { 4194 return bs->cluster_sz; 4195 } 4196 4197 uint64_t 4198 spdk_bs_get_page_size(struct spdk_blob_store *bs) 4199 { 4200 return SPDK_BS_PAGE_SIZE; 4201 } 4202 4203 uint64_t 4204 spdk_bs_get_io_unit_size(struct spdk_blob_store *bs) 4205 { 4206 return bs->io_unit_size; 4207 } 4208 4209 uint64_t 4210 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 4211 { 4212 return bs->num_free_clusters; 4213 } 4214 4215 uint64_t 4216 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 4217 { 4218 return bs->total_data_clusters; 4219 } 4220 4221 static int 4222 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 4223 { 4224 bs->md_channel = spdk_get_io_channel(bs); 4225 if (!bs->md_channel) { 4226 SPDK_ERRLOG("Failed to get IO channel.\n"); 4227 return -1; 4228 } 4229 4230 return 0; 4231 } 4232 4233 static int 4234 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 4235 { 4236 spdk_put_io_channel(bs->md_channel); 4237 4238 return 0; 4239 } 4240 4241 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 4242 { 4243 assert(blob != NULL); 4244 4245 return blob->id; 4246 } 4247 4248 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 4249 { 4250 assert(blob != NULL); 4251 4252 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 4253 } 4254 4255 uint64_t spdk_blob_get_num_io_units(struct spdk_blob *blob) 4256 { 4257 assert(blob != NULL); 4258 4259 return spdk_blob_get_num_pages(blob) * _spdk_bs_io_unit_per_page(blob->bs); 4260 } 4261 4262 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 4263 { 4264 assert(blob != NULL); 4265 4266 return blob->active.num_clusters; 4267 } 4268 4269 /* START spdk_bs_create_blob */ 4270 4271 static void 4272 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 4273 { 4274 struct spdk_blob *blob = cb_arg; 4275 4276 _spdk_blob_free(blob); 4277 4278 spdk_bs_sequence_finish(seq, bserrno); 4279 } 4280 4281 static int 4282 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_xattr_opts *xattrs, 4283 bool internal) 4284 { 4285 uint64_t i; 4286 size_t value_len = 0; 4287 int rc; 4288 const void *value = NULL; 4289 if (xattrs->count > 0 && xattrs->get_value == NULL) { 4290 return -EINVAL; 4291 } 4292 for (i = 0; i < xattrs->count; i++) { 4293 xattrs->get_value(xattrs->ctx, xattrs->names[i], &value, &value_len); 4294 if (value == NULL || value_len == 0) { 4295 return -EINVAL; 4296 } 4297 rc = _spdk_blob_set_xattr(blob, xattrs->names[i], value, value_len, internal); 4298 if (rc < 0) { 4299 return rc; 4300 } 4301 } 4302 return 0; 4303 } 4304 4305 static void 4306 _spdk_bs_create_blob(struct spdk_blob_store *bs, 4307 const struct spdk_blob_opts *opts, 4308 const struct spdk_blob_xattr_opts *internal_xattrs, 4309 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4310 { 4311 struct spdk_blob *blob; 4312 uint32_t page_idx; 4313 struct spdk_bs_cpl cpl; 4314 struct spdk_blob_opts opts_default; 4315 struct spdk_blob_xattr_opts internal_xattrs_default; 4316 spdk_bs_sequence_t *seq; 4317 spdk_blob_id id; 4318 int rc; 4319 4320 assert(spdk_get_thread() == bs->md_thread); 4321 4322 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 4323 if (page_idx == UINT32_MAX) { 4324 cb_fn(cb_arg, 0, -ENOMEM); 4325 return; 4326 } 4327 spdk_bit_array_set(bs->used_blobids, page_idx); 4328 spdk_bit_array_set(bs->used_md_pages, page_idx); 4329 4330 id = _spdk_bs_page_to_blobid(page_idx); 4331 4332 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 4333 4334 blob = _spdk_blob_alloc(bs, id); 4335 if (!blob) { 4336 cb_fn(cb_arg, 0, -ENOMEM); 4337 return; 4338 } 4339 4340 if (!opts) { 4341 spdk_blob_opts_init(&opts_default); 4342 opts = &opts_default; 4343 } 4344 if (!internal_xattrs) { 4345 _spdk_blob_xattrs_init(&internal_xattrs_default); 4346 internal_xattrs = &internal_xattrs_default; 4347 } 4348 4349 rc = _spdk_blob_set_xattrs(blob, &opts->xattrs, false); 4350 if (rc < 0) { 4351 _spdk_blob_free(blob); 4352 cb_fn(cb_arg, 0, rc); 4353 return; 4354 } 4355 4356 rc = _spdk_blob_set_xattrs(blob, internal_xattrs, true); 4357 if (rc < 0) { 4358 _spdk_blob_free(blob); 4359 cb_fn(cb_arg, 0, rc); 4360 return; 4361 } 4362 4363 if (opts->thin_provision) { 4364 _spdk_blob_set_thin_provision(blob); 4365 } 4366 4367 _spdk_blob_set_clear_method(blob, opts->clear_method); 4368 4369 rc = _spdk_blob_resize(blob, opts->num_clusters); 4370 if (rc < 0) { 4371 _spdk_blob_free(blob); 4372 cb_fn(cb_arg, 0, rc); 4373 return; 4374 } 4375 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4376 cpl.u.blobid.cb_fn = cb_fn; 4377 cpl.u.blobid.cb_arg = cb_arg; 4378 cpl.u.blobid.blobid = blob->id; 4379 4380 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 4381 if (!seq) { 4382 _spdk_blob_free(blob); 4383 cb_fn(cb_arg, 0, -ENOMEM); 4384 return; 4385 } 4386 4387 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 4388 } 4389 4390 void spdk_bs_create_blob(struct spdk_blob_store *bs, 4391 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4392 { 4393 _spdk_bs_create_blob(bs, NULL, NULL, cb_fn, cb_arg); 4394 } 4395 4396 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 4397 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4398 { 4399 _spdk_bs_create_blob(bs, opts, NULL, cb_fn, cb_arg); 4400 } 4401 4402 /* END spdk_bs_create_blob */ 4403 4404 /* START blob_cleanup */ 4405 4406 struct spdk_clone_snapshot_ctx { 4407 struct spdk_bs_cpl cpl; 4408 int bserrno; 4409 bool frozen; 4410 4411 struct spdk_io_channel *channel; 4412 4413 /* Current cluster for inflate operation */ 4414 uint64_t cluster; 4415 4416 /* For inflation force allocation of all unallocated clusters and remove 4417 * thin-provisioning. Otherwise only decouple parent and keep clone thin. */ 4418 bool allocate_all; 4419 4420 struct { 4421 spdk_blob_id id; 4422 struct spdk_blob *blob; 4423 } original; 4424 struct { 4425 spdk_blob_id id; 4426 struct spdk_blob *blob; 4427 } new; 4428 4429 /* xattrs specified for snapshot/clones only. They have no impact on 4430 * the original blobs xattrs. */ 4431 const struct spdk_blob_xattr_opts *xattrs; 4432 }; 4433 4434 static void 4435 _spdk_bs_clone_snapshot_cleanup_finish(void *cb_arg, int bserrno) 4436 { 4437 struct spdk_clone_snapshot_ctx *ctx = cb_arg; 4438 struct spdk_bs_cpl *cpl = &ctx->cpl; 4439 4440 if (bserrno != 0) { 4441 if (ctx->bserrno != 0) { 4442 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4443 } else { 4444 ctx->bserrno = bserrno; 4445 } 4446 } 4447 4448 switch (cpl->type) { 4449 case SPDK_BS_CPL_TYPE_BLOBID: 4450 cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg, cpl->u.blobid.blobid, ctx->bserrno); 4451 break; 4452 case SPDK_BS_CPL_TYPE_BLOB_BASIC: 4453 cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg, ctx->bserrno); 4454 break; 4455 default: 4456 SPDK_UNREACHABLE(); 4457 break; 4458 } 4459 4460 free(ctx); 4461 } 4462 4463 static void 4464 _spdk_bs_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 4465 { 4466 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4467 struct spdk_blob *origblob = ctx->original.blob; 4468 4469 if (bserrno != 0) { 4470 if (ctx->bserrno != 0) { 4471 SPDK_ERRLOG("Unfreeze error %d\n", bserrno); 4472 } else { 4473 ctx->bserrno = bserrno; 4474 } 4475 } 4476 4477 ctx->original.id = origblob->id; 4478 origblob->locked_operation_in_progress = false; 4479 4480 spdk_blob_close(origblob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4481 } 4482 4483 static void 4484 _spdk_bs_clone_snapshot_origblob_cleanup(void *cb_arg, int bserrno) 4485 { 4486 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4487 struct spdk_blob *origblob = ctx->original.blob; 4488 4489 if (bserrno != 0) { 4490 if (ctx->bserrno != 0) { 4491 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4492 } else { 4493 ctx->bserrno = bserrno; 4494 } 4495 } 4496 4497 if (ctx->frozen) { 4498 /* Unfreeze any outstanding I/O */ 4499 _spdk_blob_unfreeze_io(origblob, _spdk_bs_snapshot_unfreeze_cpl, ctx); 4500 } else { 4501 _spdk_bs_snapshot_unfreeze_cpl(ctx, 0); 4502 } 4503 4504 } 4505 4506 static void 4507 _spdk_bs_clone_snapshot_newblob_cleanup(void *cb_arg, int bserrno) 4508 { 4509 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4510 struct spdk_blob *newblob = ctx->new.blob; 4511 4512 if (bserrno != 0) { 4513 if (ctx->bserrno != 0) { 4514 SPDK_ERRLOG("Cleanup error %d\n", bserrno); 4515 } else { 4516 ctx->bserrno = bserrno; 4517 } 4518 } 4519 4520 ctx->new.id = newblob->id; 4521 spdk_blob_close(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4522 } 4523 4524 /* END blob_cleanup */ 4525 4526 /* START spdk_bs_create_snapshot */ 4527 4528 static void 4529 _spdk_bs_snapshot_swap_cluster_maps(struct spdk_blob *blob1, struct spdk_blob *blob2) 4530 { 4531 uint64_t *cluster_temp; 4532 4533 cluster_temp = blob1->active.clusters; 4534 blob1->active.clusters = blob2->active.clusters; 4535 blob2->active.clusters = cluster_temp; 4536 } 4537 4538 static void 4539 _spdk_bs_snapshot_origblob_sync_cpl(void *cb_arg, int bserrno) 4540 { 4541 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4542 struct spdk_blob *origblob = ctx->original.blob; 4543 struct spdk_blob *newblob = ctx->new.blob; 4544 4545 if (bserrno != 0) { 4546 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4547 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4548 return; 4549 } 4550 4551 /* Remove metadata descriptor SNAPSHOT_IN_PROGRESS */ 4552 bserrno = _spdk_blob_remove_xattr(newblob, SNAPSHOT_IN_PROGRESS, true); 4553 if (bserrno != 0) { 4554 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4555 return; 4556 } 4557 4558 _spdk_bs_blob_list_add(ctx->original.blob); 4559 4560 spdk_blob_set_read_only(newblob); 4561 4562 /* sync snapshot metadata */ 4563 spdk_blob_sync_md(newblob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4564 } 4565 4566 static void 4567 _spdk_bs_snapshot_newblob_sync_cpl(void *cb_arg, int bserrno) 4568 { 4569 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4570 struct spdk_blob *origblob = ctx->original.blob; 4571 struct spdk_blob *newblob = ctx->new.blob; 4572 4573 if (bserrno != 0) { 4574 /* return cluster map back to original */ 4575 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4576 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4577 return; 4578 } 4579 4580 /* Set internal xattr for snapshot id */ 4581 bserrno = _spdk_blob_set_xattr(origblob, BLOB_SNAPSHOT, &newblob->id, sizeof(spdk_blob_id), true); 4582 if (bserrno != 0) { 4583 /* return cluster map back to original */ 4584 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4585 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4586 return; 4587 } 4588 4589 _spdk_bs_blob_list_remove(origblob); 4590 origblob->parent_id = newblob->id; 4591 4592 /* Create new back_bs_dev for snapshot */ 4593 origblob->back_bs_dev = spdk_bs_create_blob_bs_dev(newblob); 4594 if (origblob->back_bs_dev == NULL) { 4595 /* return cluster map back to original */ 4596 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4597 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, -EINVAL); 4598 return; 4599 } 4600 4601 /* set clone blob as thin provisioned */ 4602 _spdk_blob_set_thin_provision(origblob); 4603 4604 _spdk_bs_blob_list_add(newblob); 4605 4606 /* sync clone metadata */ 4607 spdk_blob_sync_md(origblob, _spdk_bs_snapshot_origblob_sync_cpl, ctx); 4608 } 4609 4610 static void 4611 _spdk_bs_snapshot_freeze_cpl(void *cb_arg, int rc) 4612 { 4613 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4614 struct spdk_blob *origblob = ctx->original.blob; 4615 struct spdk_blob *newblob = ctx->new.blob; 4616 int bserrno; 4617 4618 if (rc != 0) { 4619 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, rc); 4620 return; 4621 } 4622 4623 ctx->frozen = true; 4624 4625 /* set new back_bs_dev for snapshot */ 4626 newblob->back_bs_dev = origblob->back_bs_dev; 4627 /* Set invalid flags from origblob */ 4628 newblob->invalid_flags = origblob->invalid_flags; 4629 4630 /* inherit parent from original blob if set */ 4631 newblob->parent_id = origblob->parent_id; 4632 if (origblob->parent_id != SPDK_BLOBID_INVALID) { 4633 /* Set internal xattr for snapshot id */ 4634 bserrno = _spdk_blob_set_xattr(newblob, BLOB_SNAPSHOT, 4635 &origblob->parent_id, sizeof(spdk_blob_id), true); 4636 if (bserrno != 0) { 4637 _spdk_bs_clone_snapshot_newblob_cleanup(ctx, bserrno); 4638 return; 4639 } 4640 } 4641 4642 /* swap cluster maps */ 4643 _spdk_bs_snapshot_swap_cluster_maps(newblob, origblob); 4644 4645 /* Set the clear method on the new blob to match the original. */ 4646 _spdk_blob_set_clear_method(newblob, origblob->clear_method); 4647 4648 /* sync snapshot metadata */ 4649 spdk_blob_sync_md(newblob, _spdk_bs_snapshot_newblob_sync_cpl, ctx); 4650 } 4651 4652 static void 4653 _spdk_bs_snapshot_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4654 { 4655 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4656 struct spdk_blob *origblob = ctx->original.blob; 4657 struct spdk_blob *newblob = _blob; 4658 4659 if (bserrno != 0) { 4660 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4661 return; 4662 } 4663 4664 ctx->new.blob = newblob; 4665 assert(spdk_blob_is_thin_provisioned(newblob)); 4666 assert(spdk_mem_all_zero(newblob->active.clusters, 4667 newblob->active.num_clusters * sizeof(*newblob->active.clusters))); 4668 4669 _spdk_blob_freeze_io(origblob, _spdk_bs_snapshot_freeze_cpl, ctx); 4670 } 4671 4672 static void 4673 _spdk_bs_snapshot_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4674 { 4675 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4676 struct spdk_blob *origblob = ctx->original.blob; 4677 4678 if (bserrno != 0) { 4679 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4680 return; 4681 } 4682 4683 ctx->new.id = blobid; 4684 ctx->cpl.u.blobid.blobid = blobid; 4685 4686 spdk_bs_open_blob(origblob->bs, ctx->new.id, _spdk_bs_snapshot_newblob_open_cpl, ctx); 4687 } 4688 4689 4690 static void 4691 _spdk_bs_xattr_snapshot(void *arg, const char *name, 4692 const void **value, size_t *value_len) 4693 { 4694 assert(strncmp(name, SNAPSHOT_IN_PROGRESS, sizeof(SNAPSHOT_IN_PROGRESS)) == 0); 4695 4696 struct spdk_blob *blob = (struct spdk_blob *)arg; 4697 *value = &blob->id; 4698 *value_len = sizeof(blob->id); 4699 } 4700 4701 static void 4702 _spdk_bs_snapshot_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4703 { 4704 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4705 struct spdk_blob_opts opts; 4706 struct spdk_blob_xattr_opts internal_xattrs; 4707 char *xattrs_names[] = { SNAPSHOT_IN_PROGRESS }; 4708 4709 if (bserrno != 0) { 4710 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4711 return; 4712 } 4713 4714 ctx->original.blob = _blob; 4715 4716 if (_blob->data_ro || _blob->md_ro) { 4717 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot from read only blob with id %lu\n", 4718 _blob->id); 4719 ctx->bserrno = -EINVAL; 4720 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4721 return; 4722 } 4723 4724 if (_blob->locked_operation_in_progress) { 4725 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create snapshot - another operation in progress\n"); 4726 ctx->bserrno = -EBUSY; 4727 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4728 return; 4729 } 4730 4731 _blob->locked_operation_in_progress = true; 4732 4733 spdk_blob_opts_init(&opts); 4734 _spdk_blob_xattrs_init(&internal_xattrs); 4735 4736 /* Change the size of new blob to the same as in original blob, 4737 * but do not allocate clusters */ 4738 opts.thin_provision = true; 4739 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4740 4741 /* If there are any xattrs specified for snapshot, set them now */ 4742 if (ctx->xattrs) { 4743 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4744 } 4745 /* Set internal xattr SNAPSHOT_IN_PROGRESS */ 4746 internal_xattrs.count = 1; 4747 internal_xattrs.ctx = _blob; 4748 internal_xattrs.names = xattrs_names; 4749 internal_xattrs.get_value = _spdk_bs_xattr_snapshot; 4750 4751 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4752 _spdk_bs_snapshot_newblob_create_cpl, ctx); 4753 } 4754 4755 void spdk_bs_create_snapshot(struct spdk_blob_store *bs, spdk_blob_id blobid, 4756 const struct spdk_blob_xattr_opts *snapshot_xattrs, 4757 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4758 { 4759 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4760 4761 if (!ctx) { 4762 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4763 return; 4764 } 4765 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4766 ctx->cpl.u.blobid.cb_fn = cb_fn; 4767 ctx->cpl.u.blobid.cb_arg = cb_arg; 4768 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4769 ctx->bserrno = 0; 4770 ctx->frozen = false; 4771 ctx->original.id = blobid; 4772 ctx->xattrs = snapshot_xattrs; 4773 4774 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_snapshot_origblob_open_cpl, ctx); 4775 } 4776 /* END spdk_bs_create_snapshot */ 4777 4778 /* START spdk_bs_create_clone */ 4779 4780 static void 4781 _spdk_bs_xattr_clone(void *arg, const char *name, 4782 const void **value, size_t *value_len) 4783 { 4784 assert(strncmp(name, BLOB_SNAPSHOT, sizeof(BLOB_SNAPSHOT)) == 0); 4785 4786 struct spdk_blob *blob = (struct spdk_blob *)arg; 4787 *value = &blob->id; 4788 *value_len = sizeof(blob->id); 4789 } 4790 4791 static void 4792 _spdk_bs_clone_newblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4793 { 4794 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4795 struct spdk_blob *clone = _blob; 4796 4797 ctx->new.blob = clone; 4798 _spdk_bs_blob_list_add(clone); 4799 4800 spdk_blob_close(clone, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4801 } 4802 4803 static void 4804 _spdk_bs_clone_newblob_create_cpl(void *cb_arg, spdk_blob_id blobid, int bserrno) 4805 { 4806 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4807 4808 ctx->cpl.u.blobid.blobid = blobid; 4809 spdk_bs_open_blob(ctx->original.blob->bs, blobid, _spdk_bs_clone_newblob_open_cpl, ctx); 4810 } 4811 4812 static void 4813 _spdk_bs_clone_origblob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 4814 { 4815 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4816 struct spdk_blob_opts opts; 4817 struct spdk_blob_xattr_opts internal_xattrs; 4818 char *xattr_names[] = { BLOB_SNAPSHOT }; 4819 4820 if (bserrno != 0) { 4821 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 4822 return; 4823 } 4824 4825 ctx->original.blob = _blob; 4826 4827 if (!_blob->data_ro || !_blob->md_ro) { 4828 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Clone not from read-only blob\n"); 4829 ctx->bserrno = -EINVAL; 4830 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4831 return; 4832 } 4833 4834 if (_blob->locked_operation_in_progress) { 4835 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot create clone - another operation in progress\n"); 4836 ctx->bserrno = -EBUSY; 4837 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 4838 return; 4839 } 4840 4841 _blob->locked_operation_in_progress = true; 4842 4843 spdk_blob_opts_init(&opts); 4844 _spdk_blob_xattrs_init(&internal_xattrs); 4845 4846 opts.thin_provision = true; 4847 opts.num_clusters = spdk_blob_get_num_clusters(_blob); 4848 if (ctx->xattrs) { 4849 memcpy(&opts.xattrs, ctx->xattrs, sizeof(*ctx->xattrs)); 4850 } 4851 4852 /* Set internal xattr BLOB_SNAPSHOT */ 4853 internal_xattrs.count = 1; 4854 internal_xattrs.ctx = _blob; 4855 internal_xattrs.names = xattr_names; 4856 internal_xattrs.get_value = _spdk_bs_xattr_clone; 4857 4858 _spdk_bs_create_blob(_blob->bs, &opts, &internal_xattrs, 4859 _spdk_bs_clone_newblob_create_cpl, ctx); 4860 } 4861 4862 void spdk_bs_create_clone(struct spdk_blob_store *bs, spdk_blob_id blobid, 4863 const struct spdk_blob_xattr_opts *clone_xattrs, 4864 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 4865 { 4866 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 4867 4868 if (!ctx) { 4869 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOMEM); 4870 return; 4871 } 4872 4873 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 4874 ctx->cpl.u.blobid.cb_fn = cb_fn; 4875 ctx->cpl.u.blobid.cb_arg = cb_arg; 4876 ctx->cpl.u.blobid.blobid = SPDK_BLOBID_INVALID; 4877 ctx->bserrno = 0; 4878 ctx->xattrs = clone_xattrs; 4879 ctx->original.id = blobid; 4880 4881 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_clone_origblob_open_cpl, ctx); 4882 } 4883 4884 /* END spdk_bs_create_clone */ 4885 4886 /* START spdk_bs_inflate_blob */ 4887 4888 static void 4889 _spdk_bs_inflate_blob_set_parent_cpl(void *cb_arg, struct spdk_blob *_parent, int bserrno) 4890 { 4891 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4892 struct spdk_blob *_blob = ctx->original.blob; 4893 4894 if (bserrno != 0) { 4895 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4896 return; 4897 } 4898 4899 assert(_parent != NULL); 4900 4901 _spdk_bs_blob_list_remove(_blob); 4902 _blob->parent_id = _parent->id; 4903 _spdk_blob_set_xattr(_blob, BLOB_SNAPSHOT, &_blob->parent_id, 4904 sizeof(spdk_blob_id), true); 4905 4906 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4907 _blob->back_bs_dev = spdk_bs_create_blob_bs_dev(_parent); 4908 _spdk_bs_blob_list_add(_blob); 4909 4910 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4911 } 4912 4913 static void 4914 _spdk_bs_inflate_blob_done(void *cb_arg, int bserrno) 4915 { 4916 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4917 struct spdk_blob *_blob = ctx->original.blob; 4918 struct spdk_blob *_parent; 4919 4920 if (bserrno != 0) { 4921 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4922 return; 4923 } 4924 4925 if (ctx->allocate_all) { 4926 /* remove thin provisioning */ 4927 _spdk_bs_blob_list_remove(_blob); 4928 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4929 _blob->invalid_flags = _blob->invalid_flags & ~SPDK_BLOB_THIN_PROV; 4930 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4931 _blob->back_bs_dev = NULL; 4932 _blob->parent_id = SPDK_BLOBID_INVALID; 4933 } else { 4934 _parent = ((struct spdk_blob_bs_dev *)(_blob->back_bs_dev))->blob; 4935 if (_parent->parent_id != SPDK_BLOBID_INVALID) { 4936 /* We must change the parent of the inflated blob */ 4937 spdk_bs_open_blob(_blob->bs, _parent->parent_id, 4938 _spdk_bs_inflate_blob_set_parent_cpl, ctx); 4939 return; 4940 } 4941 4942 _spdk_bs_blob_list_remove(_blob); 4943 _spdk_blob_remove_xattr(_blob, BLOB_SNAPSHOT, true); 4944 _blob->parent_id = SPDK_BLOBID_INVALID; 4945 _blob->back_bs_dev->destroy(_blob->back_bs_dev); 4946 _blob->back_bs_dev = spdk_bs_create_zeroes_dev(); 4947 } 4948 4949 _blob->state = SPDK_BLOB_STATE_DIRTY; 4950 spdk_blob_sync_md(_blob, _spdk_bs_clone_snapshot_origblob_cleanup, ctx); 4951 } 4952 4953 /* Check if cluster needs allocation */ 4954 static inline bool 4955 _spdk_bs_cluster_needs_allocation(struct spdk_blob *blob, uint64_t cluster, bool allocate_all) 4956 { 4957 struct spdk_blob_bs_dev *b; 4958 4959 assert(blob != NULL); 4960 4961 if (blob->active.clusters[cluster] != 0) { 4962 /* Cluster is already allocated */ 4963 return false; 4964 } 4965 4966 if (blob->parent_id == SPDK_BLOBID_INVALID) { 4967 /* Blob have no parent blob */ 4968 return allocate_all; 4969 } 4970 4971 b = (struct spdk_blob_bs_dev *)blob->back_bs_dev; 4972 return (allocate_all || b->blob->active.clusters[cluster] != 0); 4973 } 4974 4975 static void 4976 _spdk_bs_inflate_blob_touch_next(void *cb_arg, int bserrno) 4977 { 4978 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 4979 struct spdk_blob *_blob = ctx->original.blob; 4980 uint64_t offset; 4981 4982 if (bserrno != 0) { 4983 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, bserrno); 4984 return; 4985 } 4986 4987 for (; ctx->cluster < _blob->active.num_clusters; ctx->cluster++) { 4988 if (_spdk_bs_cluster_needs_allocation(_blob, ctx->cluster, ctx->allocate_all)) { 4989 break; 4990 } 4991 } 4992 4993 if (ctx->cluster < _blob->active.num_clusters) { 4994 offset = _spdk_bs_cluster_to_lba(_blob->bs, ctx->cluster); 4995 4996 /* We may safely increment a cluster before write */ 4997 ctx->cluster++; 4998 4999 /* Use zero length write to touch a cluster */ 5000 spdk_blob_io_write(_blob, ctx->channel, NULL, offset, 0, 5001 _spdk_bs_inflate_blob_touch_next, ctx); 5002 } else { 5003 _spdk_bs_inflate_blob_done(cb_arg, bserrno); 5004 } 5005 } 5006 5007 static void 5008 _spdk_bs_inflate_blob_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 5009 { 5010 struct spdk_clone_snapshot_ctx *ctx = (struct spdk_clone_snapshot_ctx *)cb_arg; 5011 uint64_t lfc; /* lowest free cluster */ 5012 uint64_t i; 5013 5014 if (bserrno != 0) { 5015 _spdk_bs_clone_snapshot_cleanup_finish(ctx, bserrno); 5016 return; 5017 } 5018 5019 ctx->original.blob = _blob; 5020 5021 if (_blob->locked_operation_in_progress) { 5022 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot inflate blob - another operation in progress\n"); 5023 ctx->bserrno = -EBUSY; 5024 spdk_blob_close(_blob, _spdk_bs_clone_snapshot_cleanup_finish, ctx); 5025 return; 5026 } 5027 5028 _blob->locked_operation_in_progress = true; 5029 5030 if (!ctx->allocate_all && _blob->parent_id == SPDK_BLOBID_INVALID) { 5031 /* This blob have no parent, so we cannot decouple it. */ 5032 SPDK_ERRLOG("Cannot decouple parent of blob with no parent.\n"); 5033 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -EINVAL); 5034 return; 5035 } 5036 5037 if (spdk_blob_is_thin_provisioned(_blob) == false) { 5038 /* This is not thin provisioned blob. No need to inflate. */ 5039 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, 0); 5040 return; 5041 } 5042 5043 /* Do two passes - one to verify that we can obtain enough clusters 5044 * and another to actually claim them. 5045 */ 5046 lfc = 0; 5047 for (i = 0; i < _blob->active.num_clusters; i++) { 5048 if (_spdk_bs_cluster_needs_allocation(_blob, i, ctx->allocate_all)) { 5049 lfc = spdk_bit_array_find_first_clear(_blob->bs->used_clusters, lfc); 5050 if (lfc == UINT32_MAX) { 5051 /* No more free clusters. Cannot satisfy the request */ 5052 _spdk_bs_clone_snapshot_origblob_cleanup(ctx, -ENOSPC); 5053 return; 5054 } 5055 lfc++; 5056 } 5057 } 5058 5059 ctx->cluster = 0; 5060 _spdk_bs_inflate_blob_touch_next(ctx, 0); 5061 } 5062 5063 static void 5064 _spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5065 spdk_blob_id blobid, bool allocate_all, spdk_blob_op_complete cb_fn, void *cb_arg) 5066 { 5067 struct spdk_clone_snapshot_ctx *ctx = calloc(1, sizeof(*ctx)); 5068 5069 if (!ctx) { 5070 cb_fn(cb_arg, -ENOMEM); 5071 return; 5072 } 5073 ctx->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5074 ctx->cpl.u.bs_basic.cb_fn = cb_fn; 5075 ctx->cpl.u.bs_basic.cb_arg = cb_arg; 5076 ctx->bserrno = 0; 5077 ctx->original.id = blobid; 5078 ctx->channel = channel; 5079 ctx->allocate_all = allocate_all; 5080 5081 spdk_bs_open_blob(bs, ctx->original.id, _spdk_bs_inflate_blob_open_cpl, ctx); 5082 } 5083 5084 void 5085 spdk_bs_inflate_blob(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5086 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5087 { 5088 _spdk_bs_inflate_blob(bs, channel, blobid, true, cb_fn, cb_arg); 5089 } 5090 5091 void 5092 spdk_bs_blob_decouple_parent(struct spdk_blob_store *bs, struct spdk_io_channel *channel, 5093 spdk_blob_id blobid, spdk_blob_op_complete cb_fn, void *cb_arg) 5094 { 5095 _spdk_bs_inflate_blob(bs, channel, blobid, false, cb_fn, cb_arg); 5096 } 5097 /* END spdk_bs_inflate_blob */ 5098 5099 /* START spdk_blob_resize */ 5100 struct spdk_bs_resize_ctx { 5101 spdk_blob_op_complete cb_fn; 5102 void *cb_arg; 5103 struct spdk_blob *blob; 5104 uint64_t sz; 5105 int rc; 5106 }; 5107 5108 static void 5109 _spdk_bs_resize_unfreeze_cpl(void *cb_arg, int rc) 5110 { 5111 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5112 5113 if (rc != 0) { 5114 SPDK_ERRLOG("Unfreeze failed, rc=%d\n", rc); 5115 } 5116 5117 if (ctx->rc != 0) { 5118 SPDK_ERRLOG("Unfreeze failed, ctx->rc=%d\n", ctx->rc); 5119 rc = ctx->rc; 5120 } 5121 5122 ctx->blob->locked_operation_in_progress = false; 5123 5124 ctx->cb_fn(ctx->cb_arg, rc); 5125 free(ctx); 5126 } 5127 5128 static void 5129 _spdk_bs_resize_freeze_cpl(void *cb_arg, int rc) 5130 { 5131 struct spdk_bs_resize_ctx *ctx = (struct spdk_bs_resize_ctx *)cb_arg; 5132 5133 if (rc != 0) { 5134 ctx->blob->locked_operation_in_progress = false; 5135 ctx->cb_fn(ctx->cb_arg, rc); 5136 free(ctx); 5137 return; 5138 } 5139 5140 ctx->rc = _spdk_blob_resize(ctx->blob, ctx->sz); 5141 5142 _spdk_blob_unfreeze_io(ctx->blob, _spdk_bs_resize_unfreeze_cpl, ctx); 5143 } 5144 5145 void 5146 spdk_blob_resize(struct spdk_blob *blob, uint64_t sz, spdk_blob_op_complete cb_fn, void *cb_arg) 5147 { 5148 struct spdk_bs_resize_ctx *ctx; 5149 5150 _spdk_blob_verify_md_op(blob); 5151 5152 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 5153 5154 if (blob->md_ro) { 5155 cb_fn(cb_arg, -EPERM); 5156 return; 5157 } 5158 5159 if (sz == blob->active.num_clusters) { 5160 cb_fn(cb_arg, 0); 5161 return; 5162 } 5163 5164 if (blob->locked_operation_in_progress) { 5165 cb_fn(cb_arg, -EBUSY); 5166 return; 5167 } 5168 5169 ctx = calloc(1, sizeof(*ctx)); 5170 if (!ctx) { 5171 cb_fn(cb_arg, -ENOMEM); 5172 return; 5173 } 5174 5175 blob->locked_operation_in_progress = true; 5176 ctx->cb_fn = cb_fn; 5177 ctx->cb_arg = cb_arg; 5178 ctx->blob = blob; 5179 ctx->sz = sz; 5180 _spdk_blob_freeze_io(blob, _spdk_bs_resize_freeze_cpl, ctx); 5181 } 5182 5183 /* END spdk_blob_resize */ 5184 5185 5186 /* START spdk_bs_delete_blob */ 5187 5188 static void 5189 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 5190 { 5191 spdk_bs_sequence_t *seq = cb_arg; 5192 5193 spdk_bs_sequence_finish(seq, bserrno); 5194 } 5195 5196 static void 5197 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5198 { 5199 struct spdk_blob *blob = cb_arg; 5200 5201 if (bserrno != 0) { 5202 /* 5203 * We already removed this blob from the blobstore tailq, so 5204 * we need to free it here since this is the last reference 5205 * to it. 5206 */ 5207 _spdk_blob_free(blob); 5208 _spdk_bs_delete_close_cpl(seq, bserrno); 5209 return; 5210 } 5211 5212 /* 5213 * This will immediately decrement the ref_count and call 5214 * the completion routine since the metadata state is clean. 5215 * By calling spdk_blob_close, we reduce the number of call 5216 * points into code that touches the blob->open_ref count 5217 * and the blobstore's blob list. 5218 */ 5219 spdk_blob_close(blob, _spdk_bs_delete_close_cpl, seq); 5220 } 5221 5222 struct delete_snapshot_ctx { 5223 struct spdk_blob_list *parent_snapshot_entry; 5224 struct spdk_blob *snapshot; 5225 bool snapshot_md_ro; 5226 struct spdk_blob *clone; 5227 bool clone_md_ro; 5228 spdk_blob_op_with_handle_complete cb_fn; 5229 void *cb_arg; 5230 int bserrno; 5231 }; 5232 5233 static void 5234 _spdk_delete_blob_cleanup_finish(void *cb_arg, int bserrno) 5235 { 5236 struct delete_snapshot_ctx *ctx = cb_arg; 5237 5238 if (bserrno != 0) { 5239 SPDK_ERRLOG("Snapshot cleanup error %d\n", bserrno); 5240 } 5241 5242 assert(ctx != NULL); 5243 5244 if (bserrno != 0 && ctx->bserrno == 0) { 5245 ctx->bserrno = bserrno; 5246 } 5247 5248 ctx->cb_fn(ctx->cb_arg, ctx->snapshot, ctx->bserrno); 5249 free(ctx); 5250 } 5251 5252 static void 5253 _spdk_delete_snapshot_cleanup_snapshot(void *cb_arg, int bserrno) 5254 { 5255 struct delete_snapshot_ctx *ctx = cb_arg; 5256 5257 if (bserrno != 0) { 5258 ctx->bserrno = bserrno; 5259 SPDK_ERRLOG("Clone cleanup error %d\n", bserrno); 5260 } 5261 5262 /* open_ref == 1 menas that only deletion context has opened this snapshot 5263 * open_ref == 2 menas that clone has opened this snapshot as well, 5264 * so we have to add it back to the blobs list */ 5265 if (ctx->snapshot->open_ref == 2) { 5266 TAILQ_INSERT_HEAD(&ctx->snapshot->bs->blobs, ctx->snapshot, link); 5267 } 5268 5269 ctx->snapshot->locked_operation_in_progress = false; 5270 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5271 5272 spdk_blob_close(ctx->snapshot, _spdk_delete_blob_cleanup_finish, ctx); 5273 } 5274 5275 static void 5276 _spdk_delete_snapshot_cleanup_clone(void *cb_arg, int bserrno) 5277 { 5278 struct delete_snapshot_ctx *ctx = cb_arg; 5279 5280 ctx->clone->locked_operation_in_progress = false; 5281 ctx->clone->md_ro = ctx->clone_md_ro; 5282 5283 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5284 } 5285 5286 static void 5287 _spdk_delete_snapshot_unfreeze_cpl(void *cb_arg, int bserrno) 5288 { 5289 struct delete_snapshot_ctx *ctx = cb_arg; 5290 5291 if (bserrno) { 5292 ctx->bserrno = bserrno; 5293 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5294 return; 5295 } 5296 5297 ctx->clone->locked_operation_in_progress = false; 5298 spdk_blob_close(ctx->clone, _spdk_delete_blob_cleanup_finish, ctx); 5299 } 5300 5301 static void 5302 _spdk_delete_snapshot_sync_snapshot_cpl(void *cb_arg, int bserrno) 5303 { 5304 struct delete_snapshot_ctx *ctx = cb_arg; 5305 struct spdk_blob_list *parent_snapshot_entry = NULL; 5306 struct spdk_blob_list *snapshot_entry = NULL; 5307 struct spdk_blob_list *clone_entry = NULL; 5308 struct spdk_blob_list *snapshot_clone_entry = NULL; 5309 5310 if (bserrno) { 5311 SPDK_ERRLOG("Failed to sync MD on blob\n"); 5312 ctx->bserrno = bserrno; 5313 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5314 return; 5315 } 5316 5317 /* Get snapshot entry for the snapshot we want to remove */ 5318 snapshot_entry = _spdk_bs_get_snapshot_entry(ctx->snapshot->bs, ctx->snapshot->id); 5319 5320 assert(snapshot_entry != NULL); 5321 5322 /* Remove clone entry in this snapshot (at this point there can be only one clone) */ 5323 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5324 assert(clone_entry != NULL); 5325 TAILQ_REMOVE(&snapshot_entry->clones, clone_entry, link); 5326 snapshot_entry->clone_count--; 5327 assert(TAILQ_EMPTY(&snapshot_entry->clones)); 5328 5329 if (ctx->snapshot->parent_id != SPDK_BLOBID_INVALID) { 5330 /* This snapshot is at the same time a clone of another snapshot - we need to 5331 * update parent snapshot (remove current clone, add new one inherited from 5332 * the snapshot that is being removed) */ 5333 5334 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5335 * snapshot that we are removing */ 5336 _spdk_blob_get_snapshot_and_clone_entries(ctx->snapshot, &parent_snapshot_entry, 5337 &snapshot_clone_entry); 5338 5339 /* Switch clone entry in parent snapshot */ 5340 TAILQ_INSERT_TAIL(&parent_snapshot_entry->clones, clone_entry, link); 5341 TAILQ_REMOVE(&parent_snapshot_entry->clones, snapshot_clone_entry, link); 5342 free(snapshot_clone_entry); 5343 } else { 5344 /* No parent snapshot - just remove clone entry */ 5345 free(clone_entry); 5346 } 5347 5348 /* Restore md_ro flags */ 5349 ctx->clone->md_ro = ctx->clone_md_ro; 5350 ctx->snapshot->md_ro = ctx->snapshot_md_ro; 5351 5352 _spdk_blob_unfreeze_io(ctx->clone, _spdk_delete_snapshot_unfreeze_cpl, ctx); 5353 } 5354 5355 static void 5356 _spdk_delete_snapshot_sync_clone_cpl(void *cb_arg, int bserrno) 5357 { 5358 struct delete_snapshot_ctx *ctx = cb_arg; 5359 uint64_t i; 5360 5361 ctx->snapshot->md_ro = false; 5362 5363 if (bserrno) { 5364 SPDK_ERRLOG("Failed to sync MD on clone\n"); 5365 ctx->bserrno = bserrno; 5366 5367 /* Restore snapshot to previous state */ 5368 bserrno = _spdk_blob_remove_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, true); 5369 if (bserrno != 0) { 5370 _spdk_delete_snapshot_cleanup_clone(ctx, bserrno); 5371 return; 5372 } 5373 5374 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_cleanup_clone, ctx); 5375 return; 5376 } 5377 5378 /* Clear cluster map entries for snapshot */ 5379 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5380 if (ctx->clone->active.clusters[i] == ctx->snapshot->active.clusters[i]) { 5381 ctx->snapshot->active.clusters[i] = 0; 5382 } 5383 } 5384 5385 ctx->snapshot->state = SPDK_BLOB_STATE_DIRTY; 5386 5387 if (ctx->parent_snapshot_entry != NULL) { 5388 ctx->snapshot->back_bs_dev = NULL; 5389 } 5390 5391 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_cpl, ctx); 5392 } 5393 5394 static void 5395 _spdk_delete_snapshot_sync_snapshot_xattr_cpl(void *cb_arg, int bserrno) 5396 { 5397 struct delete_snapshot_ctx *ctx = cb_arg; 5398 uint64_t i; 5399 5400 /* Temporarily override md_ro flag for clone for MD modification */ 5401 ctx->clone_md_ro = ctx->clone->md_ro; 5402 ctx->clone->md_ro = false; 5403 5404 if (bserrno) { 5405 SPDK_ERRLOG("Failed to sync MD with xattr on blob\n"); 5406 ctx->bserrno = bserrno; 5407 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5408 return; 5409 } 5410 5411 /* Copy snapshot map to clone map (only unallocated clusters in clone) */ 5412 for (i = 0; i < ctx->snapshot->active.num_clusters && i < ctx->clone->active.num_clusters; i++) { 5413 if (ctx->clone->active.clusters[i] == 0) { 5414 ctx->clone->active.clusters[i] = ctx->snapshot->active.clusters[i]; 5415 } 5416 } 5417 5418 /* Delete old backing bs_dev from clone (related to snapshot that will be removed) */ 5419 ctx->clone->back_bs_dev->destroy(ctx->clone->back_bs_dev); 5420 5421 /* Set/remove snapshot xattr and switch parent ID and backing bs_dev on clone... */ 5422 if (ctx->parent_snapshot_entry != NULL) { 5423 /* ...to parent snapshot */ 5424 ctx->clone->parent_id = ctx->parent_snapshot_entry->id; 5425 ctx->clone->back_bs_dev = ctx->snapshot->back_bs_dev; 5426 _spdk_blob_set_xattr(ctx->clone, BLOB_SNAPSHOT, &ctx->parent_snapshot_entry->id, 5427 sizeof(spdk_blob_id), 5428 true); 5429 } else { 5430 /* ...to blobid invalid and zeroes dev */ 5431 ctx->clone->parent_id = SPDK_BLOBID_INVALID; 5432 ctx->clone->back_bs_dev = spdk_bs_create_zeroes_dev(); 5433 _spdk_blob_remove_xattr(ctx->clone, BLOB_SNAPSHOT, true); 5434 } 5435 5436 spdk_blob_sync_md(ctx->clone, _spdk_delete_snapshot_sync_clone_cpl, ctx); 5437 } 5438 5439 static void 5440 _spdk_delete_snapshot_freeze_io_cb(void *cb_arg, int bserrno) 5441 { 5442 struct delete_snapshot_ctx *ctx = cb_arg; 5443 5444 if (bserrno) { 5445 SPDK_ERRLOG("Failed to freeze I/O on clone\n"); 5446 ctx->bserrno = bserrno; 5447 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5448 return; 5449 } 5450 5451 /* Temporarily override md_ro flag for snapshot for MD modification */ 5452 ctx->snapshot_md_ro = ctx->snapshot->md_ro; 5453 ctx->snapshot->md_ro = false; 5454 5455 /* Mark blob as pending for removal for power failure safety, use clone id for recovery */ 5456 ctx->bserrno = _spdk_blob_set_xattr(ctx->snapshot, SNAPSHOT_PENDING_REMOVAL, &ctx->clone->id, 5457 sizeof(spdk_blob_id), true); 5458 if (ctx->bserrno != 0) { 5459 _spdk_delete_snapshot_cleanup_clone(ctx, 0); 5460 return; 5461 } 5462 5463 spdk_blob_sync_md(ctx->snapshot, _spdk_delete_snapshot_sync_snapshot_xattr_cpl, ctx); 5464 } 5465 5466 static void 5467 _spdk_delete_snapshot_open_clone_cb(void *cb_arg, struct spdk_blob *clone, int bserrno) 5468 { 5469 struct delete_snapshot_ctx *ctx = cb_arg; 5470 5471 if (bserrno) { 5472 SPDK_ERRLOG("Failed to open clone\n"); 5473 ctx->bserrno = bserrno; 5474 _spdk_delete_snapshot_cleanup_snapshot(ctx, 0); 5475 return; 5476 } 5477 5478 ctx->clone = clone; 5479 5480 if (clone->locked_operation_in_progress) { 5481 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress on its clone\n"); 5482 ctx->bserrno = -EBUSY; 5483 spdk_blob_close(ctx->clone, _spdk_delete_snapshot_cleanup_snapshot, ctx); 5484 return; 5485 } 5486 5487 clone->locked_operation_in_progress = true; 5488 5489 _spdk_blob_freeze_io(clone, _spdk_delete_snapshot_freeze_io_cb, ctx); 5490 } 5491 5492 static void 5493 _spdk_update_clone_on_snapshot_deletion(struct spdk_blob *snapshot, struct delete_snapshot_ctx *ctx) 5494 { 5495 struct spdk_blob_list *snapshot_entry = NULL; 5496 struct spdk_blob_list *clone_entry = NULL; 5497 struct spdk_blob_list *snapshot_clone_entry = NULL; 5498 5499 /* Get snapshot entry for the snapshot we want to remove */ 5500 snapshot_entry = _spdk_bs_get_snapshot_entry(snapshot->bs, snapshot->id); 5501 5502 assert(snapshot_entry != NULL); 5503 5504 /* Get clone of the snapshot (at this point there can be only one clone) */ 5505 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5506 assert(snapshot_entry->clone_count == 1); 5507 assert(clone_entry != NULL); 5508 5509 /* Get snapshot entry for parent snapshot and clone entry within that snapshot for 5510 * snapshot that we are removing */ 5511 _spdk_blob_get_snapshot_and_clone_entries(snapshot, &ctx->parent_snapshot_entry, 5512 &snapshot_clone_entry); 5513 5514 spdk_bs_open_blob(snapshot->bs, clone_entry->id, _spdk_delete_snapshot_open_clone_cb, ctx); 5515 } 5516 5517 static void 5518 _spdk_bs_delete_blob_finish(void *cb_arg, struct spdk_blob *blob, int bserrno) 5519 { 5520 spdk_bs_sequence_t *seq = cb_arg; 5521 struct spdk_blob_list *snapshot_entry = NULL; 5522 uint32_t page_num; 5523 5524 if (bserrno) { 5525 SPDK_ERRLOG("Failed to remove blob\n"); 5526 spdk_bs_sequence_finish(seq, bserrno); 5527 return; 5528 } 5529 5530 /* Remove snapshot from the list */ 5531 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5532 if (snapshot_entry != NULL) { 5533 TAILQ_REMOVE(&blob->bs->snapshots, snapshot_entry, link); 5534 free(snapshot_entry); 5535 } 5536 5537 page_num = _spdk_bs_blobid_to_page(blob->id); 5538 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 5539 blob->state = SPDK_BLOB_STATE_DIRTY; 5540 blob->active.num_pages = 0; 5541 _spdk_blob_resize(blob, 0); 5542 5543 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, blob); 5544 } 5545 5546 static int 5547 _spdk_bs_is_blob_deletable(struct spdk_blob *blob, bool *update_clone) 5548 { 5549 struct spdk_blob_list *snapshot_entry = NULL; 5550 struct spdk_blob_list *clone_entry = NULL; 5551 struct spdk_blob *clone = NULL; 5552 bool has_one_clone = false; 5553 5554 /* Check if this is a snapshot with clones */ 5555 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 5556 if (snapshot_entry != NULL) { 5557 if (snapshot_entry->clone_count > 1) { 5558 SPDK_ERRLOG("Cannot remove snapshot with more than one clone\n"); 5559 return -EBUSY; 5560 } else if (snapshot_entry->clone_count == 1) { 5561 has_one_clone = true; 5562 } 5563 } 5564 5565 /* Check if someone has this blob open (besides this delete context): 5566 * - open_ref = 1 - only this context opened blob, so it is ok to remove it 5567 * - open_ref <= 2 && has_one_clone = true - clone is holding snapshot 5568 * and that is ok, because we will update it accordingly */ 5569 if (blob->open_ref <= 2 && has_one_clone) { 5570 clone_entry = TAILQ_FIRST(&snapshot_entry->clones); 5571 assert(clone_entry != NULL); 5572 clone = _spdk_blob_lookup(blob->bs, clone_entry->id); 5573 5574 if (blob->open_ref == 2 && clone == NULL) { 5575 /* Clone is closed and someone else opened this blob */ 5576 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5577 return -EBUSY; 5578 } 5579 5580 *update_clone = true; 5581 return 0; 5582 } 5583 5584 if (blob->open_ref > 1) { 5585 SPDK_ERRLOG("Cannot remove snapshot because it is open\n"); 5586 return -EBUSY; 5587 } 5588 5589 assert(has_one_clone == false); 5590 *update_clone = false; 5591 return 0; 5592 } 5593 5594 static void 5595 _spdk_bs_delete_enomem_close_cpl(void *cb_arg, int bserrno) 5596 { 5597 spdk_bs_sequence_t *seq = cb_arg; 5598 5599 spdk_bs_sequence_finish(seq, -ENOMEM); 5600 } 5601 5602 static void 5603 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 5604 { 5605 spdk_bs_sequence_t *seq = cb_arg; 5606 struct delete_snapshot_ctx *ctx; 5607 bool update_clone = false; 5608 5609 if (bserrno != 0) { 5610 spdk_bs_sequence_finish(seq, bserrno); 5611 return; 5612 } 5613 5614 _spdk_blob_verify_md_op(blob); 5615 5616 ctx = calloc(1, sizeof(*ctx)); 5617 if (ctx == NULL) { 5618 spdk_blob_close(blob, _spdk_bs_delete_enomem_close_cpl, seq); 5619 return; 5620 } 5621 5622 ctx->snapshot = blob; 5623 ctx->cb_fn = _spdk_bs_delete_blob_finish; 5624 ctx->cb_arg = seq; 5625 5626 /* Check if blob can be removed and if it is a snapshot with clone on top of it */ 5627 ctx->bserrno = _spdk_bs_is_blob_deletable(blob, &update_clone); 5628 if (ctx->bserrno) { 5629 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5630 return; 5631 } 5632 5633 if (blob->locked_operation_in_progress) { 5634 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Cannot remove blob - another operation in progress\n"); 5635 ctx->bserrno = -EBUSY; 5636 spdk_blob_close(blob, _spdk_delete_blob_cleanup_finish, ctx); 5637 return; 5638 } 5639 5640 blob->locked_operation_in_progress = true; 5641 5642 /* 5643 * Remove the blob from the blob_store list now, to ensure it does not 5644 * get returned after this point by _spdk_blob_lookup(). 5645 */ 5646 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5647 5648 if (update_clone) { 5649 /* This blob is a snapshot with active clone - update clone first */ 5650 _spdk_update_clone_on_snapshot_deletion(blob, ctx); 5651 } else { 5652 /* This blob does not have any clones - just remove it */ 5653 _spdk_bs_blob_list_remove(blob); 5654 _spdk_bs_delete_blob_finish(seq, blob, 0); 5655 free(ctx); 5656 } 5657 } 5658 5659 void 5660 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5661 spdk_blob_op_complete cb_fn, void *cb_arg) 5662 { 5663 struct spdk_bs_cpl cpl; 5664 spdk_bs_sequence_t *seq; 5665 5666 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 5667 5668 assert(spdk_get_thread() == bs->md_thread); 5669 5670 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5671 cpl.u.blob_basic.cb_fn = cb_fn; 5672 cpl.u.blob_basic.cb_arg = cb_arg; 5673 5674 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5675 if (!seq) { 5676 cb_fn(cb_arg, -ENOMEM); 5677 return; 5678 } 5679 5680 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 5681 } 5682 5683 /* END spdk_bs_delete_blob */ 5684 5685 /* START spdk_bs_open_blob */ 5686 5687 static void 5688 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5689 { 5690 struct spdk_blob *blob = cb_arg; 5691 5692 if (bserrno != 0) { 5693 _spdk_blob_free(blob); 5694 seq->cpl.u.blob_handle.blob = NULL; 5695 spdk_bs_sequence_finish(seq, bserrno); 5696 return; 5697 } 5698 5699 blob->open_ref++; 5700 5701 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 5702 5703 spdk_bs_sequence_finish(seq, bserrno); 5704 } 5705 5706 static void _spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5707 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5708 { 5709 struct spdk_blob *blob; 5710 struct spdk_bs_cpl cpl; 5711 struct spdk_blob_open_opts opts_default; 5712 spdk_bs_sequence_t *seq; 5713 uint32_t page_num; 5714 5715 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 5716 assert(spdk_get_thread() == bs->md_thread); 5717 5718 page_num = _spdk_bs_blobid_to_page(blobid); 5719 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 5720 /* Invalid blobid */ 5721 cb_fn(cb_arg, NULL, -ENOENT); 5722 return; 5723 } 5724 5725 blob = _spdk_blob_lookup(bs, blobid); 5726 if (blob) { 5727 blob->open_ref++; 5728 cb_fn(cb_arg, blob, 0); 5729 return; 5730 } 5731 5732 blob = _spdk_blob_alloc(bs, blobid); 5733 if (!blob) { 5734 cb_fn(cb_arg, NULL, -ENOMEM); 5735 return; 5736 } 5737 5738 if (!opts) { 5739 spdk_blob_open_opts_init(&opts_default); 5740 opts = &opts_default; 5741 } 5742 5743 blob->clear_method = opts->clear_method; 5744 5745 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 5746 cpl.u.blob_handle.cb_fn = cb_fn; 5747 cpl.u.blob_handle.cb_arg = cb_arg; 5748 cpl.u.blob_handle.blob = blob; 5749 5750 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 5751 if (!seq) { 5752 _spdk_blob_free(blob); 5753 cb_fn(cb_arg, NULL, -ENOMEM); 5754 return; 5755 } 5756 5757 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 5758 } 5759 5760 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 5761 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5762 { 5763 _spdk_bs_open_blob(bs, blobid, NULL, cb_fn, cb_arg); 5764 } 5765 5766 void spdk_bs_open_blob_ext(struct spdk_blob_store *bs, spdk_blob_id blobid, 5767 struct spdk_blob_open_opts *opts, spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 5768 { 5769 _spdk_bs_open_blob(bs, blobid, opts, cb_fn, cb_arg); 5770 } 5771 5772 /* END spdk_bs_open_blob */ 5773 5774 /* START spdk_blob_set_read_only */ 5775 int spdk_blob_set_read_only(struct spdk_blob *blob) 5776 { 5777 _spdk_blob_verify_md_op(blob); 5778 5779 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 5780 5781 blob->state = SPDK_BLOB_STATE_DIRTY; 5782 return 0; 5783 } 5784 /* END spdk_blob_set_read_only */ 5785 5786 /* START spdk_blob_sync_md */ 5787 5788 static void 5789 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5790 { 5791 struct spdk_blob *blob = cb_arg; 5792 5793 if (bserrno == 0 && (blob->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 5794 blob->data_ro = true; 5795 blob->md_ro = true; 5796 } 5797 5798 spdk_bs_sequence_finish(seq, bserrno); 5799 } 5800 5801 static void 5802 _spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5803 { 5804 struct spdk_bs_cpl cpl; 5805 spdk_bs_sequence_t *seq; 5806 5807 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5808 cpl.u.blob_basic.cb_fn = cb_fn; 5809 cpl.u.blob_basic.cb_arg = cb_arg; 5810 5811 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5812 if (!seq) { 5813 cb_fn(cb_arg, -ENOMEM); 5814 return; 5815 } 5816 5817 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 5818 } 5819 5820 void 5821 spdk_blob_sync_md(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5822 { 5823 _spdk_blob_verify_md_op(blob); 5824 5825 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 5826 5827 if (blob->md_ro) { 5828 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 5829 cb_fn(cb_arg, 0); 5830 return; 5831 } 5832 5833 _spdk_blob_sync_md(blob, cb_fn, cb_arg); 5834 } 5835 5836 /* END spdk_blob_sync_md */ 5837 5838 struct spdk_blob_insert_cluster_ctx { 5839 struct spdk_thread *thread; 5840 struct spdk_blob *blob; 5841 uint32_t cluster_num; /* cluster index in blob */ 5842 uint32_t cluster; /* cluster on disk */ 5843 int rc; 5844 spdk_blob_op_complete cb_fn; 5845 void *cb_arg; 5846 }; 5847 5848 static void 5849 _spdk_blob_insert_cluster_msg_cpl(void *arg) 5850 { 5851 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5852 5853 ctx->cb_fn(ctx->cb_arg, ctx->rc); 5854 free(ctx); 5855 } 5856 5857 static void 5858 _spdk_blob_insert_cluster_msg_cb(void *arg, int bserrno) 5859 { 5860 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5861 5862 ctx->rc = bserrno; 5863 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5864 } 5865 5866 static void 5867 _spdk_blob_insert_cluster_msg(void *arg) 5868 { 5869 struct spdk_blob_insert_cluster_ctx *ctx = arg; 5870 5871 ctx->rc = _spdk_blob_insert_cluster(ctx->blob, ctx->cluster_num, ctx->cluster); 5872 if (ctx->rc != 0) { 5873 spdk_thread_send_msg(ctx->thread, _spdk_blob_insert_cluster_msg_cpl, ctx); 5874 return; 5875 } 5876 5877 ctx->blob->state = SPDK_BLOB_STATE_DIRTY; 5878 _spdk_blob_sync_md(ctx->blob, _spdk_blob_insert_cluster_msg_cb, ctx); 5879 } 5880 5881 static void 5882 _spdk_blob_insert_cluster_on_md_thread(struct spdk_blob *blob, uint32_t cluster_num, 5883 uint64_t cluster, spdk_blob_op_complete cb_fn, void *cb_arg) 5884 { 5885 struct spdk_blob_insert_cluster_ctx *ctx; 5886 5887 ctx = calloc(1, sizeof(*ctx)); 5888 if (ctx == NULL) { 5889 cb_fn(cb_arg, -ENOMEM); 5890 return; 5891 } 5892 5893 ctx->thread = spdk_get_thread(); 5894 ctx->blob = blob; 5895 ctx->cluster_num = cluster_num; 5896 ctx->cluster = cluster; 5897 ctx->cb_fn = cb_fn; 5898 ctx->cb_arg = cb_arg; 5899 5900 spdk_thread_send_msg(blob->bs->md_thread, _spdk_blob_insert_cluster_msg, ctx); 5901 } 5902 5903 /* START spdk_blob_close */ 5904 5905 static void 5906 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 5907 { 5908 struct spdk_blob *blob = cb_arg; 5909 5910 if (bserrno == 0) { 5911 blob->open_ref--; 5912 if (blob->open_ref == 0) { 5913 /* 5914 * Blobs with active.num_pages == 0 are deleted blobs. 5915 * these blobs are removed from the blob_store list 5916 * when the deletion process starts - so don't try to 5917 * remove them again. 5918 */ 5919 if (blob->active.num_pages > 0) { 5920 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 5921 } 5922 _spdk_blob_free(blob); 5923 } 5924 } 5925 5926 spdk_bs_sequence_finish(seq, bserrno); 5927 } 5928 5929 void spdk_blob_close(struct spdk_blob *blob, spdk_blob_op_complete cb_fn, void *cb_arg) 5930 { 5931 struct spdk_bs_cpl cpl; 5932 spdk_bs_sequence_t *seq; 5933 5934 _spdk_blob_verify_md_op(blob); 5935 5936 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 5937 5938 if (blob->open_ref == 0) { 5939 cb_fn(cb_arg, -EBADF); 5940 return; 5941 } 5942 5943 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 5944 cpl.u.blob_basic.cb_fn = cb_fn; 5945 cpl.u.blob_basic.cb_arg = cb_arg; 5946 5947 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 5948 if (!seq) { 5949 cb_fn(cb_arg, -ENOMEM); 5950 return; 5951 } 5952 5953 /* Sync metadata */ 5954 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 5955 } 5956 5957 /* END spdk_blob_close */ 5958 5959 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 5960 { 5961 return spdk_get_io_channel(bs); 5962 } 5963 5964 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 5965 { 5966 spdk_put_io_channel(channel); 5967 } 5968 5969 void spdk_blob_io_unmap(struct spdk_blob *blob, struct spdk_io_channel *channel, 5970 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5971 { 5972 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5973 SPDK_BLOB_UNMAP); 5974 } 5975 5976 void spdk_blob_io_write_zeroes(struct spdk_blob *blob, struct spdk_io_channel *channel, 5977 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 5978 { 5979 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 5980 SPDK_BLOB_WRITE_ZEROES); 5981 } 5982 5983 void spdk_blob_io_write(struct spdk_blob *blob, struct spdk_io_channel *channel, 5984 void *payload, uint64_t offset, uint64_t length, 5985 spdk_blob_op_complete cb_fn, void *cb_arg) 5986 { 5987 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 5988 SPDK_BLOB_WRITE); 5989 } 5990 5991 void spdk_blob_io_read(struct spdk_blob *blob, struct spdk_io_channel *channel, 5992 void *payload, uint64_t offset, uint64_t length, 5993 spdk_blob_op_complete cb_fn, void *cb_arg) 5994 { 5995 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 5996 SPDK_BLOB_READ); 5997 } 5998 5999 void spdk_blob_io_writev(struct spdk_blob *blob, struct spdk_io_channel *channel, 6000 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6001 spdk_blob_op_complete cb_fn, void *cb_arg) 6002 { 6003 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 6004 } 6005 6006 void spdk_blob_io_readv(struct spdk_blob *blob, struct spdk_io_channel *channel, 6007 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 6008 spdk_blob_op_complete cb_fn, void *cb_arg) 6009 { 6010 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 6011 } 6012 6013 struct spdk_bs_iter_ctx { 6014 int64_t page_num; 6015 struct spdk_blob_store *bs; 6016 6017 spdk_blob_op_with_handle_complete cb_fn; 6018 void *cb_arg; 6019 }; 6020 6021 static void 6022 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 6023 { 6024 struct spdk_bs_iter_ctx *ctx = cb_arg; 6025 struct spdk_blob_store *bs = ctx->bs; 6026 spdk_blob_id id; 6027 6028 if (bserrno == 0) { 6029 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 6030 free(ctx); 6031 return; 6032 } 6033 6034 ctx->page_num++; 6035 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 6036 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 6037 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 6038 free(ctx); 6039 return; 6040 } 6041 6042 id = _spdk_bs_page_to_blobid(ctx->page_num); 6043 6044 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 6045 } 6046 6047 void 6048 spdk_bs_iter_first(struct spdk_blob_store *bs, 6049 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6050 { 6051 struct spdk_bs_iter_ctx *ctx; 6052 6053 ctx = calloc(1, sizeof(*ctx)); 6054 if (!ctx) { 6055 cb_fn(cb_arg, NULL, -ENOMEM); 6056 return; 6057 } 6058 6059 ctx->page_num = -1; 6060 ctx->bs = bs; 6061 ctx->cb_fn = cb_fn; 6062 ctx->cb_arg = cb_arg; 6063 6064 _spdk_bs_iter_cpl(ctx, NULL, -1); 6065 } 6066 6067 static void 6068 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 6069 { 6070 struct spdk_bs_iter_ctx *ctx = cb_arg; 6071 6072 _spdk_bs_iter_cpl(ctx, NULL, -1); 6073 } 6074 6075 void 6076 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *blob, 6077 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 6078 { 6079 struct spdk_bs_iter_ctx *ctx; 6080 6081 assert(blob != NULL); 6082 6083 ctx = calloc(1, sizeof(*ctx)); 6084 if (!ctx) { 6085 cb_fn(cb_arg, NULL, -ENOMEM); 6086 return; 6087 } 6088 6089 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 6090 ctx->bs = bs; 6091 ctx->cb_fn = cb_fn; 6092 ctx->cb_arg = cb_arg; 6093 6094 /* Close the existing blob */ 6095 spdk_blob_close(blob, _spdk_bs_iter_close_cpl, ctx); 6096 } 6097 6098 static int 6099 _spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6100 uint16_t value_len, bool internal) 6101 { 6102 struct spdk_xattr_tailq *xattrs; 6103 struct spdk_xattr *xattr; 6104 size_t desc_size; 6105 6106 _spdk_blob_verify_md_op(blob); 6107 6108 if (blob->md_ro) { 6109 return -EPERM; 6110 } 6111 6112 desc_size = sizeof(struct spdk_blob_md_descriptor_xattr) + strlen(name) + value_len; 6113 if (desc_size > SPDK_BS_MAX_DESC_SIZE) { 6114 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Xattr '%s' of size %ld does not fix into single page %ld\n", name, 6115 desc_size, SPDK_BS_MAX_DESC_SIZE); 6116 return -ENOMEM; 6117 } 6118 6119 if (internal) { 6120 xattrs = &blob->xattrs_internal; 6121 blob->invalid_flags |= SPDK_BLOB_INTERNAL_XATTR; 6122 } else { 6123 xattrs = &blob->xattrs; 6124 } 6125 6126 TAILQ_FOREACH(xattr, xattrs, link) { 6127 if (!strcmp(name, xattr->name)) { 6128 free(xattr->value); 6129 xattr->value_len = value_len; 6130 xattr->value = malloc(value_len); 6131 memcpy(xattr->value, value, value_len); 6132 6133 blob->state = SPDK_BLOB_STATE_DIRTY; 6134 6135 return 0; 6136 } 6137 } 6138 6139 xattr = calloc(1, sizeof(*xattr)); 6140 if (!xattr) { 6141 return -ENOMEM; 6142 } 6143 xattr->name = strdup(name); 6144 xattr->value_len = value_len; 6145 xattr->value = malloc(value_len); 6146 memcpy(xattr->value, value, value_len); 6147 TAILQ_INSERT_TAIL(xattrs, xattr, link); 6148 6149 blob->state = SPDK_BLOB_STATE_DIRTY; 6150 6151 return 0; 6152 } 6153 6154 int 6155 spdk_blob_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 6156 uint16_t value_len) 6157 { 6158 return _spdk_blob_set_xattr(blob, name, value, value_len, false); 6159 } 6160 6161 static int 6162 _spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name, bool internal) 6163 { 6164 struct spdk_xattr_tailq *xattrs; 6165 struct spdk_xattr *xattr; 6166 6167 _spdk_blob_verify_md_op(blob); 6168 6169 if (blob->md_ro) { 6170 return -EPERM; 6171 } 6172 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6173 6174 TAILQ_FOREACH(xattr, xattrs, link) { 6175 if (!strcmp(name, xattr->name)) { 6176 TAILQ_REMOVE(xattrs, xattr, link); 6177 free(xattr->value); 6178 free(xattr->name); 6179 free(xattr); 6180 6181 if (internal && TAILQ_EMPTY(&blob->xattrs_internal)) { 6182 blob->invalid_flags &= ~SPDK_BLOB_INTERNAL_XATTR; 6183 } 6184 blob->state = SPDK_BLOB_STATE_DIRTY; 6185 6186 return 0; 6187 } 6188 } 6189 6190 return -ENOENT; 6191 } 6192 6193 int 6194 spdk_blob_remove_xattr(struct spdk_blob *blob, const char *name) 6195 { 6196 return _spdk_blob_remove_xattr(blob, name, false); 6197 } 6198 6199 static int 6200 _spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6201 const void **value, size_t *value_len, bool internal) 6202 { 6203 struct spdk_xattr *xattr; 6204 struct spdk_xattr_tailq *xattrs; 6205 6206 xattrs = internal ? &blob->xattrs_internal : &blob->xattrs; 6207 6208 TAILQ_FOREACH(xattr, xattrs, link) { 6209 if (!strcmp(name, xattr->name)) { 6210 *value = xattr->value; 6211 *value_len = xattr->value_len; 6212 return 0; 6213 } 6214 } 6215 return -ENOENT; 6216 } 6217 6218 int 6219 spdk_blob_get_xattr_value(struct spdk_blob *blob, const char *name, 6220 const void **value, size_t *value_len) 6221 { 6222 _spdk_blob_verify_md_op(blob); 6223 6224 return _spdk_blob_get_xattr_value(blob, name, value, value_len, false); 6225 } 6226 6227 struct spdk_xattr_names { 6228 uint32_t count; 6229 const char *names[0]; 6230 }; 6231 6232 static int 6233 _spdk_blob_get_xattr_names(struct spdk_xattr_tailq *xattrs, struct spdk_xattr_names **names) 6234 { 6235 struct spdk_xattr *xattr; 6236 int count = 0; 6237 6238 TAILQ_FOREACH(xattr, xattrs, link) { 6239 count++; 6240 } 6241 6242 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 6243 if (*names == NULL) { 6244 return -ENOMEM; 6245 } 6246 6247 TAILQ_FOREACH(xattr, xattrs, link) { 6248 (*names)->names[(*names)->count++] = xattr->name; 6249 } 6250 6251 return 0; 6252 } 6253 6254 int 6255 spdk_blob_get_xattr_names(struct spdk_blob *blob, struct spdk_xattr_names **names) 6256 { 6257 _spdk_blob_verify_md_op(blob); 6258 6259 return _spdk_blob_get_xattr_names(&blob->xattrs, names); 6260 } 6261 6262 uint32_t 6263 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 6264 { 6265 assert(names != NULL); 6266 6267 return names->count; 6268 } 6269 6270 const char * 6271 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 6272 { 6273 if (index >= names->count) { 6274 return NULL; 6275 } 6276 6277 return names->names[index]; 6278 } 6279 6280 void 6281 spdk_xattr_names_free(struct spdk_xattr_names *names) 6282 { 6283 free(names); 6284 } 6285 6286 struct spdk_bs_type 6287 spdk_bs_get_bstype(struct spdk_blob_store *bs) 6288 { 6289 return bs->bstype; 6290 } 6291 6292 void 6293 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 6294 { 6295 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 6296 } 6297 6298 bool 6299 spdk_blob_is_read_only(struct spdk_blob *blob) 6300 { 6301 assert(blob != NULL); 6302 return (blob->data_ro || blob->md_ro); 6303 } 6304 6305 bool 6306 spdk_blob_is_snapshot(struct spdk_blob *blob) 6307 { 6308 struct spdk_blob_list *snapshot_entry; 6309 6310 assert(blob != NULL); 6311 6312 snapshot_entry = _spdk_bs_get_snapshot_entry(blob->bs, blob->id); 6313 if (snapshot_entry == NULL) { 6314 return false; 6315 } 6316 6317 return true; 6318 } 6319 6320 bool 6321 spdk_blob_is_clone(struct spdk_blob *blob) 6322 { 6323 assert(blob != NULL); 6324 6325 if (blob->parent_id != SPDK_BLOBID_INVALID) { 6326 assert(spdk_blob_is_thin_provisioned(blob)); 6327 return true; 6328 } 6329 6330 return false; 6331 } 6332 6333 bool 6334 spdk_blob_is_thin_provisioned(struct spdk_blob *blob) 6335 { 6336 assert(blob != NULL); 6337 return !!(blob->invalid_flags & SPDK_BLOB_THIN_PROV); 6338 } 6339 6340 static void 6341 _spdk_blob_update_clear_method(struct spdk_blob *blob) 6342 { 6343 enum blob_clear_method stored_cm; 6344 6345 assert(blob != NULL); 6346 6347 /* If BLOB_CLEAR_WITH_DEFAULT was passed in, use the setting stored 6348 * in metadata previously. If something other than the default was 6349 * specified, ignore stored value and used what was passed in. 6350 */ 6351 stored_cm = ((blob->md_ro_flags & SPDK_BLOB_CLEAR_METHOD) >> SPDK_BLOB_CLEAR_METHOD_SHIFT); 6352 6353 if (blob->clear_method == BLOB_CLEAR_WITH_DEFAULT) { 6354 blob->clear_method = stored_cm; 6355 } else if (blob->clear_method != stored_cm) { 6356 SPDK_WARNLOG("Using passed in clear method 0x%x instead of stored value of 0x%x\n", 6357 blob->clear_method, stored_cm); 6358 } 6359 } 6360 6361 spdk_blob_id 6362 spdk_blob_get_parent_snapshot(struct spdk_blob_store *bs, spdk_blob_id blob_id) 6363 { 6364 struct spdk_blob_list *snapshot_entry = NULL; 6365 struct spdk_blob_list *clone_entry = NULL; 6366 6367 TAILQ_FOREACH(snapshot_entry, &bs->snapshots, link) { 6368 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6369 if (clone_entry->id == blob_id) { 6370 return snapshot_entry->id; 6371 } 6372 } 6373 } 6374 6375 return SPDK_BLOBID_INVALID; 6376 } 6377 6378 int 6379 spdk_blob_get_clones(struct spdk_blob_store *bs, spdk_blob_id blobid, spdk_blob_id *ids, 6380 size_t *count) 6381 { 6382 struct spdk_blob_list *snapshot_entry, *clone_entry; 6383 size_t n; 6384 6385 snapshot_entry = _spdk_bs_get_snapshot_entry(bs, blobid); 6386 if (snapshot_entry == NULL) { 6387 *count = 0; 6388 return 0; 6389 } 6390 6391 if (ids == NULL || *count < snapshot_entry->clone_count) { 6392 *count = snapshot_entry->clone_count; 6393 return -ENOMEM; 6394 } 6395 *count = snapshot_entry->clone_count; 6396 6397 n = 0; 6398 TAILQ_FOREACH(clone_entry, &snapshot_entry->clones, link) { 6399 ids[n++] = clone_entry->id; 6400 } 6401 6402 return 0; 6403 } 6404 6405 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 6406