1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static inline size_t 51 divide_round_up(size_t num, size_t divisor) 52 { 53 return (num + divisor - 1) / divisor; 54 } 55 56 static void 57 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 58 { 59 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 60 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 61 assert(bs->num_free_clusters > 0); 62 63 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 64 65 spdk_bit_array_set(bs->used_clusters, cluster_num); 66 bs->num_free_clusters--; 67 } 68 69 static void 70 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 71 { 72 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 73 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 74 assert(bs->num_free_clusters < bs->total_clusters); 75 76 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 77 78 spdk_bit_array_clear(bs->used_clusters, cluster_num); 79 bs->num_free_clusters++; 80 } 81 82 static struct spdk_blob_data * 83 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 84 { 85 struct spdk_blob_data *blob; 86 87 blob = calloc(1, sizeof(*blob)); 88 if (!blob) { 89 return NULL; 90 } 91 92 blob->id = id; 93 blob->bs = bs; 94 95 blob->state = SPDK_BLOB_STATE_DIRTY; 96 blob->active.num_pages = 1; 97 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 98 if (!blob->active.pages) { 99 free(blob); 100 return NULL; 101 } 102 103 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 104 105 TAILQ_INIT(&blob->xattrs); 106 107 return blob; 108 } 109 110 static void 111 _spdk_blob_free(struct spdk_blob_data *blob) 112 { 113 struct spdk_xattr *xattr, *xattr_tmp; 114 115 assert(blob != NULL); 116 117 free(blob->active.clusters); 118 free(blob->clean.clusters); 119 free(blob->active.pages); 120 free(blob->clean.pages); 121 122 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 123 TAILQ_REMOVE(&blob->xattrs, xattr, link); 124 free(xattr->name); 125 free(xattr->value); 126 free(xattr); 127 } 128 129 free(blob); 130 } 131 132 static int 133 _spdk_blob_mark_clean(struct spdk_blob_data *blob) 134 { 135 uint64_t *clusters = NULL; 136 uint32_t *pages = NULL; 137 138 assert(blob != NULL); 139 assert(blob->state == SPDK_BLOB_STATE_LOADING || 140 blob->state == SPDK_BLOB_STATE_SYNCING); 141 142 if (blob->active.num_clusters) { 143 assert(blob->active.clusters); 144 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 145 if (!clusters) { 146 return -1; 147 } 148 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 149 } 150 151 if (blob->active.num_pages) { 152 assert(blob->active.pages); 153 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 154 if (!pages) { 155 free(clusters); 156 return -1; 157 } 158 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 159 } 160 161 free(blob->clean.clusters); 162 free(blob->clean.pages); 163 164 blob->clean.num_clusters = blob->active.num_clusters; 165 blob->clean.clusters = blob->active.clusters; 166 blob->clean.num_pages = blob->active.num_pages; 167 blob->clean.pages = blob->active.pages; 168 169 blob->active.clusters = clusters; 170 blob->active.pages = pages; 171 172 blob->state = SPDK_BLOB_STATE_CLEAN; 173 174 return 0; 175 } 176 177 static int 178 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob) 179 { 180 struct spdk_blob_md_descriptor *desc; 181 size_t cur_desc = 0; 182 void *tmp; 183 184 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 185 while (cur_desc < sizeof(page->descriptors)) { 186 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 187 if (desc->length == 0) { 188 /* If padding and length are 0, this terminates the page */ 189 break; 190 } 191 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 192 struct spdk_blob_md_descriptor_flags *desc_flags; 193 194 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 195 196 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 197 return -EINVAL; 198 } 199 200 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 201 SPDK_BLOB_INVALID_FLAGS_MASK) { 202 return -EINVAL; 203 } 204 205 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 206 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 207 blob->data_ro = true; 208 blob->md_ro = true; 209 } 210 211 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 212 SPDK_BLOB_MD_RO_FLAGS_MASK) { 213 blob->md_ro = true; 214 } 215 216 blob->invalid_flags = desc_flags->invalid_flags; 217 blob->data_ro_flags = desc_flags->data_ro_flags; 218 blob->md_ro_flags = desc_flags->md_ro_flags; 219 220 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 221 struct spdk_blob_md_descriptor_extent *desc_extent; 222 unsigned int i, j; 223 unsigned int cluster_count = blob->active.num_clusters; 224 225 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 226 227 if (desc_extent->length == 0 || 228 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) { 229 return -EINVAL; 230 } 231 232 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 233 for (j = 0; j < desc_extent->extents[i].length; j++) { 234 if (!spdk_bit_array_get(blob->bs->used_clusters, 235 desc_extent->extents[i].cluster_idx + j)) { 236 return -EINVAL; 237 } 238 cluster_count++; 239 } 240 } 241 242 if (cluster_count == 0) { 243 return -EINVAL; 244 } 245 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 246 if (tmp == NULL) { 247 return -ENOMEM; 248 } 249 blob->active.clusters = tmp; 250 blob->active.cluster_array_size = cluster_count; 251 252 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 253 for (j = 0; j < desc_extent->extents[i].length; j++) { 254 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 255 desc_extent->extents[i].cluster_idx + j); 256 } 257 } 258 259 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 260 struct spdk_blob_md_descriptor_xattr *desc_xattr; 261 struct spdk_xattr *xattr; 262 263 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 264 265 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 266 sizeof(desc_xattr->value_length) + 267 desc_xattr->name_length + desc_xattr->value_length) { 268 return -EINVAL; 269 } 270 271 xattr = calloc(1, sizeof(*xattr)); 272 if (xattr == NULL) { 273 return -ENOMEM; 274 } 275 276 xattr->name = malloc(desc_xattr->name_length + 1); 277 if (xattr->name == NULL) { 278 free(xattr); 279 return -ENOMEM; 280 } 281 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 282 xattr->name[desc_xattr->name_length] = '\0'; 283 284 xattr->value = malloc(desc_xattr->value_length); 285 if (xattr->value == NULL) { 286 free(xattr->name); 287 free(xattr); 288 return -ENOMEM; 289 } 290 xattr->value_len = desc_xattr->value_length; 291 memcpy(xattr->value, 292 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 293 desc_xattr->value_length); 294 295 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 296 } else { 297 /* Unrecognized descriptor type. Do not fail - just continue to the 298 * next descriptor. If this descriptor is associated with some feature 299 * defined in a newer version of blobstore, that version of blobstore 300 * should create and set an associated feature flag to specify if this 301 * blob can be loaded or not. 302 */ 303 } 304 305 /* Advance to the next descriptor */ 306 cur_desc += sizeof(*desc) + desc->length; 307 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 308 break; 309 } 310 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 311 } 312 313 return 0; 314 } 315 316 static int 317 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 318 struct spdk_blob_data *blob) 319 { 320 const struct spdk_blob_md_page *page; 321 uint32_t i; 322 int rc; 323 324 assert(page_count > 0); 325 assert(pages[0].sequence_num == 0); 326 assert(blob != NULL); 327 assert(blob->state == SPDK_BLOB_STATE_LOADING); 328 assert(blob->active.clusters == NULL); 329 assert(blob->state == SPDK_BLOB_STATE_LOADING); 330 331 /* The blobid provided doesn't match what's in the MD, this can 332 * happen for example if a bogus blobid is passed in through open. 333 */ 334 if (blob->id != pages[0].id) { 335 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 336 blob->id, pages[0].id); 337 return -ENOENT; 338 } 339 340 for (i = 0; i < page_count; i++) { 341 page = &pages[i]; 342 343 assert(page->id == blob->id); 344 assert(page->sequence_num == i); 345 346 rc = _spdk_blob_parse_page(page, blob); 347 if (rc != 0) { 348 return rc; 349 } 350 } 351 352 return 0; 353 } 354 355 static int 356 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob, 357 struct spdk_blob_md_page **pages, 358 uint32_t *page_count, 359 struct spdk_blob_md_page **last_page) 360 { 361 struct spdk_blob_md_page *page; 362 363 assert(pages != NULL); 364 assert(page_count != NULL); 365 366 if (*page_count == 0) { 367 assert(*pages == NULL); 368 *page_count = 1; 369 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 370 SPDK_BS_PAGE_SIZE, 371 NULL); 372 } else { 373 assert(*pages != NULL); 374 (*page_count)++; 375 *pages = spdk_dma_realloc(*pages, 376 SPDK_BS_PAGE_SIZE * (*page_count), 377 SPDK_BS_PAGE_SIZE, 378 NULL); 379 } 380 381 if (*pages == NULL) { 382 *page_count = 0; 383 *last_page = NULL; 384 return -ENOMEM; 385 } 386 387 page = &(*pages)[*page_count - 1]; 388 memset(page, 0, sizeof(*page)); 389 page->id = blob->id; 390 page->sequence_num = *page_count - 1; 391 page->next = SPDK_INVALID_MD_PAGE; 392 *last_page = page; 393 394 return 0; 395 } 396 397 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 398 * Update required_sz on both success and failure. 399 * 400 */ 401 static int 402 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 403 uint8_t *buf, size_t buf_sz, 404 size_t *required_sz) 405 { 406 struct spdk_blob_md_descriptor_xattr *desc; 407 408 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 409 strlen(xattr->name) + 410 xattr->value_len; 411 412 if (buf_sz < *required_sz) { 413 return -1; 414 } 415 416 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 417 418 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 419 desc->length = sizeof(desc->name_length) + 420 sizeof(desc->value_length) + 421 strlen(xattr->name) + 422 xattr->value_len; 423 desc->name_length = strlen(xattr->name); 424 desc->value_length = xattr->value_len; 425 426 memcpy(desc->name, xattr->name, desc->name_length); 427 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 428 xattr->value, 429 desc->value_length); 430 431 return 0; 432 } 433 434 static void 435 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob, 436 uint64_t start_cluster, uint64_t *next_cluster, 437 uint8_t *buf, size_t buf_sz) 438 { 439 struct spdk_blob_md_descriptor_extent *desc; 440 size_t cur_sz; 441 uint64_t i, extent_idx; 442 uint32_t lba, lba_per_cluster, lba_count; 443 444 /* The buffer must have room for at least one extent */ 445 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 446 if (buf_sz < cur_sz) { 447 *next_cluster = start_cluster; 448 return; 449 } 450 451 desc = (struct spdk_blob_md_descriptor_extent *)buf; 452 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 453 454 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 455 456 lba = blob->active.clusters[start_cluster]; 457 lba_count = lba_per_cluster; 458 extent_idx = 0; 459 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 460 if ((lba + lba_count) == blob->active.clusters[i]) { 461 lba_count += lba_per_cluster; 462 continue; 463 } 464 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 465 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 466 extent_idx++; 467 468 cur_sz += sizeof(desc->extents[extent_idx]); 469 470 if (buf_sz < cur_sz) { 471 /* If we ran out of buffer space, return */ 472 desc->length = sizeof(desc->extents[0]) * extent_idx; 473 *next_cluster = i; 474 return; 475 } 476 477 lba = blob->active.clusters[i]; 478 lba_count = lba_per_cluster; 479 } 480 481 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 482 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 483 extent_idx++; 484 485 desc->length = sizeof(desc->extents[0]) * extent_idx; 486 *next_cluster = blob->active.num_clusters; 487 488 return; 489 } 490 491 static void 492 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob, 493 uint8_t *buf, size_t *buf_sz) 494 { 495 struct spdk_blob_md_descriptor_flags *desc; 496 497 /* 498 * Flags get serialized first, so we should always have room for the flags 499 * descriptor. 500 */ 501 assert(*buf_sz >= sizeof(*desc)); 502 503 desc = (struct spdk_blob_md_descriptor_flags *)buf; 504 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 505 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 506 desc->invalid_flags = blob->invalid_flags; 507 desc->data_ro_flags = blob->data_ro_flags; 508 desc->md_ro_flags = blob->md_ro_flags; 509 510 *buf_sz -= sizeof(*desc); 511 } 512 513 static int 514 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages, 515 uint32_t *page_count) 516 { 517 struct spdk_blob_md_page *cur_page; 518 const struct spdk_xattr *xattr; 519 int rc; 520 uint8_t *buf; 521 size_t remaining_sz; 522 uint64_t last_cluster; 523 524 assert(pages != NULL); 525 assert(page_count != NULL); 526 assert(blob != NULL); 527 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 528 529 *pages = NULL; 530 *page_count = 0; 531 532 /* A blob always has at least 1 page, even if it has no descriptors */ 533 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 534 if (rc < 0) { 535 return rc; 536 } 537 538 buf = (uint8_t *)cur_page->descriptors; 539 remaining_sz = sizeof(cur_page->descriptors); 540 541 /* Serialize flags */ 542 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 543 544 /* Serialize xattrs */ 545 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 546 size_t required_sz = 0; 547 rc = _spdk_blob_serialize_xattr(xattr, 548 buf, remaining_sz, 549 &required_sz); 550 if (rc < 0) { 551 /* Need to add a new page to the chain */ 552 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 553 &cur_page); 554 if (rc < 0) { 555 spdk_dma_free(*pages); 556 *pages = NULL; 557 *page_count = 0; 558 return rc; 559 } 560 561 buf = (uint8_t *)cur_page->descriptors; 562 remaining_sz = sizeof(cur_page->descriptors); 563 564 /* Try again */ 565 required_sz = 0; 566 rc = _spdk_blob_serialize_xattr(xattr, 567 buf, remaining_sz, 568 &required_sz); 569 570 if (rc < 0) { 571 spdk_dma_free(*pages); 572 *pages = NULL; 573 *page_count = 0; 574 return -1; 575 } 576 } 577 578 remaining_sz -= required_sz; 579 buf += required_sz; 580 } 581 582 /* Serialize extents */ 583 last_cluster = 0; 584 while (last_cluster < blob->active.num_clusters) { 585 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 586 buf, remaining_sz); 587 588 if (last_cluster == blob->active.num_clusters) { 589 break; 590 } 591 592 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 593 &cur_page); 594 if (rc < 0) { 595 return rc; 596 } 597 598 buf = (uint8_t *)cur_page->descriptors; 599 remaining_sz = sizeof(cur_page->descriptors); 600 } 601 602 return 0; 603 } 604 605 struct spdk_blob_load_ctx { 606 struct spdk_blob_data *blob; 607 608 struct spdk_blob_md_page *pages; 609 uint32_t num_pages; 610 611 spdk_bs_sequence_cpl cb_fn; 612 void *cb_arg; 613 }; 614 615 static uint32_t 616 _spdk_blob_md_page_calc_crc(void *page) 617 { 618 uint32_t crc; 619 620 crc = BLOB_CRC32C_INITIAL; 621 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 622 crc ^= BLOB_CRC32C_INITIAL; 623 624 return crc; 625 626 } 627 628 static void 629 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 630 { 631 struct spdk_blob_load_ctx *ctx = cb_arg; 632 struct spdk_blob_data *blob = ctx->blob; 633 struct spdk_blob_md_page *page; 634 int rc; 635 uint32_t crc; 636 637 page = &ctx->pages[ctx->num_pages - 1]; 638 crc = _spdk_blob_md_page_calc_crc(page); 639 if (crc != page->crc) { 640 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 641 _spdk_blob_free(blob); 642 ctx->cb_fn(seq, NULL, -EINVAL); 643 spdk_dma_free(ctx->pages); 644 free(ctx); 645 return; 646 } 647 648 if (page->next != SPDK_INVALID_MD_PAGE) { 649 uint32_t next_page = page->next; 650 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 651 652 653 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 654 655 /* Read the next page */ 656 ctx->num_pages++; 657 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 658 sizeof(*page), NULL); 659 if (ctx->pages == NULL) { 660 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 661 free(ctx); 662 return; 663 } 664 665 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 666 next_lba, 667 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 668 _spdk_blob_load_cpl, ctx); 669 return; 670 } 671 672 /* Parse the pages */ 673 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 674 if (rc) { 675 _spdk_blob_free(blob); 676 ctx->cb_fn(seq, NULL, rc); 677 spdk_dma_free(ctx->pages); 678 free(ctx); 679 return; 680 } 681 682 _spdk_blob_mark_clean(blob); 683 684 ctx->cb_fn(seq, ctx->cb_arg, rc); 685 686 /* Free the memory */ 687 spdk_dma_free(ctx->pages); 688 free(ctx); 689 } 690 691 /* Load a blob from disk given a blobid */ 692 static void 693 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 694 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 695 { 696 struct spdk_blob_load_ctx *ctx; 697 struct spdk_blob_store *bs; 698 uint32_t page_num; 699 uint64_t lba; 700 701 assert(blob != NULL); 702 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 703 blob->state == SPDK_BLOB_STATE_DIRTY); 704 705 bs = blob->bs; 706 707 ctx = calloc(1, sizeof(*ctx)); 708 if (!ctx) { 709 cb_fn(seq, cb_arg, -ENOMEM); 710 return; 711 } 712 713 ctx->blob = blob; 714 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 715 SPDK_BS_PAGE_SIZE, NULL); 716 if (!ctx->pages) { 717 free(ctx); 718 cb_fn(seq, cb_arg, -ENOMEM); 719 return; 720 } 721 ctx->num_pages = 1; 722 ctx->cb_fn = cb_fn; 723 ctx->cb_arg = cb_arg; 724 725 page_num = _spdk_bs_blobid_to_page(blob->id); 726 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 727 728 blob->state = SPDK_BLOB_STATE_LOADING; 729 730 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 731 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 732 _spdk_blob_load_cpl, ctx); 733 } 734 735 struct spdk_blob_persist_ctx { 736 struct spdk_blob_data *blob; 737 738 struct spdk_blob_md_page *pages; 739 740 uint64_t idx; 741 742 spdk_bs_sequence_cpl cb_fn; 743 void *cb_arg; 744 }; 745 746 static void 747 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 748 { 749 struct spdk_blob_persist_ctx *ctx = cb_arg; 750 struct spdk_blob_data *blob = ctx->blob; 751 752 if (bserrno == 0) { 753 _spdk_blob_mark_clean(blob); 754 } 755 756 /* Call user callback */ 757 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 758 759 /* Free the memory */ 760 spdk_dma_free(ctx->pages); 761 free(ctx); 762 } 763 764 static void 765 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 766 { 767 struct spdk_blob_persist_ctx *ctx = cb_arg; 768 struct spdk_blob_data *blob = ctx->blob; 769 struct spdk_blob_store *bs = blob->bs; 770 void *tmp; 771 size_t i; 772 773 /* Release all clusters that were truncated */ 774 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 775 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 776 777 _spdk_bs_release_cluster(bs, cluster_num); 778 } 779 780 if (blob->active.num_clusters == 0) { 781 free(blob->active.clusters); 782 blob->active.clusters = NULL; 783 blob->active.cluster_array_size = 0; 784 } else { 785 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 786 assert(tmp != NULL); 787 blob->active.clusters = tmp; 788 blob->active.cluster_array_size = blob->active.num_clusters; 789 } 790 791 _spdk_blob_persist_complete(seq, ctx, bserrno); 792 } 793 794 static void 795 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 796 { 797 struct spdk_blob_persist_ctx *ctx = cb_arg; 798 struct spdk_blob_data *blob = ctx->blob; 799 struct spdk_blob_store *bs = blob->bs; 800 spdk_bs_batch_t *batch; 801 size_t i; 802 uint64_t lba; 803 uint32_t lba_count; 804 805 /* Clusters don't move around in blobs. The list shrinks or grows 806 * at the end, but no changes ever occur in the middle of the list. 807 */ 808 809 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 810 811 /* Unmap all clusters that were truncated */ 812 lba = 0; 813 lba_count = 0; 814 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 815 uint64_t next_lba = blob->active.clusters[i]; 816 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 817 818 if ((lba + lba_count) == next_lba) { 819 /* This cluster is contiguous with the previous one. */ 820 lba_count += next_lba_count; 821 continue; 822 } 823 824 /* This cluster is not contiguous with the previous one. */ 825 826 /* If a run of LBAs previously existing, send them 827 * as an unmap. 828 */ 829 if (lba_count > 0) { 830 spdk_bs_batch_unmap(batch, lba, lba_count); 831 } 832 833 /* Start building the next batch */ 834 lba = next_lba; 835 lba_count = next_lba_count; 836 } 837 838 /* If we ended with a contiguous set of LBAs, send the unmap now */ 839 if (lba_count > 0) { 840 spdk_bs_batch_unmap(batch, lba, lba_count); 841 } 842 843 spdk_bs_batch_close(batch); 844 } 845 846 static void 847 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 848 { 849 struct spdk_blob_persist_ctx *ctx = cb_arg; 850 struct spdk_blob_data *blob = ctx->blob; 851 struct spdk_blob_store *bs = blob->bs; 852 size_t i; 853 854 /* This loop starts at 1 because the first page is special and handled 855 * below. The pages (except the first) are never written in place, 856 * so any pages in the clean list must be zeroed. 857 */ 858 for (i = 1; i < blob->clean.num_pages; i++) { 859 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 860 } 861 862 if (blob->active.num_pages == 0) { 863 uint32_t page_num; 864 865 page_num = _spdk_bs_blobid_to_page(blob->id); 866 spdk_bit_array_clear(bs->used_md_pages, page_num); 867 } 868 869 /* Move on to unmapping clusters */ 870 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 871 } 872 873 static void 874 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 875 { 876 struct spdk_blob_persist_ctx *ctx = cb_arg; 877 struct spdk_blob_data *blob = ctx->blob; 878 struct spdk_blob_store *bs = blob->bs; 879 uint64_t lba; 880 uint32_t lba_count; 881 spdk_bs_batch_t *batch; 882 size_t i; 883 884 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 885 886 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 887 888 /* This loop starts at 1 because the first page is special and handled 889 * below. The pages (except the first) are never written in place, 890 * so any pages in the clean list must be zeroed. 891 */ 892 for (i = 1; i < blob->clean.num_pages; i++) { 893 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 894 895 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 896 } 897 898 /* The first page will only be zeroed if this is a delete. */ 899 if (blob->active.num_pages == 0) { 900 uint32_t page_num; 901 902 /* The first page in the metadata goes where the blobid indicates */ 903 page_num = _spdk_bs_blobid_to_page(blob->id); 904 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 905 906 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 907 } 908 909 spdk_bs_batch_close(batch); 910 } 911 912 static void 913 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 914 { 915 struct spdk_blob_persist_ctx *ctx = cb_arg; 916 struct spdk_blob_data *blob = ctx->blob; 917 struct spdk_blob_store *bs = blob->bs; 918 uint64_t lba; 919 uint32_t lba_count; 920 struct spdk_blob_md_page *page; 921 922 if (blob->active.num_pages == 0) { 923 /* Move on to the next step */ 924 _spdk_blob_persist_zero_pages(seq, ctx, 0); 925 return; 926 } 927 928 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 929 930 page = &ctx->pages[0]; 931 /* The first page in the metadata goes where the blobid indicates */ 932 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 933 934 spdk_bs_sequence_write(seq, page, lba, lba_count, 935 _spdk_blob_persist_zero_pages, ctx); 936 } 937 938 static void 939 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 940 { 941 struct spdk_blob_persist_ctx *ctx = cb_arg; 942 struct spdk_blob_data *blob = ctx->blob; 943 struct spdk_blob_store *bs = blob->bs; 944 uint64_t lba; 945 uint32_t lba_count; 946 struct spdk_blob_md_page *page; 947 spdk_bs_batch_t *batch; 948 size_t i; 949 950 /* Clusters don't move around in blobs. The list shrinks or grows 951 * at the end, but no changes ever occur in the middle of the list. 952 */ 953 954 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 955 956 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 957 958 /* This starts at 1. The root page is not written until 959 * all of the others are finished 960 */ 961 for (i = 1; i < blob->active.num_pages; i++) { 962 page = &ctx->pages[i]; 963 assert(page->sequence_num == i); 964 965 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 966 967 spdk_bs_batch_write(batch, page, lba, lba_count); 968 } 969 970 spdk_bs_batch_close(batch); 971 } 972 973 static int 974 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz) 975 { 976 uint64_t i; 977 uint64_t *tmp; 978 uint64_t lfc; /* lowest free cluster */ 979 struct spdk_blob_store *bs; 980 981 bs = blob->bs; 982 983 assert(blob->state != SPDK_BLOB_STATE_LOADING && 984 blob->state != SPDK_BLOB_STATE_SYNCING); 985 986 if (blob->active.num_clusters == sz) { 987 return 0; 988 } 989 990 if (blob->active.num_clusters < blob->active.cluster_array_size) { 991 /* If this blob was resized to be larger, then smaller, then 992 * larger without syncing, then the cluster array already 993 * contains spare assigned clusters we can use. 994 */ 995 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 996 sz); 997 } 998 999 blob->state = SPDK_BLOB_STATE_DIRTY; 1000 1001 /* Do two passes - one to verify that we can obtain enough clusters 1002 * and another to actually claim them. 1003 */ 1004 1005 lfc = 0; 1006 for (i = blob->active.num_clusters; i < sz; i++) { 1007 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1008 if (lfc >= bs->total_clusters) { 1009 /* No more free clusters. Cannot satisfy the request */ 1010 assert(false); 1011 return -1; 1012 } 1013 lfc++; 1014 } 1015 1016 if (sz > blob->active.num_clusters) { 1017 /* Expand the cluster array if necessary. 1018 * We only shrink the array when persisting. 1019 */ 1020 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 1021 if (sz > 0 && tmp == NULL) { 1022 assert(false); 1023 return -1; 1024 } 1025 blob->active.clusters = tmp; 1026 blob->active.cluster_array_size = sz; 1027 } 1028 1029 lfc = 0; 1030 for (i = blob->active.num_clusters; i < sz; i++) { 1031 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1032 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 1033 _spdk_bs_claim_cluster(bs, lfc); 1034 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 1035 lfc++; 1036 } 1037 1038 blob->active.num_clusters = sz; 1039 1040 return 0; 1041 } 1042 1043 /* Write a blob to disk */ 1044 static void 1045 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 1046 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1047 { 1048 struct spdk_blob_persist_ctx *ctx; 1049 int rc; 1050 uint64_t i; 1051 uint32_t page_num; 1052 struct spdk_blob_store *bs; 1053 1054 assert(blob != NULL); 1055 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 1056 blob->state == SPDK_BLOB_STATE_DIRTY); 1057 1058 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1059 cb_fn(seq, cb_arg, 0); 1060 return; 1061 } 1062 1063 bs = blob->bs; 1064 1065 ctx = calloc(1, sizeof(*ctx)); 1066 if (!ctx) { 1067 cb_fn(seq, cb_arg, -ENOMEM); 1068 return; 1069 } 1070 ctx->blob = blob; 1071 ctx->cb_fn = cb_fn; 1072 ctx->cb_arg = cb_arg; 1073 1074 blob->state = SPDK_BLOB_STATE_SYNCING; 1075 1076 if (blob->active.num_pages == 0) { 1077 /* This is the signal that the blob should be deleted. 1078 * Immediately jump to the clean up routine. */ 1079 assert(blob->clean.num_pages > 0); 1080 ctx->idx = blob->clean.num_pages - 1; 1081 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1082 return; 1083 1084 } 1085 1086 /* Generate the new metadata */ 1087 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1088 if (rc < 0) { 1089 free(ctx); 1090 cb_fn(seq, cb_arg, rc); 1091 return; 1092 } 1093 1094 assert(blob->active.num_pages >= 1); 1095 1096 /* Resize the cache of page indices */ 1097 blob->active.pages = realloc(blob->active.pages, 1098 blob->active.num_pages * sizeof(*blob->active.pages)); 1099 if (!blob->active.pages) { 1100 free(ctx); 1101 cb_fn(seq, cb_arg, -ENOMEM); 1102 return; 1103 } 1104 1105 /* Assign this metadata to pages. This requires two passes - 1106 * one to verify that there are enough pages and a second 1107 * to actually claim them. */ 1108 page_num = 0; 1109 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1110 for (i = 1; i < blob->active.num_pages; i++) { 1111 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1112 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1113 spdk_dma_free(ctx->pages); 1114 free(ctx); 1115 blob->state = SPDK_BLOB_STATE_DIRTY; 1116 cb_fn(seq, cb_arg, -ENOMEM); 1117 return; 1118 } 1119 page_num++; 1120 } 1121 1122 page_num = 0; 1123 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1124 for (i = 1; i < blob->active.num_pages; i++) { 1125 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1126 ctx->pages[i - 1].next = page_num; 1127 /* Now that previous metadata page is complete, calculate the crc for it. */ 1128 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1129 blob->active.pages[i] = page_num; 1130 spdk_bit_array_set(bs->used_md_pages, page_num); 1131 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1132 page_num++; 1133 } 1134 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1135 /* Start writing the metadata from last page to first */ 1136 ctx->idx = blob->active.num_pages - 1; 1137 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1138 } 1139 1140 static void 1141 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1142 void *payload, uint64_t offset, uint64_t length, 1143 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1144 { 1145 struct spdk_blob_data *blob = __blob_to_data(_blob); 1146 spdk_bs_batch_t *batch; 1147 struct spdk_bs_cpl cpl; 1148 uint64_t lba; 1149 uint32_t lba_count; 1150 uint8_t *buf; 1151 uint64_t page; 1152 1153 assert(blob != NULL); 1154 1155 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1156 cb_fn(cb_arg, -EPERM); 1157 return; 1158 } 1159 1160 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1161 cb_fn(cb_arg, -EINVAL); 1162 return; 1163 } 1164 1165 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1166 cpl.u.blob_basic.cb_fn = cb_fn; 1167 cpl.u.blob_basic.cb_arg = cb_arg; 1168 1169 batch = spdk_bs_batch_open(_channel, &cpl); 1170 if (!batch) { 1171 cb_fn(cb_arg, -ENOMEM); 1172 return; 1173 } 1174 1175 length = _spdk_bs_page_to_lba(blob->bs, length); 1176 page = offset; 1177 buf = payload; 1178 while (length > 0) { 1179 lba = _spdk_bs_blob_page_to_lba(blob, page); 1180 lba_count = spdk_min(length, 1181 _spdk_bs_page_to_lba(blob->bs, 1182 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1183 1184 switch (op_type) { 1185 case SPDK_BLOB_READ: 1186 spdk_bs_batch_read(batch, buf, lba, lba_count); 1187 break; 1188 case SPDK_BLOB_WRITE: 1189 spdk_bs_batch_write(batch, buf, lba, lba_count); 1190 break; 1191 case SPDK_BLOB_UNMAP: 1192 spdk_bs_batch_unmap(batch, lba, lba_count); 1193 break; 1194 case SPDK_BLOB_WRITE_ZEROES: 1195 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1196 break; 1197 } 1198 1199 length -= lba_count; 1200 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1201 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1202 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1203 } 1204 } 1205 1206 spdk_bs_batch_close(batch); 1207 } 1208 1209 struct rw_iov_ctx { 1210 struct spdk_blob_data *blob; 1211 bool read; 1212 int iovcnt; 1213 struct iovec *orig_iov; 1214 uint64_t page_offset; 1215 uint64_t pages_remaining; 1216 uint64_t pages_done; 1217 struct iovec iov[0]; 1218 }; 1219 1220 static void 1221 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1222 { 1223 assert(cb_arg == NULL); 1224 spdk_bs_sequence_finish(seq, bserrno); 1225 } 1226 1227 static void 1228 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1229 { 1230 struct rw_iov_ctx *ctx = cb_arg; 1231 struct iovec *iov, *orig_iov; 1232 int iovcnt; 1233 size_t orig_iovoff; 1234 uint64_t lba; 1235 uint64_t page_count, pages_to_boundary; 1236 uint32_t lba_count; 1237 uint64_t byte_count; 1238 1239 if (bserrno != 0 || ctx->pages_remaining == 0) { 1240 free(ctx); 1241 spdk_bs_sequence_finish(seq, bserrno); 1242 return; 1243 } 1244 1245 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1246 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1247 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1248 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1249 1250 /* 1251 * Get index and offset into the original iov array for our current position in the I/O sequence. 1252 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1253 * point to the current position in the I/O sequence. 1254 */ 1255 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1256 orig_iov = &ctx->orig_iov[0]; 1257 orig_iovoff = 0; 1258 while (byte_count > 0) { 1259 if (byte_count >= orig_iov->iov_len) { 1260 byte_count -= orig_iov->iov_len; 1261 orig_iov++; 1262 } else { 1263 orig_iovoff = byte_count; 1264 byte_count = 0; 1265 } 1266 } 1267 1268 /* 1269 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1270 * bytes of this next I/O remain to be accounted for in the new iov array. 1271 */ 1272 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1273 iov = &ctx->iov[0]; 1274 iovcnt = 0; 1275 while (byte_count > 0) { 1276 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1277 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1278 byte_count -= iov->iov_len; 1279 orig_iovoff = 0; 1280 orig_iov++; 1281 iov++; 1282 iovcnt++; 1283 } 1284 1285 ctx->page_offset += page_count; 1286 ctx->pages_done += page_count; 1287 ctx->pages_remaining -= page_count; 1288 iov = &ctx->iov[0]; 1289 1290 if (ctx->read) { 1291 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1292 } else { 1293 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1294 } 1295 } 1296 1297 static void 1298 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1299 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1300 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1301 { 1302 struct spdk_blob_data *blob = __blob_to_data(_blob); 1303 spdk_bs_sequence_t *seq; 1304 struct spdk_bs_cpl cpl; 1305 1306 assert(blob != NULL); 1307 1308 if (!read && blob->data_ro) { 1309 cb_fn(cb_arg, -EPERM); 1310 return; 1311 } 1312 1313 if (length == 0) { 1314 cb_fn(cb_arg, 0); 1315 return; 1316 } 1317 1318 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1319 cb_fn(cb_arg, -EINVAL); 1320 return; 1321 } 1322 1323 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1324 cpl.u.blob_basic.cb_fn = cb_fn; 1325 cpl.u.blob_basic.cb_arg = cb_arg; 1326 1327 /* 1328 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1329 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1330 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1331 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1332 * to allocate a separate iov array and split the I/O such that none of the resulting 1333 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1334 * but since this case happens very infrequently, any performance impact will be negligible. 1335 * 1336 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1337 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1338 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1339 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1340 */ 1341 seq = spdk_bs_sequence_start(_channel, &cpl); 1342 if (!seq) { 1343 cb_fn(cb_arg, -ENOMEM); 1344 return; 1345 } 1346 1347 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1348 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1349 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1350 1351 if (read) { 1352 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1353 } else { 1354 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1355 } 1356 } else { 1357 struct rw_iov_ctx *ctx; 1358 1359 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1360 if (ctx == NULL) { 1361 spdk_bs_sequence_finish(seq, -ENOMEM); 1362 return; 1363 } 1364 1365 ctx->blob = blob; 1366 ctx->read = read; 1367 ctx->orig_iov = iov; 1368 ctx->iovcnt = iovcnt; 1369 ctx->page_offset = offset; 1370 ctx->pages_remaining = length; 1371 ctx->pages_done = 0; 1372 1373 _spdk_rw_iov_split_next(seq, ctx, 0); 1374 } 1375 } 1376 1377 static struct spdk_blob_data * 1378 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1379 { 1380 struct spdk_blob_data *blob; 1381 1382 TAILQ_FOREACH(blob, &bs->blobs, link) { 1383 if (blob->id == blobid) { 1384 return blob; 1385 } 1386 } 1387 1388 return NULL; 1389 } 1390 1391 static int 1392 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1393 uint32_t max_ops) 1394 { 1395 struct spdk_bs_dev *dev; 1396 uint32_t i; 1397 1398 dev = bs->dev; 1399 1400 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1401 if (!channel->req_mem) { 1402 return -1; 1403 } 1404 1405 TAILQ_INIT(&channel->reqs); 1406 1407 for (i = 0; i < max_ops; i++) { 1408 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1409 } 1410 1411 channel->bs = bs; 1412 channel->dev = dev; 1413 channel->dev_channel = dev->create_channel(dev); 1414 1415 if (!channel->dev_channel) { 1416 SPDK_ERRLOG("Failed to create device channel.\n"); 1417 free(channel->req_mem); 1418 return -1; 1419 } 1420 1421 return 0; 1422 } 1423 1424 static int 1425 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1426 { 1427 struct spdk_blob_store *bs; 1428 struct spdk_bs_channel *channel = ctx_buf; 1429 1430 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1431 1432 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1433 } 1434 1435 static int 1436 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1437 { 1438 struct spdk_blob_store *bs; 1439 struct spdk_bs_channel *channel = ctx_buf; 1440 1441 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1442 1443 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1444 } 1445 1446 1447 static void 1448 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1449 { 1450 struct spdk_bs_channel *channel = ctx_buf; 1451 1452 free(channel->req_mem); 1453 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1454 } 1455 1456 static void 1457 _spdk_bs_dev_destroy(void *io_device) 1458 { 1459 struct spdk_blob_store *bs; 1460 struct spdk_blob_data *blob, *blob_tmp; 1461 1462 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1463 bs->dev->destroy(bs->dev); 1464 1465 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1466 TAILQ_REMOVE(&bs->blobs, blob, link); 1467 _spdk_blob_free(blob); 1468 } 1469 1470 spdk_bit_array_free(&bs->used_md_pages); 1471 spdk_bit_array_free(&bs->used_clusters); 1472 /* 1473 * If this function is called for any reason except a successful unload, 1474 * the unload_cpl type will be NONE and this will be a nop. 1475 */ 1476 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1477 1478 free(bs); 1479 } 1480 1481 static void 1482 _spdk_bs_free(struct spdk_blob_store *bs) 1483 { 1484 spdk_bs_unregister_md_thread(bs); 1485 spdk_io_device_unregister(&bs->io_target, NULL); 1486 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1487 } 1488 1489 void 1490 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1491 { 1492 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1493 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1494 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1495 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1496 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1497 } 1498 1499 static int 1500 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1501 { 1502 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1503 opts->max_channel_ops == 0) { 1504 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1505 return -1; 1506 } 1507 1508 return 0; 1509 } 1510 1511 static struct spdk_blob_store * 1512 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1513 { 1514 struct spdk_blob_store *bs; 1515 uint64_t dev_size; 1516 int rc; 1517 1518 dev_size = dev->blocklen * dev->blockcnt; 1519 if (dev_size < opts->cluster_sz) { 1520 /* Device size cannot be smaller than cluster size of blobstore */ 1521 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1522 opts->cluster_sz); 1523 return NULL; 1524 } 1525 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1526 /* Cluster size cannot be smaller than page size */ 1527 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1528 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1529 return NULL; 1530 } 1531 bs = calloc(1, sizeof(struct spdk_blob_store)); 1532 if (!bs) { 1533 return NULL; 1534 } 1535 1536 TAILQ_INIT(&bs->blobs); 1537 bs->dev = dev; 1538 1539 /* 1540 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1541 * even multiple of the cluster size. 1542 */ 1543 bs->cluster_sz = opts->cluster_sz; 1544 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1545 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1546 bs->num_free_clusters = bs->total_clusters; 1547 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1548 if (bs->used_clusters == NULL) { 1549 free(bs); 1550 return NULL; 1551 } 1552 1553 bs->md_target.max_md_ops = opts->max_md_ops; 1554 bs->io_target.max_channel_ops = opts->max_channel_ops; 1555 bs->super_blob = SPDK_BLOBID_INVALID; 1556 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1557 1558 /* The metadata is assumed to be at least 1 page */ 1559 bs->used_md_pages = spdk_bit_array_create(1); 1560 1561 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1562 sizeof(struct spdk_bs_channel)); 1563 rc = spdk_bs_register_md_thread(bs); 1564 if (rc == -1) { 1565 spdk_io_device_unregister(&bs->md_target, NULL); 1566 spdk_bit_array_free(&bs->used_md_pages); 1567 spdk_bit_array_free(&bs->used_clusters); 1568 free(bs); 1569 return NULL; 1570 } 1571 1572 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1573 sizeof(struct spdk_bs_channel)); 1574 1575 return bs; 1576 } 1577 1578 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1579 1580 struct spdk_bs_load_ctx { 1581 struct spdk_blob_store *bs; 1582 struct spdk_bs_super_block *super; 1583 1584 struct spdk_bs_md_mask *mask; 1585 bool in_page_chain; 1586 uint32_t page_index; 1587 uint32_t cur_page; 1588 struct spdk_blob_md_page *page; 1589 }; 1590 1591 static void 1592 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1593 { 1594 uint32_t i = 0; 1595 1596 while (true) { 1597 i = spdk_bit_array_find_first_set(array, i); 1598 if (i >= mask->length) { 1599 break; 1600 } 1601 mask->mask[i / 8] |= 1U << (i % 8); 1602 i++; 1603 } 1604 } 1605 1606 static void 1607 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1608 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1609 { 1610 /* Update the values in the super block */ 1611 super->super_blob = bs->super_blob; 1612 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1613 super->crc = _spdk_blob_md_page_calc_crc(super); 1614 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1615 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1616 cb_fn, cb_arg); 1617 } 1618 1619 static void 1620 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1621 { 1622 struct spdk_bs_load_ctx *ctx = arg; 1623 uint64_t mask_size, lba, lba_count; 1624 1625 /* Write out the used clusters mask */ 1626 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1627 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1628 if (!ctx->mask) { 1629 spdk_dma_free(ctx->super); 1630 free(ctx); 1631 spdk_bs_sequence_finish(seq, -ENOMEM); 1632 return; 1633 } 1634 1635 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1636 ctx->mask->length = ctx->bs->total_clusters; 1637 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1638 1639 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1640 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1641 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1642 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1643 } 1644 1645 static void 1646 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1647 { 1648 struct spdk_bs_load_ctx *ctx = arg; 1649 uint64_t mask_size, lba, lba_count; 1650 1651 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1652 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1653 if (!ctx->mask) { 1654 spdk_dma_free(ctx->super); 1655 free(ctx); 1656 spdk_bs_sequence_finish(seq, -ENOMEM); 1657 return; 1658 } 1659 1660 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1661 ctx->mask->length = ctx->super->md_len; 1662 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1663 1664 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1665 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1666 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1667 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1668 } 1669 1670 static void 1671 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1672 { 1673 struct spdk_bs_load_ctx *ctx = cb_arg; 1674 uint32_t i, j; 1675 int rc; 1676 1677 /* The type must be correct */ 1678 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1679 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1680 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1681 struct spdk_blob_md_page) * 8)); 1682 /* The length of the mask must be exactly equal to the total number of clusters */ 1683 assert(ctx->mask->length == ctx->bs->total_clusters); 1684 1685 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1686 if (rc < 0) { 1687 spdk_dma_free(ctx->super); 1688 spdk_dma_free(ctx->mask); 1689 _spdk_bs_free(ctx->bs); 1690 free(ctx); 1691 spdk_bs_sequence_finish(seq, -ENOMEM); 1692 return; 1693 } 1694 1695 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1696 for (i = 0; i < ctx->mask->length / 8; i++) { 1697 uint8_t segment = ctx->mask->mask[i]; 1698 for (j = 0; segment && (j < 8); j++) { 1699 if (segment & 1U) { 1700 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1701 assert(ctx->bs->num_free_clusters > 0); 1702 ctx->bs->num_free_clusters--; 1703 } 1704 segment >>= 1U; 1705 } 1706 } 1707 1708 spdk_dma_free(ctx->super); 1709 spdk_dma_free(ctx->mask); 1710 free(ctx); 1711 1712 spdk_bs_sequence_finish(seq, bserrno); 1713 } 1714 1715 static void 1716 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1717 { 1718 struct spdk_bs_load_ctx *ctx = cb_arg; 1719 uint64_t lba, lba_count, mask_size; 1720 uint32_t i, j; 1721 int rc; 1722 1723 /* The type must be correct */ 1724 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1725 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1726 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1727 8)); 1728 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1729 assert(ctx->mask->length == ctx->super->md_len); 1730 1731 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1732 if (rc < 0) { 1733 spdk_dma_free(ctx->super); 1734 spdk_dma_free(ctx->mask); 1735 _spdk_bs_free(ctx->bs); 1736 free(ctx); 1737 spdk_bs_sequence_finish(seq, -ENOMEM); 1738 return; 1739 } 1740 1741 for (i = 0; i < ctx->mask->length / 8; i++) { 1742 uint8_t segment = ctx->mask->mask[i]; 1743 for (j = 0; segment && (j < 8); j++) { 1744 if (segment & 1U) { 1745 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1746 } 1747 segment >>= 1U; 1748 } 1749 } 1750 spdk_dma_free(ctx->mask); 1751 1752 /* Read the used clusters mask */ 1753 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1754 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1755 if (!ctx->mask) { 1756 spdk_dma_free(ctx->super); 1757 _spdk_bs_free(ctx->bs); 1758 free(ctx); 1759 spdk_bs_sequence_finish(seq, -ENOMEM); 1760 return; 1761 } 1762 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1763 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1764 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1765 _spdk_bs_load_used_clusters_cpl, ctx); 1766 } 1767 1768 static void 1769 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1770 { 1771 struct spdk_bs_load_ctx *ctx = cb_arg; 1772 uint64_t lba, lba_count, mask_size; 1773 1774 /* Read the used pages mask */ 1775 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1776 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1777 if (!ctx->mask) { 1778 spdk_dma_free(ctx->super); 1779 _spdk_bs_free(ctx->bs); 1780 free(ctx); 1781 spdk_bs_sequence_finish(seq, -ENOMEM); 1782 return; 1783 } 1784 1785 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1786 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1787 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1788 _spdk_bs_load_used_pages_cpl, ctx); 1789 } 1790 1791 static int 1792 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1793 { 1794 struct spdk_blob_md_descriptor *desc; 1795 size_t cur_desc = 0; 1796 1797 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1798 while (cur_desc < sizeof(page->descriptors)) { 1799 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1800 if (desc->length == 0) { 1801 /* If padding and length are 0, this terminates the page */ 1802 break; 1803 } 1804 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1805 struct spdk_blob_md_descriptor_extent *desc_extent; 1806 unsigned int i, j; 1807 unsigned int cluster_count = 0; 1808 1809 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1810 1811 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1812 for (j = 0; j < desc_extent->extents[i].length; j++) { 1813 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1814 if (bs->num_free_clusters == 0) { 1815 return -1; 1816 } 1817 bs->num_free_clusters--; 1818 cluster_count++; 1819 } 1820 } 1821 if (cluster_count == 0) { 1822 return -1; 1823 } 1824 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1825 /* Skip this item */ 1826 } else { 1827 /* Error */ 1828 return -1; 1829 } 1830 /* Advance to the next descriptor */ 1831 cur_desc += sizeof(*desc) + desc->length; 1832 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1833 break; 1834 } 1835 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1836 } 1837 return 0; 1838 } 1839 1840 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1841 { 1842 uint32_t crc; 1843 1844 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1845 if (crc != ctx->page->crc) { 1846 return false; 1847 } 1848 1849 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1850 return false; 1851 } 1852 return true; 1853 } 1854 1855 static void 1856 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1857 1858 static void 1859 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1860 { 1861 struct spdk_bs_load_ctx *ctx = cb_arg; 1862 1863 spdk_dma_free(ctx->mask); 1864 spdk_dma_free(ctx->super); 1865 spdk_bs_sequence_finish(seq, bserrno); 1866 free(ctx); 1867 } 1868 1869 static void 1870 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1871 { 1872 struct spdk_bs_load_ctx *ctx = cb_arg; 1873 1874 spdk_dma_free(ctx->mask); 1875 1876 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1877 } 1878 1879 static void 1880 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1881 { 1882 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1883 } 1884 1885 static void 1886 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1887 { 1888 struct spdk_bs_load_ctx *ctx = cb_arg; 1889 uint32_t page_num; 1890 1891 if (bserrno != 0) { 1892 spdk_dma_free(ctx->super); 1893 _spdk_bs_free(ctx->bs); 1894 free(ctx); 1895 spdk_bs_sequence_finish(seq, bserrno); 1896 return; 1897 } 1898 1899 page_num = ctx->cur_page; 1900 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1901 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1902 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1903 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1904 spdk_dma_free(ctx->super); 1905 _spdk_bs_free(ctx->bs); 1906 free(ctx); 1907 spdk_bs_sequence_finish(seq, -EILSEQ); 1908 return; 1909 } 1910 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 1911 ctx->in_page_chain = true; 1912 ctx->cur_page = ctx->page->next; 1913 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1914 return; 1915 } 1916 } 1917 } 1918 1919 ctx->in_page_chain = false; 1920 1921 do { 1922 ctx->page_index++; 1923 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 1924 1925 if (ctx->page_index < ctx->super->md_len) { 1926 ctx->cur_page = ctx->page_index; 1927 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1928 } else { 1929 spdk_dma_free(ctx->page); 1930 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 1931 } 1932 } 1933 1934 static void 1935 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 1936 { 1937 struct spdk_bs_load_ctx *ctx = cb_arg; 1938 uint64_t lba; 1939 1940 assert(ctx->cur_page < ctx->super->md_len); 1941 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 1942 spdk_bs_sequence_read(seq, ctx->page, lba, 1943 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 1944 _spdk_bs_load_replay_md_cpl, ctx); 1945 } 1946 1947 static void 1948 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 1949 { 1950 struct spdk_bs_load_ctx *ctx = cb_arg; 1951 1952 ctx->page_index = 0; 1953 ctx->cur_page = 0; 1954 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 1955 SPDK_BS_PAGE_SIZE, 1956 NULL); 1957 if (!ctx->page) { 1958 spdk_dma_free(ctx->super); 1959 _spdk_bs_free(ctx->bs); 1960 free(ctx); 1961 spdk_bs_sequence_finish(seq, -ENOMEM); 1962 return; 1963 } 1964 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1965 } 1966 1967 static void 1968 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 1969 { 1970 struct spdk_bs_load_ctx *ctx = cb_arg; 1971 int rc; 1972 1973 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 1974 if (rc < 0) { 1975 spdk_dma_free(ctx->super); 1976 _spdk_bs_free(ctx->bs); 1977 free(ctx); 1978 spdk_bs_sequence_finish(seq, -ENOMEM); 1979 return; 1980 } 1981 1982 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1983 if (rc < 0) { 1984 spdk_dma_free(ctx->super); 1985 _spdk_bs_free(ctx->bs); 1986 free(ctx); 1987 spdk_bs_sequence_finish(seq, -ENOMEM); 1988 return; 1989 } 1990 1991 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1992 _spdk_bs_load_replay_md(seq, cb_arg); 1993 } 1994 1995 static void 1996 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1997 { 1998 struct spdk_bs_load_ctx *ctx = cb_arg; 1999 uint32_t crc; 2000 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 2001 2002 if (ctx->super->version > SPDK_BS_VERSION || 2003 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 2004 spdk_dma_free(ctx->super); 2005 _spdk_bs_free(ctx->bs); 2006 free(ctx); 2007 spdk_bs_sequence_finish(seq, -EILSEQ); 2008 return; 2009 } 2010 2011 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2012 sizeof(ctx->super->signature)) != 0) { 2013 spdk_dma_free(ctx->super); 2014 _spdk_bs_free(ctx->bs); 2015 free(ctx); 2016 spdk_bs_sequence_finish(seq, -EILSEQ); 2017 return; 2018 } 2019 2020 crc = _spdk_blob_md_page_calc_crc(ctx->super); 2021 if (crc != ctx->super->crc) { 2022 spdk_dma_free(ctx->super); 2023 _spdk_bs_free(ctx->bs); 2024 free(ctx); 2025 spdk_bs_sequence_finish(seq, -EILSEQ); 2026 return; 2027 } 2028 2029 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2030 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 2031 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2032 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 2033 } else { 2034 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 2035 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2036 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2037 spdk_dma_free(ctx->super); 2038 _spdk_bs_free(ctx->bs); 2039 free(ctx); 2040 spdk_bs_sequence_finish(seq, -ENXIO); 2041 return; 2042 } 2043 2044 /* Parse the super block */ 2045 ctx->bs->cluster_sz = ctx->super->cluster_size; 2046 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 2047 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2048 ctx->bs->md_start = ctx->super->md_start; 2049 ctx->bs->md_len = ctx->super->md_len; 2050 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 2051 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 2052 ctx->bs->super_blob = ctx->super->super_blob; 2053 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 2054 2055 if (ctx->super->clean == 1) { 2056 ctx->super->clean = 0; 2057 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 2058 } else { 2059 _spdk_bs_recover(seq, ctx); 2060 } 2061 } 2062 2063 void 2064 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2065 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2066 { 2067 struct spdk_blob_store *bs; 2068 struct spdk_bs_cpl cpl; 2069 spdk_bs_sequence_t *seq; 2070 struct spdk_bs_load_ctx *ctx; 2071 struct spdk_bs_opts opts = {}; 2072 2073 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 2074 2075 if (o) { 2076 opts = *o; 2077 } else { 2078 spdk_bs_opts_init(&opts); 2079 } 2080 2081 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 2082 cb_fn(cb_arg, NULL, -EINVAL); 2083 return; 2084 } 2085 2086 bs = _spdk_bs_alloc(dev, &opts); 2087 if (!bs) { 2088 cb_fn(cb_arg, NULL, -ENOMEM); 2089 return; 2090 } 2091 2092 ctx = calloc(1, sizeof(*ctx)); 2093 if (!ctx) { 2094 _spdk_bs_free(bs); 2095 cb_fn(cb_arg, NULL, -ENOMEM); 2096 return; 2097 } 2098 2099 ctx->bs = bs; 2100 2101 /* Allocate memory for the super block */ 2102 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2103 if (!ctx->super) { 2104 free(ctx); 2105 _spdk_bs_free(bs); 2106 return; 2107 } 2108 2109 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2110 cpl.u.bs_handle.cb_fn = cb_fn; 2111 cpl.u.bs_handle.cb_arg = cb_arg; 2112 cpl.u.bs_handle.bs = bs; 2113 2114 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2115 if (!seq) { 2116 spdk_dma_free(ctx->super); 2117 free(ctx); 2118 _spdk_bs_free(bs); 2119 cb_fn(cb_arg, NULL, -ENOMEM); 2120 return; 2121 } 2122 2123 /* Read the super block */ 2124 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2125 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2126 _spdk_bs_load_super_cpl, ctx); 2127 } 2128 2129 /* END spdk_bs_load */ 2130 2131 /* START spdk_bs_init */ 2132 2133 struct spdk_bs_init_ctx { 2134 struct spdk_blob_store *bs; 2135 struct spdk_bs_super_block *super; 2136 }; 2137 2138 static void 2139 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2140 { 2141 struct spdk_bs_init_ctx *ctx = cb_arg; 2142 2143 spdk_dma_free(ctx->super); 2144 free(ctx); 2145 2146 spdk_bs_sequence_finish(seq, bserrno); 2147 } 2148 2149 static void 2150 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2151 { 2152 struct spdk_bs_init_ctx *ctx = cb_arg; 2153 2154 /* Write super block */ 2155 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2156 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2157 _spdk_bs_init_persist_super_cpl, ctx); 2158 } 2159 2160 void 2161 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2162 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2163 { 2164 struct spdk_bs_init_ctx *ctx; 2165 struct spdk_blob_store *bs; 2166 struct spdk_bs_cpl cpl; 2167 spdk_bs_sequence_t *seq; 2168 spdk_bs_batch_t *batch; 2169 uint64_t num_md_lba; 2170 uint64_t num_md_pages; 2171 uint64_t num_md_clusters; 2172 uint32_t i; 2173 struct spdk_bs_opts opts = {}; 2174 int rc; 2175 2176 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 2177 2178 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2179 SPDK_ERRLOG("unsupported dev block length of %d\n", 2180 dev->blocklen); 2181 dev->destroy(dev); 2182 cb_fn(cb_arg, NULL, -EINVAL); 2183 return; 2184 } 2185 2186 if (o) { 2187 opts = *o; 2188 } else { 2189 spdk_bs_opts_init(&opts); 2190 } 2191 2192 if (_spdk_bs_opts_verify(&opts) != 0) { 2193 dev->destroy(dev); 2194 cb_fn(cb_arg, NULL, -EINVAL); 2195 return; 2196 } 2197 2198 bs = _spdk_bs_alloc(dev, &opts); 2199 if (!bs) { 2200 dev->destroy(dev); 2201 cb_fn(cb_arg, NULL, -ENOMEM); 2202 return; 2203 } 2204 2205 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2206 /* By default, allocate 1 page per cluster. 2207 * Technically, this over-allocates metadata 2208 * because more metadata will reduce the number 2209 * of usable clusters. This can be addressed with 2210 * more complex math in the future. 2211 */ 2212 bs->md_len = bs->total_clusters; 2213 } else { 2214 bs->md_len = opts.num_md_pages; 2215 } 2216 2217 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2218 if (rc < 0) { 2219 _spdk_bs_free(bs); 2220 cb_fn(cb_arg, NULL, -ENOMEM); 2221 return; 2222 } 2223 2224 ctx = calloc(1, sizeof(*ctx)); 2225 if (!ctx) { 2226 _spdk_bs_free(bs); 2227 cb_fn(cb_arg, NULL, -ENOMEM); 2228 return; 2229 } 2230 2231 ctx->bs = bs; 2232 2233 /* Allocate memory for the super block */ 2234 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2235 if (!ctx->super) { 2236 free(ctx); 2237 _spdk_bs_free(bs); 2238 return; 2239 } 2240 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2241 sizeof(ctx->super->signature)); 2242 ctx->super->version = SPDK_BS_VERSION; 2243 ctx->super->length = sizeof(*ctx->super); 2244 ctx->super->super_blob = bs->super_blob; 2245 ctx->super->clean = 0; 2246 ctx->super->cluster_size = bs->cluster_sz; 2247 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2248 2249 /* Calculate how many pages the metadata consumes at the front 2250 * of the disk. 2251 */ 2252 2253 /* The super block uses 1 page */ 2254 num_md_pages = 1; 2255 2256 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2257 * up to the nearest page, plus a header. 2258 */ 2259 ctx->super->used_page_mask_start = num_md_pages; 2260 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2261 divide_round_up(bs->md_len, 8), 2262 SPDK_BS_PAGE_SIZE); 2263 num_md_pages += ctx->super->used_page_mask_len; 2264 2265 /* The used_clusters mask requires 1 bit per cluster, rounded 2266 * up to the nearest page, plus a header. 2267 */ 2268 ctx->super->used_cluster_mask_start = num_md_pages; 2269 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2270 divide_round_up(bs->total_clusters, 8), 2271 SPDK_BS_PAGE_SIZE); 2272 num_md_pages += ctx->super->used_cluster_mask_len; 2273 2274 /* The metadata region size was chosen above */ 2275 ctx->super->md_start = bs->md_start = num_md_pages; 2276 ctx->super->md_len = bs->md_len; 2277 num_md_pages += bs->md_len; 2278 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2279 2280 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2281 2282 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2283 if (num_md_clusters > bs->total_clusters) { 2284 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2285 "please decrease number of pages reserved for metadata " 2286 "or increase cluster size.\n"); 2287 spdk_dma_free(ctx->super); 2288 free(ctx); 2289 _spdk_bs_free(bs); 2290 cb_fn(cb_arg, NULL, -ENOMEM); 2291 return; 2292 } 2293 /* Claim all of the clusters used by the metadata */ 2294 for (i = 0; i < num_md_clusters; i++) { 2295 _spdk_bs_claim_cluster(bs, i); 2296 } 2297 2298 bs->total_data_clusters = bs->num_free_clusters; 2299 2300 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2301 cpl.u.bs_handle.cb_fn = cb_fn; 2302 cpl.u.bs_handle.cb_arg = cb_arg; 2303 cpl.u.bs_handle.bs = bs; 2304 2305 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2306 if (!seq) { 2307 spdk_dma_free(ctx->super); 2308 free(ctx); 2309 _spdk_bs_free(bs); 2310 cb_fn(cb_arg, NULL, -ENOMEM); 2311 return; 2312 } 2313 2314 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2315 2316 /* Clear metadata space */ 2317 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2318 /* Trim data clusters */ 2319 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2320 2321 spdk_bs_batch_close(batch); 2322 } 2323 2324 /* END spdk_bs_init */ 2325 2326 /* START spdk_bs_destroy */ 2327 2328 static void 2329 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2330 { 2331 struct spdk_bs_init_ctx *ctx = cb_arg; 2332 struct spdk_blob_store *bs = ctx->bs; 2333 2334 /* 2335 * We need to defer calling spdk_bs_call_cpl() until after 2336 * dev destruction, so tuck these away for later use. 2337 */ 2338 bs->unload_err = bserrno; 2339 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2340 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2341 2342 spdk_bs_sequence_finish(seq, bserrno); 2343 2344 _spdk_bs_free(bs); 2345 free(ctx); 2346 } 2347 2348 void 2349 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 2350 void *cb_arg) 2351 { 2352 struct spdk_bs_cpl cpl; 2353 spdk_bs_sequence_t *seq; 2354 struct spdk_bs_init_ctx *ctx; 2355 2356 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 2357 2358 if (!TAILQ_EMPTY(&bs->blobs)) { 2359 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2360 cb_fn(cb_arg, -EBUSY); 2361 return; 2362 } 2363 2364 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2365 cpl.u.bs_basic.cb_fn = cb_fn; 2366 cpl.u.bs_basic.cb_arg = cb_arg; 2367 2368 ctx = calloc(1, sizeof(*ctx)); 2369 if (!ctx) { 2370 cb_fn(cb_arg, -ENOMEM); 2371 return; 2372 } 2373 2374 ctx->bs = bs; 2375 2376 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2377 if (!seq) { 2378 free(ctx); 2379 cb_fn(cb_arg, -ENOMEM); 2380 return; 2381 } 2382 2383 /* Write zeroes to the super block */ 2384 spdk_bs_sequence_write_zeroes(seq, 2385 _spdk_bs_page_to_lba(bs, 0), 2386 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 2387 _spdk_bs_destroy_trim_cpl, ctx); 2388 } 2389 2390 /* END spdk_bs_destroy */ 2391 2392 /* START spdk_bs_unload */ 2393 2394 static void 2395 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2396 { 2397 struct spdk_bs_load_ctx *ctx = cb_arg; 2398 2399 spdk_dma_free(ctx->super); 2400 2401 /* 2402 * We need to defer calling spdk_bs_call_cpl() until after 2403 * dev destuction, so tuck these away for later use. 2404 */ 2405 ctx->bs->unload_err = bserrno; 2406 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2407 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2408 2409 spdk_bs_sequence_finish(seq, bserrno); 2410 2411 _spdk_bs_free(ctx->bs); 2412 free(ctx); 2413 } 2414 2415 static void 2416 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2417 { 2418 struct spdk_bs_load_ctx *ctx = cb_arg; 2419 2420 spdk_dma_free(ctx->mask); 2421 ctx->super->clean = 1; 2422 2423 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2424 } 2425 2426 static void 2427 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2428 { 2429 struct spdk_bs_load_ctx *ctx = cb_arg; 2430 2431 spdk_dma_free(ctx->mask); 2432 2433 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2434 } 2435 2436 static void 2437 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2438 { 2439 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2440 } 2441 2442 void 2443 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2444 { 2445 struct spdk_bs_cpl cpl; 2446 spdk_bs_sequence_t *seq; 2447 struct spdk_bs_load_ctx *ctx; 2448 2449 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 2450 2451 if (!TAILQ_EMPTY(&bs->blobs)) { 2452 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2453 cb_fn(cb_arg, -EBUSY); 2454 return; 2455 } 2456 2457 ctx = calloc(1, sizeof(*ctx)); 2458 if (!ctx) { 2459 cb_fn(cb_arg, -ENOMEM); 2460 return; 2461 } 2462 2463 ctx->bs = bs; 2464 2465 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2466 if (!ctx->super) { 2467 free(ctx); 2468 cb_fn(cb_arg, -ENOMEM); 2469 return; 2470 } 2471 2472 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2473 cpl.u.bs_basic.cb_fn = cb_fn; 2474 cpl.u.bs_basic.cb_arg = cb_arg; 2475 2476 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2477 if (!seq) { 2478 spdk_dma_free(ctx->super); 2479 free(ctx); 2480 cb_fn(cb_arg, -ENOMEM); 2481 return; 2482 } 2483 2484 /* Read super block */ 2485 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2486 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2487 _spdk_bs_unload_read_super_cpl, ctx); 2488 } 2489 2490 /* END spdk_bs_unload */ 2491 2492 void 2493 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2494 spdk_bs_op_complete cb_fn, void *cb_arg) 2495 { 2496 bs->super_blob = blobid; 2497 cb_fn(cb_arg, 0); 2498 } 2499 2500 void 2501 spdk_bs_get_super(struct spdk_blob_store *bs, 2502 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2503 { 2504 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2505 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2506 } else { 2507 cb_fn(cb_arg, bs->super_blob, 0); 2508 } 2509 } 2510 2511 uint64_t 2512 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2513 { 2514 return bs->cluster_sz; 2515 } 2516 2517 uint64_t 2518 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2519 { 2520 return SPDK_BS_PAGE_SIZE; 2521 } 2522 2523 uint64_t 2524 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2525 { 2526 return bs->num_free_clusters; 2527 } 2528 2529 uint64_t 2530 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2531 { 2532 return bs->total_data_clusters; 2533 } 2534 2535 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2536 { 2537 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 2538 if (!bs->md_target.md_channel) { 2539 SPDK_ERRLOG("Failed to get IO channel.\n"); 2540 return -1; 2541 } 2542 2543 return 0; 2544 } 2545 2546 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2547 { 2548 spdk_put_io_channel(bs->md_target.md_channel); 2549 2550 return 0; 2551 } 2552 2553 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob) 2554 { 2555 struct spdk_blob_data *blob = __blob_to_data(_blob); 2556 2557 assert(blob != NULL); 2558 2559 return blob->id; 2560 } 2561 2562 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob) 2563 { 2564 struct spdk_blob_data *blob = __blob_to_data(_blob); 2565 2566 assert(blob != NULL); 2567 2568 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2569 } 2570 2571 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob) 2572 { 2573 struct spdk_blob_data *blob = __blob_to_data(_blob); 2574 2575 assert(blob != NULL); 2576 2577 return blob->active.num_clusters; 2578 } 2579 2580 /* START spdk_bs_md_create_blob */ 2581 2582 static void 2583 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2584 { 2585 struct spdk_blob_data *blob = cb_arg; 2586 2587 _spdk_blob_free(blob); 2588 2589 spdk_bs_sequence_finish(seq, bserrno); 2590 } 2591 2592 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2593 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2594 { 2595 struct spdk_blob_data *blob; 2596 uint32_t page_idx; 2597 struct spdk_bs_cpl cpl; 2598 spdk_bs_sequence_t *seq; 2599 spdk_blob_id id; 2600 2601 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2602 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2603 cb_fn(cb_arg, 0, -ENOMEM); 2604 return; 2605 } 2606 spdk_bit_array_set(bs->used_md_pages, page_idx); 2607 2608 id = _spdk_bs_page_to_blobid(page_idx); 2609 2610 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2611 2612 blob = _spdk_blob_alloc(bs, id); 2613 if (!blob) { 2614 cb_fn(cb_arg, 0, -ENOMEM); 2615 return; 2616 } 2617 2618 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2619 cpl.u.blobid.cb_fn = cb_fn; 2620 cpl.u.blobid.cb_arg = cb_arg; 2621 cpl.u.blobid.blobid = blob->id; 2622 2623 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2624 if (!seq) { 2625 _spdk_blob_free(blob); 2626 cb_fn(cb_arg, 0, -ENOMEM); 2627 return; 2628 } 2629 2630 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2631 } 2632 2633 /* END spdk_bs_md_create_blob */ 2634 2635 /* START spdk_blob_resize */ 2636 int 2637 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz) 2638 { 2639 struct spdk_blob_data *blob = __blob_to_data(_blob); 2640 int rc; 2641 2642 assert(blob != NULL); 2643 2644 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2645 2646 if (blob->md_ro) { 2647 return -EPERM; 2648 } 2649 2650 if (sz == blob->active.num_clusters) { 2651 return 0; 2652 } 2653 2654 rc = _spdk_resize_blob(blob, sz); 2655 if (rc < 0) { 2656 return rc; 2657 } 2658 2659 return 0; 2660 } 2661 2662 /* END spdk_blob_resize */ 2663 2664 2665 /* START spdk_bs_md_delete_blob */ 2666 2667 static void 2668 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2669 { 2670 struct spdk_blob_data *blob = cb_arg; 2671 2672 _spdk_blob_free(blob); 2673 2674 spdk_bs_sequence_finish(seq, bserrno); 2675 } 2676 2677 static void 2678 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2679 { 2680 struct spdk_blob_data *blob = cb_arg; 2681 2682 /* If the blob have crc error, we just return NULL. */ 2683 if (blob == NULL) { 2684 spdk_bs_sequence_finish(seq, bserrno); 2685 return; 2686 } 2687 blob->state = SPDK_BLOB_STATE_DIRTY; 2688 blob->active.num_pages = 0; 2689 _spdk_resize_blob(blob, 0); 2690 2691 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2692 } 2693 2694 void 2695 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2696 spdk_blob_op_complete cb_fn, void *cb_arg) 2697 { 2698 struct spdk_blob_data *blob; 2699 struct spdk_bs_cpl cpl; 2700 spdk_bs_sequence_t *seq; 2701 2702 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 2703 2704 blob = _spdk_blob_lookup(bs, blobid); 2705 if (blob) { 2706 assert(blob->open_ref > 0); 2707 cb_fn(cb_arg, -EINVAL); 2708 return; 2709 } 2710 2711 blob = _spdk_blob_alloc(bs, blobid); 2712 if (!blob) { 2713 cb_fn(cb_arg, -ENOMEM); 2714 return; 2715 } 2716 2717 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2718 cpl.u.blob_basic.cb_fn = cb_fn; 2719 cpl.u.blob_basic.cb_arg = cb_arg; 2720 2721 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2722 if (!seq) { 2723 _spdk_blob_free(blob); 2724 cb_fn(cb_arg, -ENOMEM); 2725 return; 2726 } 2727 2728 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2729 } 2730 2731 /* END spdk_bs_md_delete_blob */ 2732 2733 /* START spdk_bs_md_open_blob */ 2734 2735 static void 2736 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2737 { 2738 struct spdk_blob_data *blob = cb_arg; 2739 2740 /* If the blob have crc error, we just return NULL. */ 2741 if (blob == NULL) { 2742 seq->cpl.u.blob_handle.blob = NULL; 2743 spdk_bs_sequence_finish(seq, bserrno); 2744 return; 2745 } 2746 2747 blob->open_ref++; 2748 2749 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2750 2751 spdk_bs_sequence_finish(seq, bserrno); 2752 } 2753 2754 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2755 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2756 { 2757 struct spdk_blob_data *blob; 2758 struct spdk_bs_cpl cpl; 2759 spdk_bs_sequence_t *seq; 2760 uint32_t page_num; 2761 2762 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 2763 2764 blob = _spdk_blob_lookup(bs, blobid); 2765 if (blob) { 2766 blob->open_ref++; 2767 cb_fn(cb_arg, __data_to_blob(blob), 0); 2768 return; 2769 } 2770 2771 page_num = _spdk_bs_blobid_to_page(blobid); 2772 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2773 /* Invalid blobid */ 2774 cb_fn(cb_arg, NULL, -ENOENT); 2775 return; 2776 } 2777 2778 blob = _spdk_blob_alloc(bs, blobid); 2779 if (!blob) { 2780 cb_fn(cb_arg, NULL, -ENOMEM); 2781 return; 2782 } 2783 2784 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2785 cpl.u.blob_handle.cb_fn = cb_fn; 2786 cpl.u.blob_handle.cb_arg = cb_arg; 2787 cpl.u.blob_handle.blob = __data_to_blob(blob); 2788 2789 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2790 if (!seq) { 2791 _spdk_blob_free(blob); 2792 cb_fn(cb_arg, NULL, -ENOMEM); 2793 return; 2794 } 2795 2796 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2797 } 2798 2799 /* START spdk_blob_sync_md */ 2800 static void 2801 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2802 { 2803 spdk_bs_sequence_finish(seq, bserrno); 2804 } 2805 2806 void 2807 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg) 2808 { 2809 struct spdk_blob_data *blob = __blob_to_data(_blob); 2810 struct spdk_bs_cpl cpl; 2811 spdk_bs_sequence_t *seq; 2812 2813 assert(blob != NULL); 2814 2815 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 2816 2817 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2818 blob->state != SPDK_BLOB_STATE_SYNCING); 2819 2820 if (blob->md_ro) { 2821 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 2822 return; 2823 } 2824 2825 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2826 cb_fn(cb_arg, 0); 2827 return; 2828 } 2829 2830 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2831 cpl.u.blob_basic.cb_fn = cb_fn; 2832 cpl.u.blob_basic.cb_arg = cb_arg; 2833 2834 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2835 if (!seq) { 2836 cb_fn(cb_arg, -ENOMEM); 2837 return; 2838 } 2839 2840 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 2841 } 2842 2843 /* END spdk_blob_sync_md */ 2844 2845 /* START spdk_blob_close */ 2846 2847 static void 2848 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2849 { 2850 struct spdk_blob_data **blob = cb_arg; 2851 2852 if ((*blob)->open_ref == 0) { 2853 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2854 _spdk_blob_free((*blob)); 2855 } 2856 2857 *blob = NULL; 2858 2859 spdk_bs_sequence_finish(seq, bserrno); 2860 } 2861 2862 void spdk_blob_close(struct spdk_blob **b, spdk_blob_op_complete cb_fn, void *cb_arg) 2863 { 2864 struct spdk_bs_cpl cpl; 2865 struct spdk_blob_data *blob; 2866 spdk_bs_sequence_t *seq; 2867 2868 assert(b != NULL); 2869 blob = __blob_to_data(*b); 2870 assert(blob != NULL); 2871 2872 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 2873 2874 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2875 blob->state != SPDK_BLOB_STATE_SYNCING); 2876 2877 if (blob->open_ref == 0) { 2878 cb_fn(cb_arg, -EBADF); 2879 return; 2880 } 2881 2882 blob->open_ref--; 2883 2884 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2885 cpl.u.blob_basic.cb_fn = cb_fn; 2886 cpl.u.blob_basic.cb_arg = cb_arg; 2887 2888 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2889 if (!seq) { 2890 cb_fn(cb_arg, -ENOMEM); 2891 return; 2892 } 2893 2894 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2895 _spdk_blob_close_cpl(seq, b, 0); 2896 return; 2897 } 2898 2899 /* Sync metadata */ 2900 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2901 } 2902 2903 /* END spdk_blob_close */ 2904 2905 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2906 { 2907 return spdk_get_io_channel(&bs->io_target); 2908 } 2909 2910 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2911 { 2912 spdk_put_io_channel(channel); 2913 } 2914 2915 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2916 spdk_blob_op_complete cb_fn, void *cb_arg) 2917 { 2918 /* Flush is synchronous right now */ 2919 cb_fn(cb_arg, 0); 2920 } 2921 2922 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2923 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2924 { 2925 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2926 SPDK_BLOB_UNMAP); 2927 } 2928 2929 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2930 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2931 { 2932 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2933 SPDK_BLOB_WRITE_ZEROES); 2934 } 2935 2936 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2937 void *payload, uint64_t offset, uint64_t length, 2938 spdk_blob_op_complete cb_fn, void *cb_arg) 2939 { 2940 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2941 SPDK_BLOB_WRITE); 2942 } 2943 2944 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2945 void *payload, uint64_t offset, uint64_t length, 2946 spdk_blob_op_complete cb_fn, void *cb_arg) 2947 { 2948 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2949 SPDK_BLOB_READ); 2950 } 2951 2952 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2953 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2954 spdk_blob_op_complete cb_fn, void *cb_arg) 2955 { 2956 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2957 } 2958 2959 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2960 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2961 spdk_blob_op_complete cb_fn, void *cb_arg) 2962 { 2963 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2964 } 2965 2966 struct spdk_bs_iter_ctx { 2967 int64_t page_num; 2968 struct spdk_blob_store *bs; 2969 2970 spdk_blob_op_with_handle_complete cb_fn; 2971 void *cb_arg; 2972 }; 2973 2974 static void 2975 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 2976 { 2977 struct spdk_blob_data *blob = __blob_to_data(_blob); 2978 struct spdk_bs_iter_ctx *ctx = cb_arg; 2979 struct spdk_blob_store *bs = ctx->bs; 2980 spdk_blob_id id; 2981 2982 if (bserrno == 0) { 2983 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 2984 free(ctx); 2985 return; 2986 } 2987 2988 ctx->page_num++; 2989 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2990 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2991 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2992 free(ctx); 2993 return; 2994 } 2995 2996 id = _spdk_bs_page_to_blobid(ctx->page_num); 2997 2998 blob = _spdk_blob_lookup(bs, id); 2999 if (blob) { 3000 blob->open_ref++; 3001 ctx->cb_fn(ctx->cb_arg, _blob, 0); 3002 free(ctx); 3003 return; 3004 } 3005 3006 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 3007 } 3008 3009 void 3010 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 3011 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3012 { 3013 struct spdk_bs_iter_ctx *ctx; 3014 3015 ctx = calloc(1, sizeof(*ctx)); 3016 if (!ctx) { 3017 cb_fn(cb_arg, NULL, -ENOMEM); 3018 return; 3019 } 3020 3021 ctx->page_num = -1; 3022 ctx->bs = bs; 3023 ctx->cb_fn = cb_fn; 3024 ctx->cb_arg = cb_arg; 3025 3026 _spdk_bs_iter_cpl(ctx, NULL, -1); 3027 } 3028 3029 static void 3030 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 3031 { 3032 struct spdk_bs_iter_ctx *ctx = cb_arg; 3033 3034 _spdk_bs_iter_cpl(ctx, NULL, -1); 3035 } 3036 3037 void 3038 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 3039 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3040 { 3041 struct spdk_bs_iter_ctx *ctx; 3042 struct spdk_blob_data *blob; 3043 3044 assert(b != NULL); 3045 blob = __blob_to_data(*b); 3046 assert(blob != NULL); 3047 3048 ctx = calloc(1, sizeof(*ctx)); 3049 if (!ctx) { 3050 cb_fn(cb_arg, NULL, -ENOMEM); 3051 return; 3052 } 3053 3054 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 3055 ctx->bs = bs; 3056 ctx->cb_fn = cb_fn; 3057 ctx->cb_arg = cb_arg; 3058 3059 /* Close the existing blob */ 3060 spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx); 3061 } 3062 3063 int 3064 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value, 3065 uint16_t value_len) 3066 { 3067 struct spdk_blob_data *blob = __blob_to_data(_blob); 3068 struct spdk_xattr *xattr; 3069 3070 assert(blob != NULL); 3071 3072 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3073 blob->state != SPDK_BLOB_STATE_SYNCING); 3074 3075 if (blob->md_ro) { 3076 return -EPERM; 3077 } 3078 3079 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3080 if (!strcmp(name, xattr->name)) { 3081 free(xattr->value); 3082 xattr->value_len = value_len; 3083 xattr->value = malloc(value_len); 3084 memcpy(xattr->value, value, value_len); 3085 3086 blob->state = SPDK_BLOB_STATE_DIRTY; 3087 3088 return 0; 3089 } 3090 } 3091 3092 xattr = calloc(1, sizeof(*xattr)); 3093 if (!xattr) { 3094 return -1; 3095 } 3096 xattr->name = strdup(name); 3097 xattr->value_len = value_len; 3098 xattr->value = malloc(value_len); 3099 memcpy(xattr->value, value, value_len); 3100 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 3101 3102 blob->state = SPDK_BLOB_STATE_DIRTY; 3103 3104 return 0; 3105 } 3106 3107 int 3108 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name) 3109 { 3110 struct spdk_blob_data *blob = __blob_to_data(_blob); 3111 struct spdk_xattr *xattr; 3112 3113 assert(blob != NULL); 3114 3115 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3116 blob->state != SPDK_BLOB_STATE_SYNCING); 3117 3118 if (blob->md_ro) { 3119 return -EPERM; 3120 } 3121 3122 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3123 if (!strcmp(name, xattr->name)) { 3124 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3125 free(xattr->value); 3126 free(xattr->name); 3127 free(xattr); 3128 3129 blob->state = SPDK_BLOB_STATE_DIRTY; 3130 3131 return 0; 3132 } 3133 } 3134 3135 return -ENOENT; 3136 } 3137 3138 int 3139 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name, 3140 const void **value, size_t *value_len) 3141 { 3142 struct spdk_blob_data *blob = __blob_to_data(_blob); 3143 struct spdk_xattr *xattr; 3144 3145 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3146 if (!strcmp(name, xattr->name)) { 3147 *value = xattr->value; 3148 *value_len = xattr->value_len; 3149 return 0; 3150 } 3151 } 3152 3153 return -ENOENT; 3154 } 3155 3156 struct spdk_xattr_names { 3157 uint32_t count; 3158 const char *names[0]; 3159 }; 3160 3161 int 3162 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names) 3163 { 3164 struct spdk_blob_data *blob = __blob_to_data(_blob); 3165 struct spdk_xattr *xattr; 3166 int count = 0; 3167 3168 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3169 count++; 3170 } 3171 3172 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3173 if (*names == NULL) { 3174 return -ENOMEM; 3175 } 3176 3177 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3178 (*names)->names[(*names)->count++] = xattr->name; 3179 } 3180 3181 return 0; 3182 } 3183 3184 uint32_t 3185 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3186 { 3187 assert(names != NULL); 3188 3189 return names->count; 3190 } 3191 3192 const char * 3193 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3194 { 3195 if (index >= names->count) { 3196 return NULL; 3197 } 3198 3199 return names->names[index]; 3200 } 3201 3202 void 3203 spdk_xattr_names_free(struct spdk_xattr_names *names) 3204 { 3205 free(names); 3206 } 3207 3208 struct spdk_bs_type 3209 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3210 { 3211 return bs->bstype; 3212 } 3213 3214 void 3215 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3216 { 3217 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3218 } 3219 3220 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 3221