1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 52 53 static inline size_t 54 divide_round_up(size_t num, size_t divisor) 55 { 56 return (num + divisor - 1) / divisor; 57 } 58 59 static void 60 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 61 { 62 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 63 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 64 assert(bs->num_free_clusters > 0); 65 66 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 67 68 spdk_bit_array_set(bs->used_clusters, cluster_num); 69 bs->num_free_clusters--; 70 } 71 72 static void 73 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 74 { 75 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 76 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 77 assert(bs->num_free_clusters < bs->total_clusters); 78 79 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 80 81 spdk_bit_array_clear(bs->used_clusters, cluster_num); 82 bs->num_free_clusters++; 83 } 84 85 static struct spdk_blob_data * 86 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 87 { 88 struct spdk_blob_data *blob; 89 90 blob = calloc(1, sizeof(*blob)); 91 if (!blob) { 92 return NULL; 93 } 94 95 blob->id = id; 96 blob->bs = bs; 97 98 blob->state = SPDK_BLOB_STATE_DIRTY; 99 blob->active.num_pages = 1; 100 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 101 if (!blob->active.pages) { 102 free(blob); 103 return NULL; 104 } 105 106 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 107 108 TAILQ_INIT(&blob->xattrs); 109 110 return blob; 111 } 112 113 static void 114 _spdk_blob_free(struct spdk_blob_data *blob) 115 { 116 struct spdk_xattr *xattr, *xattr_tmp; 117 118 assert(blob != NULL); 119 120 free(blob->active.clusters); 121 free(blob->clean.clusters); 122 free(blob->active.pages); 123 free(blob->clean.pages); 124 125 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 126 TAILQ_REMOVE(&blob->xattrs, xattr, link); 127 free(xattr->name); 128 free(xattr->value); 129 free(xattr); 130 } 131 132 free(blob); 133 } 134 135 static int 136 _spdk_blob_mark_clean(struct spdk_blob_data *blob) 137 { 138 uint64_t *clusters = NULL; 139 uint32_t *pages = NULL; 140 141 assert(blob != NULL); 142 assert(blob->state == SPDK_BLOB_STATE_LOADING || 143 blob->state == SPDK_BLOB_STATE_SYNCING); 144 145 if (blob->active.num_clusters) { 146 assert(blob->active.clusters); 147 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 148 if (!clusters) { 149 return -1; 150 } 151 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 152 } 153 154 if (blob->active.num_pages) { 155 assert(blob->active.pages); 156 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 157 if (!pages) { 158 free(clusters); 159 return -1; 160 } 161 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 162 } 163 164 free(blob->clean.clusters); 165 free(blob->clean.pages); 166 167 blob->clean.num_clusters = blob->active.num_clusters; 168 blob->clean.clusters = blob->active.clusters; 169 blob->clean.num_pages = blob->active.num_pages; 170 blob->clean.pages = blob->active.pages; 171 172 blob->active.clusters = clusters; 173 blob->active.pages = pages; 174 175 blob->state = SPDK_BLOB_STATE_CLEAN; 176 177 return 0; 178 } 179 180 static int 181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob) 182 { 183 struct spdk_blob_md_descriptor *desc; 184 size_t cur_desc = 0; 185 void *tmp; 186 187 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 188 while (cur_desc < sizeof(page->descriptors)) { 189 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 190 if (desc->length == 0) { 191 /* If padding and length are 0, this terminates the page */ 192 break; 193 } 194 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 195 struct spdk_blob_md_descriptor_flags *desc_flags; 196 197 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 198 199 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 200 return -EINVAL; 201 } 202 203 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 204 SPDK_BLOB_INVALID_FLAGS_MASK) { 205 return -EINVAL; 206 } 207 208 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 209 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 210 blob->data_ro = true; 211 blob->md_ro = true; 212 } 213 214 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 215 SPDK_BLOB_MD_RO_FLAGS_MASK) { 216 blob->md_ro = true; 217 } 218 219 blob->invalid_flags = desc_flags->invalid_flags; 220 blob->data_ro_flags = desc_flags->data_ro_flags; 221 blob->md_ro_flags = desc_flags->md_ro_flags; 222 223 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 224 struct spdk_blob_md_descriptor_extent *desc_extent; 225 unsigned int i, j; 226 unsigned int cluster_count = blob->active.num_clusters; 227 228 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 229 230 if (desc_extent->length == 0 || 231 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) { 232 return -EINVAL; 233 } 234 235 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 236 for (j = 0; j < desc_extent->extents[i].length; j++) { 237 if (!spdk_bit_array_get(blob->bs->used_clusters, 238 desc_extent->extents[i].cluster_idx + j)) { 239 return -EINVAL; 240 } 241 cluster_count++; 242 } 243 } 244 245 if (cluster_count == 0) { 246 return -EINVAL; 247 } 248 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 249 if (tmp == NULL) { 250 return -ENOMEM; 251 } 252 blob->active.clusters = tmp; 253 blob->active.cluster_array_size = cluster_count; 254 255 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 256 for (j = 0; j < desc_extent->extents[i].length; j++) { 257 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 258 desc_extent->extents[i].cluster_idx + j); 259 } 260 } 261 262 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 263 struct spdk_blob_md_descriptor_xattr *desc_xattr; 264 struct spdk_xattr *xattr; 265 266 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 267 268 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 269 sizeof(desc_xattr->value_length) + 270 desc_xattr->name_length + desc_xattr->value_length) { 271 return -EINVAL; 272 } 273 274 xattr = calloc(1, sizeof(*xattr)); 275 if (xattr == NULL) { 276 return -ENOMEM; 277 } 278 279 xattr->name = malloc(desc_xattr->name_length + 1); 280 if (xattr->name == NULL) { 281 free(xattr); 282 return -ENOMEM; 283 } 284 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 285 xattr->name[desc_xattr->name_length] = '\0'; 286 287 xattr->value = malloc(desc_xattr->value_length); 288 if (xattr->value == NULL) { 289 free(xattr->name); 290 free(xattr); 291 return -ENOMEM; 292 } 293 xattr->value_len = desc_xattr->value_length; 294 memcpy(xattr->value, 295 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 296 desc_xattr->value_length); 297 298 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 299 } else { 300 /* Unrecognized descriptor type. Do not fail - just continue to the 301 * next descriptor. If this descriptor is associated with some feature 302 * defined in a newer version of blobstore, that version of blobstore 303 * should create and set an associated feature flag to specify if this 304 * blob can be loaded or not. 305 */ 306 } 307 308 /* Advance to the next descriptor */ 309 cur_desc += sizeof(*desc) + desc->length; 310 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 311 break; 312 } 313 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 314 } 315 316 return 0; 317 } 318 319 static int 320 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 321 struct spdk_blob_data *blob) 322 { 323 const struct spdk_blob_md_page *page; 324 uint32_t i; 325 int rc; 326 327 assert(page_count > 0); 328 assert(pages[0].sequence_num == 0); 329 assert(blob != NULL); 330 assert(blob->state == SPDK_BLOB_STATE_LOADING); 331 assert(blob->active.clusters == NULL); 332 assert(blob->state == SPDK_BLOB_STATE_LOADING); 333 334 /* The blobid provided doesn't match what's in the MD, this can 335 * happen for example if a bogus blobid is passed in through open. 336 */ 337 if (blob->id != pages[0].id) { 338 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 339 blob->id, pages[0].id); 340 return -ENOENT; 341 } 342 343 for (i = 0; i < page_count; i++) { 344 page = &pages[i]; 345 346 assert(page->id == blob->id); 347 assert(page->sequence_num == i); 348 349 rc = _spdk_blob_parse_page(page, blob); 350 if (rc != 0) { 351 return rc; 352 } 353 } 354 355 return 0; 356 } 357 358 static int 359 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob, 360 struct spdk_blob_md_page **pages, 361 uint32_t *page_count, 362 struct spdk_blob_md_page **last_page) 363 { 364 struct spdk_blob_md_page *page; 365 366 assert(pages != NULL); 367 assert(page_count != NULL); 368 369 if (*page_count == 0) { 370 assert(*pages == NULL); 371 *page_count = 1; 372 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 373 SPDK_BS_PAGE_SIZE, 374 NULL); 375 } else { 376 assert(*pages != NULL); 377 (*page_count)++; 378 *pages = spdk_dma_realloc(*pages, 379 SPDK_BS_PAGE_SIZE * (*page_count), 380 SPDK_BS_PAGE_SIZE, 381 NULL); 382 } 383 384 if (*pages == NULL) { 385 *page_count = 0; 386 *last_page = NULL; 387 return -ENOMEM; 388 } 389 390 page = &(*pages)[*page_count - 1]; 391 memset(page, 0, sizeof(*page)); 392 page->id = blob->id; 393 page->sequence_num = *page_count - 1; 394 page->next = SPDK_INVALID_MD_PAGE; 395 *last_page = page; 396 397 return 0; 398 } 399 400 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 401 * Update required_sz on both success and failure. 402 * 403 */ 404 static int 405 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 406 uint8_t *buf, size_t buf_sz, 407 size_t *required_sz) 408 { 409 struct spdk_blob_md_descriptor_xattr *desc; 410 411 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 412 strlen(xattr->name) + 413 xattr->value_len; 414 415 if (buf_sz < *required_sz) { 416 return -1; 417 } 418 419 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 420 421 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 422 desc->length = sizeof(desc->name_length) + 423 sizeof(desc->value_length) + 424 strlen(xattr->name) + 425 xattr->value_len; 426 desc->name_length = strlen(xattr->name); 427 desc->value_length = xattr->value_len; 428 429 memcpy(desc->name, xattr->name, desc->name_length); 430 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 431 xattr->value, 432 desc->value_length); 433 434 return 0; 435 } 436 437 static void 438 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob, 439 uint64_t start_cluster, uint64_t *next_cluster, 440 uint8_t *buf, size_t buf_sz) 441 { 442 struct spdk_blob_md_descriptor_extent *desc; 443 size_t cur_sz; 444 uint64_t i, extent_idx; 445 uint32_t lba, lba_per_cluster, lba_count; 446 447 /* The buffer must have room for at least one extent */ 448 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 449 if (buf_sz < cur_sz) { 450 *next_cluster = start_cluster; 451 return; 452 } 453 454 desc = (struct spdk_blob_md_descriptor_extent *)buf; 455 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 456 457 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 458 459 lba = blob->active.clusters[start_cluster]; 460 lba_count = lba_per_cluster; 461 extent_idx = 0; 462 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 463 if ((lba + lba_count) == blob->active.clusters[i]) { 464 lba_count += lba_per_cluster; 465 continue; 466 } 467 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 468 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 469 extent_idx++; 470 471 cur_sz += sizeof(desc->extents[extent_idx]); 472 473 if (buf_sz < cur_sz) { 474 /* If we ran out of buffer space, return */ 475 desc->length = sizeof(desc->extents[0]) * extent_idx; 476 *next_cluster = i; 477 return; 478 } 479 480 lba = blob->active.clusters[i]; 481 lba_count = lba_per_cluster; 482 } 483 484 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 485 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 486 extent_idx++; 487 488 desc->length = sizeof(desc->extents[0]) * extent_idx; 489 *next_cluster = blob->active.num_clusters; 490 491 return; 492 } 493 494 static void 495 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob, 496 uint8_t *buf, size_t *buf_sz) 497 { 498 struct spdk_blob_md_descriptor_flags *desc; 499 500 /* 501 * Flags get serialized first, so we should always have room for the flags 502 * descriptor. 503 */ 504 assert(*buf_sz >= sizeof(*desc)); 505 506 desc = (struct spdk_blob_md_descriptor_flags *)buf; 507 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 508 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 509 desc->invalid_flags = blob->invalid_flags; 510 desc->data_ro_flags = blob->data_ro_flags; 511 desc->md_ro_flags = blob->md_ro_flags; 512 513 *buf_sz -= sizeof(*desc); 514 } 515 516 static int 517 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages, 518 uint32_t *page_count) 519 { 520 struct spdk_blob_md_page *cur_page; 521 const struct spdk_xattr *xattr; 522 int rc; 523 uint8_t *buf; 524 size_t remaining_sz; 525 uint64_t last_cluster; 526 527 assert(pages != NULL); 528 assert(page_count != NULL); 529 assert(blob != NULL); 530 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 531 532 *pages = NULL; 533 *page_count = 0; 534 535 /* A blob always has at least 1 page, even if it has no descriptors */ 536 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 537 if (rc < 0) { 538 return rc; 539 } 540 541 buf = (uint8_t *)cur_page->descriptors; 542 remaining_sz = sizeof(cur_page->descriptors); 543 544 /* Serialize flags */ 545 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 546 547 /* Serialize xattrs */ 548 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 549 size_t required_sz = 0; 550 rc = _spdk_blob_serialize_xattr(xattr, 551 buf, remaining_sz, 552 &required_sz); 553 if (rc < 0) { 554 /* Need to add a new page to the chain */ 555 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 556 &cur_page); 557 if (rc < 0) { 558 spdk_dma_free(*pages); 559 *pages = NULL; 560 *page_count = 0; 561 return rc; 562 } 563 564 buf = (uint8_t *)cur_page->descriptors; 565 remaining_sz = sizeof(cur_page->descriptors); 566 567 /* Try again */ 568 required_sz = 0; 569 rc = _spdk_blob_serialize_xattr(xattr, 570 buf, remaining_sz, 571 &required_sz); 572 573 if (rc < 0) { 574 spdk_dma_free(*pages); 575 *pages = NULL; 576 *page_count = 0; 577 return -1; 578 } 579 } 580 581 remaining_sz -= required_sz; 582 buf += required_sz; 583 } 584 585 /* Serialize extents */ 586 last_cluster = 0; 587 while (last_cluster < blob->active.num_clusters) { 588 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 589 buf, remaining_sz); 590 591 if (last_cluster == blob->active.num_clusters) { 592 break; 593 } 594 595 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 596 &cur_page); 597 if (rc < 0) { 598 return rc; 599 } 600 601 buf = (uint8_t *)cur_page->descriptors; 602 remaining_sz = sizeof(cur_page->descriptors); 603 } 604 605 return 0; 606 } 607 608 struct spdk_blob_load_ctx { 609 struct spdk_blob_data *blob; 610 611 struct spdk_blob_md_page *pages; 612 uint32_t num_pages; 613 614 spdk_bs_sequence_cpl cb_fn; 615 void *cb_arg; 616 }; 617 618 static uint32_t 619 _spdk_blob_md_page_calc_crc(void *page) 620 { 621 uint32_t crc; 622 623 crc = BLOB_CRC32C_INITIAL; 624 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 625 crc ^= BLOB_CRC32C_INITIAL; 626 627 return crc; 628 629 } 630 631 static void 632 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 633 { 634 struct spdk_blob_load_ctx *ctx = cb_arg; 635 struct spdk_blob_data *blob = ctx->blob; 636 struct spdk_blob_md_page *page; 637 int rc; 638 uint32_t crc; 639 640 page = &ctx->pages[ctx->num_pages - 1]; 641 crc = _spdk_blob_md_page_calc_crc(page); 642 if (crc != page->crc) { 643 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 644 _spdk_blob_free(blob); 645 ctx->cb_fn(seq, NULL, -EINVAL); 646 spdk_dma_free(ctx->pages); 647 free(ctx); 648 return; 649 } 650 651 if (page->next != SPDK_INVALID_MD_PAGE) { 652 uint32_t next_page = page->next; 653 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 654 655 656 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 657 658 /* Read the next page */ 659 ctx->num_pages++; 660 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 661 sizeof(*page), NULL); 662 if (ctx->pages == NULL) { 663 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 664 free(ctx); 665 return; 666 } 667 668 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 669 next_lba, 670 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 671 _spdk_blob_load_cpl, ctx); 672 return; 673 } 674 675 /* Parse the pages */ 676 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 677 if (rc) { 678 _spdk_blob_free(blob); 679 ctx->cb_fn(seq, NULL, rc); 680 spdk_dma_free(ctx->pages); 681 free(ctx); 682 return; 683 } 684 685 _spdk_blob_mark_clean(blob); 686 687 ctx->cb_fn(seq, ctx->cb_arg, rc); 688 689 /* Free the memory */ 690 spdk_dma_free(ctx->pages); 691 free(ctx); 692 } 693 694 /* Load a blob from disk given a blobid */ 695 static void 696 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 697 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 698 { 699 struct spdk_blob_load_ctx *ctx; 700 struct spdk_blob_store *bs; 701 uint32_t page_num; 702 uint64_t lba; 703 704 assert(blob != NULL); 705 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 706 blob->state == SPDK_BLOB_STATE_DIRTY); 707 708 bs = blob->bs; 709 710 ctx = calloc(1, sizeof(*ctx)); 711 if (!ctx) { 712 cb_fn(seq, cb_arg, -ENOMEM); 713 return; 714 } 715 716 ctx->blob = blob; 717 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 718 SPDK_BS_PAGE_SIZE, NULL); 719 if (!ctx->pages) { 720 free(ctx); 721 cb_fn(seq, cb_arg, -ENOMEM); 722 return; 723 } 724 ctx->num_pages = 1; 725 ctx->cb_fn = cb_fn; 726 ctx->cb_arg = cb_arg; 727 728 page_num = _spdk_bs_blobid_to_page(blob->id); 729 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 730 731 blob->state = SPDK_BLOB_STATE_LOADING; 732 733 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 734 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 735 _spdk_blob_load_cpl, ctx); 736 } 737 738 struct spdk_blob_persist_ctx { 739 struct spdk_blob_data *blob; 740 741 struct spdk_blob_md_page *pages; 742 743 uint64_t idx; 744 745 spdk_bs_sequence_cpl cb_fn; 746 void *cb_arg; 747 }; 748 749 static void 750 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 751 { 752 struct spdk_blob_persist_ctx *ctx = cb_arg; 753 struct spdk_blob_data *blob = ctx->blob; 754 755 if (bserrno == 0) { 756 _spdk_blob_mark_clean(blob); 757 } 758 759 /* Call user callback */ 760 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 761 762 /* Free the memory */ 763 spdk_dma_free(ctx->pages); 764 free(ctx); 765 } 766 767 static void 768 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 769 { 770 struct spdk_blob_persist_ctx *ctx = cb_arg; 771 struct spdk_blob_data *blob = ctx->blob; 772 struct spdk_blob_store *bs = blob->bs; 773 void *tmp; 774 size_t i; 775 776 /* Release all clusters that were truncated */ 777 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 778 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 779 780 _spdk_bs_release_cluster(bs, cluster_num); 781 } 782 783 if (blob->active.num_clusters == 0) { 784 free(blob->active.clusters); 785 blob->active.clusters = NULL; 786 blob->active.cluster_array_size = 0; 787 } else { 788 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 789 assert(tmp != NULL); 790 blob->active.clusters = tmp; 791 blob->active.cluster_array_size = blob->active.num_clusters; 792 } 793 794 _spdk_blob_persist_complete(seq, ctx, bserrno); 795 } 796 797 static void 798 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 799 { 800 struct spdk_blob_persist_ctx *ctx = cb_arg; 801 struct spdk_blob_data *blob = ctx->blob; 802 struct spdk_blob_store *bs = blob->bs; 803 spdk_bs_batch_t *batch; 804 size_t i; 805 uint64_t lba; 806 uint32_t lba_count; 807 808 /* Clusters don't move around in blobs. The list shrinks or grows 809 * at the end, but no changes ever occur in the middle of the list. 810 */ 811 812 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 813 814 /* Unmap all clusters that were truncated */ 815 lba = 0; 816 lba_count = 0; 817 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 818 uint64_t next_lba = blob->active.clusters[i]; 819 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 820 821 if ((lba + lba_count) == next_lba) { 822 /* This cluster is contiguous with the previous one. */ 823 lba_count += next_lba_count; 824 continue; 825 } 826 827 /* This cluster is not contiguous with the previous one. */ 828 829 /* If a run of LBAs previously existing, send them 830 * as an unmap. 831 */ 832 if (lba_count > 0) { 833 spdk_bs_batch_unmap(batch, lba, lba_count); 834 } 835 836 /* Start building the next batch */ 837 lba = next_lba; 838 lba_count = next_lba_count; 839 } 840 841 /* If we ended with a contiguous set of LBAs, send the unmap now */ 842 if (lba_count > 0) { 843 spdk_bs_batch_unmap(batch, lba, lba_count); 844 } 845 846 spdk_bs_batch_close(batch); 847 } 848 849 static void 850 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 851 { 852 struct spdk_blob_persist_ctx *ctx = cb_arg; 853 struct spdk_blob_data *blob = ctx->blob; 854 struct spdk_blob_store *bs = blob->bs; 855 size_t i; 856 857 /* This loop starts at 1 because the first page is special and handled 858 * below. The pages (except the first) are never written in place, 859 * so any pages in the clean list must be zeroed. 860 */ 861 for (i = 1; i < blob->clean.num_pages; i++) { 862 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 863 } 864 865 if (blob->active.num_pages == 0) { 866 uint32_t page_num; 867 868 page_num = _spdk_bs_blobid_to_page(blob->id); 869 spdk_bit_array_clear(bs->used_md_pages, page_num); 870 } 871 872 /* Move on to unmapping clusters */ 873 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 874 } 875 876 static void 877 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 878 { 879 struct spdk_blob_persist_ctx *ctx = cb_arg; 880 struct spdk_blob_data *blob = ctx->blob; 881 struct spdk_blob_store *bs = blob->bs; 882 uint64_t lba; 883 uint32_t lba_count; 884 spdk_bs_batch_t *batch; 885 size_t i; 886 887 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 888 889 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 890 891 /* This loop starts at 1 because the first page is special and handled 892 * below. The pages (except the first) are never written in place, 893 * so any pages in the clean list must be zeroed. 894 */ 895 for (i = 1; i < blob->clean.num_pages; i++) { 896 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 897 898 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 899 } 900 901 /* The first page will only be zeroed if this is a delete. */ 902 if (blob->active.num_pages == 0) { 903 uint32_t page_num; 904 905 /* The first page in the metadata goes where the blobid indicates */ 906 page_num = _spdk_bs_blobid_to_page(blob->id); 907 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 908 909 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 910 } 911 912 spdk_bs_batch_close(batch); 913 } 914 915 static void 916 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 917 { 918 struct spdk_blob_persist_ctx *ctx = cb_arg; 919 struct spdk_blob_data *blob = ctx->blob; 920 struct spdk_blob_store *bs = blob->bs; 921 uint64_t lba; 922 uint32_t lba_count; 923 struct spdk_blob_md_page *page; 924 925 if (blob->active.num_pages == 0) { 926 /* Move on to the next step */ 927 _spdk_blob_persist_zero_pages(seq, ctx, 0); 928 return; 929 } 930 931 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 932 933 page = &ctx->pages[0]; 934 /* The first page in the metadata goes where the blobid indicates */ 935 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 936 937 spdk_bs_sequence_write(seq, page, lba, lba_count, 938 _spdk_blob_persist_zero_pages, ctx); 939 } 940 941 static void 942 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 943 { 944 struct spdk_blob_persist_ctx *ctx = cb_arg; 945 struct spdk_blob_data *blob = ctx->blob; 946 struct spdk_blob_store *bs = blob->bs; 947 uint64_t lba; 948 uint32_t lba_count; 949 struct spdk_blob_md_page *page; 950 spdk_bs_batch_t *batch; 951 size_t i; 952 953 /* Clusters don't move around in blobs. The list shrinks or grows 954 * at the end, but no changes ever occur in the middle of the list. 955 */ 956 957 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 958 959 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 960 961 /* This starts at 1. The root page is not written until 962 * all of the others are finished 963 */ 964 for (i = 1; i < blob->active.num_pages; i++) { 965 page = &ctx->pages[i]; 966 assert(page->sequence_num == i); 967 968 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 969 970 spdk_bs_batch_write(batch, page, lba, lba_count); 971 } 972 973 spdk_bs_batch_close(batch); 974 } 975 976 static int 977 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz) 978 { 979 uint64_t i; 980 uint64_t *tmp; 981 uint64_t lfc; /* lowest free cluster */ 982 struct spdk_blob_store *bs; 983 984 bs = blob->bs; 985 986 assert(blob->state != SPDK_BLOB_STATE_LOADING && 987 blob->state != SPDK_BLOB_STATE_SYNCING); 988 989 if (blob->active.num_clusters == sz) { 990 return 0; 991 } 992 993 if (blob->active.num_clusters < blob->active.cluster_array_size) { 994 /* If this blob was resized to be larger, then smaller, then 995 * larger without syncing, then the cluster array already 996 * contains spare assigned clusters we can use. 997 */ 998 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 999 sz); 1000 } 1001 1002 blob->state = SPDK_BLOB_STATE_DIRTY; 1003 1004 /* Do two passes - one to verify that we can obtain enough clusters 1005 * and another to actually claim them. 1006 */ 1007 1008 lfc = 0; 1009 for (i = blob->active.num_clusters; i < sz; i++) { 1010 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1011 if (lfc >= bs->total_clusters) { 1012 /* No more free clusters. Cannot satisfy the request */ 1013 assert(false); 1014 return -1; 1015 } 1016 lfc++; 1017 } 1018 1019 if (sz > blob->active.num_clusters) { 1020 /* Expand the cluster array if necessary. 1021 * We only shrink the array when persisting. 1022 */ 1023 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 1024 if (sz > 0 && tmp == NULL) { 1025 assert(false); 1026 return -1; 1027 } 1028 blob->active.clusters = tmp; 1029 blob->active.cluster_array_size = sz; 1030 } 1031 1032 lfc = 0; 1033 for (i = blob->active.num_clusters; i < sz; i++) { 1034 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1035 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 1036 _spdk_bs_claim_cluster(bs, lfc); 1037 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 1038 lfc++; 1039 } 1040 1041 blob->active.num_clusters = sz; 1042 1043 return 0; 1044 } 1045 1046 /* Write a blob to disk */ 1047 static void 1048 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 1049 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1050 { 1051 struct spdk_blob_persist_ctx *ctx; 1052 int rc; 1053 uint64_t i; 1054 uint32_t page_num; 1055 struct spdk_blob_store *bs; 1056 1057 assert(blob != NULL); 1058 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 1059 blob->state == SPDK_BLOB_STATE_DIRTY); 1060 1061 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1062 cb_fn(seq, cb_arg, 0); 1063 return; 1064 } 1065 1066 bs = blob->bs; 1067 1068 ctx = calloc(1, sizeof(*ctx)); 1069 if (!ctx) { 1070 cb_fn(seq, cb_arg, -ENOMEM); 1071 return; 1072 } 1073 ctx->blob = blob; 1074 ctx->cb_fn = cb_fn; 1075 ctx->cb_arg = cb_arg; 1076 1077 blob->state = SPDK_BLOB_STATE_SYNCING; 1078 1079 if (blob->active.num_pages == 0) { 1080 /* This is the signal that the blob should be deleted. 1081 * Immediately jump to the clean up routine. */ 1082 assert(blob->clean.num_pages > 0); 1083 ctx->idx = blob->clean.num_pages - 1; 1084 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1085 return; 1086 1087 } 1088 1089 /* Generate the new metadata */ 1090 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1091 if (rc < 0) { 1092 free(ctx); 1093 cb_fn(seq, cb_arg, rc); 1094 return; 1095 } 1096 1097 assert(blob->active.num_pages >= 1); 1098 1099 /* Resize the cache of page indices */ 1100 blob->active.pages = realloc(blob->active.pages, 1101 blob->active.num_pages * sizeof(*blob->active.pages)); 1102 if (!blob->active.pages) { 1103 free(ctx); 1104 cb_fn(seq, cb_arg, -ENOMEM); 1105 return; 1106 } 1107 1108 /* Assign this metadata to pages. This requires two passes - 1109 * one to verify that there are enough pages and a second 1110 * to actually claim them. */ 1111 page_num = 0; 1112 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1113 for (i = 1; i < blob->active.num_pages; i++) { 1114 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1115 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1116 spdk_dma_free(ctx->pages); 1117 free(ctx); 1118 blob->state = SPDK_BLOB_STATE_DIRTY; 1119 cb_fn(seq, cb_arg, -ENOMEM); 1120 return; 1121 } 1122 page_num++; 1123 } 1124 1125 page_num = 0; 1126 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1127 for (i = 1; i < blob->active.num_pages; i++) { 1128 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1129 ctx->pages[i - 1].next = page_num; 1130 /* Now that previous metadata page is complete, calculate the crc for it. */ 1131 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1132 blob->active.pages[i] = page_num; 1133 spdk_bit_array_set(bs->used_md_pages, page_num); 1134 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1135 page_num++; 1136 } 1137 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1138 /* Start writing the metadata from last page to first */ 1139 ctx->idx = blob->active.num_pages - 1; 1140 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1141 } 1142 1143 static void 1144 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1145 void *payload, uint64_t offset, uint64_t length, 1146 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1147 { 1148 struct spdk_blob_data *blob = __blob_to_data(_blob); 1149 spdk_bs_batch_t *batch; 1150 struct spdk_bs_cpl cpl; 1151 uint64_t lba; 1152 uint32_t lba_count; 1153 uint8_t *buf; 1154 uint64_t page; 1155 1156 assert(blob != NULL); 1157 1158 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1159 cb_fn(cb_arg, -EPERM); 1160 return; 1161 } 1162 1163 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1164 cb_fn(cb_arg, -EINVAL); 1165 return; 1166 } 1167 1168 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1169 cpl.u.blob_basic.cb_fn = cb_fn; 1170 cpl.u.blob_basic.cb_arg = cb_arg; 1171 1172 batch = spdk_bs_batch_open(_channel, &cpl); 1173 if (!batch) { 1174 cb_fn(cb_arg, -ENOMEM); 1175 return; 1176 } 1177 1178 length = _spdk_bs_page_to_lba(blob->bs, length); 1179 page = offset; 1180 buf = payload; 1181 while (length > 0) { 1182 lba = _spdk_bs_blob_page_to_lba(blob, page); 1183 lba_count = spdk_min(length, 1184 _spdk_bs_page_to_lba(blob->bs, 1185 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1186 1187 switch (op_type) { 1188 case SPDK_BLOB_READ: 1189 spdk_bs_batch_read(batch, buf, lba, lba_count); 1190 break; 1191 case SPDK_BLOB_WRITE: 1192 spdk_bs_batch_write(batch, buf, lba, lba_count); 1193 break; 1194 case SPDK_BLOB_UNMAP: 1195 spdk_bs_batch_unmap(batch, lba, lba_count); 1196 break; 1197 case SPDK_BLOB_WRITE_ZEROES: 1198 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1199 break; 1200 } 1201 1202 length -= lba_count; 1203 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1204 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1205 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1206 } 1207 } 1208 1209 spdk_bs_batch_close(batch); 1210 } 1211 1212 struct rw_iov_ctx { 1213 struct spdk_blob_data *blob; 1214 bool read; 1215 int iovcnt; 1216 struct iovec *orig_iov; 1217 uint64_t page_offset; 1218 uint64_t pages_remaining; 1219 uint64_t pages_done; 1220 struct iovec iov[0]; 1221 }; 1222 1223 static void 1224 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1225 { 1226 assert(cb_arg == NULL); 1227 spdk_bs_sequence_finish(seq, bserrno); 1228 } 1229 1230 static void 1231 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1232 { 1233 struct rw_iov_ctx *ctx = cb_arg; 1234 struct iovec *iov, *orig_iov; 1235 int iovcnt; 1236 size_t orig_iovoff; 1237 uint64_t lba; 1238 uint64_t page_count, pages_to_boundary; 1239 uint32_t lba_count; 1240 uint64_t byte_count; 1241 1242 if (bserrno != 0 || ctx->pages_remaining == 0) { 1243 free(ctx); 1244 spdk_bs_sequence_finish(seq, bserrno); 1245 return; 1246 } 1247 1248 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1249 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1250 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1251 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1252 1253 /* 1254 * Get index and offset into the original iov array for our current position in the I/O sequence. 1255 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1256 * point to the current position in the I/O sequence. 1257 */ 1258 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1259 orig_iov = &ctx->orig_iov[0]; 1260 orig_iovoff = 0; 1261 while (byte_count > 0) { 1262 if (byte_count >= orig_iov->iov_len) { 1263 byte_count -= orig_iov->iov_len; 1264 orig_iov++; 1265 } else { 1266 orig_iovoff = byte_count; 1267 byte_count = 0; 1268 } 1269 } 1270 1271 /* 1272 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1273 * bytes of this next I/O remain to be accounted for in the new iov array. 1274 */ 1275 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1276 iov = &ctx->iov[0]; 1277 iovcnt = 0; 1278 while (byte_count > 0) { 1279 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1280 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1281 byte_count -= iov->iov_len; 1282 orig_iovoff = 0; 1283 orig_iov++; 1284 iov++; 1285 iovcnt++; 1286 } 1287 1288 ctx->page_offset += page_count; 1289 ctx->pages_done += page_count; 1290 ctx->pages_remaining -= page_count; 1291 iov = &ctx->iov[0]; 1292 1293 if (ctx->read) { 1294 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1295 } else { 1296 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1297 } 1298 } 1299 1300 static void 1301 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1302 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1303 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1304 { 1305 struct spdk_blob_data *blob = __blob_to_data(_blob); 1306 spdk_bs_sequence_t *seq; 1307 struct spdk_bs_cpl cpl; 1308 1309 assert(blob != NULL); 1310 1311 if (!read && blob->data_ro) { 1312 cb_fn(cb_arg, -EPERM); 1313 return; 1314 } 1315 1316 if (length == 0) { 1317 cb_fn(cb_arg, 0); 1318 return; 1319 } 1320 1321 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1322 cb_fn(cb_arg, -EINVAL); 1323 return; 1324 } 1325 1326 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1327 cpl.u.blob_basic.cb_fn = cb_fn; 1328 cpl.u.blob_basic.cb_arg = cb_arg; 1329 1330 /* 1331 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1332 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1333 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1334 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1335 * to allocate a separate iov array and split the I/O such that none of the resulting 1336 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1337 * but since this case happens very infrequently, any performance impact will be negligible. 1338 * 1339 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1340 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1341 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1342 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1343 */ 1344 seq = spdk_bs_sequence_start(_channel, &cpl); 1345 if (!seq) { 1346 cb_fn(cb_arg, -ENOMEM); 1347 return; 1348 } 1349 1350 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1351 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1352 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1353 1354 if (read) { 1355 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1356 } else { 1357 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1358 } 1359 } else { 1360 struct rw_iov_ctx *ctx; 1361 1362 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1363 if (ctx == NULL) { 1364 spdk_bs_sequence_finish(seq, -ENOMEM); 1365 return; 1366 } 1367 1368 ctx->blob = blob; 1369 ctx->read = read; 1370 ctx->orig_iov = iov; 1371 ctx->iovcnt = iovcnt; 1372 ctx->page_offset = offset; 1373 ctx->pages_remaining = length; 1374 ctx->pages_done = 0; 1375 1376 _spdk_rw_iov_split_next(seq, ctx, 0); 1377 } 1378 } 1379 1380 static struct spdk_blob_data * 1381 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1382 { 1383 struct spdk_blob_data *blob; 1384 1385 TAILQ_FOREACH(blob, &bs->blobs, link) { 1386 if (blob->id == blobid) { 1387 return blob; 1388 } 1389 } 1390 1391 return NULL; 1392 } 1393 1394 static int 1395 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1396 uint32_t max_ops) 1397 { 1398 struct spdk_bs_dev *dev; 1399 uint32_t i; 1400 1401 dev = bs->dev; 1402 1403 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1404 if (!channel->req_mem) { 1405 return -1; 1406 } 1407 1408 TAILQ_INIT(&channel->reqs); 1409 1410 for (i = 0; i < max_ops; i++) { 1411 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1412 } 1413 1414 channel->bs = bs; 1415 channel->dev = dev; 1416 channel->dev_channel = dev->create_channel(dev); 1417 1418 if (!channel->dev_channel) { 1419 SPDK_ERRLOG("Failed to create device channel.\n"); 1420 free(channel->req_mem); 1421 return -1; 1422 } 1423 1424 return 0; 1425 } 1426 1427 static int 1428 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1429 { 1430 struct spdk_blob_store *bs; 1431 struct spdk_bs_channel *channel = ctx_buf; 1432 1433 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1434 1435 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1436 } 1437 1438 static int 1439 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1440 { 1441 struct spdk_blob_store *bs; 1442 struct spdk_bs_channel *channel = ctx_buf; 1443 1444 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1445 1446 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1447 } 1448 1449 1450 static void 1451 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1452 { 1453 struct spdk_bs_channel *channel = ctx_buf; 1454 1455 free(channel->req_mem); 1456 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1457 } 1458 1459 static void 1460 _spdk_bs_dev_destroy(void *io_device) 1461 { 1462 struct spdk_blob_store *bs; 1463 struct spdk_blob_data *blob, *blob_tmp; 1464 1465 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1466 bs->dev->destroy(bs->dev); 1467 1468 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1469 TAILQ_REMOVE(&bs->blobs, blob, link); 1470 _spdk_blob_free(blob); 1471 } 1472 1473 spdk_bit_array_free(&bs->used_md_pages); 1474 spdk_bit_array_free(&bs->used_clusters); 1475 /* 1476 * If this function is called for any reason except a successful unload, 1477 * the unload_cpl type will be NONE and this will be a nop. 1478 */ 1479 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1480 1481 free(bs); 1482 } 1483 1484 static void 1485 _spdk_bs_free(struct spdk_blob_store *bs) 1486 { 1487 spdk_bs_unregister_md_thread(bs); 1488 spdk_io_device_unregister(&bs->io_target, NULL); 1489 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1490 } 1491 1492 void 1493 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1494 { 1495 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1496 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1497 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1498 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1499 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1500 } 1501 1502 static int 1503 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1504 { 1505 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1506 opts->max_channel_ops == 0) { 1507 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1508 return -1; 1509 } 1510 1511 return 0; 1512 } 1513 1514 static struct spdk_blob_store * 1515 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1516 { 1517 struct spdk_blob_store *bs; 1518 uint64_t dev_size; 1519 int rc; 1520 1521 dev_size = dev->blocklen * dev->blockcnt; 1522 if (dev_size < opts->cluster_sz) { 1523 /* Device size cannot be smaller than cluster size of blobstore */ 1524 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1525 opts->cluster_sz); 1526 return NULL; 1527 } 1528 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1529 /* Cluster size cannot be smaller than page size */ 1530 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1531 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1532 return NULL; 1533 } 1534 bs = calloc(1, sizeof(struct spdk_blob_store)); 1535 if (!bs) { 1536 return NULL; 1537 } 1538 1539 TAILQ_INIT(&bs->blobs); 1540 bs->dev = dev; 1541 1542 /* 1543 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1544 * even multiple of the cluster size. 1545 */ 1546 bs->cluster_sz = opts->cluster_sz; 1547 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1548 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1549 bs->num_free_clusters = bs->total_clusters; 1550 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1551 if (bs->used_clusters == NULL) { 1552 free(bs); 1553 return NULL; 1554 } 1555 1556 bs->md_target.max_md_ops = opts->max_md_ops; 1557 bs->io_target.max_channel_ops = opts->max_channel_ops; 1558 bs->super_blob = SPDK_BLOBID_INVALID; 1559 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1560 1561 /* The metadata is assumed to be at least 1 page */ 1562 bs->used_md_pages = spdk_bit_array_create(1); 1563 1564 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1565 sizeof(struct spdk_bs_channel)); 1566 rc = spdk_bs_register_md_thread(bs); 1567 if (rc == -1) { 1568 spdk_io_device_unregister(&bs->md_target, NULL); 1569 spdk_bit_array_free(&bs->used_md_pages); 1570 spdk_bit_array_free(&bs->used_clusters); 1571 free(bs); 1572 return NULL; 1573 } 1574 1575 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1576 sizeof(struct spdk_bs_channel)); 1577 1578 return bs; 1579 } 1580 1581 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1582 1583 struct spdk_bs_load_ctx { 1584 struct spdk_blob_store *bs; 1585 struct spdk_bs_super_block *super; 1586 1587 struct spdk_bs_md_mask *mask; 1588 bool in_page_chain; 1589 uint32_t page_index; 1590 uint32_t cur_page; 1591 struct spdk_blob_md_page *page; 1592 }; 1593 1594 static void 1595 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1596 { 1597 uint32_t i = 0; 1598 1599 while (true) { 1600 i = spdk_bit_array_find_first_set(array, i); 1601 if (i >= mask->length) { 1602 break; 1603 } 1604 mask->mask[i / 8] |= 1U << (i % 8); 1605 i++; 1606 } 1607 } 1608 1609 static void 1610 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1611 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1612 { 1613 /* Update the values in the super block */ 1614 super->super_blob = bs->super_blob; 1615 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1616 super->crc = _spdk_blob_md_page_calc_crc(super); 1617 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1618 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1619 cb_fn, cb_arg); 1620 } 1621 1622 static void 1623 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1624 { 1625 struct spdk_bs_load_ctx *ctx = arg; 1626 uint64_t mask_size, lba, lba_count; 1627 1628 /* Write out the used clusters mask */ 1629 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1630 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1631 if (!ctx->mask) { 1632 spdk_dma_free(ctx->super); 1633 free(ctx); 1634 spdk_bs_sequence_finish(seq, -ENOMEM); 1635 return; 1636 } 1637 1638 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1639 ctx->mask->length = ctx->bs->total_clusters; 1640 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1641 1642 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1643 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1644 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1645 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1646 } 1647 1648 static void 1649 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1650 { 1651 struct spdk_bs_load_ctx *ctx = arg; 1652 uint64_t mask_size, lba, lba_count; 1653 1654 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1655 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1656 if (!ctx->mask) { 1657 spdk_dma_free(ctx->super); 1658 free(ctx); 1659 spdk_bs_sequence_finish(seq, -ENOMEM); 1660 return; 1661 } 1662 1663 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1664 ctx->mask->length = ctx->super->md_len; 1665 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1666 1667 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1668 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1669 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1670 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1671 } 1672 1673 static void 1674 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1675 { 1676 struct spdk_bs_load_ctx *ctx = cb_arg; 1677 uint32_t i, j; 1678 int rc; 1679 1680 /* The type must be correct */ 1681 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1682 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1683 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1684 struct spdk_blob_md_page) * 8)); 1685 /* The length of the mask must be exactly equal to the total number of clusters */ 1686 assert(ctx->mask->length == ctx->bs->total_clusters); 1687 1688 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1689 if (rc < 0) { 1690 spdk_dma_free(ctx->super); 1691 spdk_dma_free(ctx->mask); 1692 _spdk_bs_free(ctx->bs); 1693 free(ctx); 1694 spdk_bs_sequence_finish(seq, -ENOMEM); 1695 return; 1696 } 1697 1698 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1699 for (i = 0; i < ctx->mask->length / 8; i++) { 1700 uint8_t segment = ctx->mask->mask[i]; 1701 for (j = 0; segment && (j < 8); j++) { 1702 if (segment & 1U) { 1703 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1704 assert(ctx->bs->num_free_clusters > 0); 1705 ctx->bs->num_free_clusters--; 1706 } 1707 segment >>= 1U; 1708 } 1709 } 1710 1711 spdk_dma_free(ctx->super); 1712 spdk_dma_free(ctx->mask); 1713 free(ctx); 1714 1715 spdk_bs_sequence_finish(seq, bserrno); 1716 } 1717 1718 static void 1719 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1720 { 1721 struct spdk_bs_load_ctx *ctx = cb_arg; 1722 uint64_t lba, lba_count, mask_size; 1723 uint32_t i, j; 1724 int rc; 1725 1726 /* The type must be correct */ 1727 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1728 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1729 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1730 8)); 1731 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1732 assert(ctx->mask->length == ctx->super->md_len); 1733 1734 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1735 if (rc < 0) { 1736 spdk_dma_free(ctx->super); 1737 spdk_dma_free(ctx->mask); 1738 _spdk_bs_free(ctx->bs); 1739 free(ctx); 1740 spdk_bs_sequence_finish(seq, -ENOMEM); 1741 return; 1742 } 1743 1744 for (i = 0; i < ctx->mask->length / 8; i++) { 1745 uint8_t segment = ctx->mask->mask[i]; 1746 for (j = 0; segment && (j < 8); j++) { 1747 if (segment & 1U) { 1748 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1749 } 1750 segment >>= 1U; 1751 } 1752 } 1753 spdk_dma_free(ctx->mask); 1754 1755 /* Read the used clusters mask */ 1756 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1757 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1758 if (!ctx->mask) { 1759 spdk_dma_free(ctx->super); 1760 _spdk_bs_free(ctx->bs); 1761 free(ctx); 1762 spdk_bs_sequence_finish(seq, -ENOMEM); 1763 return; 1764 } 1765 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1766 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1767 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1768 _spdk_bs_load_used_clusters_cpl, ctx); 1769 } 1770 1771 static void 1772 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1773 { 1774 struct spdk_bs_load_ctx *ctx = cb_arg; 1775 uint64_t lba, lba_count, mask_size; 1776 1777 /* Read the used pages mask */ 1778 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1779 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1780 if (!ctx->mask) { 1781 spdk_dma_free(ctx->super); 1782 _spdk_bs_free(ctx->bs); 1783 free(ctx); 1784 spdk_bs_sequence_finish(seq, -ENOMEM); 1785 return; 1786 } 1787 1788 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1789 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1790 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1791 _spdk_bs_load_used_pages_cpl, ctx); 1792 } 1793 1794 static int 1795 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1796 { 1797 struct spdk_blob_md_descriptor *desc; 1798 size_t cur_desc = 0; 1799 1800 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1801 while (cur_desc < sizeof(page->descriptors)) { 1802 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1803 if (desc->length == 0) { 1804 /* If padding and length are 0, this terminates the page */ 1805 break; 1806 } 1807 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1808 struct spdk_blob_md_descriptor_extent *desc_extent; 1809 unsigned int i, j; 1810 unsigned int cluster_count = 0; 1811 1812 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1813 1814 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1815 for (j = 0; j < desc_extent->extents[i].length; j++) { 1816 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1817 if (bs->num_free_clusters == 0) { 1818 return -1; 1819 } 1820 bs->num_free_clusters--; 1821 cluster_count++; 1822 } 1823 } 1824 if (cluster_count == 0) { 1825 return -1; 1826 } 1827 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1828 /* Skip this item */ 1829 } else { 1830 /* Error */ 1831 return -1; 1832 } 1833 /* Advance to the next descriptor */ 1834 cur_desc += sizeof(*desc) + desc->length; 1835 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1836 break; 1837 } 1838 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1839 } 1840 return 0; 1841 } 1842 1843 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1844 { 1845 uint32_t crc; 1846 1847 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1848 if (crc != ctx->page->crc) { 1849 return false; 1850 } 1851 1852 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1853 return false; 1854 } 1855 return true; 1856 } 1857 1858 static void 1859 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1860 1861 static void 1862 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1863 { 1864 struct spdk_bs_load_ctx *ctx = cb_arg; 1865 1866 spdk_dma_free(ctx->mask); 1867 spdk_dma_free(ctx->super); 1868 spdk_bs_sequence_finish(seq, bserrno); 1869 free(ctx); 1870 } 1871 1872 static void 1873 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1874 { 1875 struct spdk_bs_load_ctx *ctx = cb_arg; 1876 1877 spdk_dma_free(ctx->mask); 1878 1879 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1880 } 1881 1882 static void 1883 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1884 { 1885 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1886 } 1887 1888 static void 1889 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1890 { 1891 struct spdk_bs_load_ctx *ctx = cb_arg; 1892 uint32_t page_num; 1893 1894 if (bserrno != 0) { 1895 spdk_dma_free(ctx->super); 1896 _spdk_bs_free(ctx->bs); 1897 free(ctx); 1898 spdk_bs_sequence_finish(seq, bserrno); 1899 return; 1900 } 1901 1902 page_num = ctx->cur_page; 1903 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1904 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1905 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1906 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1907 spdk_dma_free(ctx->super); 1908 _spdk_bs_free(ctx->bs); 1909 free(ctx); 1910 spdk_bs_sequence_finish(seq, -EILSEQ); 1911 return; 1912 } 1913 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 1914 ctx->in_page_chain = true; 1915 ctx->cur_page = ctx->page->next; 1916 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1917 return; 1918 } 1919 } 1920 } 1921 1922 ctx->in_page_chain = false; 1923 1924 do { 1925 ctx->page_index++; 1926 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 1927 1928 if (ctx->page_index < ctx->super->md_len) { 1929 ctx->cur_page = ctx->page_index; 1930 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1931 } else { 1932 spdk_dma_free(ctx->page); 1933 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 1934 } 1935 } 1936 1937 static void 1938 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 1939 { 1940 struct spdk_bs_load_ctx *ctx = cb_arg; 1941 uint64_t lba; 1942 1943 assert(ctx->cur_page < ctx->super->md_len); 1944 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 1945 spdk_bs_sequence_read(seq, ctx->page, lba, 1946 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 1947 _spdk_bs_load_replay_md_cpl, ctx); 1948 } 1949 1950 static void 1951 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 1952 { 1953 struct spdk_bs_load_ctx *ctx = cb_arg; 1954 1955 ctx->page_index = 0; 1956 ctx->cur_page = 0; 1957 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 1958 SPDK_BS_PAGE_SIZE, 1959 NULL); 1960 if (!ctx->page) { 1961 spdk_dma_free(ctx->super); 1962 _spdk_bs_free(ctx->bs); 1963 free(ctx); 1964 spdk_bs_sequence_finish(seq, -ENOMEM); 1965 return; 1966 } 1967 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1968 } 1969 1970 static void 1971 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 1972 { 1973 struct spdk_bs_load_ctx *ctx = cb_arg; 1974 int rc; 1975 1976 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 1977 if (rc < 0) { 1978 spdk_dma_free(ctx->super); 1979 _spdk_bs_free(ctx->bs); 1980 free(ctx); 1981 spdk_bs_sequence_finish(seq, -ENOMEM); 1982 return; 1983 } 1984 1985 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1986 if (rc < 0) { 1987 spdk_dma_free(ctx->super); 1988 _spdk_bs_free(ctx->bs); 1989 free(ctx); 1990 spdk_bs_sequence_finish(seq, -ENOMEM); 1991 return; 1992 } 1993 1994 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1995 _spdk_bs_load_replay_md(seq, cb_arg); 1996 } 1997 1998 static void 1999 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2000 { 2001 struct spdk_bs_load_ctx *ctx = cb_arg; 2002 uint32_t crc; 2003 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 2004 2005 if (ctx->super->version > SPDK_BS_VERSION || 2006 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 2007 spdk_dma_free(ctx->super); 2008 _spdk_bs_free(ctx->bs); 2009 free(ctx); 2010 spdk_bs_sequence_finish(seq, -EILSEQ); 2011 return; 2012 } 2013 2014 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2015 sizeof(ctx->super->signature)) != 0) { 2016 spdk_dma_free(ctx->super); 2017 _spdk_bs_free(ctx->bs); 2018 free(ctx); 2019 spdk_bs_sequence_finish(seq, -EILSEQ); 2020 return; 2021 } 2022 2023 crc = _spdk_blob_md_page_calc_crc(ctx->super); 2024 if (crc != ctx->super->crc) { 2025 spdk_dma_free(ctx->super); 2026 _spdk_bs_free(ctx->bs); 2027 free(ctx); 2028 spdk_bs_sequence_finish(seq, -EILSEQ); 2029 return; 2030 } 2031 2032 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2033 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 2034 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2035 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 2036 } else { 2037 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 2038 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2039 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2040 spdk_dma_free(ctx->super); 2041 _spdk_bs_free(ctx->bs); 2042 free(ctx); 2043 spdk_bs_sequence_finish(seq, -ENXIO); 2044 return; 2045 } 2046 2047 /* Parse the super block */ 2048 ctx->bs->cluster_sz = ctx->super->cluster_size; 2049 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 2050 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2051 ctx->bs->md_start = ctx->super->md_start; 2052 ctx->bs->md_len = ctx->super->md_len; 2053 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 2054 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 2055 ctx->bs->super_blob = ctx->super->super_blob; 2056 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 2057 2058 if (ctx->super->clean == 1) { 2059 ctx->super->clean = 0; 2060 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 2061 } else { 2062 _spdk_bs_recover(seq, ctx); 2063 } 2064 } 2065 2066 void 2067 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2068 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2069 { 2070 struct spdk_blob_store *bs; 2071 struct spdk_bs_cpl cpl; 2072 spdk_bs_sequence_t *seq; 2073 struct spdk_bs_load_ctx *ctx; 2074 struct spdk_bs_opts opts = {}; 2075 2076 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 2077 2078 if (o) { 2079 opts = *o; 2080 } else { 2081 spdk_bs_opts_init(&opts); 2082 } 2083 2084 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 2085 cb_fn(cb_arg, NULL, -EINVAL); 2086 return; 2087 } 2088 2089 bs = _spdk_bs_alloc(dev, &opts); 2090 if (!bs) { 2091 cb_fn(cb_arg, NULL, -ENOMEM); 2092 return; 2093 } 2094 2095 ctx = calloc(1, sizeof(*ctx)); 2096 if (!ctx) { 2097 _spdk_bs_free(bs); 2098 cb_fn(cb_arg, NULL, -ENOMEM); 2099 return; 2100 } 2101 2102 ctx->bs = bs; 2103 2104 /* Allocate memory for the super block */ 2105 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2106 if (!ctx->super) { 2107 free(ctx); 2108 _spdk_bs_free(bs); 2109 return; 2110 } 2111 2112 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2113 cpl.u.bs_handle.cb_fn = cb_fn; 2114 cpl.u.bs_handle.cb_arg = cb_arg; 2115 cpl.u.bs_handle.bs = bs; 2116 2117 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2118 if (!seq) { 2119 spdk_dma_free(ctx->super); 2120 free(ctx); 2121 _spdk_bs_free(bs); 2122 cb_fn(cb_arg, NULL, -ENOMEM); 2123 return; 2124 } 2125 2126 /* Read the super block */ 2127 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2128 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2129 _spdk_bs_load_super_cpl, ctx); 2130 } 2131 2132 /* END spdk_bs_load */ 2133 2134 /* START spdk_bs_init */ 2135 2136 struct spdk_bs_init_ctx { 2137 struct spdk_blob_store *bs; 2138 struct spdk_bs_super_block *super; 2139 }; 2140 2141 static void 2142 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2143 { 2144 struct spdk_bs_init_ctx *ctx = cb_arg; 2145 2146 spdk_dma_free(ctx->super); 2147 free(ctx); 2148 2149 spdk_bs_sequence_finish(seq, bserrno); 2150 } 2151 2152 static void 2153 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2154 { 2155 struct spdk_bs_init_ctx *ctx = cb_arg; 2156 2157 /* Write super block */ 2158 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2159 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2160 _spdk_bs_init_persist_super_cpl, ctx); 2161 } 2162 2163 void 2164 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2165 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2166 { 2167 struct spdk_bs_init_ctx *ctx; 2168 struct spdk_blob_store *bs; 2169 struct spdk_bs_cpl cpl; 2170 spdk_bs_sequence_t *seq; 2171 spdk_bs_batch_t *batch; 2172 uint64_t num_md_lba; 2173 uint64_t num_md_pages; 2174 uint64_t num_md_clusters; 2175 uint32_t i; 2176 struct spdk_bs_opts opts = {}; 2177 int rc; 2178 2179 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 2180 2181 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2182 SPDK_ERRLOG("unsupported dev block length of %d\n", 2183 dev->blocklen); 2184 dev->destroy(dev); 2185 cb_fn(cb_arg, NULL, -EINVAL); 2186 return; 2187 } 2188 2189 if (o) { 2190 opts = *o; 2191 } else { 2192 spdk_bs_opts_init(&opts); 2193 } 2194 2195 if (_spdk_bs_opts_verify(&opts) != 0) { 2196 dev->destroy(dev); 2197 cb_fn(cb_arg, NULL, -EINVAL); 2198 return; 2199 } 2200 2201 bs = _spdk_bs_alloc(dev, &opts); 2202 if (!bs) { 2203 dev->destroy(dev); 2204 cb_fn(cb_arg, NULL, -ENOMEM); 2205 return; 2206 } 2207 2208 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2209 /* By default, allocate 1 page per cluster. 2210 * Technically, this over-allocates metadata 2211 * because more metadata will reduce the number 2212 * of usable clusters. This can be addressed with 2213 * more complex math in the future. 2214 */ 2215 bs->md_len = bs->total_clusters; 2216 } else { 2217 bs->md_len = opts.num_md_pages; 2218 } 2219 2220 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2221 if (rc < 0) { 2222 _spdk_bs_free(bs); 2223 cb_fn(cb_arg, NULL, -ENOMEM); 2224 return; 2225 } 2226 2227 ctx = calloc(1, sizeof(*ctx)); 2228 if (!ctx) { 2229 _spdk_bs_free(bs); 2230 cb_fn(cb_arg, NULL, -ENOMEM); 2231 return; 2232 } 2233 2234 ctx->bs = bs; 2235 2236 /* Allocate memory for the super block */ 2237 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2238 if (!ctx->super) { 2239 free(ctx); 2240 _spdk_bs_free(bs); 2241 return; 2242 } 2243 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2244 sizeof(ctx->super->signature)); 2245 ctx->super->version = SPDK_BS_VERSION; 2246 ctx->super->length = sizeof(*ctx->super); 2247 ctx->super->super_blob = bs->super_blob; 2248 ctx->super->clean = 0; 2249 ctx->super->cluster_size = bs->cluster_sz; 2250 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2251 2252 /* Calculate how many pages the metadata consumes at the front 2253 * of the disk. 2254 */ 2255 2256 /* The super block uses 1 page */ 2257 num_md_pages = 1; 2258 2259 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2260 * up to the nearest page, plus a header. 2261 */ 2262 ctx->super->used_page_mask_start = num_md_pages; 2263 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2264 divide_round_up(bs->md_len, 8), 2265 SPDK_BS_PAGE_SIZE); 2266 num_md_pages += ctx->super->used_page_mask_len; 2267 2268 /* The used_clusters mask requires 1 bit per cluster, rounded 2269 * up to the nearest page, plus a header. 2270 */ 2271 ctx->super->used_cluster_mask_start = num_md_pages; 2272 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2273 divide_round_up(bs->total_clusters, 8), 2274 SPDK_BS_PAGE_SIZE); 2275 num_md_pages += ctx->super->used_cluster_mask_len; 2276 2277 /* The metadata region size was chosen above */ 2278 ctx->super->md_start = bs->md_start = num_md_pages; 2279 ctx->super->md_len = bs->md_len; 2280 num_md_pages += bs->md_len; 2281 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2282 2283 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2284 2285 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2286 if (num_md_clusters > bs->total_clusters) { 2287 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2288 "please decrease number of pages reserved for metadata " 2289 "or increase cluster size.\n"); 2290 spdk_dma_free(ctx->super); 2291 free(ctx); 2292 _spdk_bs_free(bs); 2293 cb_fn(cb_arg, NULL, -ENOMEM); 2294 return; 2295 } 2296 /* Claim all of the clusters used by the metadata */ 2297 for (i = 0; i < num_md_clusters; i++) { 2298 _spdk_bs_claim_cluster(bs, i); 2299 } 2300 2301 bs->total_data_clusters = bs->num_free_clusters; 2302 2303 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2304 cpl.u.bs_handle.cb_fn = cb_fn; 2305 cpl.u.bs_handle.cb_arg = cb_arg; 2306 cpl.u.bs_handle.bs = bs; 2307 2308 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2309 if (!seq) { 2310 spdk_dma_free(ctx->super); 2311 free(ctx); 2312 _spdk_bs_free(bs); 2313 cb_fn(cb_arg, NULL, -ENOMEM); 2314 return; 2315 } 2316 2317 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2318 2319 /* Clear metadata space */ 2320 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2321 /* Trim data clusters */ 2322 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2323 2324 spdk_bs_batch_close(batch); 2325 } 2326 2327 /* END spdk_bs_init */ 2328 2329 /* START spdk_bs_destroy */ 2330 2331 static void 2332 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2333 { 2334 struct spdk_bs_init_ctx *ctx = cb_arg; 2335 struct spdk_blob_store *bs = ctx->bs; 2336 2337 /* 2338 * We need to defer calling spdk_bs_call_cpl() until after 2339 * dev destruction, so tuck these away for later use. 2340 */ 2341 bs->unload_err = bserrno; 2342 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2343 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2344 2345 spdk_bs_sequence_finish(seq, bserrno); 2346 2347 _spdk_bs_free(bs); 2348 free(ctx); 2349 } 2350 2351 void 2352 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 2353 void *cb_arg) 2354 { 2355 struct spdk_bs_cpl cpl; 2356 spdk_bs_sequence_t *seq; 2357 struct spdk_bs_init_ctx *ctx; 2358 2359 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 2360 2361 if (!TAILQ_EMPTY(&bs->blobs)) { 2362 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2363 cb_fn(cb_arg, -EBUSY); 2364 return; 2365 } 2366 2367 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2368 cpl.u.bs_basic.cb_fn = cb_fn; 2369 cpl.u.bs_basic.cb_arg = cb_arg; 2370 2371 ctx = calloc(1, sizeof(*ctx)); 2372 if (!ctx) { 2373 cb_fn(cb_arg, -ENOMEM); 2374 return; 2375 } 2376 2377 ctx->bs = bs; 2378 2379 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2380 if (!seq) { 2381 free(ctx); 2382 cb_fn(cb_arg, -ENOMEM); 2383 return; 2384 } 2385 2386 /* Write zeroes to the super block */ 2387 spdk_bs_sequence_write_zeroes(seq, 2388 _spdk_bs_page_to_lba(bs, 0), 2389 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 2390 _spdk_bs_destroy_trim_cpl, ctx); 2391 } 2392 2393 /* END spdk_bs_destroy */ 2394 2395 /* START spdk_bs_unload */ 2396 2397 static void 2398 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2399 { 2400 struct spdk_bs_load_ctx *ctx = cb_arg; 2401 2402 spdk_dma_free(ctx->super); 2403 2404 /* 2405 * We need to defer calling spdk_bs_call_cpl() until after 2406 * dev destuction, so tuck these away for later use. 2407 */ 2408 ctx->bs->unload_err = bserrno; 2409 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2410 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2411 2412 spdk_bs_sequence_finish(seq, bserrno); 2413 2414 _spdk_bs_free(ctx->bs); 2415 free(ctx); 2416 } 2417 2418 static void 2419 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2420 { 2421 struct spdk_bs_load_ctx *ctx = cb_arg; 2422 2423 spdk_dma_free(ctx->mask); 2424 ctx->super->clean = 1; 2425 2426 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2427 } 2428 2429 static void 2430 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2431 { 2432 struct spdk_bs_load_ctx *ctx = cb_arg; 2433 2434 spdk_dma_free(ctx->mask); 2435 2436 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2437 } 2438 2439 static void 2440 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2441 { 2442 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2443 } 2444 2445 void 2446 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2447 { 2448 struct spdk_bs_cpl cpl; 2449 spdk_bs_sequence_t *seq; 2450 struct spdk_bs_load_ctx *ctx; 2451 2452 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 2453 2454 if (!TAILQ_EMPTY(&bs->blobs)) { 2455 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2456 cb_fn(cb_arg, -EBUSY); 2457 return; 2458 } 2459 2460 ctx = calloc(1, sizeof(*ctx)); 2461 if (!ctx) { 2462 cb_fn(cb_arg, -ENOMEM); 2463 return; 2464 } 2465 2466 ctx->bs = bs; 2467 2468 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2469 if (!ctx->super) { 2470 free(ctx); 2471 cb_fn(cb_arg, -ENOMEM); 2472 return; 2473 } 2474 2475 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2476 cpl.u.bs_basic.cb_fn = cb_fn; 2477 cpl.u.bs_basic.cb_arg = cb_arg; 2478 2479 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2480 if (!seq) { 2481 spdk_dma_free(ctx->super); 2482 free(ctx); 2483 cb_fn(cb_arg, -ENOMEM); 2484 return; 2485 } 2486 2487 /* Read super block */ 2488 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2489 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2490 _spdk_bs_unload_read_super_cpl, ctx); 2491 } 2492 2493 /* END spdk_bs_unload */ 2494 2495 void 2496 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2497 spdk_bs_op_complete cb_fn, void *cb_arg) 2498 { 2499 bs->super_blob = blobid; 2500 cb_fn(cb_arg, 0); 2501 } 2502 2503 void 2504 spdk_bs_get_super(struct spdk_blob_store *bs, 2505 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2506 { 2507 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2508 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2509 } else { 2510 cb_fn(cb_arg, bs->super_blob, 0); 2511 } 2512 } 2513 2514 uint64_t 2515 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2516 { 2517 return bs->cluster_sz; 2518 } 2519 2520 uint64_t 2521 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2522 { 2523 return SPDK_BS_PAGE_SIZE; 2524 } 2525 2526 uint64_t 2527 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2528 { 2529 return bs->num_free_clusters; 2530 } 2531 2532 uint64_t 2533 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2534 { 2535 return bs->total_data_clusters; 2536 } 2537 2538 static int 2539 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2540 { 2541 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 2542 if (!bs->md_target.md_channel) { 2543 SPDK_ERRLOG("Failed to get IO channel.\n"); 2544 return -1; 2545 } 2546 2547 return 0; 2548 } 2549 2550 static int 2551 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2552 { 2553 spdk_put_io_channel(bs->md_target.md_channel); 2554 2555 return 0; 2556 } 2557 2558 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob) 2559 { 2560 struct spdk_blob_data *blob = __blob_to_data(_blob); 2561 2562 assert(blob != NULL); 2563 2564 return blob->id; 2565 } 2566 2567 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob) 2568 { 2569 struct spdk_blob_data *blob = __blob_to_data(_blob); 2570 2571 assert(blob != NULL); 2572 2573 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2574 } 2575 2576 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob) 2577 { 2578 struct spdk_blob_data *blob = __blob_to_data(_blob); 2579 2580 assert(blob != NULL); 2581 2582 return blob->active.num_clusters; 2583 } 2584 2585 /* START spdk_bs_create_blob */ 2586 2587 static void 2588 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2589 { 2590 struct spdk_blob_data *blob = cb_arg; 2591 2592 _spdk_blob_free(blob); 2593 2594 spdk_bs_sequence_finish(seq, bserrno); 2595 } 2596 2597 void spdk_bs_create_blob(struct spdk_blob_store *bs, 2598 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2599 { 2600 struct spdk_blob_data *blob; 2601 uint32_t page_idx; 2602 struct spdk_bs_cpl cpl; 2603 spdk_bs_sequence_t *seq; 2604 spdk_blob_id id; 2605 2606 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2607 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2608 cb_fn(cb_arg, 0, -ENOMEM); 2609 return; 2610 } 2611 spdk_bit_array_set(bs->used_md_pages, page_idx); 2612 2613 id = _spdk_bs_page_to_blobid(page_idx); 2614 2615 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2616 2617 blob = _spdk_blob_alloc(bs, id); 2618 if (!blob) { 2619 cb_fn(cb_arg, 0, -ENOMEM); 2620 return; 2621 } 2622 2623 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2624 cpl.u.blobid.cb_fn = cb_fn; 2625 cpl.u.blobid.cb_arg = cb_arg; 2626 cpl.u.blobid.blobid = blob->id; 2627 2628 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2629 if (!seq) { 2630 _spdk_blob_free(blob); 2631 cb_fn(cb_arg, 0, -ENOMEM); 2632 return; 2633 } 2634 2635 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 2636 } 2637 2638 /* END spdk_bs_create_blob */ 2639 2640 /* START spdk_blob_resize */ 2641 int 2642 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz) 2643 { 2644 struct spdk_blob_data *blob = __blob_to_data(_blob); 2645 int rc; 2646 2647 assert(blob != NULL); 2648 2649 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2650 2651 if (blob->md_ro) { 2652 return -EPERM; 2653 } 2654 2655 if (sz == blob->active.num_clusters) { 2656 return 0; 2657 } 2658 2659 rc = _spdk_resize_blob(blob, sz); 2660 if (rc < 0) { 2661 return rc; 2662 } 2663 2664 return 0; 2665 } 2666 2667 /* END spdk_blob_resize */ 2668 2669 2670 /* START spdk_bs_delete_blob */ 2671 2672 static void 2673 _spdk_bs_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2674 { 2675 struct spdk_blob_data *blob = cb_arg; 2676 2677 _spdk_blob_free(blob); 2678 2679 spdk_bs_sequence_finish(seq, bserrno); 2680 } 2681 2682 static void 2683 _spdk_bs_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2684 { 2685 struct spdk_blob_data *blob = cb_arg; 2686 2687 /* If the blob have crc error, we just return NULL. */ 2688 if (blob == NULL) { 2689 spdk_bs_sequence_finish(seq, bserrno); 2690 return; 2691 } 2692 blob->state = SPDK_BLOB_STATE_DIRTY; 2693 blob->active.num_pages = 0; 2694 _spdk_resize_blob(blob, 0); 2695 2696 _spdk_blob_persist(seq, blob, _spdk_bs_delete_blob_cpl, blob); 2697 } 2698 2699 void 2700 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2701 spdk_blob_op_complete cb_fn, void *cb_arg) 2702 { 2703 struct spdk_blob_data *blob; 2704 struct spdk_bs_cpl cpl; 2705 spdk_bs_sequence_t *seq; 2706 2707 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 2708 2709 blob = _spdk_blob_lookup(bs, blobid); 2710 if (blob) { 2711 assert(blob->open_ref > 0); 2712 cb_fn(cb_arg, -EINVAL); 2713 return; 2714 } 2715 2716 blob = _spdk_blob_alloc(bs, blobid); 2717 if (!blob) { 2718 cb_fn(cb_arg, -ENOMEM); 2719 return; 2720 } 2721 2722 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2723 cpl.u.blob_basic.cb_fn = cb_fn; 2724 cpl.u.blob_basic.cb_arg = cb_arg; 2725 2726 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2727 if (!seq) { 2728 _spdk_blob_free(blob); 2729 cb_fn(cb_arg, -ENOMEM); 2730 return; 2731 } 2732 2733 _spdk_blob_load(seq, blob, _spdk_bs_delete_open_cpl, blob); 2734 } 2735 2736 /* END spdk_bs_delete_blob */ 2737 2738 /* START spdk_bs_open_blob */ 2739 2740 static void 2741 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2742 { 2743 struct spdk_blob_data *blob = cb_arg; 2744 2745 /* If the blob have crc error, we just return NULL. */ 2746 if (blob == NULL) { 2747 seq->cpl.u.blob_handle.blob = NULL; 2748 spdk_bs_sequence_finish(seq, bserrno); 2749 return; 2750 } 2751 2752 blob->open_ref++; 2753 2754 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2755 2756 spdk_bs_sequence_finish(seq, bserrno); 2757 } 2758 2759 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2760 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2761 { 2762 struct spdk_blob_data *blob; 2763 struct spdk_bs_cpl cpl; 2764 spdk_bs_sequence_t *seq; 2765 uint32_t page_num; 2766 2767 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 2768 2769 blob = _spdk_blob_lookup(bs, blobid); 2770 if (blob) { 2771 blob->open_ref++; 2772 cb_fn(cb_arg, __data_to_blob(blob), 0); 2773 return; 2774 } 2775 2776 page_num = _spdk_bs_blobid_to_page(blobid); 2777 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2778 /* Invalid blobid */ 2779 cb_fn(cb_arg, NULL, -ENOENT); 2780 return; 2781 } 2782 2783 blob = _spdk_blob_alloc(bs, blobid); 2784 if (!blob) { 2785 cb_fn(cb_arg, NULL, -ENOMEM); 2786 return; 2787 } 2788 2789 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2790 cpl.u.blob_handle.cb_fn = cb_fn; 2791 cpl.u.blob_handle.cb_arg = cb_arg; 2792 cpl.u.blob_handle.blob = __data_to_blob(blob); 2793 2794 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2795 if (!seq) { 2796 _spdk_blob_free(blob); 2797 cb_fn(cb_arg, NULL, -ENOMEM); 2798 return; 2799 } 2800 2801 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 2802 } 2803 2804 /* END spdk_bs_open_blob */ 2805 2806 /* START spdk_blob_sync_md */ 2807 2808 static void 2809 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2810 { 2811 spdk_bs_sequence_finish(seq, bserrno); 2812 } 2813 2814 void 2815 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg) 2816 { 2817 struct spdk_blob_data *blob = __blob_to_data(_blob); 2818 struct spdk_bs_cpl cpl; 2819 spdk_bs_sequence_t *seq; 2820 2821 assert(blob != NULL); 2822 2823 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 2824 2825 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2826 blob->state != SPDK_BLOB_STATE_SYNCING); 2827 2828 if (blob->md_ro) { 2829 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 2830 return; 2831 } 2832 2833 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2834 cb_fn(cb_arg, 0); 2835 return; 2836 } 2837 2838 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2839 cpl.u.blob_basic.cb_fn = cb_fn; 2840 cpl.u.blob_basic.cb_arg = cb_arg; 2841 2842 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2843 if (!seq) { 2844 cb_fn(cb_arg, -ENOMEM); 2845 return; 2846 } 2847 2848 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 2849 } 2850 2851 /* END spdk_blob_sync_md */ 2852 2853 /* START spdk_blob_close */ 2854 2855 static void 2856 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2857 { 2858 struct spdk_blob_data **blob = cb_arg; 2859 2860 if ((*blob)->open_ref == 0) { 2861 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2862 _spdk_blob_free((*blob)); 2863 } 2864 2865 *blob = NULL; 2866 2867 spdk_bs_sequence_finish(seq, bserrno); 2868 } 2869 2870 void spdk_blob_close(struct spdk_blob **b, spdk_blob_op_complete cb_fn, void *cb_arg) 2871 { 2872 struct spdk_bs_cpl cpl; 2873 struct spdk_blob_data *blob; 2874 spdk_bs_sequence_t *seq; 2875 2876 assert(b != NULL); 2877 blob = __blob_to_data(*b); 2878 assert(blob != NULL); 2879 2880 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 2881 2882 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2883 blob->state != SPDK_BLOB_STATE_SYNCING); 2884 2885 if (blob->open_ref == 0) { 2886 cb_fn(cb_arg, -EBADF); 2887 return; 2888 } 2889 2890 blob->open_ref--; 2891 2892 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2893 cpl.u.blob_basic.cb_fn = cb_fn; 2894 cpl.u.blob_basic.cb_arg = cb_arg; 2895 2896 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2897 if (!seq) { 2898 cb_fn(cb_arg, -ENOMEM); 2899 return; 2900 } 2901 2902 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2903 _spdk_blob_close_cpl(seq, b, 0); 2904 return; 2905 } 2906 2907 /* Sync metadata */ 2908 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2909 } 2910 2911 /* END spdk_blob_close */ 2912 2913 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2914 { 2915 return spdk_get_io_channel(&bs->io_target); 2916 } 2917 2918 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2919 { 2920 spdk_put_io_channel(channel); 2921 } 2922 2923 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2924 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2925 { 2926 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2927 SPDK_BLOB_UNMAP); 2928 } 2929 2930 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2931 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2932 { 2933 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2934 SPDK_BLOB_WRITE_ZEROES); 2935 } 2936 2937 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2938 void *payload, uint64_t offset, uint64_t length, 2939 spdk_blob_op_complete cb_fn, void *cb_arg) 2940 { 2941 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2942 SPDK_BLOB_WRITE); 2943 } 2944 2945 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2946 void *payload, uint64_t offset, uint64_t length, 2947 spdk_blob_op_complete cb_fn, void *cb_arg) 2948 { 2949 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2950 SPDK_BLOB_READ); 2951 } 2952 2953 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2954 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2955 spdk_blob_op_complete cb_fn, void *cb_arg) 2956 { 2957 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2958 } 2959 2960 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2961 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2962 spdk_blob_op_complete cb_fn, void *cb_arg) 2963 { 2964 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2965 } 2966 2967 struct spdk_bs_iter_ctx { 2968 int64_t page_num; 2969 struct spdk_blob_store *bs; 2970 2971 spdk_blob_op_with_handle_complete cb_fn; 2972 void *cb_arg; 2973 }; 2974 2975 static void 2976 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 2977 { 2978 struct spdk_blob_data *blob = __blob_to_data(_blob); 2979 struct spdk_bs_iter_ctx *ctx = cb_arg; 2980 struct spdk_blob_store *bs = ctx->bs; 2981 spdk_blob_id id; 2982 2983 if (bserrno == 0) { 2984 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 2985 free(ctx); 2986 return; 2987 } 2988 2989 ctx->page_num++; 2990 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2991 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2992 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2993 free(ctx); 2994 return; 2995 } 2996 2997 id = _spdk_bs_page_to_blobid(ctx->page_num); 2998 2999 blob = _spdk_blob_lookup(bs, id); 3000 if (blob) { 3001 blob->open_ref++; 3002 ctx->cb_fn(ctx->cb_arg, _blob, 0); 3003 free(ctx); 3004 return; 3005 } 3006 3007 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 3008 } 3009 3010 void 3011 spdk_bs_iter_first(struct spdk_blob_store *bs, 3012 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3013 { 3014 struct spdk_bs_iter_ctx *ctx; 3015 3016 ctx = calloc(1, sizeof(*ctx)); 3017 if (!ctx) { 3018 cb_fn(cb_arg, NULL, -ENOMEM); 3019 return; 3020 } 3021 3022 ctx->page_num = -1; 3023 ctx->bs = bs; 3024 ctx->cb_fn = cb_fn; 3025 ctx->cb_arg = cb_arg; 3026 3027 _spdk_bs_iter_cpl(ctx, NULL, -1); 3028 } 3029 3030 static void 3031 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 3032 { 3033 struct spdk_bs_iter_ctx *ctx = cb_arg; 3034 3035 _spdk_bs_iter_cpl(ctx, NULL, -1); 3036 } 3037 3038 void 3039 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 3040 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3041 { 3042 struct spdk_bs_iter_ctx *ctx; 3043 struct spdk_blob_data *blob; 3044 3045 assert(b != NULL); 3046 blob = __blob_to_data(*b); 3047 assert(blob != NULL); 3048 3049 ctx = calloc(1, sizeof(*ctx)); 3050 if (!ctx) { 3051 cb_fn(cb_arg, NULL, -ENOMEM); 3052 return; 3053 } 3054 3055 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 3056 ctx->bs = bs; 3057 ctx->cb_fn = cb_fn; 3058 ctx->cb_arg = cb_arg; 3059 3060 /* Close the existing blob */ 3061 spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx); 3062 } 3063 3064 int 3065 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value, 3066 uint16_t value_len) 3067 { 3068 struct spdk_blob_data *blob = __blob_to_data(_blob); 3069 struct spdk_xattr *xattr; 3070 3071 assert(blob != NULL); 3072 3073 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3074 blob->state != SPDK_BLOB_STATE_SYNCING); 3075 3076 if (blob->md_ro) { 3077 return -EPERM; 3078 } 3079 3080 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3081 if (!strcmp(name, xattr->name)) { 3082 free(xattr->value); 3083 xattr->value_len = value_len; 3084 xattr->value = malloc(value_len); 3085 memcpy(xattr->value, value, value_len); 3086 3087 blob->state = SPDK_BLOB_STATE_DIRTY; 3088 3089 return 0; 3090 } 3091 } 3092 3093 xattr = calloc(1, sizeof(*xattr)); 3094 if (!xattr) { 3095 return -1; 3096 } 3097 xattr->name = strdup(name); 3098 xattr->value_len = value_len; 3099 xattr->value = malloc(value_len); 3100 memcpy(xattr->value, value, value_len); 3101 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 3102 3103 blob->state = SPDK_BLOB_STATE_DIRTY; 3104 3105 return 0; 3106 } 3107 3108 int 3109 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name) 3110 { 3111 struct spdk_blob_data *blob = __blob_to_data(_blob); 3112 struct spdk_xattr *xattr; 3113 3114 assert(blob != NULL); 3115 3116 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3117 blob->state != SPDK_BLOB_STATE_SYNCING); 3118 3119 if (blob->md_ro) { 3120 return -EPERM; 3121 } 3122 3123 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3124 if (!strcmp(name, xattr->name)) { 3125 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3126 free(xattr->value); 3127 free(xattr->name); 3128 free(xattr); 3129 3130 blob->state = SPDK_BLOB_STATE_DIRTY; 3131 3132 return 0; 3133 } 3134 } 3135 3136 return -ENOENT; 3137 } 3138 3139 int 3140 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name, 3141 const void **value, size_t *value_len) 3142 { 3143 struct spdk_blob_data *blob = __blob_to_data(_blob); 3144 struct spdk_xattr *xattr; 3145 3146 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3147 if (!strcmp(name, xattr->name)) { 3148 *value = xattr->value; 3149 *value_len = xattr->value_len; 3150 return 0; 3151 } 3152 } 3153 3154 return -ENOENT; 3155 } 3156 3157 struct spdk_xattr_names { 3158 uint32_t count; 3159 const char *names[0]; 3160 }; 3161 3162 int 3163 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names) 3164 { 3165 struct spdk_blob_data *blob = __blob_to_data(_blob); 3166 struct spdk_xattr *xattr; 3167 int count = 0; 3168 3169 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3170 count++; 3171 } 3172 3173 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3174 if (*names == NULL) { 3175 return -ENOMEM; 3176 } 3177 3178 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3179 (*names)->names[(*names)->count++] = xattr->name; 3180 } 3181 3182 return 0; 3183 } 3184 3185 uint32_t 3186 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3187 { 3188 assert(names != NULL); 3189 3190 return names->count; 3191 } 3192 3193 const char * 3194 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3195 { 3196 if (index >= names->count) { 3197 return NULL; 3198 } 3199 3200 return names->names[index]; 3201 } 3202 3203 void 3204 spdk_xattr_names_free(struct spdk_xattr_names *names) 3205 { 3206 free(names); 3207 } 3208 3209 struct spdk_bs_type 3210 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3211 { 3212 return bs->bstype; 3213 } 3214 3215 void 3216 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3217 { 3218 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3219 } 3220 3221 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 3222