1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 52 static void _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno); 53 54 static inline size_t 55 divide_round_up(size_t num, size_t divisor) 56 { 57 return (num + divisor - 1) / divisor; 58 } 59 60 static void 61 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 62 { 63 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 64 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 65 assert(bs->num_free_clusters > 0); 66 67 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 68 69 spdk_bit_array_set(bs->used_clusters, cluster_num); 70 bs->num_free_clusters--; 71 } 72 73 static void 74 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 75 { 76 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 77 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 78 assert(bs->num_free_clusters < bs->total_clusters); 79 80 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 81 82 spdk_bit_array_clear(bs->used_clusters, cluster_num); 83 bs->num_free_clusters++; 84 } 85 86 static struct spdk_blob_data * 87 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 88 { 89 struct spdk_blob_data *blob; 90 91 blob = calloc(1, sizeof(*blob)); 92 if (!blob) { 93 return NULL; 94 } 95 96 blob->id = id; 97 blob->bs = bs; 98 99 blob->state = SPDK_BLOB_STATE_DIRTY; 100 blob->active.num_pages = 1; 101 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 102 if (!blob->active.pages) { 103 free(blob); 104 return NULL; 105 } 106 107 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 108 109 TAILQ_INIT(&blob->xattrs); 110 111 return blob; 112 } 113 114 static void 115 _spdk_blob_free(struct spdk_blob_data *blob) 116 { 117 struct spdk_xattr *xattr, *xattr_tmp; 118 119 assert(blob != NULL); 120 121 free(blob->active.clusters); 122 free(blob->clean.clusters); 123 free(blob->active.pages); 124 free(blob->clean.pages); 125 126 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 127 TAILQ_REMOVE(&blob->xattrs, xattr, link); 128 free(xattr->name); 129 free(xattr->value); 130 free(xattr); 131 } 132 133 free(blob); 134 } 135 136 static int 137 _spdk_blob_mark_clean(struct spdk_blob_data *blob) 138 { 139 uint64_t *clusters = NULL; 140 uint32_t *pages = NULL; 141 142 assert(blob != NULL); 143 assert(blob->state == SPDK_BLOB_STATE_LOADING || 144 blob->state == SPDK_BLOB_STATE_SYNCING); 145 146 if (blob->active.num_clusters) { 147 assert(blob->active.clusters); 148 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 149 if (!clusters) { 150 return -1; 151 } 152 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 153 } 154 155 if (blob->active.num_pages) { 156 assert(blob->active.pages); 157 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 158 if (!pages) { 159 free(clusters); 160 return -1; 161 } 162 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 163 } 164 165 free(blob->clean.clusters); 166 free(blob->clean.pages); 167 168 blob->clean.num_clusters = blob->active.num_clusters; 169 blob->clean.clusters = blob->active.clusters; 170 blob->clean.num_pages = blob->active.num_pages; 171 blob->clean.pages = blob->active.pages; 172 173 blob->active.clusters = clusters; 174 blob->active.pages = pages; 175 176 blob->state = SPDK_BLOB_STATE_CLEAN; 177 178 return 0; 179 } 180 181 static int 182 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob) 183 { 184 struct spdk_blob_md_descriptor *desc; 185 size_t cur_desc = 0; 186 void *tmp; 187 188 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 189 while (cur_desc < sizeof(page->descriptors)) { 190 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 191 if (desc->length == 0) { 192 /* If padding and length are 0, this terminates the page */ 193 break; 194 } 195 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 196 struct spdk_blob_md_descriptor_flags *desc_flags; 197 198 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 199 200 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 201 return -EINVAL; 202 } 203 204 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 205 SPDK_BLOB_INVALID_FLAGS_MASK) { 206 return -EINVAL; 207 } 208 209 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 210 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 211 blob->data_ro = true; 212 blob->md_ro = true; 213 } 214 215 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 216 SPDK_BLOB_MD_RO_FLAGS_MASK) { 217 blob->md_ro = true; 218 } 219 220 blob->invalid_flags = desc_flags->invalid_flags; 221 blob->data_ro_flags = desc_flags->data_ro_flags; 222 blob->md_ro_flags = desc_flags->md_ro_flags; 223 224 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 225 struct spdk_blob_md_descriptor_extent *desc_extent; 226 unsigned int i, j; 227 unsigned int cluster_count = blob->active.num_clusters; 228 229 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 230 231 if (desc_extent->length == 0 || 232 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) { 233 return -EINVAL; 234 } 235 236 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 237 for (j = 0; j < desc_extent->extents[i].length; j++) { 238 if (!spdk_bit_array_get(blob->bs->used_clusters, 239 desc_extent->extents[i].cluster_idx + j)) { 240 return -EINVAL; 241 } 242 cluster_count++; 243 } 244 } 245 246 if (cluster_count == 0) { 247 return -EINVAL; 248 } 249 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 250 if (tmp == NULL) { 251 return -ENOMEM; 252 } 253 blob->active.clusters = tmp; 254 blob->active.cluster_array_size = cluster_count; 255 256 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 257 for (j = 0; j < desc_extent->extents[i].length; j++) { 258 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 259 desc_extent->extents[i].cluster_idx + j); 260 } 261 } 262 263 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 264 struct spdk_blob_md_descriptor_xattr *desc_xattr; 265 struct spdk_xattr *xattr; 266 267 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 268 269 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 270 sizeof(desc_xattr->value_length) + 271 desc_xattr->name_length + desc_xattr->value_length) { 272 return -EINVAL; 273 } 274 275 xattr = calloc(1, sizeof(*xattr)); 276 if (xattr == NULL) { 277 return -ENOMEM; 278 } 279 280 xattr->name = malloc(desc_xattr->name_length + 1); 281 if (xattr->name == NULL) { 282 free(xattr); 283 return -ENOMEM; 284 } 285 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 286 xattr->name[desc_xattr->name_length] = '\0'; 287 288 xattr->value = malloc(desc_xattr->value_length); 289 if (xattr->value == NULL) { 290 free(xattr->name); 291 free(xattr); 292 return -ENOMEM; 293 } 294 xattr->value_len = desc_xattr->value_length; 295 memcpy(xattr->value, 296 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 297 desc_xattr->value_length); 298 299 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 300 } else { 301 /* Unrecognized descriptor type. Do not fail - just continue to the 302 * next descriptor. If this descriptor is associated with some feature 303 * defined in a newer version of blobstore, that version of blobstore 304 * should create and set an associated feature flag to specify if this 305 * blob can be loaded or not. 306 */ 307 } 308 309 /* Advance to the next descriptor */ 310 cur_desc += sizeof(*desc) + desc->length; 311 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 312 break; 313 } 314 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 315 } 316 317 return 0; 318 } 319 320 static int 321 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 322 struct spdk_blob_data *blob) 323 { 324 const struct spdk_blob_md_page *page; 325 uint32_t i; 326 int rc; 327 328 assert(page_count > 0); 329 assert(pages[0].sequence_num == 0); 330 assert(blob != NULL); 331 assert(blob->state == SPDK_BLOB_STATE_LOADING); 332 assert(blob->active.clusters == NULL); 333 assert(blob->state == SPDK_BLOB_STATE_LOADING); 334 335 /* The blobid provided doesn't match what's in the MD, this can 336 * happen for example if a bogus blobid is passed in through open. 337 */ 338 if (blob->id != pages[0].id) { 339 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 340 blob->id, pages[0].id); 341 return -ENOENT; 342 } 343 344 for (i = 0; i < page_count; i++) { 345 page = &pages[i]; 346 347 assert(page->id == blob->id); 348 assert(page->sequence_num == i); 349 350 rc = _spdk_blob_parse_page(page, blob); 351 if (rc != 0) { 352 return rc; 353 } 354 } 355 356 return 0; 357 } 358 359 static int 360 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob, 361 struct spdk_blob_md_page **pages, 362 uint32_t *page_count, 363 struct spdk_blob_md_page **last_page) 364 { 365 struct spdk_blob_md_page *page; 366 367 assert(pages != NULL); 368 assert(page_count != NULL); 369 370 if (*page_count == 0) { 371 assert(*pages == NULL); 372 *page_count = 1; 373 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 374 SPDK_BS_PAGE_SIZE, 375 NULL); 376 } else { 377 assert(*pages != NULL); 378 (*page_count)++; 379 *pages = spdk_dma_realloc(*pages, 380 SPDK_BS_PAGE_SIZE * (*page_count), 381 SPDK_BS_PAGE_SIZE, 382 NULL); 383 } 384 385 if (*pages == NULL) { 386 *page_count = 0; 387 *last_page = NULL; 388 return -ENOMEM; 389 } 390 391 page = &(*pages)[*page_count - 1]; 392 memset(page, 0, sizeof(*page)); 393 page->id = blob->id; 394 page->sequence_num = *page_count - 1; 395 page->next = SPDK_INVALID_MD_PAGE; 396 *last_page = page; 397 398 return 0; 399 } 400 401 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 402 * Update required_sz on both success and failure. 403 * 404 */ 405 static int 406 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 407 uint8_t *buf, size_t buf_sz, 408 size_t *required_sz) 409 { 410 struct spdk_blob_md_descriptor_xattr *desc; 411 412 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 413 strlen(xattr->name) + 414 xattr->value_len; 415 416 if (buf_sz < *required_sz) { 417 return -1; 418 } 419 420 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 421 422 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 423 desc->length = sizeof(desc->name_length) + 424 sizeof(desc->value_length) + 425 strlen(xattr->name) + 426 xattr->value_len; 427 desc->name_length = strlen(xattr->name); 428 desc->value_length = xattr->value_len; 429 430 memcpy(desc->name, xattr->name, desc->name_length); 431 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 432 xattr->value, 433 desc->value_length); 434 435 return 0; 436 } 437 438 static void 439 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob, 440 uint64_t start_cluster, uint64_t *next_cluster, 441 uint8_t *buf, size_t buf_sz) 442 { 443 struct spdk_blob_md_descriptor_extent *desc; 444 size_t cur_sz; 445 uint64_t i, extent_idx; 446 uint32_t lba, lba_per_cluster, lba_count; 447 448 /* The buffer must have room for at least one extent */ 449 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 450 if (buf_sz < cur_sz) { 451 *next_cluster = start_cluster; 452 return; 453 } 454 455 desc = (struct spdk_blob_md_descriptor_extent *)buf; 456 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 457 458 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 459 460 lba = blob->active.clusters[start_cluster]; 461 lba_count = lba_per_cluster; 462 extent_idx = 0; 463 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 464 if ((lba + lba_count) == blob->active.clusters[i]) { 465 lba_count += lba_per_cluster; 466 continue; 467 } 468 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 469 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 470 extent_idx++; 471 472 cur_sz += sizeof(desc->extents[extent_idx]); 473 474 if (buf_sz < cur_sz) { 475 /* If we ran out of buffer space, return */ 476 desc->length = sizeof(desc->extents[0]) * extent_idx; 477 *next_cluster = i; 478 return; 479 } 480 481 lba = blob->active.clusters[i]; 482 lba_count = lba_per_cluster; 483 } 484 485 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 486 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 487 extent_idx++; 488 489 desc->length = sizeof(desc->extents[0]) * extent_idx; 490 *next_cluster = blob->active.num_clusters; 491 492 return; 493 } 494 495 static void 496 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob, 497 uint8_t *buf, size_t *buf_sz) 498 { 499 struct spdk_blob_md_descriptor_flags *desc; 500 501 /* 502 * Flags get serialized first, so we should always have room for the flags 503 * descriptor. 504 */ 505 assert(*buf_sz >= sizeof(*desc)); 506 507 desc = (struct spdk_blob_md_descriptor_flags *)buf; 508 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 509 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 510 desc->invalid_flags = blob->invalid_flags; 511 desc->data_ro_flags = blob->data_ro_flags; 512 desc->md_ro_flags = blob->md_ro_flags; 513 514 *buf_sz -= sizeof(*desc); 515 } 516 517 static int 518 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages, 519 uint32_t *page_count) 520 { 521 struct spdk_blob_md_page *cur_page; 522 const struct spdk_xattr *xattr; 523 int rc; 524 uint8_t *buf; 525 size_t remaining_sz; 526 uint64_t last_cluster; 527 528 assert(pages != NULL); 529 assert(page_count != NULL); 530 assert(blob != NULL); 531 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 532 533 *pages = NULL; 534 *page_count = 0; 535 536 /* A blob always has at least 1 page, even if it has no descriptors */ 537 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 538 if (rc < 0) { 539 return rc; 540 } 541 542 buf = (uint8_t *)cur_page->descriptors; 543 remaining_sz = sizeof(cur_page->descriptors); 544 545 /* Serialize flags */ 546 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 547 548 /* Serialize xattrs */ 549 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 550 size_t required_sz = 0; 551 rc = _spdk_blob_serialize_xattr(xattr, 552 buf, remaining_sz, 553 &required_sz); 554 if (rc < 0) { 555 /* Need to add a new page to the chain */ 556 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 557 &cur_page); 558 if (rc < 0) { 559 spdk_dma_free(*pages); 560 *pages = NULL; 561 *page_count = 0; 562 return rc; 563 } 564 565 buf = (uint8_t *)cur_page->descriptors; 566 remaining_sz = sizeof(cur_page->descriptors); 567 568 /* Try again */ 569 required_sz = 0; 570 rc = _spdk_blob_serialize_xattr(xattr, 571 buf, remaining_sz, 572 &required_sz); 573 574 if (rc < 0) { 575 spdk_dma_free(*pages); 576 *pages = NULL; 577 *page_count = 0; 578 return -1; 579 } 580 } 581 582 remaining_sz -= required_sz; 583 buf += required_sz; 584 } 585 586 /* Serialize extents */ 587 last_cluster = 0; 588 while (last_cluster < blob->active.num_clusters) { 589 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 590 buf, remaining_sz); 591 592 if (last_cluster == blob->active.num_clusters) { 593 break; 594 } 595 596 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 597 &cur_page); 598 if (rc < 0) { 599 return rc; 600 } 601 602 buf = (uint8_t *)cur_page->descriptors; 603 remaining_sz = sizeof(cur_page->descriptors); 604 } 605 606 return 0; 607 } 608 609 struct spdk_blob_load_ctx { 610 struct spdk_blob_data *blob; 611 612 struct spdk_blob_md_page *pages; 613 uint32_t num_pages; 614 615 spdk_bs_sequence_cpl cb_fn; 616 void *cb_arg; 617 }; 618 619 static uint32_t 620 _spdk_blob_md_page_calc_crc(void *page) 621 { 622 uint32_t crc; 623 624 crc = BLOB_CRC32C_INITIAL; 625 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 626 crc ^= BLOB_CRC32C_INITIAL; 627 628 return crc; 629 630 } 631 632 static void 633 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 634 { 635 struct spdk_blob_load_ctx *ctx = cb_arg; 636 struct spdk_blob_data *blob = ctx->blob; 637 struct spdk_blob_md_page *page; 638 int rc; 639 uint32_t crc; 640 641 page = &ctx->pages[ctx->num_pages - 1]; 642 crc = _spdk_blob_md_page_calc_crc(page); 643 if (crc != page->crc) { 644 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 645 _spdk_blob_free(blob); 646 ctx->cb_fn(seq, NULL, -EINVAL); 647 spdk_dma_free(ctx->pages); 648 free(ctx); 649 return; 650 } 651 652 if (page->next != SPDK_INVALID_MD_PAGE) { 653 uint32_t next_page = page->next; 654 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 655 656 657 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 658 659 /* Read the next page */ 660 ctx->num_pages++; 661 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 662 sizeof(*page), NULL); 663 if (ctx->pages == NULL) { 664 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 665 free(ctx); 666 return; 667 } 668 669 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 670 next_lba, 671 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 672 _spdk_blob_load_cpl, ctx); 673 return; 674 } 675 676 /* Parse the pages */ 677 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 678 if (rc) { 679 _spdk_blob_free(blob); 680 ctx->cb_fn(seq, NULL, rc); 681 spdk_dma_free(ctx->pages); 682 free(ctx); 683 return; 684 } 685 686 _spdk_blob_mark_clean(blob); 687 688 ctx->cb_fn(seq, ctx->cb_arg, rc); 689 690 /* Free the memory */ 691 spdk_dma_free(ctx->pages); 692 free(ctx); 693 } 694 695 /* Load a blob from disk given a blobid */ 696 static void 697 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 698 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 699 { 700 struct spdk_blob_load_ctx *ctx; 701 struct spdk_blob_store *bs; 702 uint32_t page_num; 703 uint64_t lba; 704 705 assert(blob != NULL); 706 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 707 blob->state == SPDK_BLOB_STATE_DIRTY); 708 709 bs = blob->bs; 710 711 ctx = calloc(1, sizeof(*ctx)); 712 if (!ctx) { 713 cb_fn(seq, cb_arg, -ENOMEM); 714 return; 715 } 716 717 ctx->blob = blob; 718 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 719 SPDK_BS_PAGE_SIZE, NULL); 720 if (!ctx->pages) { 721 free(ctx); 722 cb_fn(seq, cb_arg, -ENOMEM); 723 return; 724 } 725 ctx->num_pages = 1; 726 ctx->cb_fn = cb_fn; 727 ctx->cb_arg = cb_arg; 728 729 page_num = _spdk_bs_blobid_to_page(blob->id); 730 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 731 732 blob->state = SPDK_BLOB_STATE_LOADING; 733 734 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 735 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 736 _spdk_blob_load_cpl, ctx); 737 } 738 739 struct spdk_blob_persist_ctx { 740 struct spdk_blob_data *blob; 741 742 struct spdk_blob_md_page *pages; 743 744 uint64_t idx; 745 746 spdk_bs_sequence_cpl cb_fn; 747 void *cb_arg; 748 }; 749 750 static void 751 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 752 { 753 struct spdk_blob_persist_ctx *ctx = cb_arg; 754 struct spdk_blob_data *blob = ctx->blob; 755 756 if (bserrno == 0) { 757 _spdk_blob_mark_clean(blob); 758 } 759 760 /* Call user callback */ 761 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 762 763 /* Free the memory */ 764 spdk_dma_free(ctx->pages); 765 free(ctx); 766 } 767 768 static void 769 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 770 { 771 struct spdk_blob_persist_ctx *ctx = cb_arg; 772 struct spdk_blob_data *blob = ctx->blob; 773 struct spdk_blob_store *bs = blob->bs; 774 void *tmp; 775 size_t i; 776 777 /* Release all clusters that were truncated */ 778 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 779 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 780 781 _spdk_bs_release_cluster(bs, cluster_num); 782 } 783 784 if (blob->active.num_clusters == 0) { 785 free(blob->active.clusters); 786 blob->active.clusters = NULL; 787 blob->active.cluster_array_size = 0; 788 } else { 789 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 790 assert(tmp != NULL); 791 blob->active.clusters = tmp; 792 blob->active.cluster_array_size = blob->active.num_clusters; 793 } 794 795 _spdk_blob_persist_complete(seq, ctx, bserrno); 796 } 797 798 static void 799 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 800 { 801 struct spdk_blob_persist_ctx *ctx = cb_arg; 802 struct spdk_blob_data *blob = ctx->blob; 803 struct spdk_blob_store *bs = blob->bs; 804 spdk_bs_batch_t *batch; 805 size_t i; 806 uint64_t lba; 807 uint32_t lba_count; 808 809 /* Clusters don't move around in blobs. The list shrinks or grows 810 * at the end, but no changes ever occur in the middle of the list. 811 */ 812 813 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 814 815 /* Unmap all clusters that were truncated */ 816 lba = 0; 817 lba_count = 0; 818 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 819 uint64_t next_lba = blob->active.clusters[i]; 820 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 821 822 if ((lba + lba_count) == next_lba) { 823 /* This cluster is contiguous with the previous one. */ 824 lba_count += next_lba_count; 825 continue; 826 } 827 828 /* This cluster is not contiguous with the previous one. */ 829 830 /* If a run of LBAs previously existing, send them 831 * as an unmap. 832 */ 833 if (lba_count > 0) { 834 spdk_bs_batch_unmap(batch, lba, lba_count); 835 } 836 837 /* Start building the next batch */ 838 lba = next_lba; 839 lba_count = next_lba_count; 840 } 841 842 /* If we ended with a contiguous set of LBAs, send the unmap now */ 843 if (lba_count > 0) { 844 spdk_bs_batch_unmap(batch, lba, lba_count); 845 } 846 847 spdk_bs_batch_close(batch); 848 } 849 850 static void 851 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 852 { 853 struct spdk_blob_persist_ctx *ctx = cb_arg; 854 struct spdk_blob_data *blob = ctx->blob; 855 struct spdk_blob_store *bs = blob->bs; 856 size_t i; 857 858 /* This loop starts at 1 because the first page is special and handled 859 * below. The pages (except the first) are never written in place, 860 * so any pages in the clean list must be zeroed. 861 */ 862 for (i = 1; i < blob->clean.num_pages; i++) { 863 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 864 } 865 866 if (blob->active.num_pages == 0) { 867 uint32_t page_num; 868 869 page_num = _spdk_bs_blobid_to_page(blob->id); 870 spdk_bit_array_clear(bs->used_md_pages, page_num); 871 } 872 873 /* Move on to unmapping clusters */ 874 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 875 } 876 877 static void 878 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 879 { 880 struct spdk_blob_persist_ctx *ctx = cb_arg; 881 struct spdk_blob_data *blob = ctx->blob; 882 struct spdk_blob_store *bs = blob->bs; 883 uint64_t lba; 884 uint32_t lba_count; 885 spdk_bs_batch_t *batch; 886 size_t i; 887 888 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 889 890 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 891 892 /* This loop starts at 1 because the first page is special and handled 893 * below. The pages (except the first) are never written in place, 894 * so any pages in the clean list must be zeroed. 895 */ 896 for (i = 1; i < blob->clean.num_pages; i++) { 897 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 898 899 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 900 } 901 902 /* The first page will only be zeroed if this is a delete. */ 903 if (blob->active.num_pages == 0) { 904 uint32_t page_num; 905 906 /* The first page in the metadata goes where the blobid indicates */ 907 page_num = _spdk_bs_blobid_to_page(blob->id); 908 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 909 910 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 911 } 912 913 spdk_bs_batch_close(batch); 914 } 915 916 static void 917 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 918 { 919 struct spdk_blob_persist_ctx *ctx = cb_arg; 920 struct spdk_blob_data *blob = ctx->blob; 921 struct spdk_blob_store *bs = blob->bs; 922 uint64_t lba; 923 uint32_t lba_count; 924 struct spdk_blob_md_page *page; 925 926 if (blob->active.num_pages == 0) { 927 /* Move on to the next step */ 928 _spdk_blob_persist_zero_pages(seq, ctx, 0); 929 return; 930 } 931 932 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 933 934 page = &ctx->pages[0]; 935 /* The first page in the metadata goes where the blobid indicates */ 936 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 937 938 spdk_bs_sequence_write(seq, page, lba, lba_count, 939 _spdk_blob_persist_zero_pages, ctx); 940 } 941 942 static void 943 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 944 { 945 struct spdk_blob_persist_ctx *ctx = cb_arg; 946 struct spdk_blob_data *blob = ctx->blob; 947 struct spdk_blob_store *bs = blob->bs; 948 uint64_t lba; 949 uint32_t lba_count; 950 struct spdk_blob_md_page *page; 951 spdk_bs_batch_t *batch; 952 size_t i; 953 954 /* Clusters don't move around in blobs. The list shrinks or grows 955 * at the end, but no changes ever occur in the middle of the list. 956 */ 957 958 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 959 960 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 961 962 /* This starts at 1. The root page is not written until 963 * all of the others are finished 964 */ 965 for (i = 1; i < blob->active.num_pages; i++) { 966 page = &ctx->pages[i]; 967 assert(page->sequence_num == i); 968 969 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 970 971 spdk_bs_batch_write(batch, page, lba, lba_count); 972 } 973 974 spdk_bs_batch_close(batch); 975 } 976 977 static int 978 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz) 979 { 980 uint64_t i; 981 uint64_t *tmp; 982 uint64_t lfc; /* lowest free cluster */ 983 struct spdk_blob_store *bs; 984 985 bs = blob->bs; 986 987 assert(blob->state != SPDK_BLOB_STATE_LOADING && 988 blob->state != SPDK_BLOB_STATE_SYNCING); 989 990 if (blob->active.num_clusters == sz) { 991 return 0; 992 } 993 994 if (blob->active.num_clusters < blob->active.cluster_array_size) { 995 /* If this blob was resized to be larger, then smaller, then 996 * larger without syncing, then the cluster array already 997 * contains spare assigned clusters we can use. 998 */ 999 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 1000 sz); 1001 } 1002 1003 blob->state = SPDK_BLOB_STATE_DIRTY; 1004 1005 /* Do two passes - one to verify that we can obtain enough clusters 1006 * and another to actually claim them. 1007 */ 1008 1009 lfc = 0; 1010 for (i = blob->active.num_clusters; i < sz; i++) { 1011 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1012 if (lfc >= bs->total_clusters) { 1013 /* No more free clusters. Cannot satisfy the request */ 1014 assert(false); 1015 return -1; 1016 } 1017 lfc++; 1018 } 1019 1020 if (sz > blob->active.num_clusters) { 1021 /* Expand the cluster array if necessary. 1022 * We only shrink the array when persisting. 1023 */ 1024 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 1025 if (sz > 0 && tmp == NULL) { 1026 assert(false); 1027 return -1; 1028 } 1029 blob->active.clusters = tmp; 1030 blob->active.cluster_array_size = sz; 1031 } 1032 1033 lfc = 0; 1034 for (i = blob->active.num_clusters; i < sz; i++) { 1035 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1036 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 1037 _spdk_bs_claim_cluster(bs, lfc); 1038 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 1039 lfc++; 1040 } 1041 1042 blob->active.num_clusters = sz; 1043 1044 return 0; 1045 } 1046 1047 /* Write a blob to disk */ 1048 static void 1049 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 1050 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1051 { 1052 struct spdk_blob_persist_ctx *ctx; 1053 int rc; 1054 uint64_t i; 1055 uint32_t page_num; 1056 struct spdk_blob_store *bs; 1057 1058 assert(blob != NULL); 1059 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 1060 blob->state == SPDK_BLOB_STATE_DIRTY); 1061 1062 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1063 cb_fn(seq, cb_arg, 0); 1064 return; 1065 } 1066 1067 bs = blob->bs; 1068 1069 ctx = calloc(1, sizeof(*ctx)); 1070 if (!ctx) { 1071 cb_fn(seq, cb_arg, -ENOMEM); 1072 return; 1073 } 1074 ctx->blob = blob; 1075 ctx->cb_fn = cb_fn; 1076 ctx->cb_arg = cb_arg; 1077 1078 blob->state = SPDK_BLOB_STATE_SYNCING; 1079 1080 if (blob->active.num_pages == 0) { 1081 /* This is the signal that the blob should be deleted. 1082 * Immediately jump to the clean up routine. */ 1083 assert(blob->clean.num_pages > 0); 1084 ctx->idx = blob->clean.num_pages - 1; 1085 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1086 return; 1087 1088 } 1089 1090 /* Generate the new metadata */ 1091 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1092 if (rc < 0) { 1093 free(ctx); 1094 cb_fn(seq, cb_arg, rc); 1095 return; 1096 } 1097 1098 assert(blob->active.num_pages >= 1); 1099 1100 /* Resize the cache of page indices */ 1101 blob->active.pages = realloc(blob->active.pages, 1102 blob->active.num_pages * sizeof(*blob->active.pages)); 1103 if (!blob->active.pages) { 1104 free(ctx); 1105 cb_fn(seq, cb_arg, -ENOMEM); 1106 return; 1107 } 1108 1109 /* Assign this metadata to pages. This requires two passes - 1110 * one to verify that there are enough pages and a second 1111 * to actually claim them. */ 1112 page_num = 0; 1113 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1114 for (i = 1; i < blob->active.num_pages; i++) { 1115 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1116 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1117 spdk_dma_free(ctx->pages); 1118 free(ctx); 1119 blob->state = SPDK_BLOB_STATE_DIRTY; 1120 cb_fn(seq, cb_arg, -ENOMEM); 1121 return; 1122 } 1123 page_num++; 1124 } 1125 1126 page_num = 0; 1127 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1128 for (i = 1; i < blob->active.num_pages; i++) { 1129 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1130 ctx->pages[i - 1].next = page_num; 1131 /* Now that previous metadata page is complete, calculate the crc for it. */ 1132 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1133 blob->active.pages[i] = page_num; 1134 spdk_bit_array_set(bs->used_md_pages, page_num); 1135 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1136 page_num++; 1137 } 1138 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1139 /* Start writing the metadata from last page to first */ 1140 ctx->idx = blob->active.num_pages - 1; 1141 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1142 } 1143 1144 static void 1145 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1146 void *payload, uint64_t offset, uint64_t length, 1147 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1148 { 1149 struct spdk_blob_data *blob = __blob_to_data(_blob); 1150 spdk_bs_batch_t *batch; 1151 struct spdk_bs_cpl cpl; 1152 uint64_t lba; 1153 uint32_t lba_count; 1154 uint8_t *buf; 1155 uint64_t page; 1156 1157 assert(blob != NULL); 1158 1159 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1160 cb_fn(cb_arg, -EPERM); 1161 return; 1162 } 1163 1164 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1165 cb_fn(cb_arg, -EINVAL); 1166 return; 1167 } 1168 1169 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1170 cpl.u.blob_basic.cb_fn = cb_fn; 1171 cpl.u.blob_basic.cb_arg = cb_arg; 1172 1173 batch = spdk_bs_batch_open(_channel, &cpl); 1174 if (!batch) { 1175 cb_fn(cb_arg, -ENOMEM); 1176 return; 1177 } 1178 1179 length = _spdk_bs_page_to_lba(blob->bs, length); 1180 page = offset; 1181 buf = payload; 1182 while (length > 0) { 1183 lba = _spdk_bs_blob_page_to_lba(blob, page); 1184 lba_count = spdk_min(length, 1185 _spdk_bs_page_to_lba(blob->bs, 1186 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1187 1188 switch (op_type) { 1189 case SPDK_BLOB_READ: 1190 spdk_bs_batch_read(batch, buf, lba, lba_count); 1191 break; 1192 case SPDK_BLOB_WRITE: 1193 spdk_bs_batch_write(batch, buf, lba, lba_count); 1194 break; 1195 case SPDK_BLOB_UNMAP: 1196 spdk_bs_batch_unmap(batch, lba, lba_count); 1197 break; 1198 case SPDK_BLOB_WRITE_ZEROES: 1199 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1200 break; 1201 } 1202 1203 length -= lba_count; 1204 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1205 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1206 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1207 } 1208 } 1209 1210 spdk_bs_batch_close(batch); 1211 } 1212 1213 struct rw_iov_ctx { 1214 struct spdk_blob_data *blob; 1215 bool read; 1216 int iovcnt; 1217 struct iovec *orig_iov; 1218 uint64_t page_offset; 1219 uint64_t pages_remaining; 1220 uint64_t pages_done; 1221 struct iovec iov[0]; 1222 }; 1223 1224 static void 1225 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1226 { 1227 assert(cb_arg == NULL); 1228 spdk_bs_sequence_finish(seq, bserrno); 1229 } 1230 1231 static void 1232 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1233 { 1234 struct rw_iov_ctx *ctx = cb_arg; 1235 struct iovec *iov, *orig_iov; 1236 int iovcnt; 1237 size_t orig_iovoff; 1238 uint64_t lba; 1239 uint64_t page_count, pages_to_boundary; 1240 uint32_t lba_count; 1241 uint64_t byte_count; 1242 1243 if (bserrno != 0 || ctx->pages_remaining == 0) { 1244 free(ctx); 1245 spdk_bs_sequence_finish(seq, bserrno); 1246 return; 1247 } 1248 1249 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1250 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1251 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1252 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1253 1254 /* 1255 * Get index and offset into the original iov array for our current position in the I/O sequence. 1256 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1257 * point to the current position in the I/O sequence. 1258 */ 1259 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1260 orig_iov = &ctx->orig_iov[0]; 1261 orig_iovoff = 0; 1262 while (byte_count > 0) { 1263 if (byte_count >= orig_iov->iov_len) { 1264 byte_count -= orig_iov->iov_len; 1265 orig_iov++; 1266 } else { 1267 orig_iovoff = byte_count; 1268 byte_count = 0; 1269 } 1270 } 1271 1272 /* 1273 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1274 * bytes of this next I/O remain to be accounted for in the new iov array. 1275 */ 1276 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1277 iov = &ctx->iov[0]; 1278 iovcnt = 0; 1279 while (byte_count > 0) { 1280 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1281 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1282 byte_count -= iov->iov_len; 1283 orig_iovoff = 0; 1284 orig_iov++; 1285 iov++; 1286 iovcnt++; 1287 } 1288 1289 ctx->page_offset += page_count; 1290 ctx->pages_done += page_count; 1291 ctx->pages_remaining -= page_count; 1292 iov = &ctx->iov[0]; 1293 1294 if (ctx->read) { 1295 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1296 } else { 1297 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1298 } 1299 } 1300 1301 static void 1302 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1303 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1304 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1305 { 1306 struct spdk_blob_data *blob = __blob_to_data(_blob); 1307 spdk_bs_sequence_t *seq; 1308 struct spdk_bs_cpl cpl; 1309 1310 assert(blob != NULL); 1311 1312 if (!read && blob->data_ro) { 1313 cb_fn(cb_arg, -EPERM); 1314 return; 1315 } 1316 1317 if (length == 0) { 1318 cb_fn(cb_arg, 0); 1319 return; 1320 } 1321 1322 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1323 cb_fn(cb_arg, -EINVAL); 1324 return; 1325 } 1326 1327 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1328 cpl.u.blob_basic.cb_fn = cb_fn; 1329 cpl.u.blob_basic.cb_arg = cb_arg; 1330 1331 /* 1332 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1333 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1334 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1335 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1336 * to allocate a separate iov array and split the I/O such that none of the resulting 1337 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1338 * but since this case happens very infrequently, any performance impact will be negligible. 1339 * 1340 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1341 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1342 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1343 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1344 */ 1345 seq = spdk_bs_sequence_start(_channel, &cpl); 1346 if (!seq) { 1347 cb_fn(cb_arg, -ENOMEM); 1348 return; 1349 } 1350 1351 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1352 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1353 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1354 1355 if (read) { 1356 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1357 } else { 1358 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1359 } 1360 } else { 1361 struct rw_iov_ctx *ctx; 1362 1363 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1364 if (ctx == NULL) { 1365 spdk_bs_sequence_finish(seq, -ENOMEM); 1366 return; 1367 } 1368 1369 ctx->blob = blob; 1370 ctx->read = read; 1371 ctx->orig_iov = iov; 1372 ctx->iovcnt = iovcnt; 1373 ctx->page_offset = offset; 1374 ctx->pages_remaining = length; 1375 ctx->pages_done = 0; 1376 1377 _spdk_rw_iov_split_next(seq, ctx, 0); 1378 } 1379 } 1380 1381 static struct spdk_blob_data * 1382 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1383 { 1384 struct spdk_blob_data *blob; 1385 1386 TAILQ_FOREACH(blob, &bs->blobs, link) { 1387 if (blob->id == blobid) { 1388 return blob; 1389 } 1390 } 1391 1392 return NULL; 1393 } 1394 1395 static int 1396 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 1397 { 1398 struct spdk_blob_store *bs = io_device; 1399 struct spdk_bs_channel *channel = ctx_buf; 1400 struct spdk_bs_dev *dev; 1401 uint32_t max_ops = bs->max_channel_ops; 1402 uint32_t i; 1403 1404 dev = bs->dev; 1405 1406 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1407 if (!channel->req_mem) { 1408 return -1; 1409 } 1410 1411 TAILQ_INIT(&channel->reqs); 1412 1413 for (i = 0; i < max_ops; i++) { 1414 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1415 } 1416 1417 channel->bs = bs; 1418 channel->dev = dev; 1419 channel->dev_channel = dev->create_channel(dev); 1420 1421 if (!channel->dev_channel) { 1422 SPDK_ERRLOG("Failed to create device channel.\n"); 1423 free(channel->req_mem); 1424 return -1; 1425 } 1426 1427 return 0; 1428 } 1429 1430 static void 1431 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1432 { 1433 struct spdk_bs_channel *channel = ctx_buf; 1434 1435 free(channel->req_mem); 1436 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1437 } 1438 1439 static void 1440 _spdk_bs_dev_destroy(void *io_device) 1441 { 1442 struct spdk_blob_store *bs = io_device; 1443 struct spdk_blob_data *blob, *blob_tmp; 1444 1445 bs->dev->destroy(bs->dev); 1446 1447 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1448 TAILQ_REMOVE(&bs->blobs, blob, link); 1449 _spdk_blob_free(blob); 1450 } 1451 1452 spdk_bit_array_free(&bs->used_md_pages); 1453 spdk_bit_array_free(&bs->used_clusters); 1454 /* 1455 * If this function is called for any reason except a successful unload, 1456 * the unload_cpl type will be NONE and this will be a nop. 1457 */ 1458 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1459 1460 free(bs); 1461 } 1462 1463 static void 1464 _spdk_bs_free(struct spdk_blob_store *bs) 1465 { 1466 spdk_bs_unregister_md_thread(bs); 1467 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 1468 } 1469 1470 void 1471 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1472 { 1473 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1474 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1475 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1476 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1477 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1478 } 1479 1480 static int 1481 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1482 { 1483 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1484 opts->max_channel_ops == 0) { 1485 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1486 return -1; 1487 } 1488 1489 return 0; 1490 } 1491 1492 static struct spdk_blob_store * 1493 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1494 { 1495 struct spdk_blob_store *bs; 1496 uint64_t dev_size; 1497 int rc; 1498 1499 dev_size = dev->blocklen * dev->blockcnt; 1500 if (dev_size < opts->cluster_sz) { 1501 /* Device size cannot be smaller than cluster size of blobstore */ 1502 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1503 opts->cluster_sz); 1504 return NULL; 1505 } 1506 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1507 /* Cluster size cannot be smaller than page size */ 1508 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1509 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1510 return NULL; 1511 } 1512 bs = calloc(1, sizeof(struct spdk_blob_store)); 1513 if (!bs) { 1514 return NULL; 1515 } 1516 1517 TAILQ_INIT(&bs->blobs); 1518 bs->dev = dev; 1519 1520 /* 1521 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1522 * even multiple of the cluster size. 1523 */ 1524 bs->cluster_sz = opts->cluster_sz; 1525 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1526 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1527 bs->num_free_clusters = bs->total_clusters; 1528 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1529 if (bs->used_clusters == NULL) { 1530 free(bs); 1531 return NULL; 1532 } 1533 1534 bs->max_channel_ops = opts->max_channel_ops; 1535 bs->super_blob = SPDK_BLOBID_INVALID; 1536 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1537 1538 /* The metadata is assumed to be at least 1 page */ 1539 bs->used_md_pages = spdk_bit_array_create(1); 1540 1541 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 1542 sizeof(struct spdk_bs_channel)); 1543 rc = spdk_bs_register_md_thread(bs); 1544 if (rc == -1) { 1545 spdk_io_device_unregister(bs, NULL); 1546 spdk_bit_array_free(&bs->used_md_pages); 1547 spdk_bit_array_free(&bs->used_clusters); 1548 free(bs); 1549 return NULL; 1550 } 1551 1552 return bs; 1553 } 1554 1555 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1556 1557 struct spdk_bs_load_ctx { 1558 struct spdk_blob_store *bs; 1559 struct spdk_bs_super_block *super; 1560 1561 struct spdk_bs_md_mask *mask; 1562 bool in_page_chain; 1563 uint32_t page_index; 1564 uint32_t cur_page; 1565 struct spdk_blob_md_page *page; 1566 }; 1567 1568 static void 1569 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1570 { 1571 uint32_t i = 0; 1572 1573 while (true) { 1574 i = spdk_bit_array_find_first_set(array, i); 1575 if (i >= mask->length) { 1576 break; 1577 } 1578 mask->mask[i / 8] |= 1U << (i % 8); 1579 i++; 1580 } 1581 } 1582 1583 static void 1584 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1585 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1586 { 1587 /* Update the values in the super block */ 1588 super->super_blob = bs->super_blob; 1589 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1590 super->crc = _spdk_blob_md_page_calc_crc(super); 1591 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1592 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1593 cb_fn, cb_arg); 1594 } 1595 1596 static void 1597 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1598 { 1599 struct spdk_bs_load_ctx *ctx = arg; 1600 uint64_t mask_size, lba, lba_count; 1601 1602 /* Write out the used clusters mask */ 1603 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1604 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1605 if (!ctx->mask) { 1606 spdk_dma_free(ctx->super); 1607 free(ctx); 1608 spdk_bs_sequence_finish(seq, -ENOMEM); 1609 return; 1610 } 1611 1612 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1613 ctx->mask->length = ctx->bs->total_clusters; 1614 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1615 1616 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1617 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1618 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1619 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1620 } 1621 1622 static void 1623 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1624 { 1625 struct spdk_bs_load_ctx *ctx = arg; 1626 uint64_t mask_size, lba, lba_count; 1627 1628 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1629 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1630 if (!ctx->mask) { 1631 spdk_dma_free(ctx->super); 1632 free(ctx); 1633 spdk_bs_sequence_finish(seq, -ENOMEM); 1634 return; 1635 } 1636 1637 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1638 ctx->mask->length = ctx->super->md_len; 1639 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1640 1641 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1642 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1643 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1644 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1645 } 1646 1647 static void 1648 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1649 { 1650 struct spdk_bs_load_ctx *ctx = cb_arg; 1651 uint32_t i, j; 1652 int rc; 1653 1654 /* The type must be correct */ 1655 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1656 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1657 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1658 struct spdk_blob_md_page) * 8)); 1659 /* The length of the mask must be exactly equal to the total number of clusters */ 1660 assert(ctx->mask->length == ctx->bs->total_clusters); 1661 1662 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1663 if (rc < 0) { 1664 spdk_dma_free(ctx->super); 1665 spdk_dma_free(ctx->mask); 1666 _spdk_bs_free(ctx->bs); 1667 free(ctx); 1668 spdk_bs_sequence_finish(seq, -ENOMEM); 1669 return; 1670 } 1671 1672 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1673 for (i = 0; i < ctx->mask->length / 8; i++) { 1674 uint8_t segment = ctx->mask->mask[i]; 1675 for (j = 0; segment && (j < 8); j++) { 1676 if (segment & 1U) { 1677 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1678 assert(ctx->bs->num_free_clusters > 0); 1679 ctx->bs->num_free_clusters--; 1680 } 1681 segment >>= 1U; 1682 } 1683 } 1684 1685 spdk_dma_free(ctx->super); 1686 spdk_dma_free(ctx->mask); 1687 free(ctx); 1688 1689 spdk_bs_sequence_finish(seq, bserrno); 1690 } 1691 1692 static void 1693 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1694 { 1695 struct spdk_bs_load_ctx *ctx = cb_arg; 1696 uint64_t lba, lba_count, mask_size; 1697 uint32_t i, j; 1698 int rc; 1699 1700 /* The type must be correct */ 1701 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1702 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1703 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1704 8)); 1705 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1706 assert(ctx->mask->length == ctx->super->md_len); 1707 1708 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1709 if (rc < 0) { 1710 spdk_dma_free(ctx->super); 1711 spdk_dma_free(ctx->mask); 1712 _spdk_bs_free(ctx->bs); 1713 free(ctx); 1714 spdk_bs_sequence_finish(seq, -ENOMEM); 1715 return; 1716 } 1717 1718 for (i = 0; i < ctx->mask->length / 8; i++) { 1719 uint8_t segment = ctx->mask->mask[i]; 1720 for (j = 0; segment && (j < 8); j++) { 1721 if (segment & 1U) { 1722 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1723 } 1724 segment >>= 1U; 1725 } 1726 } 1727 spdk_dma_free(ctx->mask); 1728 1729 /* Read the used clusters mask */ 1730 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1731 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1732 if (!ctx->mask) { 1733 spdk_dma_free(ctx->super); 1734 _spdk_bs_free(ctx->bs); 1735 free(ctx); 1736 spdk_bs_sequence_finish(seq, -ENOMEM); 1737 return; 1738 } 1739 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1740 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1741 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1742 _spdk_bs_load_used_clusters_cpl, ctx); 1743 } 1744 1745 static void 1746 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1747 { 1748 struct spdk_bs_load_ctx *ctx = cb_arg; 1749 uint64_t lba, lba_count, mask_size; 1750 1751 /* Read the used pages mask */ 1752 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1753 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1754 if (!ctx->mask) { 1755 spdk_dma_free(ctx->super); 1756 _spdk_bs_free(ctx->bs); 1757 free(ctx); 1758 spdk_bs_sequence_finish(seq, -ENOMEM); 1759 return; 1760 } 1761 1762 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1763 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1764 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1765 _spdk_bs_load_used_pages_cpl, ctx); 1766 } 1767 1768 static int 1769 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1770 { 1771 struct spdk_blob_md_descriptor *desc; 1772 size_t cur_desc = 0; 1773 1774 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1775 while (cur_desc < sizeof(page->descriptors)) { 1776 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1777 if (desc->length == 0) { 1778 /* If padding and length are 0, this terminates the page */ 1779 break; 1780 } 1781 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1782 struct spdk_blob_md_descriptor_extent *desc_extent; 1783 unsigned int i, j; 1784 unsigned int cluster_count = 0; 1785 1786 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1787 1788 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1789 for (j = 0; j < desc_extent->extents[i].length; j++) { 1790 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1791 if (bs->num_free_clusters == 0) { 1792 return -1; 1793 } 1794 bs->num_free_clusters--; 1795 cluster_count++; 1796 } 1797 } 1798 if (cluster_count == 0) { 1799 return -1; 1800 } 1801 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1802 /* Skip this item */ 1803 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 1804 /* Skip this item */ 1805 } else { 1806 /* Error */ 1807 return -1; 1808 } 1809 /* Advance to the next descriptor */ 1810 cur_desc += sizeof(*desc) + desc->length; 1811 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1812 break; 1813 } 1814 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1815 } 1816 return 0; 1817 } 1818 1819 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1820 { 1821 uint32_t crc; 1822 1823 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1824 if (crc != ctx->page->crc) { 1825 return false; 1826 } 1827 1828 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1829 return false; 1830 } 1831 return true; 1832 } 1833 1834 static void 1835 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1836 1837 static void 1838 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1839 { 1840 struct spdk_bs_load_ctx *ctx = cb_arg; 1841 1842 spdk_dma_free(ctx->mask); 1843 spdk_dma_free(ctx->super); 1844 spdk_bs_sequence_finish(seq, bserrno); 1845 free(ctx); 1846 } 1847 1848 static void 1849 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1850 { 1851 struct spdk_bs_load_ctx *ctx = cb_arg; 1852 1853 spdk_dma_free(ctx->mask); 1854 1855 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1856 } 1857 1858 static void 1859 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1860 { 1861 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1862 } 1863 1864 static void 1865 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1866 { 1867 struct spdk_bs_load_ctx *ctx = cb_arg; 1868 uint32_t page_num; 1869 1870 if (bserrno != 0) { 1871 spdk_dma_free(ctx->super); 1872 _spdk_bs_free(ctx->bs); 1873 free(ctx); 1874 spdk_bs_sequence_finish(seq, bserrno); 1875 return; 1876 } 1877 1878 page_num = ctx->cur_page; 1879 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1880 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1881 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1882 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1883 spdk_dma_free(ctx->super); 1884 _spdk_bs_free(ctx->bs); 1885 free(ctx); 1886 spdk_bs_sequence_finish(seq, -EILSEQ); 1887 return; 1888 } 1889 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 1890 ctx->in_page_chain = true; 1891 ctx->cur_page = ctx->page->next; 1892 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1893 return; 1894 } 1895 } 1896 } 1897 1898 ctx->in_page_chain = false; 1899 1900 do { 1901 ctx->page_index++; 1902 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 1903 1904 if (ctx->page_index < ctx->super->md_len) { 1905 ctx->cur_page = ctx->page_index; 1906 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1907 } else { 1908 spdk_dma_free(ctx->page); 1909 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 1910 } 1911 } 1912 1913 static void 1914 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 1915 { 1916 struct spdk_bs_load_ctx *ctx = cb_arg; 1917 uint64_t lba; 1918 1919 assert(ctx->cur_page < ctx->super->md_len); 1920 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 1921 spdk_bs_sequence_read(seq, ctx->page, lba, 1922 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 1923 _spdk_bs_load_replay_md_cpl, ctx); 1924 } 1925 1926 static void 1927 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 1928 { 1929 struct spdk_bs_load_ctx *ctx = cb_arg; 1930 1931 ctx->page_index = 0; 1932 ctx->cur_page = 0; 1933 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 1934 SPDK_BS_PAGE_SIZE, 1935 NULL); 1936 if (!ctx->page) { 1937 spdk_dma_free(ctx->super); 1938 _spdk_bs_free(ctx->bs); 1939 free(ctx); 1940 spdk_bs_sequence_finish(seq, -ENOMEM); 1941 return; 1942 } 1943 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1944 } 1945 1946 static void 1947 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 1948 { 1949 struct spdk_bs_load_ctx *ctx = cb_arg; 1950 int rc; 1951 1952 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 1953 if (rc < 0) { 1954 spdk_dma_free(ctx->super); 1955 _spdk_bs_free(ctx->bs); 1956 free(ctx); 1957 spdk_bs_sequence_finish(seq, -ENOMEM); 1958 return; 1959 } 1960 1961 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1962 if (rc < 0) { 1963 spdk_dma_free(ctx->super); 1964 _spdk_bs_free(ctx->bs); 1965 free(ctx); 1966 spdk_bs_sequence_finish(seq, -ENOMEM); 1967 return; 1968 } 1969 1970 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1971 _spdk_bs_load_replay_md(seq, cb_arg); 1972 } 1973 1974 static void 1975 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1976 { 1977 struct spdk_bs_load_ctx *ctx = cb_arg; 1978 uint32_t crc; 1979 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 1980 1981 if (ctx->super->version > SPDK_BS_VERSION || 1982 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 1983 spdk_dma_free(ctx->super); 1984 _spdk_bs_free(ctx->bs); 1985 free(ctx); 1986 spdk_bs_sequence_finish(seq, -EILSEQ); 1987 return; 1988 } 1989 1990 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1991 sizeof(ctx->super->signature)) != 0) { 1992 spdk_dma_free(ctx->super); 1993 _spdk_bs_free(ctx->bs); 1994 free(ctx); 1995 spdk_bs_sequence_finish(seq, -EILSEQ); 1996 return; 1997 } 1998 1999 crc = _spdk_blob_md_page_calc_crc(ctx->super); 2000 if (crc != ctx->super->crc) { 2001 spdk_dma_free(ctx->super); 2002 _spdk_bs_free(ctx->bs); 2003 free(ctx); 2004 spdk_bs_sequence_finish(seq, -EILSEQ); 2005 return; 2006 } 2007 2008 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2009 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 2010 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2011 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 2012 } else { 2013 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 2014 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2015 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2016 spdk_dma_free(ctx->super); 2017 _spdk_bs_free(ctx->bs); 2018 free(ctx); 2019 spdk_bs_sequence_finish(seq, -ENXIO); 2020 return; 2021 } 2022 2023 /* Parse the super block */ 2024 ctx->bs->cluster_sz = ctx->super->cluster_size; 2025 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 2026 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2027 ctx->bs->md_start = ctx->super->md_start; 2028 ctx->bs->md_len = ctx->super->md_len; 2029 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 2030 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 2031 ctx->bs->super_blob = ctx->super->super_blob; 2032 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 2033 2034 if (ctx->super->clean == 1) { 2035 ctx->super->clean = 0; 2036 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 2037 } else { 2038 _spdk_bs_recover(seq, ctx); 2039 } 2040 } 2041 2042 void 2043 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2044 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2045 { 2046 struct spdk_blob_store *bs; 2047 struct spdk_bs_cpl cpl; 2048 spdk_bs_sequence_t *seq; 2049 struct spdk_bs_load_ctx *ctx; 2050 struct spdk_bs_opts opts = {}; 2051 2052 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 2053 2054 if (o) { 2055 opts = *o; 2056 } else { 2057 spdk_bs_opts_init(&opts); 2058 } 2059 2060 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 2061 cb_fn(cb_arg, NULL, -EINVAL); 2062 return; 2063 } 2064 2065 bs = _spdk_bs_alloc(dev, &opts); 2066 if (!bs) { 2067 cb_fn(cb_arg, NULL, -ENOMEM); 2068 return; 2069 } 2070 2071 ctx = calloc(1, sizeof(*ctx)); 2072 if (!ctx) { 2073 _spdk_bs_free(bs); 2074 cb_fn(cb_arg, NULL, -ENOMEM); 2075 return; 2076 } 2077 2078 ctx->bs = bs; 2079 2080 /* Allocate memory for the super block */ 2081 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2082 if (!ctx->super) { 2083 free(ctx); 2084 _spdk_bs_free(bs); 2085 return; 2086 } 2087 2088 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2089 cpl.u.bs_handle.cb_fn = cb_fn; 2090 cpl.u.bs_handle.cb_arg = cb_arg; 2091 cpl.u.bs_handle.bs = bs; 2092 2093 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2094 if (!seq) { 2095 spdk_dma_free(ctx->super); 2096 free(ctx); 2097 _spdk_bs_free(bs); 2098 cb_fn(cb_arg, NULL, -ENOMEM); 2099 return; 2100 } 2101 2102 /* Read the super block */ 2103 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2104 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2105 _spdk_bs_load_super_cpl, ctx); 2106 } 2107 2108 /* END spdk_bs_load */ 2109 2110 /* START spdk_bs_init */ 2111 2112 struct spdk_bs_init_ctx { 2113 struct spdk_blob_store *bs; 2114 struct spdk_bs_super_block *super; 2115 }; 2116 2117 static void 2118 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2119 { 2120 struct spdk_bs_init_ctx *ctx = cb_arg; 2121 2122 spdk_dma_free(ctx->super); 2123 free(ctx); 2124 2125 spdk_bs_sequence_finish(seq, bserrno); 2126 } 2127 2128 static void 2129 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2130 { 2131 struct spdk_bs_init_ctx *ctx = cb_arg; 2132 2133 /* Write super block */ 2134 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2135 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2136 _spdk_bs_init_persist_super_cpl, ctx); 2137 } 2138 2139 void 2140 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2141 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2142 { 2143 struct spdk_bs_init_ctx *ctx; 2144 struct spdk_blob_store *bs; 2145 struct spdk_bs_cpl cpl; 2146 spdk_bs_sequence_t *seq; 2147 spdk_bs_batch_t *batch; 2148 uint64_t num_md_lba; 2149 uint64_t num_md_pages; 2150 uint64_t num_md_clusters; 2151 uint32_t i; 2152 struct spdk_bs_opts opts = {}; 2153 int rc; 2154 2155 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 2156 2157 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2158 SPDK_ERRLOG("unsupported dev block length of %d\n", 2159 dev->blocklen); 2160 dev->destroy(dev); 2161 cb_fn(cb_arg, NULL, -EINVAL); 2162 return; 2163 } 2164 2165 if (o) { 2166 opts = *o; 2167 } else { 2168 spdk_bs_opts_init(&opts); 2169 } 2170 2171 if (_spdk_bs_opts_verify(&opts) != 0) { 2172 dev->destroy(dev); 2173 cb_fn(cb_arg, NULL, -EINVAL); 2174 return; 2175 } 2176 2177 bs = _spdk_bs_alloc(dev, &opts); 2178 if (!bs) { 2179 dev->destroy(dev); 2180 cb_fn(cb_arg, NULL, -ENOMEM); 2181 return; 2182 } 2183 2184 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2185 /* By default, allocate 1 page per cluster. 2186 * Technically, this over-allocates metadata 2187 * because more metadata will reduce the number 2188 * of usable clusters. This can be addressed with 2189 * more complex math in the future. 2190 */ 2191 bs->md_len = bs->total_clusters; 2192 } else { 2193 bs->md_len = opts.num_md_pages; 2194 } 2195 2196 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2197 if (rc < 0) { 2198 _spdk_bs_free(bs); 2199 cb_fn(cb_arg, NULL, -ENOMEM); 2200 return; 2201 } 2202 2203 ctx = calloc(1, sizeof(*ctx)); 2204 if (!ctx) { 2205 _spdk_bs_free(bs); 2206 cb_fn(cb_arg, NULL, -ENOMEM); 2207 return; 2208 } 2209 2210 ctx->bs = bs; 2211 2212 /* Allocate memory for the super block */ 2213 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2214 if (!ctx->super) { 2215 free(ctx); 2216 _spdk_bs_free(bs); 2217 return; 2218 } 2219 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2220 sizeof(ctx->super->signature)); 2221 ctx->super->version = SPDK_BS_VERSION; 2222 ctx->super->length = sizeof(*ctx->super); 2223 ctx->super->super_blob = bs->super_blob; 2224 ctx->super->clean = 0; 2225 ctx->super->cluster_size = bs->cluster_sz; 2226 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2227 2228 /* Calculate how many pages the metadata consumes at the front 2229 * of the disk. 2230 */ 2231 2232 /* The super block uses 1 page */ 2233 num_md_pages = 1; 2234 2235 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2236 * up to the nearest page, plus a header. 2237 */ 2238 ctx->super->used_page_mask_start = num_md_pages; 2239 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2240 divide_round_up(bs->md_len, 8), 2241 SPDK_BS_PAGE_SIZE); 2242 num_md_pages += ctx->super->used_page_mask_len; 2243 2244 /* The used_clusters mask requires 1 bit per cluster, rounded 2245 * up to the nearest page, plus a header. 2246 */ 2247 ctx->super->used_cluster_mask_start = num_md_pages; 2248 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2249 divide_round_up(bs->total_clusters, 8), 2250 SPDK_BS_PAGE_SIZE); 2251 num_md_pages += ctx->super->used_cluster_mask_len; 2252 2253 /* The metadata region size was chosen above */ 2254 ctx->super->md_start = bs->md_start = num_md_pages; 2255 ctx->super->md_len = bs->md_len; 2256 num_md_pages += bs->md_len; 2257 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2258 2259 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2260 2261 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2262 if (num_md_clusters > bs->total_clusters) { 2263 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2264 "please decrease number of pages reserved for metadata " 2265 "or increase cluster size.\n"); 2266 spdk_dma_free(ctx->super); 2267 free(ctx); 2268 _spdk_bs_free(bs); 2269 cb_fn(cb_arg, NULL, -ENOMEM); 2270 return; 2271 } 2272 /* Claim all of the clusters used by the metadata */ 2273 for (i = 0; i < num_md_clusters; i++) { 2274 _spdk_bs_claim_cluster(bs, i); 2275 } 2276 2277 bs->total_data_clusters = bs->num_free_clusters; 2278 2279 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2280 cpl.u.bs_handle.cb_fn = cb_fn; 2281 cpl.u.bs_handle.cb_arg = cb_arg; 2282 cpl.u.bs_handle.bs = bs; 2283 2284 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2285 if (!seq) { 2286 spdk_dma_free(ctx->super); 2287 free(ctx); 2288 _spdk_bs_free(bs); 2289 cb_fn(cb_arg, NULL, -ENOMEM); 2290 return; 2291 } 2292 2293 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2294 2295 /* Clear metadata space */ 2296 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2297 /* Trim data clusters */ 2298 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2299 2300 spdk_bs_batch_close(batch); 2301 } 2302 2303 /* END spdk_bs_init */ 2304 2305 /* START spdk_bs_destroy */ 2306 2307 static void 2308 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2309 { 2310 struct spdk_bs_init_ctx *ctx = cb_arg; 2311 struct spdk_blob_store *bs = ctx->bs; 2312 2313 /* 2314 * We need to defer calling spdk_bs_call_cpl() until after 2315 * dev destruction, so tuck these away for later use. 2316 */ 2317 bs->unload_err = bserrno; 2318 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2319 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2320 2321 spdk_bs_sequence_finish(seq, bserrno); 2322 2323 _spdk_bs_free(bs); 2324 free(ctx); 2325 } 2326 2327 void 2328 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 2329 void *cb_arg) 2330 { 2331 struct spdk_bs_cpl cpl; 2332 spdk_bs_sequence_t *seq; 2333 struct spdk_bs_init_ctx *ctx; 2334 2335 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 2336 2337 if (!TAILQ_EMPTY(&bs->blobs)) { 2338 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2339 cb_fn(cb_arg, -EBUSY); 2340 return; 2341 } 2342 2343 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2344 cpl.u.bs_basic.cb_fn = cb_fn; 2345 cpl.u.bs_basic.cb_arg = cb_arg; 2346 2347 ctx = calloc(1, sizeof(*ctx)); 2348 if (!ctx) { 2349 cb_fn(cb_arg, -ENOMEM); 2350 return; 2351 } 2352 2353 ctx->bs = bs; 2354 2355 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2356 if (!seq) { 2357 free(ctx); 2358 cb_fn(cb_arg, -ENOMEM); 2359 return; 2360 } 2361 2362 /* Write zeroes to the super block */ 2363 spdk_bs_sequence_write_zeroes(seq, 2364 _spdk_bs_page_to_lba(bs, 0), 2365 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 2366 _spdk_bs_destroy_trim_cpl, ctx); 2367 } 2368 2369 /* END spdk_bs_destroy */ 2370 2371 /* START spdk_bs_unload */ 2372 2373 static void 2374 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2375 { 2376 struct spdk_bs_load_ctx *ctx = cb_arg; 2377 2378 spdk_dma_free(ctx->super); 2379 2380 /* 2381 * We need to defer calling spdk_bs_call_cpl() until after 2382 * dev destuction, so tuck these away for later use. 2383 */ 2384 ctx->bs->unload_err = bserrno; 2385 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2386 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2387 2388 spdk_bs_sequence_finish(seq, bserrno); 2389 2390 _spdk_bs_free(ctx->bs); 2391 free(ctx); 2392 } 2393 2394 static void 2395 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2396 { 2397 struct spdk_bs_load_ctx *ctx = cb_arg; 2398 2399 spdk_dma_free(ctx->mask); 2400 ctx->super->clean = 1; 2401 2402 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2403 } 2404 2405 static void 2406 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2407 { 2408 struct spdk_bs_load_ctx *ctx = cb_arg; 2409 2410 spdk_dma_free(ctx->mask); 2411 2412 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2413 } 2414 2415 static void 2416 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2417 { 2418 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2419 } 2420 2421 void 2422 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2423 { 2424 struct spdk_bs_cpl cpl; 2425 spdk_bs_sequence_t *seq; 2426 struct spdk_bs_load_ctx *ctx; 2427 2428 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 2429 2430 if (!TAILQ_EMPTY(&bs->blobs)) { 2431 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2432 cb_fn(cb_arg, -EBUSY); 2433 return; 2434 } 2435 2436 ctx = calloc(1, sizeof(*ctx)); 2437 if (!ctx) { 2438 cb_fn(cb_arg, -ENOMEM); 2439 return; 2440 } 2441 2442 ctx->bs = bs; 2443 2444 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2445 if (!ctx->super) { 2446 free(ctx); 2447 cb_fn(cb_arg, -ENOMEM); 2448 return; 2449 } 2450 2451 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2452 cpl.u.bs_basic.cb_fn = cb_fn; 2453 cpl.u.bs_basic.cb_arg = cb_arg; 2454 2455 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2456 if (!seq) { 2457 spdk_dma_free(ctx->super); 2458 free(ctx); 2459 cb_fn(cb_arg, -ENOMEM); 2460 return; 2461 } 2462 2463 /* Read super block */ 2464 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2465 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2466 _spdk_bs_unload_read_super_cpl, ctx); 2467 } 2468 2469 /* END spdk_bs_unload */ 2470 2471 void 2472 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2473 spdk_bs_op_complete cb_fn, void *cb_arg) 2474 { 2475 bs->super_blob = blobid; 2476 cb_fn(cb_arg, 0); 2477 } 2478 2479 void 2480 spdk_bs_get_super(struct spdk_blob_store *bs, 2481 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2482 { 2483 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2484 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2485 } else { 2486 cb_fn(cb_arg, bs->super_blob, 0); 2487 } 2488 } 2489 2490 uint64_t 2491 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2492 { 2493 return bs->cluster_sz; 2494 } 2495 2496 uint64_t 2497 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2498 { 2499 return SPDK_BS_PAGE_SIZE; 2500 } 2501 2502 uint64_t 2503 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2504 { 2505 return bs->num_free_clusters; 2506 } 2507 2508 uint64_t 2509 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2510 { 2511 return bs->total_data_clusters; 2512 } 2513 2514 static int 2515 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2516 { 2517 bs->md_channel = spdk_get_io_channel(bs); 2518 if (!bs->md_channel) { 2519 SPDK_ERRLOG("Failed to get IO channel.\n"); 2520 return -1; 2521 } 2522 2523 return 0; 2524 } 2525 2526 static int 2527 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2528 { 2529 spdk_put_io_channel(bs->md_channel); 2530 2531 return 0; 2532 } 2533 2534 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob) 2535 { 2536 struct spdk_blob_data *blob = __blob_to_data(_blob); 2537 2538 assert(blob != NULL); 2539 2540 return blob->id; 2541 } 2542 2543 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob) 2544 { 2545 struct spdk_blob_data *blob = __blob_to_data(_blob); 2546 2547 assert(blob != NULL); 2548 2549 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2550 } 2551 2552 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob) 2553 { 2554 struct spdk_blob_data *blob = __blob_to_data(_blob); 2555 2556 assert(blob != NULL); 2557 2558 return blob->active.num_clusters; 2559 } 2560 2561 /* START spdk_bs_create_blob */ 2562 2563 static void 2564 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2565 { 2566 struct spdk_blob_data *blob = cb_arg; 2567 2568 _spdk_blob_free(blob); 2569 2570 spdk_bs_sequence_finish(seq, bserrno); 2571 } 2572 2573 void spdk_bs_create_blob(struct spdk_blob_store *bs, 2574 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2575 { 2576 struct spdk_blob_data *blob; 2577 uint32_t page_idx; 2578 struct spdk_bs_cpl cpl; 2579 spdk_bs_sequence_t *seq; 2580 spdk_blob_id id; 2581 2582 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2583 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2584 cb_fn(cb_arg, 0, -ENOMEM); 2585 return; 2586 } 2587 spdk_bit_array_set(bs->used_md_pages, page_idx); 2588 2589 id = _spdk_bs_page_to_blobid(page_idx); 2590 2591 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2592 2593 blob = _spdk_blob_alloc(bs, id); 2594 if (!blob) { 2595 cb_fn(cb_arg, 0, -ENOMEM); 2596 return; 2597 } 2598 2599 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2600 cpl.u.blobid.cb_fn = cb_fn; 2601 cpl.u.blobid.cb_arg = cb_arg; 2602 cpl.u.blobid.blobid = blob->id; 2603 2604 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2605 if (!seq) { 2606 _spdk_blob_free(blob); 2607 cb_fn(cb_arg, 0, -ENOMEM); 2608 return; 2609 } 2610 2611 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 2612 } 2613 2614 /* END spdk_bs_create_blob */ 2615 2616 /* START spdk_blob_resize */ 2617 int 2618 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz) 2619 { 2620 struct spdk_blob_data *blob = __blob_to_data(_blob); 2621 int rc; 2622 2623 assert(blob != NULL); 2624 2625 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2626 2627 if (blob->md_ro) { 2628 return -EPERM; 2629 } 2630 2631 if (sz == blob->active.num_clusters) { 2632 return 0; 2633 } 2634 2635 rc = _spdk_resize_blob(blob, sz); 2636 if (rc < 0) { 2637 return rc; 2638 } 2639 2640 return 0; 2641 } 2642 2643 /* END spdk_blob_resize */ 2644 2645 2646 /* START spdk_bs_delete_blob */ 2647 2648 static void 2649 _spdk_bs_delete_close_cpl(void *cb_arg, int bserrno) 2650 { 2651 spdk_bs_sequence_t *seq = cb_arg; 2652 2653 spdk_bs_sequence_finish(seq, bserrno); 2654 } 2655 2656 static void 2657 _spdk_bs_delete_persist_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2658 { 2659 struct spdk_blob *_blob = cb_arg; 2660 struct spdk_blob_data *blob = __blob_to_data(_blob); 2661 2662 if (bserrno != 0) { 2663 /* 2664 * We already removed this blob from the blobstore tailq, so 2665 * we need to free it here since this is the last reference 2666 * to it. 2667 */ 2668 _spdk_blob_free(blob); 2669 _spdk_bs_delete_close_cpl(seq, bserrno); 2670 return; 2671 } 2672 2673 /* 2674 * This will immediately decrement the ref_count and call 2675 * the completion routine since the metadata state is clean. 2676 * By calling spdk_blob_close, we reduce the number of call 2677 * points into code that touches the blob->open_ref count 2678 * and the blobstore's blob list. 2679 */ 2680 spdk_blob_close(_blob, _spdk_bs_delete_close_cpl, seq); 2681 } 2682 2683 static void 2684 _spdk_bs_delete_open_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 2685 { 2686 spdk_bs_sequence_t *seq = cb_arg; 2687 struct spdk_blob_data *blob = __blob_to_data(_blob); 2688 2689 if (bserrno != 0) { 2690 spdk_bs_sequence_finish(seq, bserrno); 2691 return; 2692 } 2693 2694 if (blob->open_ref > 1) { 2695 /* 2696 * Someone has this blob open (besides this delete context). 2697 * Decrement the ref count directly and return -EBUSY. 2698 */ 2699 blob->open_ref--; 2700 spdk_bs_sequence_finish(seq, -EBUSY); 2701 return; 2702 } 2703 2704 /* 2705 * Remove the blob from the blob_store list now, to ensure it does not 2706 * get returned after this point by _spdk_blob_lookup(). 2707 */ 2708 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 2709 blob->state = SPDK_BLOB_STATE_DIRTY; 2710 blob->active.num_pages = 0; 2711 _spdk_resize_blob(blob, 0); 2712 2713 _spdk_blob_persist(seq, blob, _spdk_bs_delete_persist_cpl, _blob); 2714 } 2715 2716 void 2717 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2718 spdk_blob_op_complete cb_fn, void *cb_arg) 2719 { 2720 struct spdk_bs_cpl cpl; 2721 spdk_bs_sequence_t *seq; 2722 2723 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 2724 2725 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2726 cpl.u.blob_basic.cb_fn = cb_fn; 2727 cpl.u.blob_basic.cb_arg = cb_arg; 2728 2729 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2730 if (!seq) { 2731 cb_fn(cb_arg, -ENOMEM); 2732 return; 2733 } 2734 2735 spdk_bs_open_blob(bs, blobid, _spdk_bs_delete_open_cpl, seq); 2736 } 2737 2738 /* END spdk_bs_delete_blob */ 2739 2740 /* START spdk_bs_open_blob */ 2741 2742 static void 2743 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2744 { 2745 struct spdk_blob_data *blob = cb_arg; 2746 2747 /* If the blob have crc error, we just return NULL. */ 2748 if (blob == NULL) { 2749 seq->cpl.u.blob_handle.blob = NULL; 2750 spdk_bs_sequence_finish(seq, bserrno); 2751 return; 2752 } 2753 2754 blob->open_ref++; 2755 2756 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2757 2758 spdk_bs_sequence_finish(seq, bserrno); 2759 } 2760 2761 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2762 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2763 { 2764 struct spdk_blob_data *blob; 2765 struct spdk_bs_cpl cpl; 2766 spdk_bs_sequence_t *seq; 2767 uint32_t page_num; 2768 2769 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 2770 2771 blob = _spdk_blob_lookup(bs, blobid); 2772 if (blob) { 2773 blob->open_ref++; 2774 cb_fn(cb_arg, __data_to_blob(blob), 0); 2775 return; 2776 } 2777 2778 page_num = _spdk_bs_blobid_to_page(blobid); 2779 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2780 /* Invalid blobid */ 2781 cb_fn(cb_arg, NULL, -ENOENT); 2782 return; 2783 } 2784 2785 blob = _spdk_blob_alloc(bs, blobid); 2786 if (!blob) { 2787 cb_fn(cb_arg, NULL, -ENOMEM); 2788 return; 2789 } 2790 2791 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2792 cpl.u.blob_handle.cb_fn = cb_fn; 2793 cpl.u.blob_handle.cb_arg = cb_arg; 2794 cpl.u.blob_handle.blob = __data_to_blob(blob); 2795 2796 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2797 if (!seq) { 2798 _spdk_blob_free(blob); 2799 cb_fn(cb_arg, NULL, -ENOMEM); 2800 return; 2801 } 2802 2803 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 2804 } 2805 2806 /* END spdk_bs_open_blob */ 2807 2808 /* START spdk_blob_sync_md */ 2809 2810 static void 2811 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2812 { 2813 spdk_bs_sequence_finish(seq, bserrno); 2814 } 2815 2816 void 2817 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg) 2818 { 2819 struct spdk_blob_data *blob = __blob_to_data(_blob); 2820 struct spdk_bs_cpl cpl; 2821 spdk_bs_sequence_t *seq; 2822 2823 assert(blob != NULL); 2824 2825 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 2826 2827 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2828 blob->state != SPDK_BLOB_STATE_SYNCING); 2829 2830 if (blob->md_ro) { 2831 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 2832 return; 2833 } 2834 2835 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2836 cb_fn(cb_arg, 0); 2837 return; 2838 } 2839 2840 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2841 cpl.u.blob_basic.cb_fn = cb_fn; 2842 cpl.u.blob_basic.cb_arg = cb_arg; 2843 2844 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2845 if (!seq) { 2846 cb_fn(cb_arg, -ENOMEM); 2847 return; 2848 } 2849 2850 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 2851 } 2852 2853 /* END spdk_blob_sync_md */ 2854 2855 /* START spdk_blob_close */ 2856 2857 static void 2858 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2859 { 2860 struct spdk_blob_data *blob = cb_arg; 2861 2862 if (bserrno == 0) { 2863 blob->open_ref--; 2864 if (blob->open_ref == 0) { 2865 /* 2866 * Blobs with active.num_pages == 0 are deleted blobs. 2867 * these blobs are removed from the blob_store list 2868 * when the deletion process starts - so don't try to 2869 * remove them again. 2870 */ 2871 if (blob->active.num_pages > 0) { 2872 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 2873 } 2874 _spdk_blob_free(blob); 2875 } 2876 } 2877 2878 spdk_bs_sequence_finish(seq, bserrno); 2879 } 2880 2881 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg) 2882 { 2883 struct spdk_bs_cpl cpl; 2884 struct spdk_blob_data *blob; 2885 spdk_bs_sequence_t *seq; 2886 2887 assert(b != NULL); 2888 blob = __blob_to_data(b); 2889 assert(blob != NULL); 2890 2891 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 2892 2893 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2894 blob->state != SPDK_BLOB_STATE_SYNCING); 2895 2896 if (blob->open_ref == 0) { 2897 cb_fn(cb_arg, -EBADF); 2898 return; 2899 } 2900 2901 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2902 cpl.u.blob_basic.cb_fn = cb_fn; 2903 cpl.u.blob_basic.cb_arg = cb_arg; 2904 2905 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2906 if (!seq) { 2907 cb_fn(cb_arg, -ENOMEM); 2908 return; 2909 } 2910 2911 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2912 _spdk_blob_close_cpl(seq, blob, 0); 2913 return; 2914 } 2915 2916 /* Sync metadata */ 2917 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 2918 } 2919 2920 /* END spdk_blob_close */ 2921 2922 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2923 { 2924 return spdk_get_io_channel(bs); 2925 } 2926 2927 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2928 { 2929 spdk_put_io_channel(channel); 2930 } 2931 2932 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2933 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2934 { 2935 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2936 SPDK_BLOB_UNMAP); 2937 } 2938 2939 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2940 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2941 { 2942 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2943 SPDK_BLOB_WRITE_ZEROES); 2944 } 2945 2946 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2947 void *payload, uint64_t offset, uint64_t length, 2948 spdk_blob_op_complete cb_fn, void *cb_arg) 2949 { 2950 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2951 SPDK_BLOB_WRITE); 2952 } 2953 2954 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2955 void *payload, uint64_t offset, uint64_t length, 2956 spdk_blob_op_complete cb_fn, void *cb_arg) 2957 { 2958 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2959 SPDK_BLOB_READ); 2960 } 2961 2962 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2963 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2964 spdk_blob_op_complete cb_fn, void *cb_arg) 2965 { 2966 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2967 } 2968 2969 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2970 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2971 spdk_blob_op_complete cb_fn, void *cb_arg) 2972 { 2973 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2974 } 2975 2976 struct spdk_bs_iter_ctx { 2977 int64_t page_num; 2978 struct spdk_blob_store *bs; 2979 2980 spdk_blob_op_with_handle_complete cb_fn; 2981 void *cb_arg; 2982 }; 2983 2984 static void 2985 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 2986 { 2987 struct spdk_bs_iter_ctx *ctx = cb_arg; 2988 struct spdk_blob_store *bs = ctx->bs; 2989 spdk_blob_id id; 2990 2991 if (bserrno == 0) { 2992 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 2993 free(ctx); 2994 return; 2995 } 2996 2997 ctx->page_num++; 2998 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2999 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 3000 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 3001 free(ctx); 3002 return; 3003 } 3004 3005 id = _spdk_bs_page_to_blobid(ctx->page_num); 3006 3007 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 3008 } 3009 3010 void 3011 spdk_bs_iter_first(struct spdk_blob_store *bs, 3012 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3013 { 3014 struct spdk_bs_iter_ctx *ctx; 3015 3016 ctx = calloc(1, sizeof(*ctx)); 3017 if (!ctx) { 3018 cb_fn(cb_arg, NULL, -ENOMEM); 3019 return; 3020 } 3021 3022 ctx->page_num = -1; 3023 ctx->bs = bs; 3024 ctx->cb_fn = cb_fn; 3025 ctx->cb_arg = cb_arg; 3026 3027 _spdk_bs_iter_cpl(ctx, NULL, -1); 3028 } 3029 3030 static void 3031 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 3032 { 3033 struct spdk_bs_iter_ctx *ctx = cb_arg; 3034 3035 _spdk_bs_iter_cpl(ctx, NULL, -1); 3036 } 3037 3038 void 3039 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob *b, 3040 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3041 { 3042 struct spdk_bs_iter_ctx *ctx; 3043 struct spdk_blob_data *blob; 3044 3045 assert(b != NULL); 3046 blob = __blob_to_data(b); 3047 assert(blob != NULL); 3048 3049 ctx = calloc(1, sizeof(*ctx)); 3050 if (!ctx) { 3051 cb_fn(cb_arg, NULL, -ENOMEM); 3052 return; 3053 } 3054 3055 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 3056 ctx->bs = bs; 3057 ctx->cb_fn = cb_fn; 3058 ctx->cb_arg = cb_arg; 3059 3060 /* Close the existing blob */ 3061 spdk_blob_close(b, _spdk_bs_iter_close_cpl, ctx); 3062 } 3063 3064 int 3065 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value, 3066 uint16_t value_len) 3067 { 3068 struct spdk_blob_data *blob = __blob_to_data(_blob); 3069 struct spdk_xattr *xattr; 3070 3071 assert(blob != NULL); 3072 3073 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3074 blob->state != SPDK_BLOB_STATE_SYNCING); 3075 3076 if (blob->md_ro) { 3077 return -EPERM; 3078 } 3079 3080 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3081 if (!strcmp(name, xattr->name)) { 3082 free(xattr->value); 3083 xattr->value_len = value_len; 3084 xattr->value = malloc(value_len); 3085 memcpy(xattr->value, value, value_len); 3086 3087 blob->state = SPDK_BLOB_STATE_DIRTY; 3088 3089 return 0; 3090 } 3091 } 3092 3093 xattr = calloc(1, sizeof(*xattr)); 3094 if (!xattr) { 3095 return -1; 3096 } 3097 xattr->name = strdup(name); 3098 xattr->value_len = value_len; 3099 xattr->value = malloc(value_len); 3100 memcpy(xattr->value, value, value_len); 3101 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 3102 3103 blob->state = SPDK_BLOB_STATE_DIRTY; 3104 3105 return 0; 3106 } 3107 3108 int 3109 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name) 3110 { 3111 struct spdk_blob_data *blob = __blob_to_data(_blob); 3112 struct spdk_xattr *xattr; 3113 3114 assert(blob != NULL); 3115 3116 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3117 blob->state != SPDK_BLOB_STATE_SYNCING); 3118 3119 if (blob->md_ro) { 3120 return -EPERM; 3121 } 3122 3123 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3124 if (!strcmp(name, xattr->name)) { 3125 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3126 free(xattr->value); 3127 free(xattr->name); 3128 free(xattr); 3129 3130 blob->state = SPDK_BLOB_STATE_DIRTY; 3131 3132 return 0; 3133 } 3134 } 3135 3136 return -ENOENT; 3137 } 3138 3139 int 3140 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name, 3141 const void **value, size_t *value_len) 3142 { 3143 struct spdk_blob_data *blob = __blob_to_data(_blob); 3144 struct spdk_xattr *xattr; 3145 3146 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3147 if (!strcmp(name, xattr->name)) { 3148 *value = xattr->value; 3149 *value_len = xattr->value_len; 3150 return 0; 3151 } 3152 } 3153 3154 return -ENOENT; 3155 } 3156 3157 struct spdk_xattr_names { 3158 uint32_t count; 3159 const char *names[0]; 3160 }; 3161 3162 int 3163 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names) 3164 { 3165 struct spdk_blob_data *blob = __blob_to_data(_blob); 3166 struct spdk_xattr *xattr; 3167 int count = 0; 3168 3169 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3170 count++; 3171 } 3172 3173 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3174 if (*names == NULL) { 3175 return -ENOMEM; 3176 } 3177 3178 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3179 (*names)->names[(*names)->count++] = xattr->name; 3180 } 3181 3182 return 0; 3183 } 3184 3185 uint32_t 3186 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3187 { 3188 assert(names != NULL); 3189 3190 return names->count; 3191 } 3192 3193 const char * 3194 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3195 { 3196 if (index >= names->count) { 3197 return NULL; 3198 } 3199 3200 return names->names[index]; 3201 } 3202 3203 void 3204 spdk_xattr_names_free(struct spdk_xattr_names *names) 3205 { 3206 free(names); 3207 } 3208 3209 struct spdk_bs_type 3210 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3211 { 3212 return bs->bstype; 3213 } 3214 3215 void 3216 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3217 { 3218 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3219 } 3220 3221 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 3222