1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static inline size_t 51 divide_round_up(size_t num, size_t divisor) 52 { 53 return (num + divisor - 1) / divisor; 54 } 55 56 static void 57 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 58 { 59 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 60 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 61 assert(bs->num_free_clusters > 0); 62 63 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 64 65 spdk_bit_array_set(bs->used_clusters, cluster_num); 66 bs->num_free_clusters--; 67 } 68 69 static void 70 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 71 { 72 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 73 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 74 assert(bs->num_free_clusters < bs->total_clusters); 75 76 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 77 78 spdk_bit_array_clear(bs->used_clusters, cluster_num); 79 bs->num_free_clusters++; 80 } 81 82 static struct spdk_blob * 83 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 84 { 85 struct spdk_blob *blob; 86 87 blob = calloc(1, sizeof(*blob)); 88 if (!blob) { 89 return NULL; 90 } 91 92 blob->id = id; 93 blob->bs = bs; 94 95 blob->state = SPDK_BLOB_STATE_DIRTY; 96 blob->active.num_pages = 1; 97 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 98 if (!blob->active.pages) { 99 free(blob); 100 return NULL; 101 } 102 103 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 104 105 TAILQ_INIT(&blob->xattrs); 106 107 return blob; 108 } 109 110 static void 111 _spdk_blob_free(struct spdk_blob *blob) 112 { 113 struct spdk_xattr *xattr, *xattr_tmp; 114 115 assert(blob != NULL); 116 117 free(blob->active.clusters); 118 free(blob->clean.clusters); 119 free(blob->active.pages); 120 free(blob->clean.pages); 121 122 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 123 TAILQ_REMOVE(&blob->xattrs, xattr, link); 124 free(xattr->name); 125 free(xattr->value); 126 free(xattr); 127 } 128 129 free(blob); 130 } 131 132 static int 133 _spdk_blob_mark_clean(struct spdk_blob *blob) 134 { 135 uint64_t *clusters = NULL; 136 uint32_t *pages = NULL; 137 138 assert(blob != NULL); 139 assert(blob->state == SPDK_BLOB_STATE_LOADING || 140 blob->state == SPDK_BLOB_STATE_SYNCING); 141 142 if (blob->active.num_clusters) { 143 assert(blob->active.clusters); 144 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 145 if (!clusters) { 146 return -1; 147 } 148 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 149 } 150 151 if (blob->active.num_pages) { 152 assert(blob->active.pages); 153 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 154 if (!pages) { 155 free(clusters); 156 return -1; 157 } 158 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 159 } 160 161 free(blob->clean.clusters); 162 free(blob->clean.pages); 163 164 blob->clean.num_clusters = blob->active.num_clusters; 165 blob->clean.clusters = blob->active.clusters; 166 blob->clean.num_pages = blob->active.num_pages; 167 blob->clean.pages = blob->active.pages; 168 169 blob->active.clusters = clusters; 170 blob->active.pages = pages; 171 172 blob->state = SPDK_BLOB_STATE_CLEAN; 173 174 return 0; 175 } 176 177 static int 178 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 179 { 180 struct spdk_blob_md_descriptor *desc; 181 size_t cur_desc = 0; 182 void *tmp; 183 184 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 185 while (cur_desc < sizeof(page->descriptors)) { 186 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 187 if (desc->length == 0) { 188 /* If padding and length are 0, this terminates the page */ 189 break; 190 } 191 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 192 struct spdk_blob_md_descriptor_flags *desc_flags; 193 194 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 195 196 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 197 return -EINVAL; 198 } 199 200 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 201 SPDK_BLOB_INVALID_FLAGS_MASK) { 202 return -EINVAL; 203 } 204 205 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 206 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 207 blob->data_ro = true; 208 blob->md_ro = true; 209 } 210 211 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 212 SPDK_BLOB_MD_RO_FLAGS_MASK) { 213 blob->md_ro = true; 214 } 215 216 blob->invalid_flags = desc_flags->invalid_flags; 217 blob->data_ro_flags = desc_flags->data_ro_flags; 218 blob->md_ro_flags = desc_flags->md_ro_flags; 219 220 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 221 struct spdk_blob_md_descriptor_extent *desc_extent; 222 unsigned int i, j; 223 unsigned int cluster_count = blob->active.num_clusters; 224 225 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 226 227 if (desc_extent->length == 0 || 228 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) { 229 return -EINVAL; 230 } 231 232 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 233 for (j = 0; j < desc_extent->extents[i].length; j++) { 234 if (!spdk_bit_array_get(blob->bs->used_clusters, 235 desc_extent->extents[i].cluster_idx + j)) { 236 return -EINVAL; 237 } 238 cluster_count++; 239 } 240 } 241 242 if (cluster_count == 0) { 243 return -EINVAL; 244 } 245 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 246 if (tmp == NULL) { 247 return -ENOMEM; 248 } 249 blob->active.clusters = tmp; 250 blob->active.cluster_array_size = cluster_count; 251 252 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 253 for (j = 0; j < desc_extent->extents[i].length; j++) { 254 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 255 desc_extent->extents[i].cluster_idx + j); 256 } 257 } 258 259 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 260 struct spdk_blob_md_descriptor_xattr *desc_xattr; 261 struct spdk_xattr *xattr; 262 263 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 264 265 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 266 sizeof(desc_xattr->value_length) + 267 desc_xattr->name_length + desc_xattr->value_length) { 268 return -EINVAL; 269 } 270 271 xattr = calloc(1, sizeof(*xattr)); 272 if (xattr == NULL) { 273 return -ENOMEM; 274 } 275 276 xattr->name = malloc(desc_xattr->name_length + 1); 277 if (xattr->name == NULL) { 278 free(xattr); 279 return -ENOMEM; 280 } 281 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 282 xattr->name[desc_xattr->name_length] = '\0'; 283 284 xattr->value = malloc(desc_xattr->value_length); 285 if (xattr->value == NULL) { 286 free(xattr->name); 287 free(xattr); 288 return -ENOMEM; 289 } 290 xattr->value_len = desc_xattr->value_length; 291 memcpy(xattr->value, 292 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 293 desc_xattr->value_length); 294 295 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 296 } else { 297 /* Unrecognized descriptor type. Do not fail - just continue to the 298 * next descriptor. If this descriptor is associated with some feature 299 * defined in a newer version of blobstore, that version of blobstore 300 * should create and set an associated feature flag to specify if this 301 * blob can be loaded or not. 302 */ 303 } 304 305 /* Advance to the next descriptor */ 306 cur_desc += sizeof(*desc) + desc->length; 307 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 308 break; 309 } 310 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 311 } 312 313 return 0; 314 } 315 316 static int 317 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 318 struct spdk_blob *blob) 319 { 320 const struct spdk_blob_md_page *page; 321 uint32_t i; 322 int rc; 323 324 assert(page_count > 0); 325 assert(pages[0].sequence_num == 0); 326 assert(blob != NULL); 327 assert(blob->state == SPDK_BLOB_STATE_LOADING); 328 assert(blob->active.clusters == NULL); 329 assert(blob->state == SPDK_BLOB_STATE_LOADING); 330 331 /* The blobid provided doesn't match what's in the MD, this can 332 * happen for example if a bogus blobid is passed in through open. 333 */ 334 if (blob->id != pages[0].id) { 335 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 336 blob->id, pages[0].id); 337 return -ENOENT; 338 } 339 340 for (i = 0; i < page_count; i++) { 341 page = &pages[i]; 342 343 assert(page->id == blob->id); 344 assert(page->sequence_num == i); 345 346 rc = _spdk_blob_parse_page(page, blob); 347 if (rc != 0) { 348 return rc; 349 } 350 } 351 352 return 0; 353 } 354 355 static int 356 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 357 struct spdk_blob_md_page **pages, 358 uint32_t *page_count, 359 struct spdk_blob_md_page **last_page) 360 { 361 struct spdk_blob_md_page *page; 362 363 assert(pages != NULL); 364 assert(page_count != NULL); 365 366 if (*page_count == 0) { 367 assert(*pages == NULL); 368 *page_count = 1; 369 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 370 SPDK_BS_PAGE_SIZE, 371 NULL); 372 } else { 373 assert(*pages != NULL); 374 (*page_count)++; 375 *pages = spdk_dma_realloc(*pages, 376 SPDK_BS_PAGE_SIZE * (*page_count), 377 SPDK_BS_PAGE_SIZE, 378 NULL); 379 } 380 381 if (*pages == NULL) { 382 *page_count = 0; 383 *last_page = NULL; 384 return -ENOMEM; 385 } 386 387 page = &(*pages)[*page_count - 1]; 388 memset(page, 0, sizeof(*page)); 389 page->id = blob->id; 390 page->sequence_num = *page_count - 1; 391 page->next = SPDK_INVALID_MD_PAGE; 392 *last_page = page; 393 394 return 0; 395 } 396 397 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 398 * Update required_sz on both success and failure. 399 * 400 */ 401 static int 402 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 403 uint8_t *buf, size_t buf_sz, 404 size_t *required_sz) 405 { 406 struct spdk_blob_md_descriptor_xattr *desc; 407 408 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 409 strlen(xattr->name) + 410 xattr->value_len; 411 412 if (buf_sz < *required_sz) { 413 return -1; 414 } 415 416 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 417 418 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 419 desc->length = sizeof(desc->name_length) + 420 sizeof(desc->value_length) + 421 strlen(xattr->name) + 422 xattr->value_len; 423 desc->name_length = strlen(xattr->name); 424 desc->value_length = xattr->value_len; 425 426 memcpy(desc->name, xattr->name, desc->name_length); 427 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 428 xattr->value, 429 desc->value_length); 430 431 return 0; 432 } 433 434 static void 435 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 436 uint64_t start_cluster, uint64_t *next_cluster, 437 uint8_t *buf, size_t buf_sz) 438 { 439 struct spdk_blob_md_descriptor_extent *desc; 440 size_t cur_sz; 441 uint64_t i, extent_idx; 442 uint32_t lba, lba_per_cluster, lba_count; 443 444 /* The buffer must have room for at least one extent */ 445 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 446 if (buf_sz < cur_sz) { 447 *next_cluster = start_cluster; 448 return; 449 } 450 451 desc = (struct spdk_blob_md_descriptor_extent *)buf; 452 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 453 454 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 455 456 lba = blob->active.clusters[start_cluster]; 457 lba_count = lba_per_cluster; 458 extent_idx = 0; 459 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 460 if ((lba + lba_count) == blob->active.clusters[i]) { 461 lba_count += lba_per_cluster; 462 continue; 463 } 464 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 465 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 466 extent_idx++; 467 468 cur_sz += sizeof(desc->extents[extent_idx]); 469 470 if (buf_sz < cur_sz) { 471 /* If we ran out of buffer space, return */ 472 desc->length = sizeof(desc->extents[0]) * extent_idx; 473 *next_cluster = i; 474 return; 475 } 476 477 lba = blob->active.clusters[i]; 478 lba_count = lba_per_cluster; 479 } 480 481 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 482 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 483 extent_idx++; 484 485 desc->length = sizeof(desc->extents[0]) * extent_idx; 486 *next_cluster = blob->active.num_clusters; 487 488 return; 489 } 490 491 static void 492 _spdk_blob_serialize_flags(const struct spdk_blob *blob, 493 uint8_t *buf, size_t *buf_sz) 494 { 495 struct spdk_blob_md_descriptor_flags *desc; 496 497 /* 498 * Flags get serialized first, so we should always have room for the flags 499 * descriptor. 500 */ 501 assert(*buf_sz >= sizeof(*desc)); 502 503 desc = (struct spdk_blob_md_descriptor_flags *)buf; 504 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 505 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 506 desc->invalid_flags = blob->invalid_flags; 507 desc->data_ro_flags = blob->data_ro_flags; 508 desc->md_ro_flags = blob->md_ro_flags; 509 510 *buf_sz -= sizeof(*desc); 511 } 512 513 static int 514 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 515 uint32_t *page_count) 516 { 517 struct spdk_blob_md_page *cur_page; 518 const struct spdk_xattr *xattr; 519 int rc; 520 uint8_t *buf; 521 size_t remaining_sz; 522 uint64_t last_cluster; 523 524 assert(pages != NULL); 525 assert(page_count != NULL); 526 assert(blob != NULL); 527 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 528 529 *pages = NULL; 530 *page_count = 0; 531 532 /* A blob always has at least 1 page, even if it has no descriptors */ 533 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 534 if (rc < 0) { 535 return rc; 536 } 537 538 buf = (uint8_t *)cur_page->descriptors; 539 remaining_sz = sizeof(cur_page->descriptors); 540 541 /* Serialize flags */ 542 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 543 544 /* Serialize xattrs */ 545 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 546 size_t required_sz = 0; 547 rc = _spdk_blob_serialize_xattr(xattr, 548 buf, remaining_sz, 549 &required_sz); 550 if (rc < 0) { 551 /* Need to add a new page to the chain */ 552 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 553 &cur_page); 554 if (rc < 0) { 555 spdk_dma_free(*pages); 556 *pages = NULL; 557 *page_count = 0; 558 return rc; 559 } 560 561 buf = (uint8_t *)cur_page->descriptors; 562 remaining_sz = sizeof(cur_page->descriptors); 563 564 /* Try again */ 565 required_sz = 0; 566 rc = _spdk_blob_serialize_xattr(xattr, 567 buf, remaining_sz, 568 &required_sz); 569 570 if (rc < 0) { 571 spdk_dma_free(*pages); 572 *pages = NULL; 573 *page_count = 0; 574 return -1; 575 } 576 } 577 578 remaining_sz -= required_sz; 579 buf += required_sz; 580 } 581 582 /* Serialize extents */ 583 last_cluster = 0; 584 while (last_cluster < blob->active.num_clusters) { 585 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 586 buf, remaining_sz); 587 588 if (last_cluster == blob->active.num_clusters) { 589 break; 590 } 591 592 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 593 &cur_page); 594 if (rc < 0) { 595 return rc; 596 } 597 598 buf = (uint8_t *)cur_page->descriptors; 599 remaining_sz = sizeof(cur_page->descriptors); 600 } 601 602 return 0; 603 } 604 605 struct spdk_blob_load_ctx { 606 struct spdk_blob *blob; 607 608 struct spdk_blob_md_page *pages; 609 uint32_t num_pages; 610 611 spdk_bs_sequence_cpl cb_fn; 612 void *cb_arg; 613 }; 614 615 static uint32_t 616 _spdk_blob_md_page_calc_crc(void *page) 617 { 618 uint32_t crc; 619 620 crc = BLOB_CRC32C_INITIAL; 621 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 622 crc ^= BLOB_CRC32C_INITIAL; 623 624 return crc; 625 626 } 627 628 static void 629 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 630 { 631 struct spdk_blob_load_ctx *ctx = cb_arg; 632 struct spdk_blob *blob = ctx->blob; 633 struct spdk_blob_md_page *page; 634 int rc; 635 uint32_t crc; 636 637 page = &ctx->pages[ctx->num_pages - 1]; 638 crc = _spdk_blob_md_page_calc_crc(page); 639 if (crc != page->crc) { 640 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 641 _spdk_blob_free(blob); 642 ctx->cb_fn(seq, NULL, -EINVAL); 643 spdk_dma_free(ctx->pages); 644 free(ctx); 645 return; 646 } 647 648 if (page->next != SPDK_INVALID_MD_PAGE) { 649 uint32_t next_page = page->next; 650 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 651 652 653 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 654 655 /* Read the next page */ 656 ctx->num_pages++; 657 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 658 sizeof(*page), NULL); 659 if (ctx->pages == NULL) { 660 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 661 free(ctx); 662 return; 663 } 664 665 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 666 next_lba, 667 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 668 _spdk_blob_load_cpl, ctx); 669 return; 670 } 671 672 /* Parse the pages */ 673 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 674 if (rc) { 675 _spdk_blob_free(blob); 676 ctx->cb_fn(seq, NULL, rc); 677 spdk_dma_free(ctx->pages); 678 free(ctx); 679 return; 680 } 681 682 _spdk_blob_mark_clean(blob); 683 684 ctx->cb_fn(seq, ctx->cb_arg, rc); 685 686 /* Free the memory */ 687 spdk_dma_free(ctx->pages); 688 free(ctx); 689 } 690 691 /* Load a blob from disk given a blobid */ 692 static void 693 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 694 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 695 { 696 struct spdk_blob_load_ctx *ctx; 697 struct spdk_blob_store *bs; 698 uint32_t page_num; 699 uint64_t lba; 700 701 assert(blob != NULL); 702 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 703 blob->state == SPDK_BLOB_STATE_DIRTY); 704 705 bs = blob->bs; 706 707 ctx = calloc(1, sizeof(*ctx)); 708 if (!ctx) { 709 cb_fn(seq, cb_arg, -ENOMEM); 710 return; 711 } 712 713 ctx->blob = blob; 714 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 715 SPDK_BS_PAGE_SIZE, NULL); 716 if (!ctx->pages) { 717 free(ctx); 718 cb_fn(seq, cb_arg, -ENOMEM); 719 return; 720 } 721 ctx->num_pages = 1; 722 ctx->cb_fn = cb_fn; 723 ctx->cb_arg = cb_arg; 724 725 page_num = _spdk_bs_blobid_to_page(blob->id); 726 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 727 728 blob->state = SPDK_BLOB_STATE_LOADING; 729 730 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 731 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 732 _spdk_blob_load_cpl, ctx); 733 } 734 735 struct spdk_blob_persist_ctx { 736 struct spdk_blob *blob; 737 738 struct spdk_blob_md_page *pages; 739 740 uint64_t idx; 741 742 spdk_bs_sequence_cpl cb_fn; 743 void *cb_arg; 744 }; 745 746 static void 747 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 748 { 749 struct spdk_blob_persist_ctx *ctx = cb_arg; 750 struct spdk_blob *blob = ctx->blob; 751 752 if (bserrno == 0) { 753 _spdk_blob_mark_clean(blob); 754 } 755 756 /* Call user callback */ 757 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 758 759 /* Free the memory */ 760 spdk_dma_free(ctx->pages); 761 free(ctx); 762 } 763 764 static void 765 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 766 { 767 struct spdk_blob_persist_ctx *ctx = cb_arg; 768 struct spdk_blob *blob = ctx->blob; 769 struct spdk_blob_store *bs = blob->bs; 770 void *tmp; 771 size_t i; 772 773 /* Release all clusters that were truncated */ 774 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 775 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 776 777 _spdk_bs_release_cluster(bs, cluster_num); 778 } 779 780 if (blob->active.num_clusters == 0) { 781 free(blob->active.clusters); 782 blob->active.clusters = NULL; 783 blob->active.cluster_array_size = 0; 784 } else { 785 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 786 assert(tmp != NULL); 787 blob->active.clusters = tmp; 788 blob->active.cluster_array_size = blob->active.num_clusters; 789 } 790 791 _spdk_blob_persist_complete(seq, ctx, bserrno); 792 } 793 794 static void 795 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 796 { 797 struct spdk_blob_persist_ctx *ctx = cb_arg; 798 struct spdk_blob *blob = ctx->blob; 799 struct spdk_blob_store *bs = blob->bs; 800 spdk_bs_batch_t *batch; 801 size_t i; 802 uint64_t lba; 803 uint32_t lba_count; 804 805 /* Clusters don't move around in blobs. The list shrinks or grows 806 * at the end, but no changes ever occur in the middle of the list. 807 */ 808 809 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 810 811 /* Unmap all clusters that were truncated */ 812 lba = 0; 813 lba_count = 0; 814 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 815 uint64_t next_lba = blob->active.clusters[i]; 816 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 817 818 if ((lba + lba_count) == next_lba) { 819 /* This cluster is contiguous with the previous one. */ 820 lba_count += next_lba_count; 821 continue; 822 } 823 824 /* This cluster is not contiguous with the previous one. */ 825 826 /* If a run of LBAs previously existing, send them 827 * as an unmap. 828 */ 829 if (lba_count > 0) { 830 spdk_bs_batch_unmap(batch, lba, lba_count); 831 } 832 833 /* Start building the next batch */ 834 lba = next_lba; 835 lba_count = next_lba_count; 836 } 837 838 /* If we ended with a contiguous set of LBAs, send the unmap now */ 839 if (lba_count > 0) { 840 spdk_bs_batch_unmap(batch, lba, lba_count); 841 } 842 843 spdk_bs_batch_close(batch); 844 } 845 846 static void 847 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 848 { 849 struct spdk_blob_persist_ctx *ctx = cb_arg; 850 struct spdk_blob *blob = ctx->blob; 851 struct spdk_blob_store *bs = blob->bs; 852 size_t i; 853 854 /* This loop starts at 1 because the first page is special and handled 855 * below. The pages (except the first) are never written in place, 856 * so any pages in the clean list must be zeroed. 857 */ 858 for (i = 1; i < blob->clean.num_pages; i++) { 859 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 860 } 861 862 if (blob->active.num_pages == 0) { 863 uint32_t page_num; 864 865 page_num = _spdk_bs_blobid_to_page(blob->id); 866 spdk_bit_array_clear(bs->used_md_pages, page_num); 867 } 868 869 /* Move on to unmapping clusters */ 870 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 871 } 872 873 static void 874 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 875 { 876 struct spdk_blob_persist_ctx *ctx = cb_arg; 877 struct spdk_blob *blob = ctx->blob; 878 struct spdk_blob_store *bs = blob->bs; 879 uint64_t lba; 880 uint32_t lba_count; 881 spdk_bs_batch_t *batch; 882 size_t i; 883 884 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 885 886 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 887 888 /* This loop starts at 1 because the first page is special and handled 889 * below. The pages (except the first) are never written in place, 890 * so any pages in the clean list must be zeroed. 891 */ 892 for (i = 1; i < blob->clean.num_pages; i++) { 893 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 894 895 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 896 } 897 898 /* The first page will only be zeroed if this is a delete. */ 899 if (blob->active.num_pages == 0) { 900 uint32_t page_num; 901 902 /* The first page in the metadata goes where the blobid indicates */ 903 page_num = _spdk_bs_blobid_to_page(blob->id); 904 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 905 906 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 907 } 908 909 spdk_bs_batch_close(batch); 910 } 911 912 static void 913 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 914 { 915 struct spdk_blob_persist_ctx *ctx = cb_arg; 916 struct spdk_blob *blob = ctx->blob; 917 struct spdk_blob_store *bs = blob->bs; 918 uint64_t lba; 919 uint32_t lba_count; 920 struct spdk_blob_md_page *page; 921 922 if (blob->active.num_pages == 0) { 923 /* Move on to the next step */ 924 _spdk_blob_persist_zero_pages(seq, ctx, 0); 925 return; 926 } 927 928 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 929 930 page = &ctx->pages[0]; 931 /* The first page in the metadata goes where the blobid indicates */ 932 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 933 934 spdk_bs_sequence_write(seq, page, lba, lba_count, 935 _spdk_blob_persist_zero_pages, ctx); 936 } 937 938 static void 939 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 940 { 941 struct spdk_blob_persist_ctx *ctx = cb_arg; 942 struct spdk_blob *blob = ctx->blob; 943 struct spdk_blob_store *bs = blob->bs; 944 uint64_t lba; 945 uint32_t lba_count; 946 struct spdk_blob_md_page *page; 947 spdk_bs_batch_t *batch; 948 size_t i; 949 950 /* Clusters don't move around in blobs. The list shrinks or grows 951 * at the end, but no changes ever occur in the middle of the list. 952 */ 953 954 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 955 956 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 957 958 /* This starts at 1. The root page is not written until 959 * all of the others are finished 960 */ 961 for (i = 1; i < blob->active.num_pages; i++) { 962 page = &ctx->pages[i]; 963 assert(page->sequence_num == i); 964 965 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 966 967 spdk_bs_batch_write(batch, page, lba, lba_count); 968 } 969 970 spdk_bs_batch_close(batch); 971 } 972 973 static int 974 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 975 { 976 uint64_t i; 977 uint64_t *tmp; 978 uint64_t lfc; /* lowest free cluster */ 979 struct spdk_blob_store *bs; 980 981 bs = blob->bs; 982 983 assert(blob->state != SPDK_BLOB_STATE_LOADING && 984 blob->state != SPDK_BLOB_STATE_SYNCING); 985 986 if (blob->active.num_clusters == sz) { 987 return 0; 988 } 989 990 if (blob->active.num_clusters < blob->active.cluster_array_size) { 991 /* If this blob was resized to be larger, then smaller, then 992 * larger without syncing, then the cluster array already 993 * contains spare assigned clusters we can use. 994 */ 995 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 996 sz); 997 } 998 999 blob->state = SPDK_BLOB_STATE_DIRTY; 1000 1001 /* Do two passes - one to verify that we can obtain enough clusters 1002 * and another to actually claim them. 1003 */ 1004 1005 lfc = 0; 1006 for (i = blob->active.num_clusters; i < sz; i++) { 1007 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1008 if (lfc >= bs->total_clusters) { 1009 /* No more free clusters. Cannot satisfy the request */ 1010 assert(false); 1011 return -1; 1012 } 1013 lfc++; 1014 } 1015 1016 if (sz > blob->active.num_clusters) { 1017 /* Expand the cluster array if necessary. 1018 * We only shrink the array when persisting. 1019 */ 1020 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 1021 if (sz > 0 && tmp == NULL) { 1022 assert(false); 1023 return -1; 1024 } 1025 blob->active.clusters = tmp; 1026 blob->active.cluster_array_size = sz; 1027 } 1028 1029 lfc = 0; 1030 for (i = blob->active.num_clusters; i < sz; i++) { 1031 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1032 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 1033 _spdk_bs_claim_cluster(bs, lfc); 1034 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 1035 lfc++; 1036 } 1037 1038 blob->active.num_clusters = sz; 1039 1040 return 0; 1041 } 1042 1043 /* Write a blob to disk */ 1044 static void 1045 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 1046 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1047 { 1048 struct spdk_blob_persist_ctx *ctx; 1049 int rc; 1050 uint64_t i; 1051 uint32_t page_num; 1052 struct spdk_blob_store *bs; 1053 1054 assert(blob != NULL); 1055 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 1056 blob->state == SPDK_BLOB_STATE_DIRTY); 1057 1058 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1059 cb_fn(seq, cb_arg, 0); 1060 return; 1061 } 1062 1063 bs = blob->bs; 1064 1065 ctx = calloc(1, sizeof(*ctx)); 1066 if (!ctx) { 1067 cb_fn(seq, cb_arg, -ENOMEM); 1068 return; 1069 } 1070 ctx->blob = blob; 1071 ctx->cb_fn = cb_fn; 1072 ctx->cb_arg = cb_arg; 1073 1074 blob->state = SPDK_BLOB_STATE_SYNCING; 1075 1076 if (blob->active.num_pages == 0) { 1077 /* This is the signal that the blob should be deleted. 1078 * Immediately jump to the clean up routine. */ 1079 assert(blob->clean.num_pages > 0); 1080 ctx->idx = blob->clean.num_pages - 1; 1081 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1082 return; 1083 1084 } 1085 1086 /* Generate the new metadata */ 1087 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1088 if (rc < 0) { 1089 free(ctx); 1090 cb_fn(seq, cb_arg, rc); 1091 return; 1092 } 1093 1094 assert(blob->active.num_pages >= 1); 1095 1096 /* Resize the cache of page indices */ 1097 blob->active.pages = realloc(blob->active.pages, 1098 blob->active.num_pages * sizeof(*blob->active.pages)); 1099 if (!blob->active.pages) { 1100 free(ctx); 1101 cb_fn(seq, cb_arg, -ENOMEM); 1102 return; 1103 } 1104 1105 /* Assign this metadata to pages. This requires two passes - 1106 * one to verify that there are enough pages and a second 1107 * to actually claim them. */ 1108 page_num = 0; 1109 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1110 for (i = 1; i < blob->active.num_pages; i++) { 1111 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1112 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1113 spdk_dma_free(ctx->pages); 1114 free(ctx); 1115 blob->state = SPDK_BLOB_STATE_DIRTY; 1116 cb_fn(seq, cb_arg, -ENOMEM); 1117 return; 1118 } 1119 page_num++; 1120 } 1121 1122 page_num = 0; 1123 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1124 for (i = 1; i < blob->active.num_pages; i++) { 1125 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1126 ctx->pages[i - 1].next = page_num; 1127 /* Now that previous metadata page is complete, calculate the crc for it. */ 1128 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1129 blob->active.pages[i] = page_num; 1130 spdk_bit_array_set(bs->used_md_pages, page_num); 1131 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1132 page_num++; 1133 } 1134 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1135 /* Start writing the metadata from last page to first */ 1136 ctx->idx = blob->active.num_pages - 1; 1137 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1138 } 1139 1140 static void 1141 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1142 void *payload, uint64_t offset, uint64_t length, 1143 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1144 { 1145 spdk_bs_batch_t *batch; 1146 struct spdk_bs_cpl cpl; 1147 uint64_t lba; 1148 uint32_t lba_count; 1149 uint8_t *buf; 1150 uint64_t page; 1151 1152 assert(blob != NULL); 1153 1154 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1155 cb_fn(cb_arg, -EPERM); 1156 return; 1157 } 1158 1159 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1160 cb_fn(cb_arg, -EINVAL); 1161 return; 1162 } 1163 1164 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1165 cpl.u.blob_basic.cb_fn = cb_fn; 1166 cpl.u.blob_basic.cb_arg = cb_arg; 1167 1168 batch = spdk_bs_batch_open(_channel, &cpl); 1169 if (!batch) { 1170 cb_fn(cb_arg, -ENOMEM); 1171 return; 1172 } 1173 1174 length = _spdk_bs_page_to_lba(blob->bs, length); 1175 page = offset; 1176 buf = payload; 1177 while (length > 0) { 1178 lba = _spdk_bs_blob_page_to_lba(blob, page); 1179 lba_count = spdk_min(length, 1180 _spdk_bs_page_to_lba(blob->bs, 1181 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1182 1183 switch (op_type) { 1184 case SPDK_BLOB_READ: 1185 spdk_bs_batch_read(batch, buf, lba, lba_count); 1186 break; 1187 case SPDK_BLOB_WRITE: 1188 spdk_bs_batch_write(batch, buf, lba, lba_count); 1189 break; 1190 case SPDK_BLOB_UNMAP: 1191 spdk_bs_batch_unmap(batch, lba, lba_count); 1192 break; 1193 case SPDK_BLOB_WRITE_ZEROES: 1194 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1195 break; 1196 } 1197 1198 length -= lba_count; 1199 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1200 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1201 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1202 } 1203 } 1204 1205 spdk_bs_batch_close(batch); 1206 } 1207 1208 struct rw_iov_ctx { 1209 struct spdk_blob *blob; 1210 bool read; 1211 int iovcnt; 1212 struct iovec *orig_iov; 1213 uint64_t page_offset; 1214 uint64_t pages_remaining; 1215 uint64_t pages_done; 1216 struct iovec iov[0]; 1217 }; 1218 1219 static void 1220 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1221 { 1222 assert(cb_arg == NULL); 1223 spdk_bs_sequence_finish(seq, bserrno); 1224 } 1225 1226 static void 1227 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1228 { 1229 struct rw_iov_ctx *ctx = cb_arg; 1230 struct iovec *iov, *orig_iov; 1231 int iovcnt; 1232 size_t orig_iovoff; 1233 uint64_t lba; 1234 uint64_t page_count, pages_to_boundary; 1235 uint32_t lba_count; 1236 uint64_t byte_count; 1237 1238 if (bserrno != 0 || ctx->pages_remaining == 0) { 1239 free(ctx); 1240 spdk_bs_sequence_finish(seq, bserrno); 1241 return; 1242 } 1243 1244 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1245 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1246 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1247 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1248 1249 /* 1250 * Get index and offset into the original iov array for our current position in the I/O sequence. 1251 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1252 * point to the current position in the I/O sequence. 1253 */ 1254 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1255 orig_iov = &ctx->orig_iov[0]; 1256 orig_iovoff = 0; 1257 while (byte_count > 0) { 1258 if (byte_count >= orig_iov->iov_len) { 1259 byte_count -= orig_iov->iov_len; 1260 orig_iov++; 1261 } else { 1262 orig_iovoff = byte_count; 1263 byte_count = 0; 1264 } 1265 } 1266 1267 /* 1268 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1269 * bytes of this next I/O remain to be accounted for in the new iov array. 1270 */ 1271 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1272 iov = &ctx->iov[0]; 1273 iovcnt = 0; 1274 while (byte_count > 0) { 1275 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1276 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1277 byte_count -= iov->iov_len; 1278 orig_iovoff = 0; 1279 orig_iov++; 1280 iov++; 1281 iovcnt++; 1282 } 1283 1284 ctx->page_offset += page_count; 1285 ctx->pages_done += page_count; 1286 ctx->pages_remaining -= page_count; 1287 iov = &ctx->iov[0]; 1288 1289 if (ctx->read) { 1290 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1291 } else { 1292 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1293 } 1294 } 1295 1296 static void 1297 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1298 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1299 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1300 { 1301 spdk_bs_sequence_t *seq; 1302 struct spdk_bs_cpl cpl; 1303 1304 assert(blob != NULL); 1305 1306 if (length == 0) { 1307 cb_fn(cb_arg, 0); 1308 return; 1309 } 1310 1311 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1312 cb_fn(cb_arg, -EINVAL); 1313 return; 1314 } 1315 1316 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1317 cpl.u.blob_basic.cb_fn = cb_fn; 1318 cpl.u.blob_basic.cb_arg = cb_arg; 1319 1320 /* 1321 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1322 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1323 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1324 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1325 * to allocate a separate iov array and split the I/O such that none of the resulting 1326 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1327 * but since this case happens very infrequently, any performance impact will be negligible. 1328 * 1329 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1330 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1331 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1332 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1333 */ 1334 seq = spdk_bs_sequence_start(_channel, &cpl); 1335 if (!seq) { 1336 cb_fn(cb_arg, -ENOMEM); 1337 return; 1338 } 1339 1340 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1341 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1342 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1343 1344 if (read) { 1345 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1346 } else { 1347 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1348 } 1349 } else { 1350 struct rw_iov_ctx *ctx; 1351 1352 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1353 if (ctx == NULL) { 1354 spdk_bs_sequence_finish(seq, -ENOMEM); 1355 return; 1356 } 1357 1358 ctx->blob = blob; 1359 ctx->read = read; 1360 ctx->orig_iov = iov; 1361 ctx->iovcnt = iovcnt; 1362 ctx->page_offset = offset; 1363 ctx->pages_remaining = length; 1364 ctx->pages_done = 0; 1365 1366 _spdk_rw_iov_split_next(seq, ctx, 0); 1367 } 1368 } 1369 1370 static struct spdk_blob * 1371 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1372 { 1373 struct spdk_blob *blob; 1374 1375 TAILQ_FOREACH(blob, &bs->blobs, link) { 1376 if (blob->id == blobid) { 1377 return blob; 1378 } 1379 } 1380 1381 return NULL; 1382 } 1383 1384 static int 1385 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1386 uint32_t max_ops) 1387 { 1388 struct spdk_bs_dev *dev; 1389 uint32_t i; 1390 1391 dev = bs->dev; 1392 1393 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1394 if (!channel->req_mem) { 1395 return -1; 1396 } 1397 1398 TAILQ_INIT(&channel->reqs); 1399 1400 for (i = 0; i < max_ops; i++) { 1401 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1402 } 1403 1404 channel->bs = bs; 1405 channel->dev = dev; 1406 channel->dev_channel = dev->create_channel(dev); 1407 1408 if (!channel->dev_channel) { 1409 SPDK_ERRLOG("Failed to create device channel.\n"); 1410 free(channel->req_mem); 1411 return -1; 1412 } 1413 1414 return 0; 1415 } 1416 1417 static int 1418 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1419 { 1420 struct spdk_blob_store *bs; 1421 struct spdk_bs_channel *channel = ctx_buf; 1422 1423 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1424 1425 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1426 } 1427 1428 static int 1429 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1430 { 1431 struct spdk_blob_store *bs; 1432 struct spdk_bs_channel *channel = ctx_buf; 1433 1434 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1435 1436 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1437 } 1438 1439 1440 static void 1441 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1442 { 1443 struct spdk_bs_channel *channel = ctx_buf; 1444 1445 free(channel->req_mem); 1446 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1447 } 1448 1449 static void 1450 _spdk_bs_dev_destroy(void *io_device) 1451 { 1452 struct spdk_blob_store *bs; 1453 struct spdk_blob *blob, *blob_tmp; 1454 1455 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1456 bs->dev->destroy(bs->dev); 1457 1458 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1459 TAILQ_REMOVE(&bs->blobs, blob, link); 1460 _spdk_blob_free(blob); 1461 } 1462 1463 spdk_bit_array_free(&bs->used_md_pages); 1464 spdk_bit_array_free(&bs->used_clusters); 1465 /* 1466 * If this function is called for any reason except a successful unload, 1467 * the unload_cpl type will be NONE and this will be a nop. 1468 */ 1469 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1470 1471 free(bs); 1472 } 1473 1474 static void 1475 _spdk_bs_free(struct spdk_blob_store *bs) 1476 { 1477 spdk_bs_unregister_md_thread(bs); 1478 spdk_io_device_unregister(&bs->io_target, NULL); 1479 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1480 } 1481 1482 void 1483 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1484 { 1485 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1486 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1487 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1488 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1489 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1490 } 1491 1492 static int 1493 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1494 { 1495 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1496 opts->max_channel_ops == 0) { 1497 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1498 return -1; 1499 } 1500 1501 return 0; 1502 } 1503 1504 static struct spdk_blob_store * 1505 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1506 { 1507 struct spdk_blob_store *bs; 1508 uint64_t dev_size; 1509 int rc; 1510 1511 dev_size = dev->blocklen * dev->blockcnt; 1512 if (dev_size < opts->cluster_sz) { 1513 /* Device size cannot be smaller than cluster size of blobstore */ 1514 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1515 opts->cluster_sz); 1516 return NULL; 1517 } 1518 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1519 /* Cluster size cannot be smaller than page size */ 1520 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1521 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1522 return NULL; 1523 } 1524 bs = calloc(1, sizeof(struct spdk_blob_store)); 1525 if (!bs) { 1526 return NULL; 1527 } 1528 1529 TAILQ_INIT(&bs->blobs); 1530 bs->dev = dev; 1531 1532 /* 1533 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1534 * even multiple of the cluster size. 1535 */ 1536 bs->cluster_sz = opts->cluster_sz; 1537 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1538 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1539 bs->num_free_clusters = bs->total_clusters; 1540 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1541 if (bs->used_clusters == NULL) { 1542 free(bs); 1543 return NULL; 1544 } 1545 1546 bs->md_target.max_md_ops = opts->max_md_ops; 1547 bs->io_target.max_channel_ops = opts->max_channel_ops; 1548 bs->super_blob = SPDK_BLOBID_INVALID; 1549 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1550 1551 /* The metadata is assumed to be at least 1 page */ 1552 bs->used_md_pages = spdk_bit_array_create(1); 1553 1554 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1555 sizeof(struct spdk_bs_channel)); 1556 rc = spdk_bs_register_md_thread(bs); 1557 if (rc == -1) { 1558 spdk_io_device_unregister(&bs->md_target, NULL); 1559 spdk_bit_array_free(&bs->used_md_pages); 1560 spdk_bit_array_free(&bs->used_clusters); 1561 free(bs); 1562 return NULL; 1563 } 1564 1565 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1566 sizeof(struct spdk_bs_channel)); 1567 1568 return bs; 1569 } 1570 1571 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1572 1573 struct spdk_bs_load_ctx { 1574 struct spdk_blob_store *bs; 1575 struct spdk_bs_super_block *super; 1576 1577 struct spdk_bs_md_mask *mask; 1578 bool in_page_chain; 1579 uint32_t page_index; 1580 uint32_t cur_page; 1581 struct spdk_blob_md_page *page; 1582 }; 1583 1584 static void 1585 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1586 { 1587 uint32_t i = 0; 1588 1589 while (true) { 1590 i = spdk_bit_array_find_first_set(array, i); 1591 if (i >= mask->length) { 1592 break; 1593 } 1594 mask->mask[i / 8] |= 1U << (i % 8); 1595 i++; 1596 } 1597 } 1598 1599 static void 1600 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1601 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1602 { 1603 /* Update the values in the super block */ 1604 super->super_blob = bs->super_blob; 1605 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1606 super->crc = _spdk_blob_md_page_calc_crc(super); 1607 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1608 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1609 cb_fn, cb_arg); 1610 } 1611 1612 static void 1613 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1614 { 1615 struct spdk_bs_load_ctx *ctx = arg; 1616 uint64_t mask_size, lba, lba_count; 1617 1618 /* Write out the used clusters mask */ 1619 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1620 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1621 if (!ctx->mask) { 1622 spdk_dma_free(ctx->super); 1623 free(ctx); 1624 spdk_bs_sequence_finish(seq, -ENOMEM); 1625 return; 1626 } 1627 1628 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1629 ctx->mask->length = ctx->bs->total_clusters; 1630 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1631 1632 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1633 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1634 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1635 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1636 } 1637 1638 static void 1639 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1640 { 1641 struct spdk_bs_load_ctx *ctx = arg; 1642 uint64_t mask_size, lba, lba_count; 1643 1644 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1645 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1646 if (!ctx->mask) { 1647 spdk_dma_free(ctx->super); 1648 free(ctx); 1649 spdk_bs_sequence_finish(seq, -ENOMEM); 1650 return; 1651 } 1652 1653 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1654 ctx->mask->length = ctx->super->md_len; 1655 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1656 1657 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1658 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1659 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1660 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1661 } 1662 1663 static void 1664 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1665 { 1666 struct spdk_bs_load_ctx *ctx = cb_arg; 1667 uint32_t i, j; 1668 int rc; 1669 1670 /* The type must be correct */ 1671 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1672 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1673 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1674 struct spdk_blob_md_page) * 8)); 1675 /* The length of the mask must be exactly equal to the total number of clusters */ 1676 assert(ctx->mask->length == ctx->bs->total_clusters); 1677 1678 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1679 if (rc < 0) { 1680 spdk_dma_free(ctx->super); 1681 spdk_dma_free(ctx->mask); 1682 _spdk_bs_free(ctx->bs); 1683 free(ctx); 1684 spdk_bs_sequence_finish(seq, -ENOMEM); 1685 return; 1686 } 1687 1688 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1689 for (i = 0; i < ctx->mask->length / 8; i++) { 1690 uint8_t segment = ctx->mask->mask[i]; 1691 for (j = 0; segment && (j < 8); j++) { 1692 if (segment & 1U) { 1693 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1694 assert(ctx->bs->num_free_clusters > 0); 1695 ctx->bs->num_free_clusters--; 1696 } 1697 segment >>= 1U; 1698 } 1699 } 1700 1701 spdk_dma_free(ctx->super); 1702 spdk_dma_free(ctx->mask); 1703 free(ctx); 1704 1705 spdk_bs_sequence_finish(seq, bserrno); 1706 } 1707 1708 static void 1709 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1710 { 1711 struct spdk_bs_load_ctx *ctx = cb_arg; 1712 uint64_t lba, lba_count, mask_size; 1713 uint32_t i, j; 1714 int rc; 1715 1716 /* The type must be correct */ 1717 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1718 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1719 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1720 8)); 1721 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1722 assert(ctx->mask->length == ctx->super->md_len); 1723 1724 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1725 if (rc < 0) { 1726 spdk_dma_free(ctx->super); 1727 spdk_dma_free(ctx->mask); 1728 _spdk_bs_free(ctx->bs); 1729 free(ctx); 1730 spdk_bs_sequence_finish(seq, -ENOMEM); 1731 return; 1732 } 1733 1734 for (i = 0; i < ctx->mask->length / 8; i++) { 1735 uint8_t segment = ctx->mask->mask[i]; 1736 for (j = 0; segment && (j < 8); j++) { 1737 if (segment & 1U) { 1738 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1739 } 1740 segment >>= 1U; 1741 } 1742 } 1743 spdk_dma_free(ctx->mask); 1744 1745 /* Read the used clusters mask */ 1746 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1747 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1748 if (!ctx->mask) { 1749 spdk_dma_free(ctx->super); 1750 _spdk_bs_free(ctx->bs); 1751 free(ctx); 1752 spdk_bs_sequence_finish(seq, -ENOMEM); 1753 return; 1754 } 1755 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1756 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1757 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1758 _spdk_bs_load_used_clusters_cpl, ctx); 1759 } 1760 1761 static void 1762 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1763 { 1764 struct spdk_bs_load_ctx *ctx = cb_arg; 1765 uint64_t lba, lba_count, mask_size; 1766 1767 /* Read the used pages mask */ 1768 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1769 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1770 if (!ctx->mask) { 1771 spdk_dma_free(ctx->super); 1772 _spdk_bs_free(ctx->bs); 1773 free(ctx); 1774 spdk_bs_sequence_finish(seq, -ENOMEM); 1775 return; 1776 } 1777 1778 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1779 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1780 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1781 _spdk_bs_load_used_pages_cpl, ctx); 1782 } 1783 1784 static int 1785 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1786 { 1787 struct spdk_blob_md_descriptor *desc; 1788 size_t cur_desc = 0; 1789 1790 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1791 while (cur_desc < sizeof(page->descriptors)) { 1792 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1793 if (desc->length == 0) { 1794 /* If padding and length are 0, this terminates the page */ 1795 break; 1796 } 1797 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1798 struct spdk_blob_md_descriptor_extent *desc_extent; 1799 unsigned int i, j; 1800 unsigned int cluster_count = 0; 1801 1802 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1803 1804 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1805 for (j = 0; j < desc_extent->extents[i].length; j++) { 1806 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1807 if (bs->num_free_clusters == 0) { 1808 return -1; 1809 } 1810 bs->num_free_clusters--; 1811 cluster_count++; 1812 } 1813 } 1814 if (cluster_count == 0) { 1815 return -1; 1816 } 1817 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1818 /* Skip this item */ 1819 } else { 1820 /* Error */ 1821 return -1; 1822 } 1823 /* Advance to the next descriptor */ 1824 cur_desc += sizeof(*desc) + desc->length; 1825 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1826 break; 1827 } 1828 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1829 } 1830 return 0; 1831 } 1832 1833 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1834 { 1835 uint32_t crc; 1836 1837 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1838 if (crc != ctx->page->crc) { 1839 return false; 1840 } 1841 1842 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1843 return false; 1844 } 1845 return true; 1846 } 1847 1848 static void 1849 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1850 1851 static void 1852 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1853 { 1854 struct spdk_bs_load_ctx *ctx = cb_arg; 1855 1856 spdk_dma_free(ctx->mask); 1857 spdk_dma_free(ctx->super); 1858 spdk_bs_sequence_finish(seq, bserrno); 1859 free(ctx); 1860 } 1861 1862 static void 1863 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1864 { 1865 struct spdk_bs_load_ctx *ctx = cb_arg; 1866 1867 spdk_dma_free(ctx->mask); 1868 1869 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1870 } 1871 1872 static void 1873 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1874 { 1875 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1876 } 1877 1878 static void 1879 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1880 { 1881 struct spdk_bs_load_ctx *ctx = cb_arg; 1882 uint32_t page_num; 1883 1884 if (bserrno != 0) { 1885 spdk_dma_free(ctx->super); 1886 _spdk_bs_free(ctx->bs); 1887 free(ctx); 1888 spdk_bs_sequence_finish(seq, bserrno); 1889 return; 1890 } 1891 1892 page_num = ctx->cur_page; 1893 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1894 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1895 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1896 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1897 spdk_dma_free(ctx->super); 1898 _spdk_bs_free(ctx->bs); 1899 free(ctx); 1900 spdk_bs_sequence_finish(seq, -EILSEQ); 1901 return; 1902 } 1903 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 1904 ctx->in_page_chain = true; 1905 ctx->cur_page = ctx->page->next; 1906 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1907 return; 1908 } 1909 } 1910 } 1911 1912 ctx->in_page_chain = false; 1913 1914 do { 1915 ctx->page_index++; 1916 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 1917 1918 if (ctx->page_index < ctx->super->md_len) { 1919 ctx->cur_page = ctx->page_index; 1920 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1921 } else { 1922 spdk_dma_free(ctx->page); 1923 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 1924 } 1925 } 1926 1927 static void 1928 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 1929 { 1930 struct spdk_bs_load_ctx *ctx = cb_arg; 1931 uint64_t lba; 1932 1933 assert(ctx->cur_page < ctx->super->md_len); 1934 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 1935 spdk_bs_sequence_read(seq, ctx->page, lba, 1936 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 1937 _spdk_bs_load_replay_md_cpl, ctx); 1938 } 1939 1940 static void 1941 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 1942 { 1943 struct spdk_bs_load_ctx *ctx = cb_arg; 1944 1945 ctx->page_index = 0; 1946 ctx->cur_page = 0; 1947 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 1948 SPDK_BS_PAGE_SIZE, 1949 NULL); 1950 if (!ctx->page) { 1951 spdk_dma_free(ctx->super); 1952 _spdk_bs_free(ctx->bs); 1953 free(ctx); 1954 spdk_bs_sequence_finish(seq, -ENOMEM); 1955 return; 1956 } 1957 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1958 } 1959 1960 static void 1961 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 1962 { 1963 struct spdk_bs_load_ctx *ctx = cb_arg; 1964 int rc; 1965 1966 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 1967 if (rc < 0) { 1968 spdk_dma_free(ctx->super); 1969 _spdk_bs_free(ctx->bs); 1970 free(ctx); 1971 spdk_bs_sequence_finish(seq, -ENOMEM); 1972 return; 1973 } 1974 1975 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1976 if (rc < 0) { 1977 spdk_dma_free(ctx->super); 1978 _spdk_bs_free(ctx->bs); 1979 free(ctx); 1980 spdk_bs_sequence_finish(seq, -ENOMEM); 1981 return; 1982 } 1983 1984 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1985 _spdk_bs_load_replay_md(seq, cb_arg); 1986 } 1987 1988 static void 1989 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1990 { 1991 struct spdk_bs_load_ctx *ctx = cb_arg; 1992 uint32_t crc; 1993 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 1994 1995 if (ctx->super->version > SPDK_BS_VERSION || 1996 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 1997 spdk_dma_free(ctx->super); 1998 _spdk_bs_free(ctx->bs); 1999 free(ctx); 2000 spdk_bs_sequence_finish(seq, -EILSEQ); 2001 return; 2002 } 2003 2004 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2005 sizeof(ctx->super->signature)) != 0) { 2006 spdk_dma_free(ctx->super); 2007 _spdk_bs_free(ctx->bs); 2008 free(ctx); 2009 spdk_bs_sequence_finish(seq, -EILSEQ); 2010 return; 2011 } 2012 2013 crc = _spdk_blob_md_page_calc_crc(ctx->super); 2014 if (crc != ctx->super->crc) { 2015 spdk_dma_free(ctx->super); 2016 _spdk_bs_free(ctx->bs); 2017 free(ctx); 2018 spdk_bs_sequence_finish(seq, -EILSEQ); 2019 return; 2020 } 2021 2022 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2023 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 2024 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2025 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 2026 } else { 2027 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 2028 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2029 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2030 spdk_dma_free(ctx->super); 2031 _spdk_bs_free(ctx->bs); 2032 free(ctx); 2033 spdk_bs_sequence_finish(seq, -ENXIO); 2034 return; 2035 } 2036 2037 /* Parse the super block */ 2038 ctx->bs->cluster_sz = ctx->super->cluster_size; 2039 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 2040 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2041 ctx->bs->md_start = ctx->super->md_start; 2042 ctx->bs->md_len = ctx->super->md_len; 2043 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 2044 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 2045 ctx->bs->super_blob = ctx->super->super_blob; 2046 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 2047 2048 if (ctx->super->clean == 1) { 2049 ctx->super->clean = 0; 2050 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 2051 } else { 2052 _spdk_bs_recover(seq, ctx); 2053 } 2054 } 2055 2056 void 2057 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2058 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2059 { 2060 struct spdk_blob_store *bs; 2061 struct spdk_bs_cpl cpl; 2062 spdk_bs_sequence_t *seq; 2063 struct spdk_bs_load_ctx *ctx; 2064 struct spdk_bs_opts opts = {}; 2065 2066 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 2067 2068 if (o) { 2069 opts = *o; 2070 } else { 2071 spdk_bs_opts_init(&opts); 2072 } 2073 2074 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 2075 cb_fn(cb_arg, NULL, -EINVAL); 2076 return; 2077 } 2078 2079 bs = _spdk_bs_alloc(dev, &opts); 2080 if (!bs) { 2081 cb_fn(cb_arg, NULL, -ENOMEM); 2082 return; 2083 } 2084 2085 ctx = calloc(1, sizeof(*ctx)); 2086 if (!ctx) { 2087 _spdk_bs_free(bs); 2088 cb_fn(cb_arg, NULL, -ENOMEM); 2089 return; 2090 } 2091 2092 ctx->bs = bs; 2093 2094 /* Allocate memory for the super block */ 2095 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2096 if (!ctx->super) { 2097 free(ctx); 2098 _spdk_bs_free(bs); 2099 return; 2100 } 2101 2102 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2103 cpl.u.bs_handle.cb_fn = cb_fn; 2104 cpl.u.bs_handle.cb_arg = cb_arg; 2105 cpl.u.bs_handle.bs = bs; 2106 2107 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2108 if (!seq) { 2109 spdk_dma_free(ctx->super); 2110 free(ctx); 2111 _spdk_bs_free(bs); 2112 cb_fn(cb_arg, NULL, -ENOMEM); 2113 return; 2114 } 2115 2116 /* Read the super block */ 2117 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2118 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2119 _spdk_bs_load_super_cpl, ctx); 2120 } 2121 2122 /* END spdk_bs_load */ 2123 2124 /* START spdk_bs_init */ 2125 2126 struct spdk_bs_init_ctx { 2127 struct spdk_blob_store *bs; 2128 struct spdk_bs_super_block *super; 2129 }; 2130 2131 static void 2132 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2133 { 2134 struct spdk_bs_init_ctx *ctx = cb_arg; 2135 2136 spdk_dma_free(ctx->super); 2137 free(ctx); 2138 2139 spdk_bs_sequence_finish(seq, bserrno); 2140 } 2141 2142 static void 2143 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2144 { 2145 struct spdk_bs_init_ctx *ctx = cb_arg; 2146 2147 /* Write super block */ 2148 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2149 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2150 _spdk_bs_init_persist_super_cpl, ctx); 2151 } 2152 2153 void 2154 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2155 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2156 { 2157 struct spdk_bs_init_ctx *ctx; 2158 struct spdk_blob_store *bs; 2159 struct spdk_bs_cpl cpl; 2160 spdk_bs_sequence_t *seq; 2161 spdk_bs_batch_t *batch; 2162 uint64_t num_md_lba; 2163 uint64_t num_md_pages; 2164 uint64_t num_md_clusters; 2165 uint32_t i; 2166 struct spdk_bs_opts opts = {}; 2167 int rc; 2168 2169 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 2170 2171 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2172 SPDK_ERRLOG("unsupported dev block length of %d\n", 2173 dev->blocklen); 2174 dev->destroy(dev); 2175 cb_fn(cb_arg, NULL, -EINVAL); 2176 return; 2177 } 2178 2179 if (o) { 2180 opts = *o; 2181 } else { 2182 spdk_bs_opts_init(&opts); 2183 } 2184 2185 if (_spdk_bs_opts_verify(&opts) != 0) { 2186 dev->destroy(dev); 2187 cb_fn(cb_arg, NULL, -EINVAL); 2188 return; 2189 } 2190 2191 bs = _spdk_bs_alloc(dev, &opts); 2192 if (!bs) { 2193 dev->destroy(dev); 2194 cb_fn(cb_arg, NULL, -ENOMEM); 2195 return; 2196 } 2197 2198 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2199 /* By default, allocate 1 page per cluster. 2200 * Technically, this over-allocates metadata 2201 * because more metadata will reduce the number 2202 * of usable clusters. This can be addressed with 2203 * more complex math in the future. 2204 */ 2205 bs->md_len = bs->total_clusters; 2206 } else { 2207 bs->md_len = opts.num_md_pages; 2208 } 2209 2210 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2211 if (rc < 0) { 2212 _spdk_bs_free(bs); 2213 cb_fn(cb_arg, NULL, -ENOMEM); 2214 return; 2215 } 2216 2217 ctx = calloc(1, sizeof(*ctx)); 2218 if (!ctx) { 2219 _spdk_bs_free(bs); 2220 cb_fn(cb_arg, NULL, -ENOMEM); 2221 return; 2222 } 2223 2224 ctx->bs = bs; 2225 2226 /* Allocate memory for the super block */ 2227 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2228 if (!ctx->super) { 2229 free(ctx); 2230 _spdk_bs_free(bs); 2231 return; 2232 } 2233 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2234 sizeof(ctx->super->signature)); 2235 ctx->super->version = SPDK_BS_VERSION; 2236 ctx->super->length = sizeof(*ctx->super); 2237 ctx->super->super_blob = bs->super_blob; 2238 ctx->super->clean = 0; 2239 ctx->super->cluster_size = bs->cluster_sz; 2240 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2241 2242 /* Calculate how many pages the metadata consumes at the front 2243 * of the disk. 2244 */ 2245 2246 /* The super block uses 1 page */ 2247 num_md_pages = 1; 2248 2249 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2250 * up to the nearest page, plus a header. 2251 */ 2252 ctx->super->used_page_mask_start = num_md_pages; 2253 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2254 divide_round_up(bs->md_len, 8), 2255 SPDK_BS_PAGE_SIZE); 2256 num_md_pages += ctx->super->used_page_mask_len; 2257 2258 /* The used_clusters mask requires 1 bit per cluster, rounded 2259 * up to the nearest page, plus a header. 2260 */ 2261 ctx->super->used_cluster_mask_start = num_md_pages; 2262 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2263 divide_round_up(bs->total_clusters, 8), 2264 SPDK_BS_PAGE_SIZE); 2265 num_md_pages += ctx->super->used_cluster_mask_len; 2266 2267 /* The metadata region size was chosen above */ 2268 ctx->super->md_start = bs->md_start = num_md_pages; 2269 ctx->super->md_len = bs->md_len; 2270 num_md_pages += bs->md_len; 2271 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2272 2273 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2274 2275 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2276 if (num_md_clusters > bs->total_clusters) { 2277 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2278 "please decrease number of pages reserved for metadata " 2279 "or increase cluster size.\n"); 2280 spdk_dma_free(ctx->super); 2281 free(ctx); 2282 _spdk_bs_free(bs); 2283 cb_fn(cb_arg, NULL, -ENOMEM); 2284 return; 2285 } 2286 /* Claim all of the clusters used by the metadata */ 2287 for (i = 0; i < num_md_clusters; i++) { 2288 _spdk_bs_claim_cluster(bs, i); 2289 } 2290 2291 bs->total_data_clusters = bs->num_free_clusters; 2292 2293 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2294 cpl.u.bs_handle.cb_fn = cb_fn; 2295 cpl.u.bs_handle.cb_arg = cb_arg; 2296 cpl.u.bs_handle.bs = bs; 2297 2298 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2299 if (!seq) { 2300 spdk_dma_free(ctx->super); 2301 free(ctx); 2302 _spdk_bs_free(bs); 2303 cb_fn(cb_arg, NULL, -ENOMEM); 2304 return; 2305 } 2306 2307 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2308 2309 /* Clear metadata space */ 2310 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2311 /* Trim data clusters */ 2312 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2313 2314 spdk_bs_batch_close(batch); 2315 } 2316 2317 /* END spdk_bs_init */ 2318 2319 /* START spdk_bs_destroy */ 2320 2321 static void 2322 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2323 { 2324 struct spdk_bs_init_ctx *ctx = cb_arg; 2325 struct spdk_blob_store *bs = ctx->bs; 2326 2327 /* 2328 * We need to defer calling spdk_bs_call_cpl() until after 2329 * dev destruction, so tuck these away for later use. 2330 */ 2331 bs->unload_err = bserrno; 2332 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2333 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2334 2335 spdk_bs_sequence_finish(seq, bserrno); 2336 2337 _spdk_bs_free(bs); 2338 free(ctx); 2339 } 2340 2341 void 2342 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 2343 void *cb_arg) 2344 { 2345 struct spdk_bs_cpl cpl; 2346 spdk_bs_sequence_t *seq; 2347 struct spdk_bs_init_ctx *ctx; 2348 2349 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 2350 2351 if (!TAILQ_EMPTY(&bs->blobs)) { 2352 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2353 cb_fn(cb_arg, -EBUSY); 2354 return; 2355 } 2356 2357 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2358 cpl.u.bs_basic.cb_fn = cb_fn; 2359 cpl.u.bs_basic.cb_arg = cb_arg; 2360 2361 ctx = calloc(1, sizeof(*ctx)); 2362 if (!ctx) { 2363 cb_fn(cb_arg, -ENOMEM); 2364 return; 2365 } 2366 2367 ctx->bs = bs; 2368 2369 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2370 if (!seq) { 2371 free(ctx); 2372 cb_fn(cb_arg, -ENOMEM); 2373 return; 2374 } 2375 2376 /* Write zeroes to the super block */ 2377 spdk_bs_sequence_write_zeroes(seq, 2378 _spdk_bs_page_to_lba(bs, 0), 2379 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 2380 _spdk_bs_destroy_trim_cpl, ctx); 2381 } 2382 2383 /* END spdk_bs_destroy */ 2384 2385 /* START spdk_bs_unload */ 2386 2387 static void 2388 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2389 { 2390 struct spdk_bs_load_ctx *ctx = cb_arg; 2391 2392 spdk_dma_free(ctx->super); 2393 2394 /* 2395 * We need to defer calling spdk_bs_call_cpl() until after 2396 * dev destuction, so tuck these away for later use. 2397 */ 2398 ctx->bs->unload_err = bserrno; 2399 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2400 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2401 2402 spdk_bs_sequence_finish(seq, bserrno); 2403 2404 _spdk_bs_free(ctx->bs); 2405 free(ctx); 2406 } 2407 2408 static void 2409 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2410 { 2411 struct spdk_bs_load_ctx *ctx = cb_arg; 2412 2413 spdk_dma_free(ctx->mask); 2414 ctx->super->clean = 1; 2415 2416 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2417 } 2418 2419 static void 2420 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2421 { 2422 struct spdk_bs_load_ctx *ctx = cb_arg; 2423 2424 spdk_dma_free(ctx->mask); 2425 2426 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2427 } 2428 2429 static void 2430 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2431 { 2432 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2433 } 2434 2435 void 2436 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2437 { 2438 struct spdk_bs_cpl cpl; 2439 spdk_bs_sequence_t *seq; 2440 struct spdk_bs_load_ctx *ctx; 2441 2442 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 2443 2444 if (!TAILQ_EMPTY(&bs->blobs)) { 2445 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2446 cb_fn(cb_arg, -EBUSY); 2447 return; 2448 } 2449 2450 ctx = calloc(1, sizeof(*ctx)); 2451 if (!ctx) { 2452 cb_fn(cb_arg, -ENOMEM); 2453 return; 2454 } 2455 2456 ctx->bs = bs; 2457 2458 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2459 if (!ctx->super) { 2460 free(ctx); 2461 cb_fn(cb_arg, -ENOMEM); 2462 return; 2463 } 2464 2465 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2466 cpl.u.bs_basic.cb_fn = cb_fn; 2467 cpl.u.bs_basic.cb_arg = cb_arg; 2468 2469 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2470 if (!seq) { 2471 spdk_dma_free(ctx->super); 2472 free(ctx); 2473 cb_fn(cb_arg, -ENOMEM); 2474 return; 2475 } 2476 2477 /* Read super block */ 2478 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2479 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2480 _spdk_bs_unload_read_super_cpl, ctx); 2481 } 2482 2483 /* END spdk_bs_unload */ 2484 2485 void 2486 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2487 spdk_bs_op_complete cb_fn, void *cb_arg) 2488 { 2489 bs->super_blob = blobid; 2490 cb_fn(cb_arg, 0); 2491 } 2492 2493 void 2494 spdk_bs_get_super(struct spdk_blob_store *bs, 2495 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2496 { 2497 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2498 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2499 } else { 2500 cb_fn(cb_arg, bs->super_blob, 0); 2501 } 2502 } 2503 2504 uint64_t 2505 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2506 { 2507 return bs->cluster_sz; 2508 } 2509 2510 uint64_t 2511 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2512 { 2513 return SPDK_BS_PAGE_SIZE; 2514 } 2515 2516 uint64_t 2517 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2518 { 2519 return bs->num_free_clusters; 2520 } 2521 2522 uint64_t 2523 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2524 { 2525 return bs->total_data_clusters; 2526 } 2527 2528 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2529 { 2530 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 2531 if (!bs->md_target.md_channel) { 2532 SPDK_ERRLOG("Failed to get IO channel.\n"); 2533 return -1; 2534 } 2535 2536 return 0; 2537 } 2538 2539 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2540 { 2541 spdk_put_io_channel(bs->md_target.md_channel); 2542 2543 return 0; 2544 } 2545 2546 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 2547 { 2548 assert(blob != NULL); 2549 2550 return blob->id; 2551 } 2552 2553 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 2554 { 2555 assert(blob != NULL); 2556 2557 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2558 } 2559 2560 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 2561 { 2562 assert(blob != NULL); 2563 2564 return blob->active.num_clusters; 2565 } 2566 2567 /* START spdk_bs_md_create_blob */ 2568 2569 static void 2570 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2571 { 2572 struct spdk_blob *blob = cb_arg; 2573 2574 _spdk_blob_free(blob); 2575 2576 spdk_bs_sequence_finish(seq, bserrno); 2577 } 2578 2579 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2580 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2581 { 2582 struct spdk_blob *blob; 2583 uint32_t page_idx; 2584 struct spdk_bs_cpl cpl; 2585 spdk_bs_sequence_t *seq; 2586 spdk_blob_id id; 2587 2588 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2589 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2590 cb_fn(cb_arg, 0, -ENOMEM); 2591 return; 2592 } 2593 spdk_bit_array_set(bs->used_md_pages, page_idx); 2594 2595 id = _spdk_bs_page_to_blobid(page_idx); 2596 2597 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2598 2599 blob = _spdk_blob_alloc(bs, id); 2600 if (!blob) { 2601 cb_fn(cb_arg, 0, -ENOMEM); 2602 return; 2603 } 2604 2605 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2606 cpl.u.blobid.cb_fn = cb_fn; 2607 cpl.u.blobid.cb_arg = cb_arg; 2608 cpl.u.blobid.blobid = blob->id; 2609 2610 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2611 if (!seq) { 2612 _spdk_blob_free(blob); 2613 cb_fn(cb_arg, 0, -ENOMEM); 2614 return; 2615 } 2616 2617 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2618 } 2619 2620 /* END spdk_bs_md_create_blob */ 2621 2622 /* START spdk_bs_md_resize_blob */ 2623 int 2624 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2625 { 2626 int rc; 2627 2628 assert(blob != NULL); 2629 2630 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2631 2632 if (blob->md_ro) { 2633 return -EPERM; 2634 } 2635 2636 if (sz == blob->active.num_clusters) { 2637 return 0; 2638 } 2639 2640 rc = _spdk_resize_blob(blob, sz); 2641 if (rc < 0) { 2642 return rc; 2643 } 2644 2645 return 0; 2646 } 2647 2648 /* END spdk_bs_md_resize_blob */ 2649 2650 2651 /* START spdk_bs_md_delete_blob */ 2652 2653 static void 2654 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2655 { 2656 struct spdk_blob *blob = cb_arg; 2657 2658 _spdk_blob_free(blob); 2659 2660 spdk_bs_sequence_finish(seq, bserrno); 2661 } 2662 2663 static void 2664 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2665 { 2666 struct spdk_blob *blob = cb_arg; 2667 2668 /* If the blob have crc error, we just return NULL. */ 2669 if (blob == NULL) { 2670 spdk_bs_sequence_finish(seq, bserrno); 2671 return; 2672 } 2673 blob->state = SPDK_BLOB_STATE_DIRTY; 2674 blob->active.num_pages = 0; 2675 _spdk_resize_blob(blob, 0); 2676 2677 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2678 } 2679 2680 void 2681 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2682 spdk_blob_op_complete cb_fn, void *cb_arg) 2683 { 2684 struct spdk_blob *blob; 2685 struct spdk_bs_cpl cpl; 2686 spdk_bs_sequence_t *seq; 2687 2688 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 2689 2690 blob = _spdk_blob_lookup(bs, blobid); 2691 if (blob) { 2692 assert(blob->open_ref > 0); 2693 cb_fn(cb_arg, -EINVAL); 2694 return; 2695 } 2696 2697 blob = _spdk_blob_alloc(bs, blobid); 2698 if (!blob) { 2699 cb_fn(cb_arg, -ENOMEM); 2700 return; 2701 } 2702 2703 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2704 cpl.u.blob_basic.cb_fn = cb_fn; 2705 cpl.u.blob_basic.cb_arg = cb_arg; 2706 2707 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2708 if (!seq) { 2709 _spdk_blob_free(blob); 2710 cb_fn(cb_arg, -ENOMEM); 2711 return; 2712 } 2713 2714 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2715 } 2716 2717 /* END spdk_bs_md_delete_blob */ 2718 2719 /* START spdk_bs_md_open_blob */ 2720 2721 static void 2722 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2723 { 2724 struct spdk_blob *blob = cb_arg; 2725 2726 /* If the blob have crc error, we just return NULL. */ 2727 if (blob == NULL) { 2728 seq->cpl.u.blob_handle.blob = NULL; 2729 spdk_bs_sequence_finish(seq, bserrno); 2730 return; 2731 } 2732 2733 blob->open_ref++; 2734 2735 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2736 2737 spdk_bs_sequence_finish(seq, bserrno); 2738 } 2739 2740 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2741 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2742 { 2743 struct spdk_blob *blob; 2744 struct spdk_bs_cpl cpl; 2745 spdk_bs_sequence_t *seq; 2746 uint32_t page_num; 2747 2748 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 2749 2750 blob = _spdk_blob_lookup(bs, blobid); 2751 if (blob) { 2752 blob->open_ref++; 2753 cb_fn(cb_arg, blob, 0); 2754 return; 2755 } 2756 2757 page_num = _spdk_bs_blobid_to_page(blobid); 2758 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2759 /* Invalid blobid */ 2760 cb_fn(cb_arg, NULL, -ENOENT); 2761 return; 2762 } 2763 2764 blob = _spdk_blob_alloc(bs, blobid); 2765 if (!blob) { 2766 cb_fn(cb_arg, NULL, -ENOMEM); 2767 return; 2768 } 2769 2770 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2771 cpl.u.blob_handle.cb_fn = cb_fn; 2772 cpl.u.blob_handle.cb_arg = cb_arg; 2773 cpl.u.blob_handle.blob = blob; 2774 2775 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2776 if (!seq) { 2777 _spdk_blob_free(blob); 2778 cb_fn(cb_arg, NULL, -ENOMEM); 2779 return; 2780 } 2781 2782 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2783 } 2784 2785 /* START spdk_bs_md_sync_blob */ 2786 static void 2787 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2788 { 2789 spdk_bs_sequence_finish(seq, bserrno); 2790 } 2791 2792 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2793 spdk_blob_op_complete cb_fn, void *cb_arg) 2794 { 2795 struct spdk_bs_cpl cpl; 2796 spdk_bs_sequence_t *seq; 2797 2798 assert(blob != NULL); 2799 2800 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 2801 2802 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2803 blob->state != SPDK_BLOB_STATE_SYNCING); 2804 2805 if (blob->md_ro) { 2806 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 2807 return; 2808 } 2809 2810 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2811 cb_fn(cb_arg, 0); 2812 return; 2813 } 2814 2815 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2816 cpl.u.blob_basic.cb_fn = cb_fn; 2817 cpl.u.blob_basic.cb_arg = cb_arg; 2818 2819 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2820 if (!seq) { 2821 cb_fn(cb_arg, -ENOMEM); 2822 return; 2823 } 2824 2825 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2826 } 2827 2828 /* END spdk_bs_md_sync_blob */ 2829 2830 /* START spdk_bs_md_close_blob */ 2831 2832 static void 2833 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2834 { 2835 struct spdk_blob **blob = cb_arg; 2836 2837 if ((*blob)->open_ref == 0) { 2838 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2839 _spdk_blob_free((*blob)); 2840 } 2841 2842 *blob = NULL; 2843 2844 spdk_bs_sequence_finish(seq, bserrno); 2845 } 2846 2847 void spdk_bs_md_close_blob(struct spdk_blob **b, 2848 spdk_blob_op_complete cb_fn, void *cb_arg) 2849 { 2850 struct spdk_bs_cpl cpl; 2851 struct spdk_blob *blob; 2852 spdk_bs_sequence_t *seq; 2853 2854 assert(b != NULL); 2855 blob = *b; 2856 assert(blob != NULL); 2857 2858 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 2859 2860 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2861 blob->state != SPDK_BLOB_STATE_SYNCING); 2862 2863 if (blob->open_ref == 0) { 2864 cb_fn(cb_arg, -EBADF); 2865 return; 2866 } 2867 2868 blob->open_ref--; 2869 2870 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2871 cpl.u.blob_basic.cb_fn = cb_fn; 2872 cpl.u.blob_basic.cb_arg = cb_arg; 2873 2874 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2875 if (!seq) { 2876 cb_fn(cb_arg, -ENOMEM); 2877 return; 2878 } 2879 2880 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2881 _spdk_blob_close_cpl(seq, b, 0); 2882 return; 2883 } 2884 2885 /* Sync metadata */ 2886 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2887 } 2888 2889 /* END spdk_bs_md_close_blob */ 2890 2891 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2892 { 2893 return spdk_get_io_channel(&bs->io_target); 2894 } 2895 2896 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2897 { 2898 spdk_put_io_channel(channel); 2899 } 2900 2901 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2902 spdk_blob_op_complete cb_fn, void *cb_arg) 2903 { 2904 /* Flush is synchronous right now */ 2905 cb_fn(cb_arg, 0); 2906 } 2907 2908 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2909 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2910 { 2911 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2912 SPDK_BLOB_UNMAP); 2913 } 2914 2915 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2916 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2917 { 2918 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2919 SPDK_BLOB_WRITE_ZEROES); 2920 } 2921 2922 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2923 void *payload, uint64_t offset, uint64_t length, 2924 spdk_blob_op_complete cb_fn, void *cb_arg) 2925 { 2926 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2927 SPDK_BLOB_WRITE); 2928 } 2929 2930 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2931 void *payload, uint64_t offset, uint64_t length, 2932 spdk_blob_op_complete cb_fn, void *cb_arg) 2933 { 2934 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2935 SPDK_BLOB_READ); 2936 } 2937 2938 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2939 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2940 spdk_blob_op_complete cb_fn, void *cb_arg) 2941 { 2942 if (blob->data_ro) { 2943 cb_fn(cb_arg, -EPERM); 2944 return; 2945 } 2946 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2947 } 2948 2949 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2950 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2951 spdk_blob_op_complete cb_fn, void *cb_arg) 2952 { 2953 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2954 } 2955 2956 struct spdk_bs_iter_ctx { 2957 int64_t page_num; 2958 struct spdk_blob_store *bs; 2959 2960 spdk_blob_op_with_handle_complete cb_fn; 2961 void *cb_arg; 2962 }; 2963 2964 static void 2965 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2966 { 2967 struct spdk_bs_iter_ctx *ctx = cb_arg; 2968 struct spdk_blob_store *bs = ctx->bs; 2969 spdk_blob_id id; 2970 2971 if (bserrno == 0) { 2972 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2973 free(ctx); 2974 return; 2975 } 2976 2977 ctx->page_num++; 2978 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2979 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2980 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2981 free(ctx); 2982 return; 2983 } 2984 2985 id = _spdk_bs_page_to_blobid(ctx->page_num); 2986 2987 blob = _spdk_blob_lookup(bs, id); 2988 if (blob) { 2989 blob->open_ref++; 2990 ctx->cb_fn(ctx->cb_arg, blob, 0); 2991 free(ctx); 2992 return; 2993 } 2994 2995 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2996 } 2997 2998 void 2999 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 3000 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3001 { 3002 struct spdk_bs_iter_ctx *ctx; 3003 3004 ctx = calloc(1, sizeof(*ctx)); 3005 if (!ctx) { 3006 cb_fn(cb_arg, NULL, -ENOMEM); 3007 return; 3008 } 3009 3010 ctx->page_num = -1; 3011 ctx->bs = bs; 3012 ctx->cb_fn = cb_fn; 3013 ctx->cb_arg = cb_arg; 3014 3015 _spdk_bs_iter_cpl(ctx, NULL, -1); 3016 } 3017 3018 static void 3019 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 3020 { 3021 struct spdk_bs_iter_ctx *ctx = cb_arg; 3022 3023 _spdk_bs_iter_cpl(ctx, NULL, -1); 3024 } 3025 3026 void 3027 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 3028 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3029 { 3030 struct spdk_bs_iter_ctx *ctx; 3031 struct spdk_blob *blob; 3032 3033 assert(b != NULL); 3034 blob = *b; 3035 assert(blob != NULL); 3036 3037 ctx = calloc(1, sizeof(*ctx)); 3038 if (!ctx) { 3039 cb_fn(cb_arg, NULL, -ENOMEM); 3040 return; 3041 } 3042 3043 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 3044 ctx->bs = bs; 3045 ctx->cb_fn = cb_fn; 3046 ctx->cb_arg = cb_arg; 3047 3048 /* Close the existing blob */ 3049 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 3050 } 3051 3052 int 3053 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 3054 uint16_t value_len) 3055 { 3056 struct spdk_xattr *xattr; 3057 3058 assert(blob != NULL); 3059 3060 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3061 blob->state != SPDK_BLOB_STATE_SYNCING); 3062 3063 if (blob->md_ro) { 3064 return -EPERM; 3065 } 3066 3067 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3068 if (!strcmp(name, xattr->name)) { 3069 free(xattr->value); 3070 xattr->value_len = value_len; 3071 xattr->value = malloc(value_len); 3072 memcpy(xattr->value, value, value_len); 3073 3074 blob->state = SPDK_BLOB_STATE_DIRTY; 3075 3076 return 0; 3077 } 3078 } 3079 3080 xattr = calloc(1, sizeof(*xattr)); 3081 if (!xattr) { 3082 return -1; 3083 } 3084 xattr->name = strdup(name); 3085 xattr->value_len = value_len; 3086 xattr->value = malloc(value_len); 3087 memcpy(xattr->value, value, value_len); 3088 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 3089 3090 blob->state = SPDK_BLOB_STATE_DIRTY; 3091 3092 return 0; 3093 } 3094 3095 int 3096 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 3097 { 3098 struct spdk_xattr *xattr; 3099 3100 assert(blob != NULL); 3101 3102 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3103 blob->state != SPDK_BLOB_STATE_SYNCING); 3104 3105 if (blob->md_ro) { 3106 return -EPERM; 3107 } 3108 3109 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3110 if (!strcmp(name, xattr->name)) { 3111 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3112 free(xattr->value); 3113 free(xattr->name); 3114 free(xattr); 3115 3116 blob->state = SPDK_BLOB_STATE_DIRTY; 3117 3118 return 0; 3119 } 3120 } 3121 3122 return -ENOENT; 3123 } 3124 3125 int 3126 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 3127 const void **value, size_t *value_len) 3128 { 3129 struct spdk_xattr *xattr; 3130 3131 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3132 if (!strcmp(name, xattr->name)) { 3133 *value = xattr->value; 3134 *value_len = xattr->value_len; 3135 return 0; 3136 } 3137 } 3138 3139 return -ENOENT; 3140 } 3141 3142 struct spdk_xattr_names { 3143 uint32_t count; 3144 const char *names[0]; 3145 }; 3146 3147 int 3148 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 3149 struct spdk_xattr_names **names) 3150 { 3151 struct spdk_xattr *xattr; 3152 int count = 0; 3153 3154 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3155 count++; 3156 } 3157 3158 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3159 if (*names == NULL) { 3160 return -ENOMEM; 3161 } 3162 3163 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3164 (*names)->names[(*names)->count++] = xattr->name; 3165 } 3166 3167 return 0; 3168 } 3169 3170 uint32_t 3171 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3172 { 3173 assert(names != NULL); 3174 3175 return names->count; 3176 } 3177 3178 const char * 3179 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3180 { 3181 if (index >= names->count) { 3182 return NULL; 3183 } 3184 3185 return names->names[index]; 3186 } 3187 3188 void 3189 spdk_xattr_names_free(struct spdk_xattr_names *names) 3190 { 3191 free(names); 3192 } 3193 3194 struct spdk_bs_type 3195 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3196 { 3197 return bs->bstype; 3198 } 3199 3200 void 3201 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3202 { 3203 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3204 } 3205 3206 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 3207