1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 53 54 static inline size_t 55 divide_round_up(size_t num, size_t divisor) 56 { 57 return (num + divisor - 1) / divisor; 58 } 59 60 static void 61 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 62 { 63 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 64 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 65 assert(bs->num_free_clusters > 0); 66 67 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 68 69 spdk_bit_array_set(bs->used_clusters, cluster_num); 70 bs->num_free_clusters--; 71 } 72 73 static void 74 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 75 { 76 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 77 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 78 assert(bs->num_free_clusters < bs->total_clusters); 79 80 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 81 82 spdk_bit_array_clear(bs->used_clusters, cluster_num); 83 bs->num_free_clusters++; 84 } 85 86 void 87 spdk_blob_opts_init(struct spdk_blob_opts *opts) 88 { 89 opts->num_clusters = 0; 90 opts->xattr_count = 0; 91 opts->xattr_names = NULL; 92 opts->xattr_ctx = NULL; 93 opts->get_xattr_value = NULL; 94 } 95 96 static struct spdk_blob_data * 97 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 98 { 99 struct spdk_blob_data *blob; 100 101 blob = calloc(1, sizeof(*blob)); 102 if (!blob) { 103 return NULL; 104 } 105 106 blob->id = id; 107 blob->bs = bs; 108 109 blob->state = SPDK_BLOB_STATE_DIRTY; 110 blob->active.num_pages = 1; 111 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 112 if (!blob->active.pages) { 113 free(blob); 114 return NULL; 115 } 116 117 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 118 119 TAILQ_INIT(&blob->xattrs); 120 121 return blob; 122 } 123 124 static void 125 _spdk_blob_free(struct spdk_blob_data *blob) 126 { 127 struct spdk_xattr *xattr, *xattr_tmp; 128 129 assert(blob != NULL); 130 131 free(blob->active.clusters); 132 free(blob->clean.clusters); 133 free(blob->active.pages); 134 free(blob->clean.pages); 135 136 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 137 TAILQ_REMOVE(&blob->xattrs, xattr, link); 138 free(xattr->name); 139 free(xattr->value); 140 free(xattr); 141 } 142 143 free(blob); 144 } 145 146 static int 147 _spdk_blob_mark_clean(struct spdk_blob_data *blob) 148 { 149 uint64_t *clusters = NULL; 150 uint32_t *pages = NULL; 151 152 assert(blob != NULL); 153 assert(blob->state == SPDK_BLOB_STATE_LOADING || 154 blob->state == SPDK_BLOB_STATE_SYNCING); 155 156 if (blob->active.num_clusters) { 157 assert(blob->active.clusters); 158 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 159 if (!clusters) { 160 return -1; 161 } 162 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 163 } 164 165 if (blob->active.num_pages) { 166 assert(blob->active.pages); 167 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 168 if (!pages) { 169 free(clusters); 170 return -1; 171 } 172 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 173 } 174 175 free(blob->clean.clusters); 176 free(blob->clean.pages); 177 178 blob->clean.num_clusters = blob->active.num_clusters; 179 blob->clean.clusters = blob->active.clusters; 180 blob->clean.num_pages = blob->active.num_pages; 181 blob->clean.pages = blob->active.pages; 182 183 blob->active.clusters = clusters; 184 blob->active.pages = pages; 185 186 blob->state = SPDK_BLOB_STATE_CLEAN; 187 188 return 0; 189 } 190 191 static int 192 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob) 193 { 194 struct spdk_blob_md_descriptor *desc; 195 size_t cur_desc = 0; 196 void *tmp; 197 198 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 199 while (cur_desc < sizeof(page->descriptors)) { 200 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 201 if (desc->length == 0) { 202 /* If padding and length are 0, this terminates the page */ 203 break; 204 } 205 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 206 struct spdk_blob_md_descriptor_flags *desc_flags; 207 208 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 209 210 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 211 return -EINVAL; 212 } 213 214 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 215 SPDK_BLOB_INVALID_FLAGS_MASK) { 216 return -EINVAL; 217 } 218 219 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 220 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 221 blob->data_ro = true; 222 blob->md_ro = true; 223 } 224 225 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 226 SPDK_BLOB_MD_RO_FLAGS_MASK) { 227 blob->md_ro = true; 228 } 229 230 if ((desc_flags->data_ro_flags & SPDK_BLOB_READ_ONLY)) { 231 blob->data_ro = true; 232 blob->md_ro = true; 233 } 234 235 blob->invalid_flags = desc_flags->invalid_flags; 236 blob->data_ro_flags = desc_flags->data_ro_flags; 237 blob->md_ro_flags = desc_flags->md_ro_flags; 238 239 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 240 struct spdk_blob_md_descriptor_extent *desc_extent; 241 unsigned int i, j; 242 unsigned int cluster_count = blob->active.num_clusters; 243 244 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 245 246 if (desc_extent->length == 0 || 247 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) { 248 return -EINVAL; 249 } 250 251 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 252 for (j = 0; j < desc_extent->extents[i].length; j++) { 253 if (!spdk_bit_array_get(blob->bs->used_clusters, 254 desc_extent->extents[i].cluster_idx + j)) { 255 return -EINVAL; 256 } 257 cluster_count++; 258 } 259 } 260 261 if (cluster_count == 0) { 262 return -EINVAL; 263 } 264 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 265 if (tmp == NULL) { 266 return -ENOMEM; 267 } 268 blob->active.clusters = tmp; 269 blob->active.cluster_array_size = cluster_count; 270 271 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 272 for (j = 0; j < desc_extent->extents[i].length; j++) { 273 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 274 desc_extent->extents[i].cluster_idx + j); 275 } 276 } 277 278 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 279 struct spdk_blob_md_descriptor_xattr *desc_xattr; 280 struct spdk_xattr *xattr; 281 282 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 283 284 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 285 sizeof(desc_xattr->value_length) + 286 desc_xattr->name_length + desc_xattr->value_length) { 287 return -EINVAL; 288 } 289 290 xattr = calloc(1, sizeof(*xattr)); 291 if (xattr == NULL) { 292 return -ENOMEM; 293 } 294 295 xattr->name = malloc(desc_xattr->name_length + 1); 296 if (xattr->name == NULL) { 297 free(xattr); 298 return -ENOMEM; 299 } 300 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 301 xattr->name[desc_xattr->name_length] = '\0'; 302 303 xattr->value = malloc(desc_xattr->value_length); 304 if (xattr->value == NULL) { 305 free(xattr->name); 306 free(xattr); 307 return -ENOMEM; 308 } 309 xattr->value_len = desc_xattr->value_length; 310 memcpy(xattr->value, 311 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 312 desc_xattr->value_length); 313 314 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 315 } else { 316 /* Unrecognized descriptor type. Do not fail - just continue to the 317 * next descriptor. If this descriptor is associated with some feature 318 * defined in a newer version of blobstore, that version of blobstore 319 * should create and set an associated feature flag to specify if this 320 * blob can be loaded or not. 321 */ 322 } 323 324 /* Advance to the next descriptor */ 325 cur_desc += sizeof(*desc) + desc->length; 326 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 327 break; 328 } 329 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 330 } 331 332 return 0; 333 } 334 335 static int 336 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 337 struct spdk_blob_data *blob) 338 { 339 const struct spdk_blob_md_page *page; 340 uint32_t i; 341 int rc; 342 343 assert(page_count > 0); 344 assert(pages[0].sequence_num == 0); 345 assert(blob != NULL); 346 assert(blob->state == SPDK_BLOB_STATE_LOADING); 347 assert(blob->active.clusters == NULL); 348 assert(blob->state == SPDK_BLOB_STATE_LOADING); 349 350 /* The blobid provided doesn't match what's in the MD, this can 351 * happen for example if a bogus blobid is passed in through open. 352 */ 353 if (blob->id != pages[0].id) { 354 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 355 blob->id, pages[0].id); 356 return -ENOENT; 357 } 358 359 for (i = 0; i < page_count; i++) { 360 page = &pages[i]; 361 362 assert(page->id == blob->id); 363 assert(page->sequence_num == i); 364 365 rc = _spdk_blob_parse_page(page, blob); 366 if (rc != 0) { 367 return rc; 368 } 369 } 370 371 return 0; 372 } 373 374 static int 375 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob, 376 struct spdk_blob_md_page **pages, 377 uint32_t *page_count, 378 struct spdk_blob_md_page **last_page) 379 { 380 struct spdk_blob_md_page *page; 381 382 assert(pages != NULL); 383 assert(page_count != NULL); 384 385 if (*page_count == 0) { 386 assert(*pages == NULL); 387 *page_count = 1; 388 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 389 SPDK_BS_PAGE_SIZE, 390 NULL); 391 } else { 392 assert(*pages != NULL); 393 (*page_count)++; 394 *pages = spdk_dma_realloc(*pages, 395 SPDK_BS_PAGE_SIZE * (*page_count), 396 SPDK_BS_PAGE_SIZE, 397 NULL); 398 } 399 400 if (*pages == NULL) { 401 *page_count = 0; 402 *last_page = NULL; 403 return -ENOMEM; 404 } 405 406 page = &(*pages)[*page_count - 1]; 407 memset(page, 0, sizeof(*page)); 408 page->id = blob->id; 409 page->sequence_num = *page_count - 1; 410 page->next = SPDK_INVALID_MD_PAGE; 411 *last_page = page; 412 413 return 0; 414 } 415 416 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 417 * Update required_sz on both success and failure. 418 * 419 */ 420 static int 421 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 422 uint8_t *buf, size_t buf_sz, 423 size_t *required_sz) 424 { 425 struct spdk_blob_md_descriptor_xattr *desc; 426 427 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 428 strlen(xattr->name) + 429 xattr->value_len; 430 431 if (buf_sz < *required_sz) { 432 return -1; 433 } 434 435 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 436 437 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 438 desc->length = sizeof(desc->name_length) + 439 sizeof(desc->value_length) + 440 strlen(xattr->name) + 441 xattr->value_len; 442 desc->name_length = strlen(xattr->name); 443 desc->value_length = xattr->value_len; 444 445 memcpy(desc->name, xattr->name, desc->name_length); 446 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 447 xattr->value, 448 desc->value_length); 449 450 return 0; 451 } 452 453 static void 454 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob, 455 uint64_t start_cluster, uint64_t *next_cluster, 456 uint8_t *buf, size_t buf_sz) 457 { 458 struct spdk_blob_md_descriptor_extent *desc; 459 size_t cur_sz; 460 uint64_t i, extent_idx; 461 uint32_t lba, lba_per_cluster, lba_count; 462 463 /* The buffer must have room for at least one extent */ 464 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 465 if (buf_sz < cur_sz) { 466 *next_cluster = start_cluster; 467 return; 468 } 469 470 desc = (struct spdk_blob_md_descriptor_extent *)buf; 471 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 472 473 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 474 475 lba = blob->active.clusters[start_cluster]; 476 lba_count = lba_per_cluster; 477 extent_idx = 0; 478 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 479 if ((lba + lba_count) == blob->active.clusters[i]) { 480 lba_count += lba_per_cluster; 481 continue; 482 } 483 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 484 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 485 extent_idx++; 486 487 cur_sz += sizeof(desc->extents[extent_idx]); 488 489 if (buf_sz < cur_sz) { 490 /* If we ran out of buffer space, return */ 491 desc->length = sizeof(desc->extents[0]) * extent_idx; 492 *next_cluster = i; 493 return; 494 } 495 496 lba = blob->active.clusters[i]; 497 lba_count = lba_per_cluster; 498 } 499 500 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 501 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 502 extent_idx++; 503 504 desc->length = sizeof(desc->extents[0]) * extent_idx; 505 *next_cluster = blob->active.num_clusters; 506 507 return; 508 } 509 510 static void 511 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob, 512 uint8_t *buf, size_t *buf_sz) 513 { 514 struct spdk_blob_md_descriptor_flags *desc; 515 516 /* 517 * Flags get serialized first, so we should always have room for the flags 518 * descriptor. 519 */ 520 assert(*buf_sz >= sizeof(*desc)); 521 522 desc = (struct spdk_blob_md_descriptor_flags *)buf; 523 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 524 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 525 desc->invalid_flags = blob->invalid_flags; 526 desc->data_ro_flags = blob->data_ro_flags; 527 desc->md_ro_flags = blob->md_ro_flags; 528 529 *buf_sz -= sizeof(*desc); 530 } 531 532 static int 533 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages, 534 uint32_t *page_count) 535 { 536 struct spdk_blob_md_page *cur_page; 537 const struct spdk_xattr *xattr; 538 int rc; 539 uint8_t *buf; 540 size_t remaining_sz; 541 uint64_t last_cluster; 542 543 assert(pages != NULL); 544 assert(page_count != NULL); 545 assert(blob != NULL); 546 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 547 548 *pages = NULL; 549 *page_count = 0; 550 551 /* A blob always has at least 1 page, even if it has no descriptors */ 552 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 553 if (rc < 0) { 554 return rc; 555 } 556 557 buf = (uint8_t *)cur_page->descriptors; 558 remaining_sz = sizeof(cur_page->descriptors); 559 560 /* Serialize flags */ 561 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 562 563 /* Serialize xattrs */ 564 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 565 size_t required_sz = 0; 566 rc = _spdk_blob_serialize_xattr(xattr, 567 buf, remaining_sz, 568 &required_sz); 569 if (rc < 0) { 570 /* Need to add a new page to the chain */ 571 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 572 &cur_page); 573 if (rc < 0) { 574 spdk_dma_free(*pages); 575 *pages = NULL; 576 *page_count = 0; 577 return rc; 578 } 579 580 buf = (uint8_t *)cur_page->descriptors; 581 remaining_sz = sizeof(cur_page->descriptors); 582 583 /* Try again */ 584 required_sz = 0; 585 rc = _spdk_blob_serialize_xattr(xattr, 586 buf, remaining_sz, 587 &required_sz); 588 589 if (rc < 0) { 590 spdk_dma_free(*pages); 591 *pages = NULL; 592 *page_count = 0; 593 return -1; 594 } 595 } 596 597 remaining_sz -= required_sz; 598 buf += required_sz; 599 } 600 601 /* Serialize extents */ 602 last_cluster = 0; 603 while (last_cluster < blob->active.num_clusters) { 604 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 605 buf, remaining_sz); 606 607 if (last_cluster == blob->active.num_clusters) { 608 break; 609 } 610 611 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 612 &cur_page); 613 if (rc < 0) { 614 return rc; 615 } 616 617 buf = (uint8_t *)cur_page->descriptors; 618 remaining_sz = sizeof(cur_page->descriptors); 619 } 620 621 return 0; 622 } 623 624 struct spdk_blob_load_ctx { 625 struct spdk_blob_data *blob; 626 627 struct spdk_blob_md_page *pages; 628 uint32_t num_pages; 629 630 spdk_bs_sequence_cpl cb_fn; 631 void *cb_arg; 632 }; 633 634 static uint32_t 635 _spdk_blob_md_page_calc_crc(void *page) 636 { 637 uint32_t crc; 638 639 crc = BLOB_CRC32C_INITIAL; 640 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 641 crc ^= BLOB_CRC32C_INITIAL; 642 643 return crc; 644 645 } 646 647 static void 648 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 649 { 650 struct spdk_blob_load_ctx *ctx = cb_arg; 651 struct spdk_blob_data *blob = ctx->blob; 652 struct spdk_blob_md_page *page; 653 int rc; 654 uint32_t crc; 655 656 page = &ctx->pages[ctx->num_pages - 1]; 657 crc = _spdk_blob_md_page_calc_crc(page); 658 if (crc != page->crc) { 659 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 660 _spdk_blob_free(blob); 661 ctx->cb_fn(seq, NULL, -EINVAL); 662 spdk_dma_free(ctx->pages); 663 free(ctx); 664 return; 665 } 666 667 if (page->next != SPDK_INVALID_MD_PAGE) { 668 uint32_t next_page = page->next; 669 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 670 671 672 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 673 674 /* Read the next page */ 675 ctx->num_pages++; 676 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 677 sizeof(*page), NULL); 678 if (ctx->pages == NULL) { 679 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 680 free(ctx); 681 return; 682 } 683 684 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 685 next_lba, 686 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 687 _spdk_blob_load_cpl, ctx); 688 return; 689 } 690 691 /* Parse the pages */ 692 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 693 if (rc) { 694 _spdk_blob_free(blob); 695 ctx->cb_fn(seq, NULL, rc); 696 spdk_dma_free(ctx->pages); 697 free(ctx); 698 return; 699 } 700 701 _spdk_blob_mark_clean(blob); 702 703 ctx->cb_fn(seq, ctx->cb_arg, rc); 704 705 /* Free the memory */ 706 spdk_dma_free(ctx->pages); 707 free(ctx); 708 } 709 710 /* Load a blob from disk given a blobid */ 711 static void 712 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 713 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 714 { 715 struct spdk_blob_load_ctx *ctx; 716 struct spdk_blob_store *bs; 717 uint32_t page_num; 718 uint64_t lba; 719 720 assert(blob != NULL); 721 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 722 blob->state == SPDK_BLOB_STATE_DIRTY); 723 724 bs = blob->bs; 725 726 ctx = calloc(1, sizeof(*ctx)); 727 if (!ctx) { 728 cb_fn(seq, cb_arg, -ENOMEM); 729 return; 730 } 731 732 ctx->blob = blob; 733 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 734 SPDK_BS_PAGE_SIZE, NULL); 735 if (!ctx->pages) { 736 free(ctx); 737 cb_fn(seq, cb_arg, -ENOMEM); 738 return; 739 } 740 ctx->num_pages = 1; 741 ctx->cb_fn = cb_fn; 742 ctx->cb_arg = cb_arg; 743 744 page_num = _spdk_bs_blobid_to_page(blob->id); 745 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 746 747 blob->state = SPDK_BLOB_STATE_LOADING; 748 749 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 750 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 751 _spdk_blob_load_cpl, ctx); 752 } 753 754 struct spdk_blob_persist_ctx { 755 struct spdk_blob_data *blob; 756 757 struct spdk_blob_md_page *pages; 758 759 uint64_t idx; 760 761 spdk_bs_sequence_cpl cb_fn; 762 void *cb_arg; 763 }; 764 765 static void 766 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 767 { 768 struct spdk_blob_persist_ctx *ctx = cb_arg; 769 struct spdk_blob_data *blob = ctx->blob; 770 771 if (bserrno == 0) { 772 _spdk_blob_mark_clean(blob); 773 } 774 775 /* Call user callback */ 776 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 777 778 /* Free the memory */ 779 spdk_dma_free(ctx->pages); 780 free(ctx); 781 } 782 783 static void 784 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 785 { 786 struct spdk_blob_persist_ctx *ctx = cb_arg; 787 struct spdk_blob_data *blob = ctx->blob; 788 struct spdk_blob_store *bs = blob->bs; 789 void *tmp; 790 size_t i; 791 792 /* Release all clusters that were truncated */ 793 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 794 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 795 796 _spdk_bs_release_cluster(bs, cluster_num); 797 } 798 799 if (blob->active.num_clusters == 0) { 800 free(blob->active.clusters); 801 blob->active.clusters = NULL; 802 blob->active.cluster_array_size = 0; 803 } else { 804 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 805 assert(tmp != NULL); 806 blob->active.clusters = tmp; 807 blob->active.cluster_array_size = blob->active.num_clusters; 808 } 809 810 _spdk_blob_persist_complete(seq, ctx, bserrno); 811 } 812 813 static void 814 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 815 { 816 struct spdk_blob_persist_ctx *ctx = cb_arg; 817 struct spdk_blob_data *blob = ctx->blob; 818 struct spdk_blob_store *bs = blob->bs; 819 spdk_bs_batch_t *batch; 820 size_t i; 821 uint64_t lba; 822 uint32_t lba_count; 823 824 /* Clusters don't move around in blobs. The list shrinks or grows 825 * at the end, but no changes ever occur in the middle of the list. 826 */ 827 828 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 829 830 /* Unmap all clusters that were truncated */ 831 lba = 0; 832 lba_count = 0; 833 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 834 uint64_t next_lba = blob->active.clusters[i]; 835 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 836 837 if ((lba + lba_count) == next_lba) { 838 /* This cluster is contiguous with the previous one. */ 839 lba_count += next_lba_count; 840 continue; 841 } 842 843 /* This cluster is not contiguous with the previous one. */ 844 845 /* If a run of LBAs previously existing, send them 846 * as an unmap. 847 */ 848 if (lba_count > 0) { 849 spdk_bs_batch_unmap(batch, lba, lba_count); 850 } 851 852 /* Start building the next batch */ 853 lba = next_lba; 854 lba_count = next_lba_count; 855 } 856 857 /* If we ended with a contiguous set of LBAs, send the unmap now */ 858 if (lba_count > 0) { 859 spdk_bs_batch_unmap(batch, lba, lba_count); 860 } 861 862 spdk_bs_batch_close(batch); 863 } 864 865 static void 866 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 867 { 868 struct spdk_blob_persist_ctx *ctx = cb_arg; 869 struct spdk_blob_data *blob = ctx->blob; 870 struct spdk_blob_store *bs = blob->bs; 871 size_t i; 872 873 /* This loop starts at 1 because the first page is special and handled 874 * below. The pages (except the first) are never written in place, 875 * so any pages in the clean list must be zeroed. 876 */ 877 for (i = 1; i < blob->clean.num_pages; i++) { 878 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 879 } 880 881 if (blob->active.num_pages == 0) { 882 uint32_t page_num; 883 884 page_num = _spdk_bs_blobid_to_page(blob->id); 885 spdk_bit_array_clear(bs->used_md_pages, page_num); 886 } 887 888 /* Move on to unmapping clusters */ 889 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 890 } 891 892 static void 893 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 894 { 895 struct spdk_blob_persist_ctx *ctx = cb_arg; 896 struct spdk_blob_data *blob = ctx->blob; 897 struct spdk_blob_store *bs = blob->bs; 898 uint64_t lba; 899 uint32_t lba_count; 900 spdk_bs_batch_t *batch; 901 size_t i; 902 903 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 904 905 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 906 907 /* This loop starts at 1 because the first page is special and handled 908 * below. The pages (except the first) are never written in place, 909 * so any pages in the clean list must be zeroed. 910 */ 911 for (i = 1; i < blob->clean.num_pages; i++) { 912 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 913 914 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 915 } 916 917 /* The first page will only be zeroed if this is a delete. */ 918 if (blob->active.num_pages == 0) { 919 uint32_t page_num; 920 921 /* The first page in the metadata goes where the blobid indicates */ 922 page_num = _spdk_bs_blobid_to_page(blob->id); 923 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 924 925 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 926 } 927 928 spdk_bs_batch_close(batch); 929 } 930 931 static void 932 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 933 { 934 struct spdk_blob_persist_ctx *ctx = cb_arg; 935 struct spdk_blob_data *blob = ctx->blob; 936 struct spdk_blob_store *bs = blob->bs; 937 uint64_t lba; 938 uint32_t lba_count; 939 struct spdk_blob_md_page *page; 940 941 if (blob->active.num_pages == 0) { 942 /* Move on to the next step */ 943 _spdk_blob_persist_zero_pages(seq, ctx, 0); 944 return; 945 } 946 947 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 948 949 page = &ctx->pages[0]; 950 /* The first page in the metadata goes where the blobid indicates */ 951 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 952 953 spdk_bs_sequence_write(seq, page, lba, lba_count, 954 _spdk_blob_persist_zero_pages, ctx); 955 } 956 957 static void 958 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 959 { 960 struct spdk_blob_persist_ctx *ctx = cb_arg; 961 struct spdk_blob_data *blob = ctx->blob; 962 struct spdk_blob_store *bs = blob->bs; 963 uint64_t lba; 964 uint32_t lba_count; 965 struct spdk_blob_md_page *page; 966 spdk_bs_batch_t *batch; 967 size_t i; 968 969 /* Clusters don't move around in blobs. The list shrinks or grows 970 * at the end, but no changes ever occur in the middle of the list. 971 */ 972 973 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 974 975 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 976 977 /* This starts at 1. The root page is not written until 978 * all of the others are finished 979 */ 980 for (i = 1; i < blob->active.num_pages; i++) { 981 page = &ctx->pages[i]; 982 assert(page->sequence_num == i); 983 984 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 985 986 spdk_bs_batch_write(batch, page, lba, lba_count); 987 } 988 989 spdk_bs_batch_close(batch); 990 } 991 992 static int 993 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz) 994 { 995 uint64_t i; 996 uint64_t *tmp; 997 uint64_t lfc; /* lowest free cluster */ 998 struct spdk_blob_store *bs; 999 1000 bs = blob->bs; 1001 1002 assert(blob->state != SPDK_BLOB_STATE_LOADING && 1003 blob->state != SPDK_BLOB_STATE_SYNCING); 1004 1005 if (blob->active.num_clusters == sz) { 1006 return 0; 1007 } 1008 1009 if (blob->active.num_clusters < blob->active.cluster_array_size) { 1010 /* If this blob was resized to be larger, then smaller, then 1011 * larger without syncing, then the cluster array already 1012 * contains spare assigned clusters we can use. 1013 */ 1014 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 1015 sz); 1016 } 1017 1018 blob->state = SPDK_BLOB_STATE_DIRTY; 1019 1020 /* Do two passes - one to verify that we can obtain enough clusters 1021 * and another to actually claim them. 1022 */ 1023 1024 lfc = 0; 1025 for (i = blob->active.num_clusters; i < sz; i++) { 1026 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1027 if (lfc >= bs->total_clusters) { 1028 /* No more free clusters. Cannot satisfy the request */ 1029 assert(false); 1030 return -1; 1031 } 1032 lfc++; 1033 } 1034 1035 if (sz > blob->active.num_clusters) { 1036 /* Expand the cluster array if necessary. 1037 * We only shrink the array when persisting. 1038 */ 1039 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 1040 if (sz > 0 && tmp == NULL) { 1041 assert(false); 1042 return -1; 1043 } 1044 blob->active.clusters = tmp; 1045 blob->active.cluster_array_size = sz; 1046 } 1047 1048 lfc = 0; 1049 for (i = blob->active.num_clusters; i < sz; i++) { 1050 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1051 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 1052 _spdk_bs_claim_cluster(bs, lfc); 1053 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 1054 lfc++; 1055 } 1056 1057 blob->active.num_clusters = sz; 1058 1059 return 0; 1060 } 1061 1062 /* Write a blob to disk */ 1063 static void 1064 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 1065 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1066 { 1067 struct spdk_blob_persist_ctx *ctx; 1068 int rc; 1069 uint64_t i; 1070 uint32_t page_num; 1071 struct spdk_blob_store *bs; 1072 1073 assert(blob != NULL); 1074 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 1075 blob->state == SPDK_BLOB_STATE_DIRTY); 1076 1077 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1078 cb_fn(seq, cb_arg, 0); 1079 return; 1080 } 1081 1082 bs = blob->bs; 1083 1084 ctx = calloc(1, sizeof(*ctx)); 1085 if (!ctx) { 1086 cb_fn(seq, cb_arg, -ENOMEM); 1087 return; 1088 } 1089 ctx->blob = blob; 1090 ctx->cb_fn = cb_fn; 1091 ctx->cb_arg = cb_arg; 1092 1093 blob->state = SPDK_BLOB_STATE_SYNCING; 1094 1095 if (blob->active.num_pages == 0) { 1096 /* This is the signal that the blob should be deleted. 1097 * Immediately jump to the clean up routine. */ 1098 assert(blob->clean.num_pages > 0); 1099 ctx->idx = blob->clean.num_pages - 1; 1100 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1101 return; 1102 1103 } 1104 1105 /* Generate the new metadata */ 1106 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1107 if (rc < 0) { 1108 free(ctx); 1109 cb_fn(seq, cb_arg, rc); 1110 return; 1111 } 1112 1113 assert(blob->active.num_pages >= 1); 1114 1115 /* Resize the cache of page indices */ 1116 blob->active.pages = realloc(blob->active.pages, 1117 blob->active.num_pages * sizeof(*blob->active.pages)); 1118 if (!blob->active.pages) { 1119 free(ctx); 1120 cb_fn(seq, cb_arg, -ENOMEM); 1121 return; 1122 } 1123 1124 /* Assign this metadata to pages. This requires two passes - 1125 * one to verify that there are enough pages and a second 1126 * to actually claim them. */ 1127 page_num = 0; 1128 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1129 for (i = 1; i < blob->active.num_pages; i++) { 1130 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1131 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1132 spdk_dma_free(ctx->pages); 1133 free(ctx); 1134 blob->state = SPDK_BLOB_STATE_DIRTY; 1135 cb_fn(seq, cb_arg, -ENOMEM); 1136 return; 1137 } 1138 page_num++; 1139 } 1140 1141 page_num = 0; 1142 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1143 for (i = 1; i < blob->active.num_pages; i++) { 1144 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1145 ctx->pages[i - 1].next = page_num; 1146 /* Now that previous metadata page is complete, calculate the crc for it. */ 1147 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1148 blob->active.pages[i] = page_num; 1149 spdk_bit_array_set(bs->used_md_pages, page_num); 1150 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1151 page_num++; 1152 } 1153 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1154 /* Start writing the metadata from last page to first */ 1155 ctx->idx = blob->active.num_pages - 1; 1156 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1157 } 1158 1159 static void 1160 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1161 void *payload, uint64_t offset, uint64_t length, 1162 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1163 { 1164 struct spdk_blob_data *blob = __blob_to_data(_blob); 1165 spdk_bs_batch_t *batch; 1166 struct spdk_bs_cpl cpl; 1167 uint64_t lba; 1168 uint32_t lba_count; 1169 uint8_t *buf; 1170 uint64_t page; 1171 1172 assert(blob != NULL); 1173 1174 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1175 cb_fn(cb_arg, -EPERM); 1176 return; 1177 } 1178 1179 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1180 cb_fn(cb_arg, -EINVAL); 1181 return; 1182 } 1183 1184 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1185 cpl.u.blob_basic.cb_fn = cb_fn; 1186 cpl.u.blob_basic.cb_arg = cb_arg; 1187 1188 batch = spdk_bs_batch_open(_channel, &cpl); 1189 if (!batch) { 1190 cb_fn(cb_arg, -ENOMEM); 1191 return; 1192 } 1193 1194 length = _spdk_bs_page_to_lba(blob->bs, length); 1195 page = offset; 1196 buf = payload; 1197 while (length > 0) { 1198 lba = _spdk_bs_blob_page_to_lba(blob, page); 1199 lba_count = spdk_min(length, 1200 _spdk_bs_page_to_lba(blob->bs, 1201 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1202 1203 switch (op_type) { 1204 case SPDK_BLOB_READ: 1205 spdk_bs_batch_read(batch, buf, lba, lba_count); 1206 break; 1207 case SPDK_BLOB_WRITE: 1208 spdk_bs_batch_write(batch, buf, lba, lba_count); 1209 break; 1210 case SPDK_BLOB_UNMAP: 1211 spdk_bs_batch_unmap(batch, lba, lba_count); 1212 break; 1213 case SPDK_BLOB_WRITE_ZEROES: 1214 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1215 break; 1216 } 1217 1218 length -= lba_count; 1219 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1220 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1221 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1222 } 1223 } 1224 1225 spdk_bs_batch_close(batch); 1226 } 1227 1228 struct rw_iov_ctx { 1229 struct spdk_blob_data *blob; 1230 bool read; 1231 int iovcnt; 1232 struct iovec *orig_iov; 1233 uint64_t page_offset; 1234 uint64_t pages_remaining; 1235 uint64_t pages_done; 1236 struct iovec iov[0]; 1237 }; 1238 1239 static void 1240 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1241 { 1242 assert(cb_arg == NULL); 1243 spdk_bs_sequence_finish(seq, bserrno); 1244 } 1245 1246 static void 1247 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1248 { 1249 struct rw_iov_ctx *ctx = cb_arg; 1250 struct iovec *iov, *orig_iov; 1251 int iovcnt; 1252 size_t orig_iovoff; 1253 uint64_t lba; 1254 uint64_t page_count, pages_to_boundary; 1255 uint32_t lba_count; 1256 uint64_t byte_count; 1257 1258 if (bserrno != 0 || ctx->pages_remaining == 0) { 1259 free(ctx); 1260 spdk_bs_sequence_finish(seq, bserrno); 1261 return; 1262 } 1263 1264 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1265 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1266 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1267 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1268 1269 /* 1270 * Get index and offset into the original iov array for our current position in the I/O sequence. 1271 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1272 * point to the current position in the I/O sequence. 1273 */ 1274 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1275 orig_iov = &ctx->orig_iov[0]; 1276 orig_iovoff = 0; 1277 while (byte_count > 0) { 1278 if (byte_count >= orig_iov->iov_len) { 1279 byte_count -= orig_iov->iov_len; 1280 orig_iov++; 1281 } else { 1282 orig_iovoff = byte_count; 1283 byte_count = 0; 1284 } 1285 } 1286 1287 /* 1288 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1289 * bytes of this next I/O remain to be accounted for in the new iov array. 1290 */ 1291 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1292 iov = &ctx->iov[0]; 1293 iovcnt = 0; 1294 while (byte_count > 0) { 1295 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1296 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1297 byte_count -= iov->iov_len; 1298 orig_iovoff = 0; 1299 orig_iov++; 1300 iov++; 1301 iovcnt++; 1302 } 1303 1304 ctx->page_offset += page_count; 1305 ctx->pages_done += page_count; 1306 ctx->pages_remaining -= page_count; 1307 iov = &ctx->iov[0]; 1308 1309 if (ctx->read) { 1310 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1311 } else { 1312 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1313 } 1314 } 1315 1316 static void 1317 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1318 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1319 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1320 { 1321 struct spdk_blob_data *blob = __blob_to_data(_blob); 1322 spdk_bs_sequence_t *seq; 1323 struct spdk_bs_cpl cpl; 1324 1325 assert(blob != NULL); 1326 1327 if (!read && blob->data_ro) { 1328 cb_fn(cb_arg, -EPERM); 1329 return; 1330 } 1331 1332 if (length == 0) { 1333 cb_fn(cb_arg, 0); 1334 return; 1335 } 1336 1337 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1338 cb_fn(cb_arg, -EINVAL); 1339 return; 1340 } 1341 1342 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1343 cpl.u.blob_basic.cb_fn = cb_fn; 1344 cpl.u.blob_basic.cb_arg = cb_arg; 1345 1346 /* 1347 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1348 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1349 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1350 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1351 * to allocate a separate iov array and split the I/O such that none of the resulting 1352 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1353 * but since this case happens very infrequently, any performance impact will be negligible. 1354 * 1355 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1356 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1357 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1358 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1359 */ 1360 seq = spdk_bs_sequence_start(_channel, &cpl); 1361 if (!seq) { 1362 cb_fn(cb_arg, -ENOMEM); 1363 return; 1364 } 1365 1366 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1367 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1368 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1369 1370 if (read) { 1371 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1372 } else { 1373 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1374 } 1375 } else { 1376 struct rw_iov_ctx *ctx; 1377 1378 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1379 if (ctx == NULL) { 1380 spdk_bs_sequence_finish(seq, -ENOMEM); 1381 return; 1382 } 1383 1384 ctx->blob = blob; 1385 ctx->read = read; 1386 ctx->orig_iov = iov; 1387 ctx->iovcnt = iovcnt; 1388 ctx->page_offset = offset; 1389 ctx->pages_remaining = length; 1390 ctx->pages_done = 0; 1391 1392 _spdk_rw_iov_split_next(seq, ctx, 0); 1393 } 1394 } 1395 1396 static struct spdk_blob_data * 1397 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1398 { 1399 struct spdk_blob_data *blob; 1400 1401 TAILQ_FOREACH(blob, &bs->blobs, link) { 1402 if (blob->id == blobid) { 1403 return blob; 1404 } 1405 } 1406 1407 return NULL; 1408 } 1409 1410 static int 1411 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 1412 { 1413 struct spdk_blob_store *bs = io_device; 1414 struct spdk_bs_channel *channel = ctx_buf; 1415 struct spdk_bs_dev *dev; 1416 uint32_t max_ops = bs->max_channel_ops; 1417 uint32_t i; 1418 1419 dev = bs->dev; 1420 1421 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1422 if (!channel->req_mem) { 1423 return -1; 1424 } 1425 1426 TAILQ_INIT(&channel->reqs); 1427 1428 for (i = 0; i < max_ops; i++) { 1429 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1430 } 1431 1432 channel->bs = bs; 1433 channel->dev = dev; 1434 channel->dev_channel = dev->create_channel(dev); 1435 1436 if (!channel->dev_channel) { 1437 SPDK_ERRLOG("Failed to create device channel.\n"); 1438 free(channel->req_mem); 1439 return -1; 1440 } 1441 1442 return 0; 1443 } 1444 1445 static void 1446 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1447 { 1448 struct spdk_bs_channel *channel = ctx_buf; 1449 1450 free(channel->req_mem); 1451 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1452 } 1453 1454 static void 1455 _spdk_bs_dev_destroy(void *io_device) 1456 { 1457 struct spdk_blob_store *bs = io_device; 1458 struct spdk_blob_data *blob, *blob_tmp; 1459 1460 bs->dev->destroy(bs->dev); 1461 1462 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1463 TAILQ_REMOVE(&bs->blobs, blob, link); 1464 _spdk_blob_free(blob); 1465 } 1466 1467 spdk_bit_array_free(&bs->used_blobids); 1468 spdk_bit_array_free(&bs->used_md_pages); 1469 spdk_bit_array_free(&bs->used_clusters); 1470 /* 1471 * If this function is called for any reason except a successful unload, 1472 * the unload_cpl type will be NONE and this will be a nop. 1473 */ 1474 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1475 1476 free(bs); 1477 } 1478 1479 static void 1480 _spdk_bs_free(struct spdk_blob_store *bs) 1481 { 1482 spdk_bs_unregister_md_thread(bs); 1483 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 1484 } 1485 1486 void 1487 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1488 { 1489 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1490 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1491 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1492 opts->max_channel_ops = SPDK_BLOB_OPTS_DEFAULT_CHANNEL_OPS; 1493 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1494 } 1495 1496 static int 1497 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1498 { 1499 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1500 opts->max_channel_ops == 0) { 1501 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1502 return -1; 1503 } 1504 1505 return 0; 1506 } 1507 1508 static struct spdk_blob_store * 1509 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1510 { 1511 struct spdk_blob_store *bs; 1512 uint64_t dev_size; 1513 int rc; 1514 1515 dev_size = dev->blocklen * dev->blockcnt; 1516 if (dev_size < opts->cluster_sz) { 1517 /* Device size cannot be smaller than cluster size of blobstore */ 1518 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1519 opts->cluster_sz); 1520 return NULL; 1521 } 1522 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1523 /* Cluster size cannot be smaller than page size */ 1524 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1525 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1526 return NULL; 1527 } 1528 bs = calloc(1, sizeof(struct spdk_blob_store)); 1529 if (!bs) { 1530 return NULL; 1531 } 1532 1533 TAILQ_INIT(&bs->blobs); 1534 bs->dev = dev; 1535 1536 /* 1537 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1538 * even multiple of the cluster size. 1539 */ 1540 bs->cluster_sz = opts->cluster_sz; 1541 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1542 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1543 bs->num_free_clusters = bs->total_clusters; 1544 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1545 if (bs->used_clusters == NULL) { 1546 free(bs); 1547 return NULL; 1548 } 1549 1550 bs->max_channel_ops = opts->max_channel_ops; 1551 bs->super_blob = SPDK_BLOBID_INVALID; 1552 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1553 1554 /* The metadata is assumed to be at least 1 page */ 1555 bs->used_md_pages = spdk_bit_array_create(1); 1556 bs->used_blobids = spdk_bit_array_create(0); 1557 1558 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 1559 sizeof(struct spdk_bs_channel)); 1560 rc = spdk_bs_register_md_thread(bs); 1561 if (rc == -1) { 1562 spdk_io_device_unregister(bs, NULL); 1563 spdk_bit_array_free(&bs->used_blobids); 1564 spdk_bit_array_free(&bs->used_md_pages); 1565 spdk_bit_array_free(&bs->used_clusters); 1566 free(bs); 1567 return NULL; 1568 } 1569 1570 return bs; 1571 } 1572 1573 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1574 1575 struct spdk_bs_load_ctx { 1576 struct spdk_blob_store *bs; 1577 struct spdk_bs_super_block *super; 1578 1579 struct spdk_bs_md_mask *mask; 1580 bool in_page_chain; 1581 uint32_t page_index; 1582 uint32_t cur_page; 1583 struct spdk_blob_md_page *page; 1584 bool is_load; 1585 }; 1586 1587 static void 1588 _spdk_bs_load_ctx_fail(spdk_bs_sequence_t *seq, struct spdk_bs_load_ctx *ctx, int bserrno) 1589 { 1590 assert(bserrno != 0); 1591 1592 spdk_dma_free(ctx->super); 1593 /* 1594 * Only free the blobstore when a load fails. If an unload fails (for some reason) 1595 * we want to keep the blobstore in case the caller wants to try again. 1596 */ 1597 if (ctx->is_load) { 1598 _spdk_bs_free(ctx->bs); 1599 } 1600 free(ctx); 1601 spdk_bs_sequence_finish(seq, bserrno); 1602 } 1603 1604 static void 1605 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1606 { 1607 uint32_t i = 0; 1608 1609 while (true) { 1610 i = spdk_bit_array_find_first_set(array, i); 1611 if (i >= mask->length) { 1612 break; 1613 } 1614 mask->mask[i / 8] |= 1U << (i % 8); 1615 i++; 1616 } 1617 } 1618 1619 static void 1620 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1621 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1622 { 1623 /* Update the values in the super block */ 1624 super->super_blob = bs->super_blob; 1625 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1626 super->crc = _spdk_blob_md_page_calc_crc(super); 1627 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1628 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1629 cb_fn, cb_arg); 1630 } 1631 1632 static void 1633 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1634 { 1635 struct spdk_bs_load_ctx *ctx = arg; 1636 uint64_t mask_size, lba, lba_count; 1637 1638 /* Write out the used clusters mask */ 1639 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1640 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1641 if (!ctx->mask) { 1642 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1643 return; 1644 } 1645 1646 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1647 ctx->mask->length = ctx->bs->total_clusters; 1648 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1649 1650 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1651 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1652 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1653 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1654 } 1655 1656 static void 1657 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1658 { 1659 struct spdk_bs_load_ctx *ctx = arg; 1660 uint64_t mask_size, lba, lba_count; 1661 1662 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1663 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1664 if (!ctx->mask) { 1665 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1666 return; 1667 } 1668 1669 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1670 ctx->mask->length = ctx->super->md_len; 1671 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1672 1673 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1674 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1675 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1676 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1677 } 1678 1679 static void 1680 _spdk_bs_write_used_blobids(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1681 { 1682 struct spdk_bs_load_ctx *ctx = arg; 1683 uint64_t mask_size, lba, lba_count; 1684 1685 if (ctx->super->used_blobid_mask_len == 0) { 1686 /* 1687 * This is a pre-v3 on-disk format where the blobid mask does not get 1688 * written to disk. 1689 */ 1690 cb_fn(seq, arg, 0); 1691 return; 1692 } 1693 1694 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 1695 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1696 if (!ctx->mask) { 1697 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1698 return; 1699 } 1700 1701 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_BLOBIDS; 1702 ctx->mask->length = ctx->super->md_len; 1703 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_blobids)); 1704 1705 _spdk_bs_set_mask(ctx->bs->used_blobids, ctx->mask); 1706 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 1707 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 1708 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1709 } 1710 1711 static void 1712 _spdk_bs_load_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1713 { 1714 struct spdk_bs_load_ctx *ctx = cb_arg; 1715 uint32_t i, j; 1716 int rc; 1717 1718 /* The type must be correct */ 1719 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_BLOBIDS); 1720 1721 /* The length of the mask (in bits) must not be greater than 1722 * the length of the buffer (converted to bits) */ 1723 assert(ctx->mask->length <= (ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE * 8)); 1724 1725 /* The length of the mask must be exactly equal to the size 1726 * (in pages) of the metadata region */ 1727 assert(ctx->mask->length == ctx->super->md_len); 1728 1729 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->mask->length); 1730 if (rc < 0) { 1731 spdk_dma_free(ctx->mask); 1732 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1733 return; 1734 } 1735 1736 for (i = 0; i < ctx->mask->length / 8; i++) { 1737 uint8_t segment = ctx->mask->mask[i]; 1738 for (j = 0; segment; j++) { 1739 if (segment & 1U) { 1740 spdk_bit_array_set(ctx->bs->used_blobids, (i * 8) + j); 1741 } 1742 segment >>= 1U; 1743 } 1744 } 1745 1746 spdk_dma_free(ctx->super); 1747 spdk_dma_free(ctx->mask); 1748 free(ctx); 1749 1750 spdk_bs_sequence_finish(seq, bserrno); 1751 } 1752 1753 static void 1754 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1755 { 1756 struct spdk_bs_load_ctx *ctx = cb_arg; 1757 uint64_t lba, lba_count, mask_size; 1758 uint32_t i, j; 1759 int rc; 1760 1761 /* The type must be correct */ 1762 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1763 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1764 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1765 struct spdk_blob_md_page) * 8)); 1766 /* The length of the mask must be exactly equal to the total number of clusters */ 1767 assert(ctx->mask->length == ctx->bs->total_clusters); 1768 1769 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1770 if (rc < 0) { 1771 spdk_dma_free(ctx->mask); 1772 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1773 return; 1774 } 1775 1776 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1777 for (i = 0; i < ctx->mask->length / 8; i++) { 1778 uint8_t segment = ctx->mask->mask[i]; 1779 for (j = 0; segment && (j < 8); j++) { 1780 if (segment & 1U) { 1781 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1782 assert(ctx->bs->num_free_clusters > 0); 1783 ctx->bs->num_free_clusters--; 1784 } 1785 segment >>= 1U; 1786 } 1787 } 1788 1789 spdk_dma_free(ctx->mask); 1790 1791 /* Read the used blobids mask */ 1792 mask_size = ctx->super->used_blobid_mask_len * SPDK_BS_PAGE_SIZE; 1793 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1794 if (!ctx->mask) { 1795 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1796 return; 1797 } 1798 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_start); 1799 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_blobid_mask_len); 1800 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1801 _spdk_bs_load_used_blobids_cpl, ctx); 1802 } 1803 1804 static void 1805 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1806 { 1807 struct spdk_bs_load_ctx *ctx = cb_arg; 1808 uint64_t lba, lba_count, mask_size; 1809 uint32_t i, j; 1810 int rc; 1811 1812 /* The type must be correct */ 1813 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1814 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1815 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1816 8)); 1817 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1818 assert(ctx->mask->length == ctx->super->md_len); 1819 1820 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1821 if (rc < 0) { 1822 spdk_dma_free(ctx->mask); 1823 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1824 return; 1825 } 1826 1827 for (i = 0; i < ctx->mask->length / 8; i++) { 1828 uint8_t segment = ctx->mask->mask[i]; 1829 for (j = 0; segment && (j < 8); j++) { 1830 if (segment & 1U) { 1831 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1832 } 1833 segment >>= 1U; 1834 } 1835 } 1836 spdk_dma_free(ctx->mask); 1837 1838 /* Read the used clusters mask */ 1839 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1840 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1841 if (!ctx->mask) { 1842 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1843 return; 1844 } 1845 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1846 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1847 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1848 _spdk_bs_load_used_clusters_cpl, ctx); 1849 } 1850 1851 static void 1852 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1853 { 1854 struct spdk_bs_load_ctx *ctx = cb_arg; 1855 uint64_t lba, lba_count, mask_size; 1856 1857 /* Read the used pages mask */ 1858 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1859 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1860 if (!ctx->mask) { 1861 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 1862 return; 1863 } 1864 1865 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1866 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1867 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1868 _spdk_bs_load_used_pages_cpl, ctx); 1869 } 1870 1871 static int 1872 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1873 { 1874 struct spdk_blob_md_descriptor *desc; 1875 size_t cur_desc = 0; 1876 1877 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1878 while (cur_desc < sizeof(page->descriptors)) { 1879 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1880 if (desc->length == 0) { 1881 /* If padding and length are 0, this terminates the page */ 1882 break; 1883 } 1884 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1885 struct spdk_blob_md_descriptor_extent *desc_extent; 1886 unsigned int i, j; 1887 unsigned int cluster_count = 0; 1888 1889 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1890 1891 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1892 for (j = 0; j < desc_extent->extents[i].length; j++) { 1893 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1894 if (bs->num_free_clusters == 0) { 1895 return -1; 1896 } 1897 bs->num_free_clusters--; 1898 cluster_count++; 1899 } 1900 } 1901 if (cluster_count == 0) { 1902 return -1; 1903 } 1904 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1905 /* Skip this item */ 1906 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 1907 /* Skip this item */ 1908 } else { 1909 /* Error */ 1910 return -1; 1911 } 1912 /* Advance to the next descriptor */ 1913 cur_desc += sizeof(*desc) + desc->length; 1914 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1915 break; 1916 } 1917 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1918 } 1919 return 0; 1920 } 1921 1922 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1923 { 1924 uint32_t crc; 1925 1926 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1927 if (crc != ctx->page->crc) { 1928 return false; 1929 } 1930 1931 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1932 return false; 1933 } 1934 return true; 1935 } 1936 1937 static void 1938 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1939 1940 static void 1941 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1942 { 1943 struct spdk_bs_load_ctx *ctx = cb_arg; 1944 1945 spdk_dma_free(ctx->mask); 1946 spdk_dma_free(ctx->super); 1947 spdk_bs_sequence_finish(seq, bserrno); 1948 free(ctx); 1949 } 1950 1951 static void 1952 _spdk_bs_load_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1953 { 1954 struct spdk_bs_load_ctx *ctx = cb_arg; 1955 1956 spdk_dma_free(ctx->mask); 1957 ctx->mask = NULL; 1958 1959 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1960 } 1961 1962 static void 1963 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1964 { 1965 struct spdk_bs_load_ctx *ctx = cb_arg; 1966 1967 spdk_dma_free(ctx->mask); 1968 ctx->mask = NULL; 1969 1970 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_load_write_used_blobids_cpl); 1971 } 1972 1973 static void 1974 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1975 { 1976 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1977 } 1978 1979 static void 1980 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1981 { 1982 struct spdk_bs_load_ctx *ctx = cb_arg; 1983 uint32_t page_num; 1984 1985 if (bserrno != 0) { 1986 _spdk_bs_load_ctx_fail(seq, ctx, bserrno); 1987 return; 1988 } 1989 1990 page_num = ctx->cur_page; 1991 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1992 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1993 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1994 if (ctx->page->sequence_num == 0) { 1995 spdk_bit_array_set(ctx->bs->used_blobids, page_num); 1996 } 1997 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1998 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 1999 return; 2000 } 2001 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 2002 ctx->in_page_chain = true; 2003 ctx->cur_page = ctx->page->next; 2004 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 2005 return; 2006 } 2007 } 2008 } 2009 2010 ctx->in_page_chain = false; 2011 2012 do { 2013 ctx->page_index++; 2014 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 2015 2016 if (ctx->page_index < ctx->super->md_len) { 2017 ctx->cur_page = ctx->page_index; 2018 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 2019 } else { 2020 spdk_dma_free(ctx->page); 2021 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 2022 } 2023 } 2024 2025 static void 2026 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 2027 { 2028 struct spdk_bs_load_ctx *ctx = cb_arg; 2029 uint64_t lba; 2030 2031 assert(ctx->cur_page < ctx->super->md_len); 2032 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 2033 spdk_bs_sequence_read(seq, ctx->page, lba, 2034 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 2035 _spdk_bs_load_replay_md_cpl, ctx); 2036 } 2037 2038 static void 2039 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 2040 { 2041 struct spdk_bs_load_ctx *ctx = cb_arg; 2042 2043 ctx->page_index = 0; 2044 ctx->cur_page = 0; 2045 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 2046 SPDK_BS_PAGE_SIZE, 2047 NULL); 2048 if (!ctx->page) { 2049 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2050 return; 2051 } 2052 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 2053 } 2054 2055 static void 2056 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2057 { 2058 struct spdk_bs_load_ctx *ctx = cb_arg; 2059 int rc; 2060 2061 if (bserrno != 0) { 2062 _spdk_bs_load_ctx_fail(seq, ctx, -EIO); 2063 return; 2064 } 2065 2066 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 2067 if (rc < 0) { 2068 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2069 return; 2070 } 2071 2072 rc = spdk_bit_array_resize(&ctx->bs->used_blobids, ctx->super->md_len); 2073 if (rc < 0) { 2074 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2075 return; 2076 } 2077 2078 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 2079 if (rc < 0) { 2080 _spdk_bs_load_ctx_fail(seq, ctx, -ENOMEM); 2081 return; 2082 } 2083 2084 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 2085 _spdk_bs_load_replay_md(seq, cb_arg); 2086 } 2087 2088 static void 2089 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2090 { 2091 struct spdk_bs_load_ctx *ctx = cb_arg; 2092 uint32_t crc; 2093 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 2094 2095 if (ctx->super->version > SPDK_BS_VERSION || 2096 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 2097 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 2098 return; 2099 } 2100 2101 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2102 sizeof(ctx->super->signature)) != 0) { 2103 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 2104 return; 2105 } 2106 2107 crc = _spdk_blob_md_page_calc_crc(ctx->super); 2108 if (crc != ctx->super->crc) { 2109 _spdk_bs_load_ctx_fail(seq, ctx, -EILSEQ); 2110 return; 2111 } 2112 2113 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2114 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 2115 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2116 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 2117 } else { 2118 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 2119 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2120 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2121 _spdk_bs_load_ctx_fail(seq, ctx, -ENXIO); 2122 return; 2123 } 2124 2125 /* Parse the super block */ 2126 ctx->bs->cluster_sz = ctx->super->cluster_size; 2127 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 2128 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2129 ctx->bs->md_start = ctx->super->md_start; 2130 ctx->bs->md_len = ctx->super->md_len; 2131 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 2132 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 2133 ctx->bs->super_blob = ctx->super->super_blob; 2134 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 2135 2136 if (ctx->super->clean == 0) { 2137 _spdk_bs_recover(seq, ctx, 0); 2138 } else if (ctx->super->used_blobid_mask_len == 0) { 2139 /* 2140 * Metadata is clean, but this is an old metadata format without 2141 * a blobid mask. Clear the clean bit and then build the masks 2142 * using _spdk_bs_recover. 2143 */ 2144 ctx->super->clean = 0; 2145 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_recover, ctx); 2146 } else { 2147 ctx->super->clean = 0; 2148 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 2149 } 2150 } 2151 2152 void 2153 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2154 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2155 { 2156 struct spdk_blob_store *bs; 2157 struct spdk_bs_cpl cpl; 2158 spdk_bs_sequence_t *seq; 2159 struct spdk_bs_load_ctx *ctx; 2160 struct spdk_bs_opts opts = {}; 2161 2162 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 2163 2164 if (o) { 2165 opts = *o; 2166 } else { 2167 spdk_bs_opts_init(&opts); 2168 } 2169 2170 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 2171 cb_fn(cb_arg, NULL, -EINVAL); 2172 return; 2173 } 2174 2175 bs = _spdk_bs_alloc(dev, &opts); 2176 if (!bs) { 2177 cb_fn(cb_arg, NULL, -ENOMEM); 2178 return; 2179 } 2180 2181 ctx = calloc(1, sizeof(*ctx)); 2182 if (!ctx) { 2183 _spdk_bs_free(bs); 2184 cb_fn(cb_arg, NULL, -ENOMEM); 2185 return; 2186 } 2187 2188 ctx->bs = bs; 2189 ctx->is_load = true; 2190 2191 /* Allocate memory for the super block */ 2192 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2193 if (!ctx->super) { 2194 free(ctx); 2195 _spdk_bs_free(bs); 2196 return; 2197 } 2198 2199 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2200 cpl.u.bs_handle.cb_fn = cb_fn; 2201 cpl.u.bs_handle.cb_arg = cb_arg; 2202 cpl.u.bs_handle.bs = bs; 2203 2204 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2205 if (!seq) { 2206 spdk_dma_free(ctx->super); 2207 free(ctx); 2208 _spdk_bs_free(bs); 2209 cb_fn(cb_arg, NULL, -ENOMEM); 2210 return; 2211 } 2212 2213 /* Read the super block */ 2214 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2215 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2216 _spdk_bs_load_super_cpl, ctx); 2217 } 2218 2219 /* END spdk_bs_load */ 2220 2221 /* START spdk_bs_init */ 2222 2223 struct spdk_bs_init_ctx { 2224 struct spdk_blob_store *bs; 2225 struct spdk_bs_super_block *super; 2226 }; 2227 2228 static void 2229 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2230 { 2231 struct spdk_bs_init_ctx *ctx = cb_arg; 2232 2233 spdk_dma_free(ctx->super); 2234 free(ctx); 2235 2236 spdk_bs_sequence_finish(seq, bserrno); 2237 } 2238 2239 static void 2240 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2241 { 2242 struct spdk_bs_init_ctx *ctx = cb_arg; 2243 2244 /* Write super block */ 2245 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2246 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2247 _spdk_bs_init_persist_super_cpl, ctx); 2248 } 2249 2250 void 2251 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2252 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2253 { 2254 struct spdk_bs_init_ctx *ctx; 2255 struct spdk_blob_store *bs; 2256 struct spdk_bs_cpl cpl; 2257 spdk_bs_sequence_t *seq; 2258 spdk_bs_batch_t *batch; 2259 uint64_t num_md_lba; 2260 uint64_t num_md_pages; 2261 uint64_t num_md_clusters; 2262 uint32_t i; 2263 struct spdk_bs_opts opts = {}; 2264 int rc; 2265 2266 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 2267 2268 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2269 SPDK_ERRLOG("unsupported dev block length of %d\n", 2270 dev->blocklen); 2271 dev->destroy(dev); 2272 cb_fn(cb_arg, NULL, -EINVAL); 2273 return; 2274 } 2275 2276 if (o) { 2277 opts = *o; 2278 } else { 2279 spdk_bs_opts_init(&opts); 2280 } 2281 2282 if (_spdk_bs_opts_verify(&opts) != 0) { 2283 dev->destroy(dev); 2284 cb_fn(cb_arg, NULL, -EINVAL); 2285 return; 2286 } 2287 2288 bs = _spdk_bs_alloc(dev, &opts); 2289 if (!bs) { 2290 dev->destroy(dev); 2291 cb_fn(cb_arg, NULL, -ENOMEM); 2292 return; 2293 } 2294 2295 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2296 /* By default, allocate 1 page per cluster. 2297 * Technically, this over-allocates metadata 2298 * because more metadata will reduce the number 2299 * of usable clusters. This can be addressed with 2300 * more complex math in the future. 2301 */ 2302 bs->md_len = bs->total_clusters; 2303 } else { 2304 bs->md_len = opts.num_md_pages; 2305 } 2306 2307 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2308 if (rc < 0) { 2309 _spdk_bs_free(bs); 2310 cb_fn(cb_arg, NULL, -ENOMEM); 2311 return; 2312 } 2313 2314 rc = spdk_bit_array_resize(&bs->used_blobids, bs->md_len); 2315 if (rc < 0) { 2316 _spdk_bs_free(bs); 2317 cb_fn(cb_arg, NULL, -ENOMEM); 2318 return; 2319 } 2320 2321 ctx = calloc(1, sizeof(*ctx)); 2322 if (!ctx) { 2323 _spdk_bs_free(bs); 2324 cb_fn(cb_arg, NULL, -ENOMEM); 2325 return; 2326 } 2327 2328 ctx->bs = bs; 2329 2330 /* Allocate memory for the super block */ 2331 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2332 if (!ctx->super) { 2333 free(ctx); 2334 _spdk_bs_free(bs); 2335 return; 2336 } 2337 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2338 sizeof(ctx->super->signature)); 2339 ctx->super->version = SPDK_BS_VERSION; 2340 ctx->super->length = sizeof(*ctx->super); 2341 ctx->super->super_blob = bs->super_blob; 2342 ctx->super->clean = 0; 2343 ctx->super->cluster_size = bs->cluster_sz; 2344 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2345 2346 /* Calculate how many pages the metadata consumes at the front 2347 * of the disk. 2348 */ 2349 2350 /* The super block uses 1 page */ 2351 num_md_pages = 1; 2352 2353 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2354 * up to the nearest page, plus a header. 2355 */ 2356 ctx->super->used_page_mask_start = num_md_pages; 2357 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2358 divide_round_up(bs->md_len, 8), 2359 SPDK_BS_PAGE_SIZE); 2360 num_md_pages += ctx->super->used_page_mask_len; 2361 2362 /* The used_clusters mask requires 1 bit per cluster, rounded 2363 * up to the nearest page, plus a header. 2364 */ 2365 ctx->super->used_cluster_mask_start = num_md_pages; 2366 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2367 divide_round_up(bs->total_clusters, 8), 2368 SPDK_BS_PAGE_SIZE); 2369 num_md_pages += ctx->super->used_cluster_mask_len; 2370 2371 /* The used_blobids mask requires 1 bit per metadata page, rounded 2372 * up to the nearest page, plus a header. 2373 */ 2374 ctx->super->used_blobid_mask_start = num_md_pages; 2375 ctx->super->used_blobid_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2376 divide_round_up(bs->md_len, 8), 2377 SPDK_BS_PAGE_SIZE); 2378 num_md_pages += ctx->super->used_blobid_mask_len; 2379 2380 /* The metadata region size was chosen above */ 2381 ctx->super->md_start = bs->md_start = num_md_pages; 2382 ctx->super->md_len = bs->md_len; 2383 num_md_pages += bs->md_len; 2384 2385 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2386 2387 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2388 2389 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2390 if (num_md_clusters > bs->total_clusters) { 2391 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2392 "please decrease number of pages reserved for metadata " 2393 "or increase cluster size.\n"); 2394 spdk_dma_free(ctx->super); 2395 free(ctx); 2396 _spdk_bs_free(bs); 2397 cb_fn(cb_arg, NULL, -ENOMEM); 2398 return; 2399 } 2400 /* Claim all of the clusters used by the metadata */ 2401 for (i = 0; i < num_md_clusters; i++) { 2402 _spdk_bs_claim_cluster(bs, i); 2403 } 2404 2405 bs->total_data_clusters = bs->num_free_clusters; 2406 2407 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2408 cpl.u.bs_handle.cb_fn = cb_fn; 2409 cpl.u.bs_handle.cb_arg = cb_arg; 2410 cpl.u.bs_handle.bs = bs; 2411 2412 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2413 if (!seq) { 2414 spdk_dma_free(ctx->super); 2415 free(ctx); 2416 _spdk_bs_free(bs); 2417 cb_fn(cb_arg, NULL, -ENOMEM); 2418 return; 2419 } 2420 2421 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2422 2423 /* Clear metadata space */ 2424 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2425 /* Trim data clusters */ 2426 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2427 2428 spdk_bs_batch_close(batch); 2429 } 2430 2431 /* END spdk_bs_init */ 2432 2433 /* START spdk_bs_destroy */ 2434 2435 static void 2436 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2437 { 2438 struct spdk_bs_init_ctx *ctx = cb_arg; 2439 struct spdk_blob_store *bs = ctx->bs; 2440 2441 /* 2442 * We need to defer calling spdk_bs_call_cpl() until after 2443 * dev destruction, so tuck these away for later use. 2444 */ 2445 bs->unload_err = bserrno; 2446 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2447 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2448 2449 spdk_bs_sequence_finish(seq, bserrno); 2450 2451 _spdk_bs_free(bs); 2452 free(ctx); 2453 } 2454 2455 void 2456 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 2457 void *cb_arg) 2458 { 2459 struct spdk_bs_cpl cpl; 2460 spdk_bs_sequence_t *seq; 2461 struct spdk_bs_init_ctx *ctx; 2462 2463 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 2464 2465 if (!TAILQ_EMPTY(&bs->blobs)) { 2466 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2467 cb_fn(cb_arg, -EBUSY); 2468 return; 2469 } 2470 2471 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2472 cpl.u.bs_basic.cb_fn = cb_fn; 2473 cpl.u.bs_basic.cb_arg = cb_arg; 2474 2475 ctx = calloc(1, sizeof(*ctx)); 2476 if (!ctx) { 2477 cb_fn(cb_arg, -ENOMEM); 2478 return; 2479 } 2480 2481 ctx->bs = bs; 2482 2483 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2484 if (!seq) { 2485 free(ctx); 2486 cb_fn(cb_arg, -ENOMEM); 2487 return; 2488 } 2489 2490 /* Write zeroes to the super block */ 2491 spdk_bs_sequence_write_zeroes(seq, 2492 _spdk_bs_page_to_lba(bs, 0), 2493 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 2494 _spdk_bs_destroy_trim_cpl, ctx); 2495 } 2496 2497 /* END spdk_bs_destroy */ 2498 2499 /* START spdk_bs_unload */ 2500 2501 static void 2502 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2503 { 2504 struct spdk_bs_load_ctx *ctx = cb_arg; 2505 2506 spdk_dma_free(ctx->super); 2507 2508 /* 2509 * We need to defer calling spdk_bs_call_cpl() until after 2510 * dev destuction, so tuck these away for later use. 2511 */ 2512 ctx->bs->unload_err = bserrno; 2513 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2514 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2515 2516 spdk_bs_sequence_finish(seq, bserrno); 2517 2518 _spdk_bs_free(ctx->bs); 2519 free(ctx); 2520 } 2521 2522 static void 2523 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2524 { 2525 struct spdk_bs_load_ctx *ctx = cb_arg; 2526 2527 spdk_dma_free(ctx->mask); 2528 ctx->super->clean = 1; 2529 2530 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2531 } 2532 2533 static void 2534 _spdk_bs_unload_write_used_blobids_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2535 { 2536 struct spdk_bs_load_ctx *ctx = cb_arg; 2537 2538 spdk_dma_free(ctx->mask); 2539 ctx->mask = NULL; 2540 2541 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2542 } 2543 2544 static void 2545 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2546 { 2547 struct spdk_bs_load_ctx *ctx = cb_arg; 2548 2549 spdk_dma_free(ctx->mask); 2550 ctx->mask = NULL; 2551 2552 _spdk_bs_write_used_blobids(seq, cb_arg, _spdk_bs_unload_write_used_blobids_cpl); 2553 } 2554 2555 static void 2556 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2557 { 2558 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2559 } 2560 2561 void 2562 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2563 { 2564 struct spdk_bs_cpl cpl; 2565 spdk_bs_sequence_t *seq; 2566 struct spdk_bs_load_ctx *ctx; 2567 2568 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 2569 2570 if (!TAILQ_EMPTY(&bs->blobs)) { 2571 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2572 cb_fn(cb_arg, -EBUSY); 2573 return; 2574 } 2575 2576 ctx = calloc(1, sizeof(*ctx)); 2577 if (!ctx) { 2578 cb_fn(cb_arg, -ENOMEM); 2579 return; 2580 } 2581 2582 ctx->bs = bs; 2583 ctx->is_load = false; 2584 2585 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2586 if (!ctx->super) { 2587 free(ctx); 2588 cb_fn(cb_arg, -ENOMEM); 2589 return; 2590 } 2591 2592 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2593 cpl.u.bs_basic.cb_fn = cb_fn; 2594 cpl.u.bs_basic.cb_arg = cb_arg; 2595 2596 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2597 if (!seq) { 2598 spdk_dma_free(ctx->super); 2599 free(ctx); 2600 cb_fn(cb_arg, -ENOMEM); 2601 return; 2602 } 2603 2604 /* Read super block */ 2605 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2606 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2607 _spdk_bs_unload_read_super_cpl, ctx); 2608 } 2609 2610 /* END spdk_bs_unload */ 2611 2612 void 2613 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2614 spdk_bs_op_complete cb_fn, void *cb_arg) 2615 { 2616 bs->super_blob = blobid; 2617 cb_fn(cb_arg, 0); 2618 } 2619 2620 void 2621 spdk_bs_get_super(struct spdk_blob_store *bs, 2622 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2623 { 2624 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2625 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2626 } else { 2627 cb_fn(cb_arg, bs->super_blob, 0); 2628 } 2629 } 2630 2631 uint64_t 2632 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2633 { 2634 return bs->cluster_sz; 2635 } 2636 2637 uint64_t 2638 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2639 { 2640 return SPDK_BS_PAGE_SIZE; 2641 } 2642 2643 uint64_t 2644 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2645 { 2646 return bs->num_free_clusters; 2647 } 2648 2649 uint64_t 2650 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2651 { 2652 return bs->total_data_clusters; 2653 } 2654 2655 static int 2656 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2657 { 2658 bs->md_channel = spdk_get_io_channel(bs); 2659 if (!bs->md_channel) { 2660 SPDK_ERRLOG("Failed to get IO channel.\n"); 2661 return -1; 2662 } 2663 2664 return 0; 2665 } 2666 2667 static int 2668 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2669 { 2670 spdk_put_io_channel(bs->md_channel); 2671 2672 return 0; 2673 } 2674 2675 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob) 2676 { 2677 struct spdk_blob_data *blob = __blob_to_data(_blob); 2678 2679 assert(blob != NULL); 2680 2681 return blob->id; 2682 } 2683 2684 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob) 2685 { 2686 struct spdk_blob_data *blob = __blob_to_data(_blob); 2687 2688 assert(blob != NULL); 2689 2690 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2691 } 2692 2693 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob) 2694 { 2695 struct spdk_blob_data *blob = __blob_to_data(_blob); 2696 2697 assert(blob != NULL); 2698 2699 return blob->active.num_clusters; 2700 } 2701 2702 /* START spdk_bs_create_blob */ 2703 2704 static void 2705 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2706 { 2707 struct spdk_blob_data *blob = cb_arg; 2708 2709 _spdk_blob_free(blob); 2710 2711 spdk_bs_sequence_finish(seq, bserrno); 2712 } 2713 2714 static int 2715 _spdk_blob_set_xattrs(struct spdk_blob *blob, const struct spdk_blob_opts *opts) 2716 { 2717 uint64_t i; 2718 size_t value_len = 0; 2719 int rc; 2720 const void *value = NULL; 2721 if (opts->xattr_count > 0 && opts->get_xattr_value == NULL) { 2722 return -EINVAL; 2723 } 2724 for (i = 0; i < opts->xattr_count; i++) { 2725 opts->get_xattr_value(opts->xattr_ctx, opts->xattr_names[i], &value, &value_len); 2726 if (value == NULL || value_len == 0) { 2727 return -EINVAL; 2728 } 2729 rc = spdk_blob_set_xattr(blob, opts->xattr_names[i], value, value_len); 2730 if (rc < 0) { 2731 return rc; 2732 } 2733 } 2734 return 0; 2735 } 2736 2737 void spdk_bs_create_blob_ext(struct spdk_blob_store *bs, const struct spdk_blob_opts *opts, 2738 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2739 { 2740 struct spdk_blob_data *blob; 2741 uint32_t page_idx; 2742 struct spdk_bs_cpl cpl; 2743 struct spdk_blob_opts opts_default; 2744 spdk_bs_sequence_t *seq; 2745 spdk_blob_id id; 2746 int rc; 2747 2748 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2749 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2750 cb_fn(cb_arg, 0, -ENOMEM); 2751 return; 2752 } 2753 spdk_bit_array_set(bs->used_blobids, page_idx); 2754 spdk_bit_array_set(bs->used_md_pages, page_idx); 2755 2756 id = _spdk_bs_page_to_blobid(page_idx); 2757 2758 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2759 2760 blob = _spdk_blob_alloc(bs, id); 2761 if (!blob) { 2762 cb_fn(cb_arg, 0, -ENOMEM); 2763 return; 2764 } 2765 2766 if (!opts) { 2767 spdk_blob_opts_init(&opts_default); 2768 opts = &opts_default; 2769 } 2770 rc = _spdk_blob_set_xattrs(__data_to_blob(blob), opts); 2771 if (rc < 0) { 2772 _spdk_blob_free(blob); 2773 cb_fn(cb_arg, 0, rc); 2774 return; 2775 } 2776 spdk_blob_resize(__data_to_blob(blob), opts->num_clusters); 2777 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2778 cpl.u.blobid.cb_fn = cb_fn; 2779 cpl.u.blobid.cb_arg = cb_arg; 2780 cpl.u.blobid.blobid = blob->id; 2781 2782 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2783 if (!seq) { 2784 _spdk_blob_free(blob); 2785 cb_fn(cb_arg, 0, -ENOMEM); 2786 return; 2787 } 2788 2789 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 2790 } 2791 2792 void spdk_bs_create_blob(struct spdk_blob_store *bs, 2793 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2794 { 2795 spdk_bs_create_blob_ext(bs, NULL, cb_fn, cb_arg); 2796 } 2797 2798 /* END spdk_bs_create_blob */ 2799 2800 /* START spdk_blob_resize */ 2801 int 2802 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz) 2803 { 2804 struct spdk_blob_data *blob = __blob_to_data(_blob); 2805 int rc; 2806 2807 assert(blob != NULL); 2808 2809 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2810 2811 if (blob->md_ro) { 2812 return -EPERM; 2813 } 2814 2815 if (sz == blob->active.num_clusters) { 2816 return 0; 2817 } 2818 2819 rc = _spdk_resize_blob(blob, sz); 2820 if (rc < 0) { 2821 return rc; 2822 } 2823 2824 return 0; 2825 } 2826 2827 /* END spdk_blob_resize */ 2828 2829 2830 /* START spdk_bs_delete_blob */ 2831 2832 static void 2833 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 2834 { 2835 spdk_bs_sequence_t *seq = cb_arg; 2836 2837 spdk_bs_sequence_finish(seq, bserrno); 2838 } 2839 2840 static void 2841 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2842 { 2843 struct spdk_blob *_blob = cb_arg; 2844 struct spdk_blob_data *blob = __blob_to_data(_blob); 2845 2846 if (bserrno != 0) { 2847 /* 2848 * We already removed this blob from the blobstore tailq, so 2849 * we need to free it here since this is the last reference 2850 * to it. 2851 */ 2852 _spdk_blob_free(blob); 2853 _spdk_bs_delete_close_cpl(seq, bserrno); 2854 return; 2855 } 2856 2857 /* 2858 * This will immediately decrement the ref_count and call 2859 * the completion routine since the metadata state is clean. 2860 * By calling spdk_blob_close, we reduce the number of call 2861 * points into code that touches the blob->open_ref count 2862 * and the blobstore's blob list. 2863 */ 2864 spdk_blob_close(_blob, _spdk_bs_delete_close_cpl, seq); 2865 } 2866 2867 static void 2868 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 2869 { 2870 spdk_bs_sequence_t *seq = cb_arg; 2871 struct spdk_blob_data *blob = __blob_to_data(_blob); 2872 uint32_t page_num; 2873 2874 if (bserrno != 0) { 2875 spdk_bs_sequence_finish(seq, bserrno); 2876 return; 2877 } 2878 2879 if (blob->open_ref > 1) { 2880 /* 2881 * Someone has this blob open (besides this delete context). 2882 * Decrement the ref count directly and return -EBUSY. 2883 */ 2884 blob->open_ref--; 2885 spdk_bs_sequence_finish(seq, -EBUSY); 2886 return; 2887 } 2888 2889 /* 2890 * Remove the blob from the blob_store list now, to ensure it does not 2891 * get returned after this point by _spdk_blob_lookup(). 2892 */ 2893 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 2894 page_num = _spdk_bs_blobid_to_page(blob->id); 2895 spdk_bit_array_clear(blob->bs->used_blobids, page_num); 2896 blob->state = SPDK_BLOB_STATE_DIRTY; 2897 blob->active.num_pages = 0; 2898 _spdk_resize_blob(blob, 0); 2899 2900 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, _blob); 2901 } 2902 2903 void 2904 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2905 spdk_blob_op_complete cb_fn, void *cb_arg) 2906 { 2907 struct spdk_bs_cpl cpl; 2908 spdk_bs_sequence_t *seq; 2909 2910 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 2911 2912 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2913 cpl.u.blob_basic.cb_fn = cb_fn; 2914 cpl.u.blob_basic.cb_arg = cb_arg; 2915 2916 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2917 if (!seq) { 2918 cb_fn(cb_arg, -ENOMEM); 2919 return; 2920 } 2921 2922 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 2923 } 2924 2925 /* END spdk_bs_delete_blob */ 2926 2927 /* START spdk_bs_open_blob */ 2928 2929 static void 2930 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2931 { 2932 struct spdk_blob_data *blob = cb_arg; 2933 2934 /* If the blob have crc error, we just return NULL. */ 2935 if (blob == NULL) { 2936 seq->cpl.u.blob_handle.blob = NULL; 2937 spdk_bs_sequence_finish(seq, bserrno); 2938 return; 2939 } 2940 2941 blob->open_ref++; 2942 2943 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2944 2945 spdk_bs_sequence_finish(seq, bserrno); 2946 } 2947 2948 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2949 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2950 { 2951 struct spdk_blob_data *blob; 2952 struct spdk_bs_cpl cpl; 2953 spdk_bs_sequence_t *seq; 2954 uint32_t page_num; 2955 2956 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 2957 2958 page_num = _spdk_bs_blobid_to_page(blobid); 2959 if (spdk_bit_array_get(bs->used_blobids, page_num) == false) { 2960 /* Invalid blobid */ 2961 cb_fn(cb_arg, NULL, -ENOENT); 2962 return; 2963 } 2964 2965 blob = _spdk_blob_lookup(bs, blobid); 2966 if (blob) { 2967 blob->open_ref++; 2968 cb_fn(cb_arg, __data_to_blob(blob), 0); 2969 return; 2970 } 2971 2972 blob = _spdk_blob_alloc(bs, blobid); 2973 if (!blob) { 2974 cb_fn(cb_arg, NULL, -ENOMEM); 2975 return; 2976 } 2977 2978 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2979 cpl.u.blob_handle.cb_fn = cb_fn; 2980 cpl.u.blob_handle.cb_arg = cb_arg; 2981 cpl.u.blob_handle.blob = __data_to_blob(blob); 2982 2983 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2984 if (!seq) { 2985 _spdk_blob_free(blob); 2986 cb_fn(cb_arg, NULL, -ENOMEM); 2987 return; 2988 } 2989 2990 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 2991 } 2992 /* END spdk_bs_open_blob */ 2993 2994 /* START spdk_blob_set_read_only */ 2995 void spdk_blob_set_read_only(struct spdk_blob *b) 2996 { 2997 struct spdk_blob_data *blob = __blob_to_data(b); 2998 2999 blob->data_ro = true; 3000 blob->md_ro = true; 3001 blob->data_ro_flags |= SPDK_BLOB_READ_ONLY; 3002 3003 blob->state = SPDK_BLOB_STATE_DIRTY; 3004 } 3005 /* END spdk_blob_set_read_only */ 3006 3007 /* START spdk_blob_sync_md */ 3008 3009 static void 3010 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3011 { 3012 spdk_bs_sequence_finish(seq, bserrno); 3013 } 3014 3015 void 3016 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg) 3017 { 3018 struct spdk_blob_data *blob = __blob_to_data(_blob); 3019 struct spdk_bs_cpl cpl; 3020 spdk_bs_sequence_t *seq; 3021 3022 assert(blob != NULL); 3023 3024 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 3025 3026 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3027 blob->state != SPDK_BLOB_STATE_SYNCING); 3028 3029 if (blob->md_ro) { 3030 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 3031 return; 3032 } 3033 3034 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 3035 cb_fn(cb_arg, 0); 3036 return; 3037 } 3038 3039 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 3040 cpl.u.blob_basic.cb_fn = cb_fn; 3041 cpl.u.blob_basic.cb_arg = cb_arg; 3042 3043 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 3044 if (!seq) { 3045 cb_fn(cb_arg, -ENOMEM); 3046 return; 3047 } 3048 3049 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 3050 } 3051 3052 /* END spdk_blob_sync_md */ 3053 3054 /* START spdk_blob_close */ 3055 3056 static void 3057 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 3058 { 3059 struct spdk_blob_data *blob = cb_arg; 3060 3061 if (bserrno == 0) { 3062 blob->open_ref--; 3063 if (blob->open_ref == 0) { 3064 /* 3065 * Blobs with active.num_pages == 0 are deleted blobs. 3066 * these blobs are removed from the blob_store list 3067 * when the deletion process starts - so don't try to 3068 * remove them again. 3069 */ 3070 if (blob->active.num_pages > 0) { 3071 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 3072 } 3073 _spdk_blob_free(blob); 3074 } 3075 } 3076 3077 spdk_bs_sequence_finish(seq, bserrno); 3078 } 3079 3080 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg) 3081 { 3082 struct spdk_bs_cpl cpl; 3083 struct spdk_blob_data *blob; 3084 spdk_bs_sequence_t *seq; 3085 3086 assert(b != NULL); 3087 blob = __blob_to_data(b); 3088 assert(blob != NULL); 3089 3090 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 3091 3092 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3093 blob->state != SPDK_BLOB_STATE_SYNCING); 3094 3095 if (blob->open_ref == 0) { 3096 cb_fn(cb_arg, -EBADF); 3097 return; 3098 } 3099 3100 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 3101 cpl.u.blob_basic.cb_fn = cb_fn; 3102 cpl.u.blob_basic.cb_arg = cb_arg; 3103 3104 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 3105 if (!seq) { 3106 cb_fn(cb_arg, -ENOMEM); 3107 return; 3108 } 3109 3110 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 3111 _spdk_blob_close_cpl(seq, blob, 0); 3112 return; 3113 } 3114 3115 /* Sync metadata */ 3116 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 3117 } 3118 3119 /* END spdk_blob_close */ 3120 3121 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 3122 { 3123 return spdk_get_io_channel(bs); 3124 } 3125 3126 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 3127 { 3128 spdk_put_io_channel(channel); 3129 } 3130 3131 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 3132 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 3133 { 3134 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 3135 SPDK_BLOB_UNMAP); 3136 } 3137 3138 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 3139 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 3140 { 3141 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 3142 SPDK_BLOB_WRITE_ZEROES); 3143 } 3144 3145 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 3146 void *payload, uint64_t offset, uint64_t length, 3147 spdk_blob_op_complete cb_fn, void *cb_arg) 3148 { 3149 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 3150 SPDK_BLOB_WRITE); 3151 } 3152 3153 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 3154 void *payload, uint64_t offset, uint64_t length, 3155 spdk_blob_op_complete cb_fn, void *cb_arg) 3156 { 3157 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 3158 SPDK_BLOB_READ); 3159 } 3160 3161 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 3162 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 3163 spdk_blob_op_complete cb_fn, void *cb_arg) 3164 { 3165 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 3166 } 3167 3168 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 3169 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 3170 spdk_blob_op_complete cb_fn, void *cb_arg) 3171 { 3172 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 3173 } 3174 3175 struct spdk_bs_iter_ctx { 3176 int64_t page_num; 3177 struct spdk_blob_store *bs; 3178 3179 spdk_blob_op_with_handle_complete cb_fn; 3180 void *cb_arg; 3181 }; 3182 3183 static void 3184 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 3185 { 3186 struct spdk_bs_iter_ctx *ctx = cb_arg; 3187 struct spdk_blob_store *bs = ctx->bs; 3188 spdk_blob_id id; 3189 3190 if (bserrno == 0) { 3191 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 3192 free(ctx); 3193 return; 3194 } 3195 3196 ctx->page_num++; 3197 ctx->page_num = spdk_bit_array_find_first_set(bs->used_blobids, ctx->page_num); 3198 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_blobids)) { 3199 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 3200 free(ctx); 3201 return; 3202 } 3203 3204 id = _spdk_bs_page_to_blobid(ctx->page_num); 3205 3206 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 3207 } 3208 3209 void 3210 spdk_bs_iter_first(struct spdk_blob_store *bs, 3211 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3212 { 3213 struct spdk_bs_iter_ctx *ctx; 3214 3215 ctx = calloc(1, sizeof(*ctx)); 3216 if (!ctx) { 3217 cb_fn(cb_arg, NULL, -ENOMEM); 3218 return; 3219 } 3220 3221 ctx->page_num = -1; 3222 ctx->bs = bs; 3223 ctx->cb_fn = cb_fn; 3224 ctx->cb_arg = cb_arg; 3225 3226 _spdk_bs_iter_cpl(ctx, NULL, -1); 3227 } 3228 3229 static void 3230 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 3231 { 3232 struct spdk_bs_iter_ctx *ctx = cb_arg; 3233 3234 _spdk_bs_iter_cpl(ctx, NULL, -1); 3235 } 3236 3237 void 3238 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *b, 3239 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3240 { 3241 struct spdk_bs_iter_ctx *ctx; 3242 struct spdk_blob_data *blob; 3243 3244 assert(b != NULL); 3245 blob = __blob_to_data(b); 3246 assert(blob != NULL); 3247 3248 ctx = calloc(1, sizeof(*ctx)); 3249 if (!ctx) { 3250 cb_fn(cb_arg, NULL, -ENOMEM); 3251 return; 3252 } 3253 3254 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 3255 ctx->bs = bs; 3256 ctx->cb_fn = cb_fn; 3257 ctx->cb_arg = cb_arg; 3258 3259 /* Close the existing blob */ 3260 spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx); 3261 } 3262 3263 int 3264 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value, 3265 uint16_t value_len) 3266 { 3267 struct spdk_blob_data *blob = __blob_to_data(_blob); 3268 struct spdk_xattr *xattr; 3269 3270 assert(blob != NULL); 3271 3272 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3273 blob->state != SPDK_BLOB_STATE_SYNCING); 3274 3275 if (blob->md_ro) { 3276 return -EPERM; 3277 } 3278 3279 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3280 if (!strcmp(name, xattr->name)) { 3281 free(xattr->value); 3282 xattr->value_len = value_len; 3283 xattr->value = malloc(value_len); 3284 memcpy(xattr->value, value, value_len); 3285 3286 blob->state = SPDK_BLOB_STATE_DIRTY; 3287 3288 return 0; 3289 } 3290 } 3291 3292 xattr = calloc(1, sizeof(*xattr)); 3293 if (!xattr) { 3294 return -1; 3295 } 3296 xattr->name = strdup(name); 3297 xattr->value_len = value_len; 3298 xattr->value = malloc(value_len); 3299 memcpy(xattr->value, value, value_len); 3300 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 3301 3302 blob->state = SPDK_BLOB_STATE_DIRTY; 3303 3304 return 0; 3305 } 3306 3307 int 3308 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name) 3309 { 3310 struct spdk_blob_data *blob = __blob_to_data(_blob); 3311 struct spdk_xattr *xattr; 3312 3313 assert(blob != NULL); 3314 3315 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3316 blob->state != SPDK_BLOB_STATE_SYNCING); 3317 3318 if (blob->md_ro) { 3319 return -EPERM; 3320 } 3321 3322 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3323 if (!strcmp(name, xattr->name)) { 3324 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3325 free(xattr->value); 3326 free(xattr->name); 3327 free(xattr); 3328 3329 blob->state = SPDK_BLOB_STATE_DIRTY; 3330 3331 return 0; 3332 } 3333 } 3334 3335 return -ENOENT; 3336 } 3337 3338 int 3339 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name, 3340 const void **value, size_t *value_len) 3341 { 3342 struct spdk_blob_data *blob = __blob_to_data(_blob); 3343 struct spdk_xattr *xattr; 3344 3345 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3346 if (!strcmp(name, xattr->name)) { 3347 *value = xattr->value; 3348 *value_len = xattr->value_len; 3349 return 0; 3350 } 3351 } 3352 3353 return -ENOENT; 3354 } 3355 3356 struct spdk_xattr_names { 3357 uint32_t count; 3358 const char *names[0]; 3359 }; 3360 3361 int 3362 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names) 3363 { 3364 struct spdk_blob_data *blob = __blob_to_data(_blob); 3365 struct spdk_xattr *xattr; 3366 int count = 0; 3367 3368 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3369 count++; 3370 } 3371 3372 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3373 if (*names == NULL) { 3374 return -ENOMEM; 3375 } 3376 3377 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3378 (*names)->names[(*names)->count++] = xattr->name; 3379 } 3380 3381 return 0; 3382 } 3383 3384 uint32_t 3385 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3386 { 3387 assert(names != NULL); 3388 3389 return names->count; 3390 } 3391 3392 const char * 3393 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3394 { 3395 if (index >= names->count) { 3396 return NULL; 3397 } 3398 3399 return names->names[index]; 3400 } 3401 3402 void 3403 spdk_xattr_names_free(struct spdk_xattr_names *names) 3404 { 3405 free(names); 3406 } 3407 3408 struct spdk_bs_type 3409 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3410 { 3411 return bs->bstype; 3412 } 3413 3414 void 3415 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3416 { 3417 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3418 } 3419 3420 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 3421