1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static int spdk_bs_register_md_thread(struct spdk_blob_store *bs); 51 static int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs); 52 53 static inline size_t 54 divide_round_up(size_t num, size_t divisor) 55 { 56 return (num + divisor - 1) / divisor; 57 } 58 59 static void 60 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 61 { 62 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 63 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 64 assert(bs->num_free_clusters > 0); 65 66 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %u\n", cluster_num); 67 68 spdk_bit_array_set(bs->used_clusters, cluster_num); 69 bs->num_free_clusters--; 70 } 71 72 static void 73 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 74 { 75 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 76 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 77 assert(bs->num_free_clusters < bs->total_clusters); 78 79 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Releasing cluster %u\n", cluster_num); 80 81 spdk_bit_array_clear(bs->used_clusters, cluster_num); 82 bs->num_free_clusters++; 83 } 84 85 static struct spdk_blob_data * 86 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 87 { 88 struct spdk_blob_data *blob; 89 90 blob = calloc(1, sizeof(*blob)); 91 if (!blob) { 92 return NULL; 93 } 94 95 blob->id = id; 96 blob->bs = bs; 97 98 blob->state = SPDK_BLOB_STATE_DIRTY; 99 blob->active.num_pages = 1; 100 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 101 if (!blob->active.pages) { 102 free(blob); 103 return NULL; 104 } 105 106 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 107 108 TAILQ_INIT(&blob->xattrs); 109 110 return blob; 111 } 112 113 static void 114 _spdk_blob_free(struct spdk_blob_data *blob) 115 { 116 struct spdk_xattr *xattr, *xattr_tmp; 117 118 assert(blob != NULL); 119 120 free(blob->active.clusters); 121 free(blob->clean.clusters); 122 free(blob->active.pages); 123 free(blob->clean.pages); 124 125 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 126 TAILQ_REMOVE(&blob->xattrs, xattr, link); 127 free(xattr->name); 128 free(xattr->value); 129 free(xattr); 130 } 131 132 free(blob); 133 } 134 135 static int 136 _spdk_blob_mark_clean(struct spdk_blob_data *blob) 137 { 138 uint64_t *clusters = NULL; 139 uint32_t *pages = NULL; 140 141 assert(blob != NULL); 142 assert(blob->state == SPDK_BLOB_STATE_LOADING || 143 blob->state == SPDK_BLOB_STATE_SYNCING); 144 145 if (blob->active.num_clusters) { 146 assert(blob->active.clusters); 147 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 148 if (!clusters) { 149 return -1; 150 } 151 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 152 } 153 154 if (blob->active.num_pages) { 155 assert(blob->active.pages); 156 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 157 if (!pages) { 158 free(clusters); 159 return -1; 160 } 161 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 162 } 163 164 free(blob->clean.clusters); 165 free(blob->clean.pages); 166 167 blob->clean.num_clusters = blob->active.num_clusters; 168 blob->clean.clusters = blob->active.clusters; 169 blob->clean.num_pages = blob->active.num_pages; 170 blob->clean.pages = blob->active.pages; 171 172 blob->active.clusters = clusters; 173 blob->active.pages = pages; 174 175 blob->state = SPDK_BLOB_STATE_CLEAN; 176 177 return 0; 178 } 179 180 static int 181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_data *blob) 182 { 183 struct spdk_blob_md_descriptor *desc; 184 size_t cur_desc = 0; 185 void *tmp; 186 187 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 188 while (cur_desc < sizeof(page->descriptors)) { 189 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 190 if (desc->length == 0) { 191 /* If padding and length are 0, this terminates the page */ 192 break; 193 } 194 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_FLAGS) { 195 struct spdk_blob_md_descriptor_flags *desc_flags; 196 197 desc_flags = (struct spdk_blob_md_descriptor_flags *)desc; 198 199 if (desc_flags->length != sizeof(*desc_flags) - sizeof(*desc)) { 200 return -EINVAL; 201 } 202 203 if ((desc_flags->invalid_flags | SPDK_BLOB_INVALID_FLAGS_MASK) != 204 SPDK_BLOB_INVALID_FLAGS_MASK) { 205 return -EINVAL; 206 } 207 208 if ((desc_flags->data_ro_flags | SPDK_BLOB_DATA_RO_FLAGS_MASK) != 209 SPDK_BLOB_DATA_RO_FLAGS_MASK) { 210 blob->data_ro = true; 211 blob->md_ro = true; 212 } 213 214 if ((desc_flags->md_ro_flags | SPDK_BLOB_MD_RO_FLAGS_MASK) != 215 SPDK_BLOB_MD_RO_FLAGS_MASK) { 216 blob->md_ro = true; 217 } 218 219 blob->invalid_flags = desc_flags->invalid_flags; 220 blob->data_ro_flags = desc_flags->data_ro_flags; 221 blob->md_ro_flags = desc_flags->md_ro_flags; 222 223 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 224 struct spdk_blob_md_descriptor_extent *desc_extent; 225 unsigned int i, j; 226 unsigned int cluster_count = blob->active.num_clusters; 227 228 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 229 230 if (desc_extent->length == 0 || 231 (desc_extent->length % sizeof(desc_extent->extents[0]) != 0)) { 232 return -EINVAL; 233 } 234 235 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 236 for (j = 0; j < desc_extent->extents[i].length; j++) { 237 if (!spdk_bit_array_get(blob->bs->used_clusters, 238 desc_extent->extents[i].cluster_idx + j)) { 239 return -EINVAL; 240 } 241 cluster_count++; 242 } 243 } 244 245 if (cluster_count == 0) { 246 return -EINVAL; 247 } 248 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 249 if (tmp == NULL) { 250 return -ENOMEM; 251 } 252 blob->active.clusters = tmp; 253 blob->active.cluster_array_size = cluster_count; 254 255 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 256 for (j = 0; j < desc_extent->extents[i].length; j++) { 257 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 258 desc_extent->extents[i].cluster_idx + j); 259 } 260 } 261 262 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 263 struct spdk_blob_md_descriptor_xattr *desc_xattr; 264 struct spdk_xattr *xattr; 265 266 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 267 268 if (desc_xattr->length != sizeof(desc_xattr->name_length) + 269 sizeof(desc_xattr->value_length) + 270 desc_xattr->name_length + desc_xattr->value_length) { 271 return -EINVAL; 272 } 273 274 xattr = calloc(1, sizeof(*xattr)); 275 if (xattr == NULL) { 276 return -ENOMEM; 277 } 278 279 xattr->name = malloc(desc_xattr->name_length + 1); 280 if (xattr->name == NULL) { 281 free(xattr); 282 return -ENOMEM; 283 } 284 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 285 xattr->name[desc_xattr->name_length] = '\0'; 286 287 xattr->value = malloc(desc_xattr->value_length); 288 if (xattr->value == NULL) { 289 free(xattr->name); 290 free(xattr); 291 return -ENOMEM; 292 } 293 xattr->value_len = desc_xattr->value_length; 294 memcpy(xattr->value, 295 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 296 desc_xattr->value_length); 297 298 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 299 } else { 300 /* Unrecognized descriptor type. Do not fail - just continue to the 301 * next descriptor. If this descriptor is associated with some feature 302 * defined in a newer version of blobstore, that version of blobstore 303 * should create and set an associated feature flag to specify if this 304 * blob can be loaded or not. 305 */ 306 } 307 308 /* Advance to the next descriptor */ 309 cur_desc += sizeof(*desc) + desc->length; 310 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 311 break; 312 } 313 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 314 } 315 316 return 0; 317 } 318 319 static int 320 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 321 struct spdk_blob_data *blob) 322 { 323 const struct spdk_blob_md_page *page; 324 uint32_t i; 325 int rc; 326 327 assert(page_count > 0); 328 assert(pages[0].sequence_num == 0); 329 assert(blob != NULL); 330 assert(blob->state == SPDK_BLOB_STATE_LOADING); 331 assert(blob->active.clusters == NULL); 332 assert(blob->state == SPDK_BLOB_STATE_LOADING); 333 334 /* The blobid provided doesn't match what's in the MD, this can 335 * happen for example if a bogus blobid is passed in through open. 336 */ 337 if (blob->id != pages[0].id) { 338 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 339 blob->id, pages[0].id); 340 return -ENOENT; 341 } 342 343 for (i = 0; i < page_count; i++) { 344 page = &pages[i]; 345 346 assert(page->id == blob->id); 347 assert(page->sequence_num == i); 348 349 rc = _spdk_blob_parse_page(page, blob); 350 if (rc != 0) { 351 return rc; 352 } 353 } 354 355 return 0; 356 } 357 358 static int 359 _spdk_blob_serialize_add_page(const struct spdk_blob_data *blob, 360 struct spdk_blob_md_page **pages, 361 uint32_t *page_count, 362 struct spdk_blob_md_page **last_page) 363 { 364 struct spdk_blob_md_page *page; 365 366 assert(pages != NULL); 367 assert(page_count != NULL); 368 369 if (*page_count == 0) { 370 assert(*pages == NULL); 371 *page_count = 1; 372 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 373 SPDK_BS_PAGE_SIZE, 374 NULL); 375 } else { 376 assert(*pages != NULL); 377 (*page_count)++; 378 *pages = spdk_dma_realloc(*pages, 379 SPDK_BS_PAGE_SIZE * (*page_count), 380 SPDK_BS_PAGE_SIZE, 381 NULL); 382 } 383 384 if (*pages == NULL) { 385 *page_count = 0; 386 *last_page = NULL; 387 return -ENOMEM; 388 } 389 390 page = &(*pages)[*page_count - 1]; 391 memset(page, 0, sizeof(*page)); 392 page->id = blob->id; 393 page->sequence_num = *page_count - 1; 394 page->next = SPDK_INVALID_MD_PAGE; 395 *last_page = page; 396 397 return 0; 398 } 399 400 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 401 * Update required_sz on both success and failure. 402 * 403 */ 404 static int 405 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 406 uint8_t *buf, size_t buf_sz, 407 size_t *required_sz) 408 { 409 struct spdk_blob_md_descriptor_xattr *desc; 410 411 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 412 strlen(xattr->name) + 413 xattr->value_len; 414 415 if (buf_sz < *required_sz) { 416 return -1; 417 } 418 419 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 420 421 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 422 desc->length = sizeof(desc->name_length) + 423 sizeof(desc->value_length) + 424 strlen(xattr->name) + 425 xattr->value_len; 426 desc->name_length = strlen(xattr->name); 427 desc->value_length = xattr->value_len; 428 429 memcpy(desc->name, xattr->name, desc->name_length); 430 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 431 xattr->value, 432 desc->value_length); 433 434 return 0; 435 } 436 437 static void 438 _spdk_blob_serialize_extent(const struct spdk_blob_data *blob, 439 uint64_t start_cluster, uint64_t *next_cluster, 440 uint8_t *buf, size_t buf_sz) 441 { 442 struct spdk_blob_md_descriptor_extent *desc; 443 size_t cur_sz; 444 uint64_t i, extent_idx; 445 uint32_t lba, lba_per_cluster, lba_count; 446 447 /* The buffer must have room for at least one extent */ 448 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 449 if (buf_sz < cur_sz) { 450 *next_cluster = start_cluster; 451 return; 452 } 453 454 desc = (struct spdk_blob_md_descriptor_extent *)buf; 455 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 456 457 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 458 459 lba = blob->active.clusters[start_cluster]; 460 lba_count = lba_per_cluster; 461 extent_idx = 0; 462 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 463 if ((lba + lba_count) == blob->active.clusters[i]) { 464 lba_count += lba_per_cluster; 465 continue; 466 } 467 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 468 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 469 extent_idx++; 470 471 cur_sz += sizeof(desc->extents[extent_idx]); 472 473 if (buf_sz < cur_sz) { 474 /* If we ran out of buffer space, return */ 475 desc->length = sizeof(desc->extents[0]) * extent_idx; 476 *next_cluster = i; 477 return; 478 } 479 480 lba = blob->active.clusters[i]; 481 lba_count = lba_per_cluster; 482 } 483 484 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 485 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 486 extent_idx++; 487 488 desc->length = sizeof(desc->extents[0]) * extent_idx; 489 *next_cluster = blob->active.num_clusters; 490 491 return; 492 } 493 494 static void 495 _spdk_blob_serialize_flags(const struct spdk_blob_data *blob, 496 uint8_t *buf, size_t *buf_sz) 497 { 498 struct spdk_blob_md_descriptor_flags *desc; 499 500 /* 501 * Flags get serialized first, so we should always have room for the flags 502 * descriptor. 503 */ 504 assert(*buf_sz >= sizeof(*desc)); 505 506 desc = (struct spdk_blob_md_descriptor_flags *)buf; 507 desc->type = SPDK_MD_DESCRIPTOR_TYPE_FLAGS; 508 desc->length = sizeof(*desc) - sizeof(struct spdk_blob_md_descriptor); 509 desc->invalid_flags = blob->invalid_flags; 510 desc->data_ro_flags = blob->data_ro_flags; 511 desc->md_ro_flags = blob->md_ro_flags; 512 513 *buf_sz -= sizeof(*desc); 514 } 515 516 static int 517 _spdk_blob_serialize(const struct spdk_blob_data *blob, struct spdk_blob_md_page **pages, 518 uint32_t *page_count) 519 { 520 struct spdk_blob_md_page *cur_page; 521 const struct spdk_xattr *xattr; 522 int rc; 523 uint8_t *buf; 524 size_t remaining_sz; 525 uint64_t last_cluster; 526 527 assert(pages != NULL); 528 assert(page_count != NULL); 529 assert(blob != NULL); 530 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 531 532 *pages = NULL; 533 *page_count = 0; 534 535 /* A blob always has at least 1 page, even if it has no descriptors */ 536 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 537 if (rc < 0) { 538 return rc; 539 } 540 541 buf = (uint8_t *)cur_page->descriptors; 542 remaining_sz = sizeof(cur_page->descriptors); 543 544 /* Serialize flags */ 545 _spdk_blob_serialize_flags(blob, buf, &remaining_sz); 546 547 /* Serialize xattrs */ 548 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 549 size_t required_sz = 0; 550 rc = _spdk_blob_serialize_xattr(xattr, 551 buf, remaining_sz, 552 &required_sz); 553 if (rc < 0) { 554 /* Need to add a new page to the chain */ 555 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 556 &cur_page); 557 if (rc < 0) { 558 spdk_dma_free(*pages); 559 *pages = NULL; 560 *page_count = 0; 561 return rc; 562 } 563 564 buf = (uint8_t *)cur_page->descriptors; 565 remaining_sz = sizeof(cur_page->descriptors); 566 567 /* Try again */ 568 required_sz = 0; 569 rc = _spdk_blob_serialize_xattr(xattr, 570 buf, remaining_sz, 571 &required_sz); 572 573 if (rc < 0) { 574 spdk_dma_free(*pages); 575 *pages = NULL; 576 *page_count = 0; 577 return -1; 578 } 579 } 580 581 remaining_sz -= required_sz; 582 buf += required_sz; 583 } 584 585 /* Serialize extents */ 586 last_cluster = 0; 587 while (last_cluster < blob->active.num_clusters) { 588 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 589 buf, remaining_sz); 590 591 if (last_cluster == blob->active.num_clusters) { 592 break; 593 } 594 595 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 596 &cur_page); 597 if (rc < 0) { 598 return rc; 599 } 600 601 buf = (uint8_t *)cur_page->descriptors; 602 remaining_sz = sizeof(cur_page->descriptors); 603 } 604 605 return 0; 606 } 607 608 struct spdk_blob_load_ctx { 609 struct spdk_blob_data *blob; 610 611 struct spdk_blob_md_page *pages; 612 uint32_t num_pages; 613 614 spdk_bs_sequence_cpl cb_fn; 615 void *cb_arg; 616 }; 617 618 static uint32_t 619 _spdk_blob_md_page_calc_crc(void *page) 620 { 621 uint32_t crc; 622 623 crc = BLOB_CRC32C_INITIAL; 624 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 625 crc ^= BLOB_CRC32C_INITIAL; 626 627 return crc; 628 629 } 630 631 static void 632 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 633 { 634 struct spdk_blob_load_ctx *ctx = cb_arg; 635 struct spdk_blob_data *blob = ctx->blob; 636 struct spdk_blob_md_page *page; 637 int rc; 638 uint32_t crc; 639 640 page = &ctx->pages[ctx->num_pages - 1]; 641 crc = _spdk_blob_md_page_calc_crc(page); 642 if (crc != page->crc) { 643 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 644 _spdk_blob_free(blob); 645 ctx->cb_fn(seq, NULL, -EINVAL); 646 spdk_dma_free(ctx->pages); 647 free(ctx); 648 return; 649 } 650 651 if (page->next != SPDK_INVALID_MD_PAGE) { 652 uint32_t next_page = page->next; 653 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 654 655 656 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 657 658 /* Read the next page */ 659 ctx->num_pages++; 660 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 661 sizeof(*page), NULL); 662 if (ctx->pages == NULL) { 663 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 664 free(ctx); 665 return; 666 } 667 668 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 669 next_lba, 670 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 671 _spdk_blob_load_cpl, ctx); 672 return; 673 } 674 675 /* Parse the pages */ 676 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 677 if (rc) { 678 _spdk_blob_free(blob); 679 ctx->cb_fn(seq, NULL, rc); 680 spdk_dma_free(ctx->pages); 681 free(ctx); 682 return; 683 } 684 685 _spdk_blob_mark_clean(blob); 686 687 ctx->cb_fn(seq, ctx->cb_arg, rc); 688 689 /* Free the memory */ 690 spdk_dma_free(ctx->pages); 691 free(ctx); 692 } 693 694 /* Load a blob from disk given a blobid */ 695 static void 696 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 697 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 698 { 699 struct spdk_blob_load_ctx *ctx; 700 struct spdk_blob_store *bs; 701 uint32_t page_num; 702 uint64_t lba; 703 704 assert(blob != NULL); 705 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 706 blob->state == SPDK_BLOB_STATE_DIRTY); 707 708 bs = blob->bs; 709 710 ctx = calloc(1, sizeof(*ctx)); 711 if (!ctx) { 712 cb_fn(seq, cb_arg, -ENOMEM); 713 return; 714 } 715 716 ctx->blob = blob; 717 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 718 SPDK_BS_PAGE_SIZE, NULL); 719 if (!ctx->pages) { 720 free(ctx); 721 cb_fn(seq, cb_arg, -ENOMEM); 722 return; 723 } 724 ctx->num_pages = 1; 725 ctx->cb_fn = cb_fn; 726 ctx->cb_arg = cb_arg; 727 728 page_num = _spdk_bs_blobid_to_page(blob->id); 729 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 730 731 blob->state = SPDK_BLOB_STATE_LOADING; 732 733 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 734 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 735 _spdk_blob_load_cpl, ctx); 736 } 737 738 struct spdk_blob_persist_ctx { 739 struct spdk_blob_data *blob; 740 741 struct spdk_blob_md_page *pages; 742 743 uint64_t idx; 744 745 spdk_bs_sequence_cpl cb_fn; 746 void *cb_arg; 747 }; 748 749 static void 750 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 751 { 752 struct spdk_blob_persist_ctx *ctx = cb_arg; 753 struct spdk_blob_data *blob = ctx->blob; 754 755 if (bserrno == 0) { 756 _spdk_blob_mark_clean(blob); 757 } 758 759 /* Call user callback */ 760 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 761 762 /* Free the memory */ 763 spdk_dma_free(ctx->pages); 764 free(ctx); 765 } 766 767 static void 768 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 769 { 770 struct spdk_blob_persist_ctx *ctx = cb_arg; 771 struct spdk_blob_data *blob = ctx->blob; 772 struct spdk_blob_store *bs = blob->bs; 773 void *tmp; 774 size_t i; 775 776 /* Release all clusters that were truncated */ 777 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 778 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 779 780 _spdk_bs_release_cluster(bs, cluster_num); 781 } 782 783 if (blob->active.num_clusters == 0) { 784 free(blob->active.clusters); 785 blob->active.clusters = NULL; 786 blob->active.cluster_array_size = 0; 787 } else { 788 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 789 assert(tmp != NULL); 790 blob->active.clusters = tmp; 791 blob->active.cluster_array_size = blob->active.num_clusters; 792 } 793 794 _spdk_blob_persist_complete(seq, ctx, bserrno); 795 } 796 797 static void 798 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 799 { 800 struct spdk_blob_persist_ctx *ctx = cb_arg; 801 struct spdk_blob_data *blob = ctx->blob; 802 struct spdk_blob_store *bs = blob->bs; 803 spdk_bs_batch_t *batch; 804 size_t i; 805 uint64_t lba; 806 uint32_t lba_count; 807 808 /* Clusters don't move around in blobs. The list shrinks or grows 809 * at the end, but no changes ever occur in the middle of the list. 810 */ 811 812 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 813 814 /* Unmap all clusters that were truncated */ 815 lba = 0; 816 lba_count = 0; 817 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 818 uint64_t next_lba = blob->active.clusters[i]; 819 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 820 821 if ((lba + lba_count) == next_lba) { 822 /* This cluster is contiguous with the previous one. */ 823 lba_count += next_lba_count; 824 continue; 825 } 826 827 /* This cluster is not contiguous with the previous one. */ 828 829 /* If a run of LBAs previously existing, send them 830 * as an unmap. 831 */ 832 if (lba_count > 0) { 833 spdk_bs_batch_unmap(batch, lba, lba_count); 834 } 835 836 /* Start building the next batch */ 837 lba = next_lba; 838 lba_count = next_lba_count; 839 } 840 841 /* If we ended with a contiguous set of LBAs, send the unmap now */ 842 if (lba_count > 0) { 843 spdk_bs_batch_unmap(batch, lba, lba_count); 844 } 845 846 spdk_bs_batch_close(batch); 847 } 848 849 static void 850 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 851 { 852 struct spdk_blob_persist_ctx *ctx = cb_arg; 853 struct spdk_blob_data *blob = ctx->blob; 854 struct spdk_blob_store *bs = blob->bs; 855 size_t i; 856 857 /* This loop starts at 1 because the first page is special and handled 858 * below. The pages (except the first) are never written in place, 859 * so any pages in the clean list must be zeroed. 860 */ 861 for (i = 1; i < blob->clean.num_pages; i++) { 862 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 863 } 864 865 if (blob->active.num_pages == 0) { 866 uint32_t page_num; 867 868 page_num = _spdk_bs_blobid_to_page(blob->id); 869 spdk_bit_array_clear(bs->used_md_pages, page_num); 870 } 871 872 /* Move on to unmapping clusters */ 873 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 874 } 875 876 static void 877 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 878 { 879 struct spdk_blob_persist_ctx *ctx = cb_arg; 880 struct spdk_blob_data *blob = ctx->blob; 881 struct spdk_blob_store *bs = blob->bs; 882 uint64_t lba; 883 uint32_t lba_count; 884 spdk_bs_batch_t *batch; 885 size_t i; 886 887 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 888 889 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 890 891 /* This loop starts at 1 because the first page is special and handled 892 * below. The pages (except the first) are never written in place, 893 * so any pages in the clean list must be zeroed. 894 */ 895 for (i = 1; i < blob->clean.num_pages; i++) { 896 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 897 898 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 899 } 900 901 /* The first page will only be zeroed if this is a delete. */ 902 if (blob->active.num_pages == 0) { 903 uint32_t page_num; 904 905 /* The first page in the metadata goes where the blobid indicates */ 906 page_num = _spdk_bs_blobid_to_page(blob->id); 907 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 908 909 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 910 } 911 912 spdk_bs_batch_close(batch); 913 } 914 915 static void 916 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 917 { 918 struct spdk_blob_persist_ctx *ctx = cb_arg; 919 struct spdk_blob_data *blob = ctx->blob; 920 struct spdk_blob_store *bs = blob->bs; 921 uint64_t lba; 922 uint32_t lba_count; 923 struct spdk_blob_md_page *page; 924 925 if (blob->active.num_pages == 0) { 926 /* Move on to the next step */ 927 _spdk_blob_persist_zero_pages(seq, ctx, 0); 928 return; 929 } 930 931 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 932 933 page = &ctx->pages[0]; 934 /* The first page in the metadata goes where the blobid indicates */ 935 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 936 937 spdk_bs_sequence_write(seq, page, lba, lba_count, 938 _spdk_blob_persist_zero_pages, ctx); 939 } 940 941 static void 942 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 943 { 944 struct spdk_blob_persist_ctx *ctx = cb_arg; 945 struct spdk_blob_data *blob = ctx->blob; 946 struct spdk_blob_store *bs = blob->bs; 947 uint64_t lba; 948 uint32_t lba_count; 949 struct spdk_blob_md_page *page; 950 spdk_bs_batch_t *batch; 951 size_t i; 952 953 /* Clusters don't move around in blobs. The list shrinks or grows 954 * at the end, but no changes ever occur in the middle of the list. 955 */ 956 957 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 958 959 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 960 961 /* This starts at 1. The root page is not written until 962 * all of the others are finished 963 */ 964 for (i = 1; i < blob->active.num_pages; i++) { 965 page = &ctx->pages[i]; 966 assert(page->sequence_num == i); 967 968 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 969 970 spdk_bs_batch_write(batch, page, lba, lba_count); 971 } 972 973 spdk_bs_batch_close(batch); 974 } 975 976 static int 977 _spdk_resize_blob(struct spdk_blob_data *blob, uint64_t sz) 978 { 979 uint64_t i; 980 uint64_t *tmp; 981 uint64_t lfc; /* lowest free cluster */ 982 struct spdk_blob_store *bs; 983 984 bs = blob->bs; 985 986 assert(blob->state != SPDK_BLOB_STATE_LOADING && 987 blob->state != SPDK_BLOB_STATE_SYNCING); 988 989 if (blob->active.num_clusters == sz) { 990 return 0; 991 } 992 993 if (blob->active.num_clusters < blob->active.cluster_array_size) { 994 /* If this blob was resized to be larger, then smaller, then 995 * larger without syncing, then the cluster array already 996 * contains spare assigned clusters we can use. 997 */ 998 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 999 sz); 1000 } 1001 1002 blob->state = SPDK_BLOB_STATE_DIRTY; 1003 1004 /* Do two passes - one to verify that we can obtain enough clusters 1005 * and another to actually claim them. 1006 */ 1007 1008 lfc = 0; 1009 for (i = blob->active.num_clusters; i < sz; i++) { 1010 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1011 if (lfc >= bs->total_clusters) { 1012 /* No more free clusters. Cannot satisfy the request */ 1013 assert(false); 1014 return -1; 1015 } 1016 lfc++; 1017 } 1018 1019 if (sz > blob->active.num_clusters) { 1020 /* Expand the cluster array if necessary. 1021 * We only shrink the array when persisting. 1022 */ 1023 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 1024 if (sz > 0 && tmp == NULL) { 1025 assert(false); 1026 return -1; 1027 } 1028 blob->active.clusters = tmp; 1029 blob->active.cluster_array_size = sz; 1030 } 1031 1032 lfc = 0; 1033 for (i = blob->active.num_clusters; i < sz; i++) { 1034 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 1035 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 1036 _spdk_bs_claim_cluster(bs, lfc); 1037 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 1038 lfc++; 1039 } 1040 1041 blob->active.num_clusters = sz; 1042 1043 return 0; 1044 } 1045 1046 /* Write a blob to disk */ 1047 static void 1048 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob_data *blob, 1049 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1050 { 1051 struct spdk_blob_persist_ctx *ctx; 1052 int rc; 1053 uint64_t i; 1054 uint32_t page_num; 1055 struct spdk_blob_store *bs; 1056 1057 assert(blob != NULL); 1058 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 1059 blob->state == SPDK_BLOB_STATE_DIRTY); 1060 1061 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1062 cb_fn(seq, cb_arg, 0); 1063 return; 1064 } 1065 1066 bs = blob->bs; 1067 1068 ctx = calloc(1, sizeof(*ctx)); 1069 if (!ctx) { 1070 cb_fn(seq, cb_arg, -ENOMEM); 1071 return; 1072 } 1073 ctx->blob = blob; 1074 ctx->cb_fn = cb_fn; 1075 ctx->cb_arg = cb_arg; 1076 1077 blob->state = SPDK_BLOB_STATE_SYNCING; 1078 1079 if (blob->active.num_pages == 0) { 1080 /* This is the signal that the blob should be deleted. 1081 * Immediately jump to the clean up routine. */ 1082 assert(blob->clean.num_pages > 0); 1083 ctx->idx = blob->clean.num_pages - 1; 1084 _spdk_blob_persist_zero_pages(seq, ctx, 0); 1085 return; 1086 1087 } 1088 1089 /* Generate the new metadata */ 1090 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1091 if (rc < 0) { 1092 free(ctx); 1093 cb_fn(seq, cb_arg, rc); 1094 return; 1095 } 1096 1097 assert(blob->active.num_pages >= 1); 1098 1099 /* Resize the cache of page indices */ 1100 blob->active.pages = realloc(blob->active.pages, 1101 blob->active.num_pages * sizeof(*blob->active.pages)); 1102 if (!blob->active.pages) { 1103 free(ctx); 1104 cb_fn(seq, cb_arg, -ENOMEM); 1105 return; 1106 } 1107 1108 /* Assign this metadata to pages. This requires two passes - 1109 * one to verify that there are enough pages and a second 1110 * to actually claim them. */ 1111 page_num = 0; 1112 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1113 for (i = 1; i < blob->active.num_pages; i++) { 1114 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1115 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1116 spdk_dma_free(ctx->pages); 1117 free(ctx); 1118 blob->state = SPDK_BLOB_STATE_DIRTY; 1119 cb_fn(seq, cb_arg, -ENOMEM); 1120 return; 1121 } 1122 page_num++; 1123 } 1124 1125 page_num = 0; 1126 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1127 for (i = 1; i < blob->active.num_pages; i++) { 1128 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1129 ctx->pages[i - 1].next = page_num; 1130 /* Now that previous metadata page is complete, calculate the crc for it. */ 1131 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1132 blob->active.pages[i] = page_num; 1133 spdk_bit_array_set(bs->used_md_pages, page_num); 1134 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1135 page_num++; 1136 } 1137 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1138 /* Start writing the metadata from last page to first */ 1139 ctx->idx = blob->active.num_pages - 1; 1140 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1141 } 1142 1143 static void 1144 _spdk_blob_request_submit_op(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1145 void *payload, uint64_t offset, uint64_t length, 1146 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1147 { 1148 struct spdk_blob_data *blob = __blob_to_data(_blob); 1149 spdk_bs_batch_t *batch; 1150 struct spdk_bs_cpl cpl; 1151 uint64_t lba; 1152 uint32_t lba_count; 1153 uint8_t *buf; 1154 uint64_t page; 1155 1156 assert(blob != NULL); 1157 1158 if (blob->data_ro && op_type != SPDK_BLOB_READ) { 1159 cb_fn(cb_arg, -EPERM); 1160 return; 1161 } 1162 1163 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1164 cb_fn(cb_arg, -EINVAL); 1165 return; 1166 } 1167 1168 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1169 cpl.u.blob_basic.cb_fn = cb_fn; 1170 cpl.u.blob_basic.cb_arg = cb_arg; 1171 1172 batch = spdk_bs_batch_open(_channel, &cpl); 1173 if (!batch) { 1174 cb_fn(cb_arg, -ENOMEM); 1175 return; 1176 } 1177 1178 length = _spdk_bs_page_to_lba(blob->bs, length); 1179 page = offset; 1180 buf = payload; 1181 while (length > 0) { 1182 lba = _spdk_bs_blob_page_to_lba(blob, page); 1183 lba_count = spdk_min(length, 1184 _spdk_bs_page_to_lba(blob->bs, 1185 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1186 1187 switch (op_type) { 1188 case SPDK_BLOB_READ: 1189 spdk_bs_batch_read(batch, buf, lba, lba_count); 1190 break; 1191 case SPDK_BLOB_WRITE: 1192 spdk_bs_batch_write(batch, buf, lba, lba_count); 1193 break; 1194 case SPDK_BLOB_UNMAP: 1195 spdk_bs_batch_unmap(batch, lba, lba_count); 1196 break; 1197 case SPDK_BLOB_WRITE_ZEROES: 1198 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1199 break; 1200 } 1201 1202 length -= lba_count; 1203 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1204 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1205 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1206 } 1207 } 1208 1209 spdk_bs_batch_close(batch); 1210 } 1211 1212 struct rw_iov_ctx { 1213 struct spdk_blob_data *blob; 1214 bool read; 1215 int iovcnt; 1216 struct iovec *orig_iov; 1217 uint64_t page_offset; 1218 uint64_t pages_remaining; 1219 uint64_t pages_done; 1220 struct iovec iov[0]; 1221 }; 1222 1223 static void 1224 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1225 { 1226 assert(cb_arg == NULL); 1227 spdk_bs_sequence_finish(seq, bserrno); 1228 } 1229 1230 static void 1231 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1232 { 1233 struct rw_iov_ctx *ctx = cb_arg; 1234 struct iovec *iov, *orig_iov; 1235 int iovcnt; 1236 size_t orig_iovoff; 1237 uint64_t lba; 1238 uint64_t page_count, pages_to_boundary; 1239 uint32_t lba_count; 1240 uint64_t byte_count; 1241 1242 if (bserrno != 0 || ctx->pages_remaining == 0) { 1243 free(ctx); 1244 spdk_bs_sequence_finish(seq, bserrno); 1245 return; 1246 } 1247 1248 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1249 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1250 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1251 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1252 1253 /* 1254 * Get index and offset into the original iov array for our current position in the I/O sequence. 1255 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1256 * point to the current position in the I/O sequence. 1257 */ 1258 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1259 orig_iov = &ctx->orig_iov[0]; 1260 orig_iovoff = 0; 1261 while (byte_count > 0) { 1262 if (byte_count >= orig_iov->iov_len) { 1263 byte_count -= orig_iov->iov_len; 1264 orig_iov++; 1265 } else { 1266 orig_iovoff = byte_count; 1267 byte_count = 0; 1268 } 1269 } 1270 1271 /* 1272 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1273 * bytes of this next I/O remain to be accounted for in the new iov array. 1274 */ 1275 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1276 iov = &ctx->iov[0]; 1277 iovcnt = 0; 1278 while (byte_count > 0) { 1279 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1280 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1281 byte_count -= iov->iov_len; 1282 orig_iovoff = 0; 1283 orig_iov++; 1284 iov++; 1285 iovcnt++; 1286 } 1287 1288 ctx->page_offset += page_count; 1289 ctx->pages_done += page_count; 1290 ctx->pages_remaining -= page_count; 1291 iov = &ctx->iov[0]; 1292 1293 if (ctx->read) { 1294 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1295 } else { 1296 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1297 } 1298 } 1299 1300 static void 1301 _spdk_blob_request_submit_rw_iov(struct spdk_blob *_blob, struct spdk_io_channel *_channel, 1302 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1303 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1304 { 1305 struct spdk_blob_data *blob = __blob_to_data(_blob); 1306 spdk_bs_sequence_t *seq; 1307 struct spdk_bs_cpl cpl; 1308 1309 assert(blob != NULL); 1310 1311 if (!read && blob->data_ro) { 1312 cb_fn(cb_arg, -EPERM); 1313 return; 1314 } 1315 1316 if (length == 0) { 1317 cb_fn(cb_arg, 0); 1318 return; 1319 } 1320 1321 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1322 cb_fn(cb_arg, -EINVAL); 1323 return; 1324 } 1325 1326 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1327 cpl.u.blob_basic.cb_fn = cb_fn; 1328 cpl.u.blob_basic.cb_arg = cb_arg; 1329 1330 /* 1331 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1332 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1333 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1334 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1335 * to allocate a separate iov array and split the I/O such that none of the resulting 1336 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1337 * but since this case happens very infrequently, any performance impact will be negligible. 1338 * 1339 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1340 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1341 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1342 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1343 */ 1344 seq = spdk_bs_sequence_start(_channel, &cpl); 1345 if (!seq) { 1346 cb_fn(cb_arg, -ENOMEM); 1347 return; 1348 } 1349 1350 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1351 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1352 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1353 1354 if (read) { 1355 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1356 } else { 1357 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1358 } 1359 } else { 1360 struct rw_iov_ctx *ctx; 1361 1362 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1363 if (ctx == NULL) { 1364 spdk_bs_sequence_finish(seq, -ENOMEM); 1365 return; 1366 } 1367 1368 ctx->blob = blob; 1369 ctx->read = read; 1370 ctx->orig_iov = iov; 1371 ctx->iovcnt = iovcnt; 1372 ctx->page_offset = offset; 1373 ctx->pages_remaining = length; 1374 ctx->pages_done = 0; 1375 1376 _spdk_rw_iov_split_next(seq, ctx, 0); 1377 } 1378 } 1379 1380 static struct spdk_blob_data * 1381 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1382 { 1383 struct spdk_blob_data *blob; 1384 1385 TAILQ_FOREACH(blob, &bs->blobs, link) { 1386 if (blob->id == blobid) { 1387 return blob; 1388 } 1389 } 1390 1391 return NULL; 1392 } 1393 1394 static int 1395 _spdk_bs_channel_create(void *io_device, void *ctx_buf) 1396 { 1397 struct spdk_blob_store *bs = io_device; 1398 struct spdk_bs_channel *channel = ctx_buf; 1399 struct spdk_bs_dev *dev; 1400 uint32_t max_ops = bs->max_channel_ops; 1401 uint32_t i; 1402 1403 dev = bs->dev; 1404 1405 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1406 if (!channel->req_mem) { 1407 return -1; 1408 } 1409 1410 TAILQ_INIT(&channel->reqs); 1411 1412 for (i = 0; i < max_ops; i++) { 1413 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1414 } 1415 1416 channel->bs = bs; 1417 channel->dev = dev; 1418 channel->dev_channel = dev->create_channel(dev); 1419 1420 if (!channel->dev_channel) { 1421 SPDK_ERRLOG("Failed to create device channel.\n"); 1422 free(channel->req_mem); 1423 return -1; 1424 } 1425 1426 return 0; 1427 } 1428 1429 static void 1430 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1431 { 1432 struct spdk_bs_channel *channel = ctx_buf; 1433 1434 free(channel->req_mem); 1435 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1436 } 1437 1438 static void 1439 _spdk_bs_dev_destroy(void *io_device) 1440 { 1441 struct spdk_blob_store *bs = io_device; 1442 struct spdk_blob_data *blob, *blob_tmp; 1443 1444 bs->dev->destroy(bs->dev); 1445 1446 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1447 TAILQ_REMOVE(&bs->blobs, blob, link); 1448 _spdk_blob_free(blob); 1449 } 1450 1451 spdk_bit_array_free(&bs->used_md_pages); 1452 spdk_bit_array_free(&bs->used_clusters); 1453 /* 1454 * If this function is called for any reason except a successful unload, 1455 * the unload_cpl type will be NONE and this will be a nop. 1456 */ 1457 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1458 1459 free(bs); 1460 } 1461 1462 static void 1463 _spdk_bs_free(struct spdk_blob_store *bs) 1464 { 1465 spdk_bs_unregister_md_thread(bs); 1466 spdk_io_device_unregister(bs, _spdk_bs_dev_destroy); 1467 } 1468 1469 void 1470 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1471 { 1472 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1473 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1474 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1475 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1476 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1477 } 1478 1479 static int 1480 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1481 { 1482 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1483 opts->max_channel_ops == 0) { 1484 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1485 return -1; 1486 } 1487 1488 return 0; 1489 } 1490 1491 static struct spdk_blob_store * 1492 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1493 { 1494 struct spdk_blob_store *bs; 1495 uint64_t dev_size; 1496 int rc; 1497 1498 dev_size = dev->blocklen * dev->blockcnt; 1499 if (dev_size < opts->cluster_sz) { 1500 /* Device size cannot be smaller than cluster size of blobstore */ 1501 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1502 opts->cluster_sz); 1503 return NULL; 1504 } 1505 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1506 /* Cluster size cannot be smaller than page size */ 1507 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1508 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1509 return NULL; 1510 } 1511 bs = calloc(1, sizeof(struct spdk_blob_store)); 1512 if (!bs) { 1513 return NULL; 1514 } 1515 1516 TAILQ_INIT(&bs->blobs); 1517 bs->dev = dev; 1518 1519 /* 1520 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1521 * even multiple of the cluster size. 1522 */ 1523 bs->cluster_sz = opts->cluster_sz; 1524 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1525 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1526 bs->num_free_clusters = bs->total_clusters; 1527 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1528 if (bs->used_clusters == NULL) { 1529 free(bs); 1530 return NULL; 1531 } 1532 1533 bs->max_channel_ops = opts->max_channel_ops; 1534 bs->super_blob = SPDK_BLOBID_INVALID; 1535 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1536 1537 /* The metadata is assumed to be at least 1 page */ 1538 bs->used_md_pages = spdk_bit_array_create(1); 1539 1540 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 1541 sizeof(struct spdk_bs_channel)); 1542 rc = spdk_bs_register_md_thread(bs); 1543 if (rc == -1) { 1544 spdk_io_device_unregister(bs, NULL); 1545 spdk_bit_array_free(&bs->used_md_pages); 1546 spdk_bit_array_free(&bs->used_clusters); 1547 free(bs); 1548 return NULL; 1549 } 1550 1551 return bs; 1552 } 1553 1554 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1555 1556 struct spdk_bs_load_ctx { 1557 struct spdk_blob_store *bs; 1558 struct spdk_bs_super_block *super; 1559 1560 struct spdk_bs_md_mask *mask; 1561 bool in_page_chain; 1562 uint32_t page_index; 1563 uint32_t cur_page; 1564 struct spdk_blob_md_page *page; 1565 }; 1566 1567 static void 1568 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1569 { 1570 uint32_t i = 0; 1571 1572 while (true) { 1573 i = spdk_bit_array_find_first_set(array, i); 1574 if (i >= mask->length) { 1575 break; 1576 } 1577 mask->mask[i / 8] |= 1U << (i % 8); 1578 i++; 1579 } 1580 } 1581 1582 static void 1583 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1584 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1585 { 1586 /* Update the values in the super block */ 1587 super->super_blob = bs->super_blob; 1588 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1589 super->crc = _spdk_blob_md_page_calc_crc(super); 1590 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1591 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1592 cb_fn, cb_arg); 1593 } 1594 1595 static void 1596 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1597 { 1598 struct spdk_bs_load_ctx *ctx = arg; 1599 uint64_t mask_size, lba, lba_count; 1600 1601 /* Write out the used clusters mask */ 1602 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1603 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1604 if (!ctx->mask) { 1605 spdk_dma_free(ctx->super); 1606 free(ctx); 1607 spdk_bs_sequence_finish(seq, -ENOMEM); 1608 return; 1609 } 1610 1611 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1612 ctx->mask->length = ctx->bs->total_clusters; 1613 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1614 1615 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1616 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1617 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1618 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1619 } 1620 1621 static void 1622 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1623 { 1624 struct spdk_bs_load_ctx *ctx = arg; 1625 uint64_t mask_size, lba, lba_count; 1626 1627 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1628 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1629 if (!ctx->mask) { 1630 spdk_dma_free(ctx->super); 1631 free(ctx); 1632 spdk_bs_sequence_finish(seq, -ENOMEM); 1633 return; 1634 } 1635 1636 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1637 ctx->mask->length = ctx->super->md_len; 1638 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1639 1640 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1641 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1642 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1643 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1644 } 1645 1646 static void 1647 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1648 { 1649 struct spdk_bs_load_ctx *ctx = cb_arg; 1650 uint32_t i, j; 1651 int rc; 1652 1653 /* The type must be correct */ 1654 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1655 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1656 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1657 struct spdk_blob_md_page) * 8)); 1658 /* The length of the mask must be exactly equal to the total number of clusters */ 1659 assert(ctx->mask->length == ctx->bs->total_clusters); 1660 1661 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1662 if (rc < 0) { 1663 spdk_dma_free(ctx->super); 1664 spdk_dma_free(ctx->mask); 1665 _spdk_bs_free(ctx->bs); 1666 free(ctx); 1667 spdk_bs_sequence_finish(seq, -ENOMEM); 1668 return; 1669 } 1670 1671 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1672 for (i = 0; i < ctx->mask->length / 8; i++) { 1673 uint8_t segment = ctx->mask->mask[i]; 1674 for (j = 0; segment && (j < 8); j++) { 1675 if (segment & 1U) { 1676 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1677 assert(ctx->bs->num_free_clusters > 0); 1678 ctx->bs->num_free_clusters--; 1679 } 1680 segment >>= 1U; 1681 } 1682 } 1683 1684 spdk_dma_free(ctx->super); 1685 spdk_dma_free(ctx->mask); 1686 free(ctx); 1687 1688 spdk_bs_sequence_finish(seq, bserrno); 1689 } 1690 1691 static void 1692 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1693 { 1694 struct spdk_bs_load_ctx *ctx = cb_arg; 1695 uint64_t lba, lba_count, mask_size; 1696 uint32_t i, j; 1697 int rc; 1698 1699 /* The type must be correct */ 1700 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1701 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1702 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1703 8)); 1704 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1705 assert(ctx->mask->length == ctx->super->md_len); 1706 1707 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1708 if (rc < 0) { 1709 spdk_dma_free(ctx->super); 1710 spdk_dma_free(ctx->mask); 1711 _spdk_bs_free(ctx->bs); 1712 free(ctx); 1713 spdk_bs_sequence_finish(seq, -ENOMEM); 1714 return; 1715 } 1716 1717 for (i = 0; i < ctx->mask->length / 8; i++) { 1718 uint8_t segment = ctx->mask->mask[i]; 1719 for (j = 0; segment && (j < 8); j++) { 1720 if (segment & 1U) { 1721 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1722 } 1723 segment >>= 1U; 1724 } 1725 } 1726 spdk_dma_free(ctx->mask); 1727 1728 /* Read the used clusters mask */ 1729 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1730 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1731 if (!ctx->mask) { 1732 spdk_dma_free(ctx->super); 1733 _spdk_bs_free(ctx->bs); 1734 free(ctx); 1735 spdk_bs_sequence_finish(seq, -ENOMEM); 1736 return; 1737 } 1738 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1739 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1740 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1741 _spdk_bs_load_used_clusters_cpl, ctx); 1742 } 1743 1744 static void 1745 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1746 { 1747 struct spdk_bs_load_ctx *ctx = cb_arg; 1748 uint64_t lba, lba_count, mask_size; 1749 1750 /* Read the used pages mask */ 1751 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1752 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1753 if (!ctx->mask) { 1754 spdk_dma_free(ctx->super); 1755 _spdk_bs_free(ctx->bs); 1756 free(ctx); 1757 spdk_bs_sequence_finish(seq, -ENOMEM); 1758 return; 1759 } 1760 1761 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1762 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1763 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1764 _spdk_bs_load_used_pages_cpl, ctx); 1765 } 1766 1767 static int 1768 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1769 { 1770 struct spdk_blob_md_descriptor *desc; 1771 size_t cur_desc = 0; 1772 1773 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1774 while (cur_desc < sizeof(page->descriptors)) { 1775 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1776 if (desc->length == 0) { 1777 /* If padding and length are 0, this terminates the page */ 1778 break; 1779 } 1780 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1781 struct spdk_blob_md_descriptor_extent *desc_extent; 1782 unsigned int i, j; 1783 unsigned int cluster_count = 0; 1784 1785 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1786 1787 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1788 for (j = 0; j < desc_extent->extents[i].length; j++) { 1789 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1790 if (bs->num_free_clusters == 0) { 1791 return -1; 1792 } 1793 bs->num_free_clusters--; 1794 cluster_count++; 1795 } 1796 } 1797 if (cluster_count == 0) { 1798 return -1; 1799 } 1800 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1801 /* Skip this item */ 1802 } else { 1803 /* Error */ 1804 return -1; 1805 } 1806 /* Advance to the next descriptor */ 1807 cur_desc += sizeof(*desc) + desc->length; 1808 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1809 break; 1810 } 1811 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1812 } 1813 return 0; 1814 } 1815 1816 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1817 { 1818 uint32_t crc; 1819 1820 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1821 if (crc != ctx->page->crc) { 1822 return false; 1823 } 1824 1825 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1826 return false; 1827 } 1828 return true; 1829 } 1830 1831 static void 1832 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1833 1834 static void 1835 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1836 { 1837 struct spdk_bs_load_ctx *ctx = cb_arg; 1838 1839 spdk_dma_free(ctx->mask); 1840 spdk_dma_free(ctx->super); 1841 spdk_bs_sequence_finish(seq, bserrno); 1842 free(ctx); 1843 } 1844 1845 static void 1846 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1847 { 1848 struct spdk_bs_load_ctx *ctx = cb_arg; 1849 1850 spdk_dma_free(ctx->mask); 1851 1852 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1853 } 1854 1855 static void 1856 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1857 { 1858 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1859 } 1860 1861 static void 1862 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1863 { 1864 struct spdk_bs_load_ctx *ctx = cb_arg; 1865 uint32_t page_num; 1866 1867 if (bserrno != 0) { 1868 spdk_dma_free(ctx->super); 1869 _spdk_bs_free(ctx->bs); 1870 free(ctx); 1871 spdk_bs_sequence_finish(seq, bserrno); 1872 return; 1873 } 1874 1875 page_num = ctx->cur_page; 1876 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1877 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1878 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1879 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1880 spdk_dma_free(ctx->super); 1881 _spdk_bs_free(ctx->bs); 1882 free(ctx); 1883 spdk_bs_sequence_finish(seq, -EILSEQ); 1884 return; 1885 } 1886 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 1887 ctx->in_page_chain = true; 1888 ctx->cur_page = ctx->page->next; 1889 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1890 return; 1891 } 1892 } 1893 } 1894 1895 ctx->in_page_chain = false; 1896 1897 do { 1898 ctx->page_index++; 1899 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 1900 1901 if (ctx->page_index < ctx->super->md_len) { 1902 ctx->cur_page = ctx->page_index; 1903 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1904 } else { 1905 spdk_dma_free(ctx->page); 1906 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 1907 } 1908 } 1909 1910 static void 1911 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 1912 { 1913 struct spdk_bs_load_ctx *ctx = cb_arg; 1914 uint64_t lba; 1915 1916 assert(ctx->cur_page < ctx->super->md_len); 1917 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 1918 spdk_bs_sequence_read(seq, ctx->page, lba, 1919 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 1920 _spdk_bs_load_replay_md_cpl, ctx); 1921 } 1922 1923 static void 1924 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 1925 { 1926 struct spdk_bs_load_ctx *ctx = cb_arg; 1927 1928 ctx->page_index = 0; 1929 ctx->cur_page = 0; 1930 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 1931 SPDK_BS_PAGE_SIZE, 1932 NULL); 1933 if (!ctx->page) { 1934 spdk_dma_free(ctx->super); 1935 _spdk_bs_free(ctx->bs); 1936 free(ctx); 1937 spdk_bs_sequence_finish(seq, -ENOMEM); 1938 return; 1939 } 1940 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1941 } 1942 1943 static void 1944 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 1945 { 1946 struct spdk_bs_load_ctx *ctx = cb_arg; 1947 int rc; 1948 1949 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 1950 if (rc < 0) { 1951 spdk_dma_free(ctx->super); 1952 _spdk_bs_free(ctx->bs); 1953 free(ctx); 1954 spdk_bs_sequence_finish(seq, -ENOMEM); 1955 return; 1956 } 1957 1958 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1959 if (rc < 0) { 1960 spdk_dma_free(ctx->super); 1961 _spdk_bs_free(ctx->bs); 1962 free(ctx); 1963 spdk_bs_sequence_finish(seq, -ENOMEM); 1964 return; 1965 } 1966 1967 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1968 _spdk_bs_load_replay_md(seq, cb_arg); 1969 } 1970 1971 static void 1972 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1973 { 1974 struct spdk_bs_load_ctx *ctx = cb_arg; 1975 uint32_t crc; 1976 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 1977 1978 if (ctx->super->version > SPDK_BS_VERSION || 1979 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 1980 spdk_dma_free(ctx->super); 1981 _spdk_bs_free(ctx->bs); 1982 free(ctx); 1983 spdk_bs_sequence_finish(seq, -EILSEQ); 1984 return; 1985 } 1986 1987 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1988 sizeof(ctx->super->signature)) != 0) { 1989 spdk_dma_free(ctx->super); 1990 _spdk_bs_free(ctx->bs); 1991 free(ctx); 1992 spdk_bs_sequence_finish(seq, -EILSEQ); 1993 return; 1994 } 1995 1996 crc = _spdk_blob_md_page_calc_crc(ctx->super); 1997 if (crc != ctx->super->crc) { 1998 spdk_dma_free(ctx->super); 1999 _spdk_bs_free(ctx->bs); 2000 free(ctx); 2001 spdk_bs_sequence_finish(seq, -EILSEQ); 2002 return; 2003 } 2004 2005 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2006 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype matched - loading blobstore\n"); 2007 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 2008 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 2009 } else { 2010 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Unexpected bstype\n"); 2011 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2012 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 2013 spdk_dma_free(ctx->super); 2014 _spdk_bs_free(ctx->bs); 2015 free(ctx); 2016 spdk_bs_sequence_finish(seq, -ENXIO); 2017 return; 2018 } 2019 2020 /* Parse the super block */ 2021 ctx->bs->cluster_sz = ctx->super->cluster_size; 2022 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 2023 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 2024 ctx->bs->md_start = ctx->super->md_start; 2025 ctx->bs->md_len = ctx->super->md_len; 2026 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 2027 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 2028 ctx->bs->super_blob = ctx->super->super_blob; 2029 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 2030 2031 if (ctx->super->clean == 1) { 2032 ctx->super->clean = 0; 2033 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 2034 } else { 2035 _spdk_bs_recover(seq, ctx); 2036 } 2037 } 2038 2039 void 2040 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2041 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2042 { 2043 struct spdk_blob_store *bs; 2044 struct spdk_bs_cpl cpl; 2045 spdk_bs_sequence_t *seq; 2046 struct spdk_bs_load_ctx *ctx; 2047 struct spdk_bs_opts opts = {}; 2048 2049 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Loading blobstore from dev %p\n", dev); 2050 2051 if (o) { 2052 opts = *o; 2053 } else { 2054 spdk_bs_opts_init(&opts); 2055 } 2056 2057 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 2058 cb_fn(cb_arg, NULL, -EINVAL); 2059 return; 2060 } 2061 2062 bs = _spdk_bs_alloc(dev, &opts); 2063 if (!bs) { 2064 cb_fn(cb_arg, NULL, -ENOMEM); 2065 return; 2066 } 2067 2068 ctx = calloc(1, sizeof(*ctx)); 2069 if (!ctx) { 2070 _spdk_bs_free(bs); 2071 cb_fn(cb_arg, NULL, -ENOMEM); 2072 return; 2073 } 2074 2075 ctx->bs = bs; 2076 2077 /* Allocate memory for the super block */ 2078 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2079 if (!ctx->super) { 2080 free(ctx); 2081 _spdk_bs_free(bs); 2082 return; 2083 } 2084 2085 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2086 cpl.u.bs_handle.cb_fn = cb_fn; 2087 cpl.u.bs_handle.cb_arg = cb_arg; 2088 cpl.u.bs_handle.bs = bs; 2089 2090 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2091 if (!seq) { 2092 spdk_dma_free(ctx->super); 2093 free(ctx); 2094 _spdk_bs_free(bs); 2095 cb_fn(cb_arg, NULL, -ENOMEM); 2096 return; 2097 } 2098 2099 /* Read the super block */ 2100 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2101 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2102 _spdk_bs_load_super_cpl, ctx); 2103 } 2104 2105 /* END spdk_bs_load */ 2106 2107 /* START spdk_bs_init */ 2108 2109 struct spdk_bs_init_ctx { 2110 struct spdk_blob_store *bs; 2111 struct spdk_bs_super_block *super; 2112 }; 2113 2114 static void 2115 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2116 { 2117 struct spdk_bs_init_ctx *ctx = cb_arg; 2118 2119 spdk_dma_free(ctx->super); 2120 free(ctx); 2121 2122 spdk_bs_sequence_finish(seq, bserrno); 2123 } 2124 2125 static void 2126 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2127 { 2128 struct spdk_bs_init_ctx *ctx = cb_arg; 2129 2130 /* Write super block */ 2131 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2132 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2133 _spdk_bs_init_persist_super_cpl, ctx); 2134 } 2135 2136 void 2137 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2138 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2139 { 2140 struct spdk_bs_init_ctx *ctx; 2141 struct spdk_blob_store *bs; 2142 struct spdk_bs_cpl cpl; 2143 spdk_bs_sequence_t *seq; 2144 spdk_bs_batch_t *batch; 2145 uint64_t num_md_lba; 2146 uint64_t num_md_pages; 2147 uint64_t num_md_clusters; 2148 uint32_t i; 2149 struct spdk_bs_opts opts = {}; 2150 int rc; 2151 2152 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Initializing blobstore on dev %p\n", dev); 2153 2154 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2155 SPDK_ERRLOG("unsupported dev block length of %d\n", 2156 dev->blocklen); 2157 dev->destroy(dev); 2158 cb_fn(cb_arg, NULL, -EINVAL); 2159 return; 2160 } 2161 2162 if (o) { 2163 opts = *o; 2164 } else { 2165 spdk_bs_opts_init(&opts); 2166 } 2167 2168 if (_spdk_bs_opts_verify(&opts) != 0) { 2169 dev->destroy(dev); 2170 cb_fn(cb_arg, NULL, -EINVAL); 2171 return; 2172 } 2173 2174 bs = _spdk_bs_alloc(dev, &opts); 2175 if (!bs) { 2176 dev->destroy(dev); 2177 cb_fn(cb_arg, NULL, -ENOMEM); 2178 return; 2179 } 2180 2181 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2182 /* By default, allocate 1 page per cluster. 2183 * Technically, this over-allocates metadata 2184 * because more metadata will reduce the number 2185 * of usable clusters. This can be addressed with 2186 * more complex math in the future. 2187 */ 2188 bs->md_len = bs->total_clusters; 2189 } else { 2190 bs->md_len = opts.num_md_pages; 2191 } 2192 2193 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2194 if (rc < 0) { 2195 _spdk_bs_free(bs); 2196 cb_fn(cb_arg, NULL, -ENOMEM); 2197 return; 2198 } 2199 2200 ctx = calloc(1, sizeof(*ctx)); 2201 if (!ctx) { 2202 _spdk_bs_free(bs); 2203 cb_fn(cb_arg, NULL, -ENOMEM); 2204 return; 2205 } 2206 2207 ctx->bs = bs; 2208 2209 /* Allocate memory for the super block */ 2210 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2211 if (!ctx->super) { 2212 free(ctx); 2213 _spdk_bs_free(bs); 2214 return; 2215 } 2216 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2217 sizeof(ctx->super->signature)); 2218 ctx->super->version = SPDK_BS_VERSION; 2219 ctx->super->length = sizeof(*ctx->super); 2220 ctx->super->super_blob = bs->super_blob; 2221 ctx->super->clean = 0; 2222 ctx->super->cluster_size = bs->cluster_sz; 2223 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2224 2225 /* Calculate how many pages the metadata consumes at the front 2226 * of the disk. 2227 */ 2228 2229 /* The super block uses 1 page */ 2230 num_md_pages = 1; 2231 2232 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2233 * up to the nearest page, plus a header. 2234 */ 2235 ctx->super->used_page_mask_start = num_md_pages; 2236 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2237 divide_round_up(bs->md_len, 8), 2238 SPDK_BS_PAGE_SIZE); 2239 num_md_pages += ctx->super->used_page_mask_len; 2240 2241 /* The used_clusters mask requires 1 bit per cluster, rounded 2242 * up to the nearest page, plus a header. 2243 */ 2244 ctx->super->used_cluster_mask_start = num_md_pages; 2245 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2246 divide_round_up(bs->total_clusters, 8), 2247 SPDK_BS_PAGE_SIZE); 2248 num_md_pages += ctx->super->used_cluster_mask_len; 2249 2250 /* The metadata region size was chosen above */ 2251 ctx->super->md_start = bs->md_start = num_md_pages; 2252 ctx->super->md_len = bs->md_len; 2253 num_md_pages += bs->md_len; 2254 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2255 2256 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2257 2258 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2259 if (num_md_clusters > bs->total_clusters) { 2260 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2261 "please decrease number of pages reserved for metadata " 2262 "or increase cluster size.\n"); 2263 spdk_dma_free(ctx->super); 2264 free(ctx); 2265 _spdk_bs_free(bs); 2266 cb_fn(cb_arg, NULL, -ENOMEM); 2267 return; 2268 } 2269 /* Claim all of the clusters used by the metadata */ 2270 for (i = 0; i < num_md_clusters; i++) { 2271 _spdk_bs_claim_cluster(bs, i); 2272 } 2273 2274 bs->total_data_clusters = bs->num_free_clusters; 2275 2276 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2277 cpl.u.bs_handle.cb_fn = cb_fn; 2278 cpl.u.bs_handle.cb_arg = cb_arg; 2279 cpl.u.bs_handle.bs = bs; 2280 2281 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2282 if (!seq) { 2283 spdk_dma_free(ctx->super); 2284 free(ctx); 2285 _spdk_bs_free(bs); 2286 cb_fn(cb_arg, NULL, -ENOMEM); 2287 return; 2288 } 2289 2290 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2291 2292 /* Clear metadata space */ 2293 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2294 /* Trim data clusters */ 2295 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2296 2297 spdk_bs_batch_close(batch); 2298 } 2299 2300 /* END spdk_bs_init */ 2301 2302 /* START spdk_bs_destroy */ 2303 2304 static void 2305 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2306 { 2307 struct spdk_bs_init_ctx *ctx = cb_arg; 2308 struct spdk_blob_store *bs = ctx->bs; 2309 2310 /* 2311 * We need to defer calling spdk_bs_call_cpl() until after 2312 * dev destruction, so tuck these away for later use. 2313 */ 2314 bs->unload_err = bserrno; 2315 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2316 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2317 2318 spdk_bs_sequence_finish(seq, bserrno); 2319 2320 _spdk_bs_free(bs); 2321 free(ctx); 2322 } 2323 2324 void 2325 spdk_bs_destroy(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, 2326 void *cb_arg) 2327 { 2328 struct spdk_bs_cpl cpl; 2329 spdk_bs_sequence_t *seq; 2330 struct spdk_bs_init_ctx *ctx; 2331 2332 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Destroying blobstore\n"); 2333 2334 if (!TAILQ_EMPTY(&bs->blobs)) { 2335 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2336 cb_fn(cb_arg, -EBUSY); 2337 return; 2338 } 2339 2340 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2341 cpl.u.bs_basic.cb_fn = cb_fn; 2342 cpl.u.bs_basic.cb_arg = cb_arg; 2343 2344 ctx = calloc(1, sizeof(*ctx)); 2345 if (!ctx) { 2346 cb_fn(cb_arg, -ENOMEM); 2347 return; 2348 } 2349 2350 ctx->bs = bs; 2351 2352 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2353 if (!seq) { 2354 free(ctx); 2355 cb_fn(cb_arg, -ENOMEM); 2356 return; 2357 } 2358 2359 /* Write zeroes to the super block */ 2360 spdk_bs_sequence_write_zeroes(seq, 2361 _spdk_bs_page_to_lba(bs, 0), 2362 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_bs_super_block)), 2363 _spdk_bs_destroy_trim_cpl, ctx); 2364 } 2365 2366 /* END spdk_bs_destroy */ 2367 2368 /* START spdk_bs_unload */ 2369 2370 static void 2371 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2372 { 2373 struct spdk_bs_load_ctx *ctx = cb_arg; 2374 2375 spdk_dma_free(ctx->super); 2376 2377 /* 2378 * We need to defer calling spdk_bs_call_cpl() until after 2379 * dev destuction, so tuck these away for later use. 2380 */ 2381 ctx->bs->unload_err = bserrno; 2382 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2383 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2384 2385 spdk_bs_sequence_finish(seq, bserrno); 2386 2387 _spdk_bs_free(ctx->bs); 2388 free(ctx); 2389 } 2390 2391 static void 2392 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2393 { 2394 struct spdk_bs_load_ctx *ctx = cb_arg; 2395 2396 spdk_dma_free(ctx->mask); 2397 ctx->super->clean = 1; 2398 2399 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2400 } 2401 2402 static void 2403 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2404 { 2405 struct spdk_bs_load_ctx *ctx = cb_arg; 2406 2407 spdk_dma_free(ctx->mask); 2408 2409 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2410 } 2411 2412 static void 2413 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2414 { 2415 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2416 } 2417 2418 void 2419 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2420 { 2421 struct spdk_bs_cpl cpl; 2422 spdk_bs_sequence_t *seq; 2423 struct spdk_bs_load_ctx *ctx; 2424 2425 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blobstore\n"); 2426 2427 if (!TAILQ_EMPTY(&bs->blobs)) { 2428 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2429 cb_fn(cb_arg, -EBUSY); 2430 return; 2431 } 2432 2433 ctx = calloc(1, sizeof(*ctx)); 2434 if (!ctx) { 2435 cb_fn(cb_arg, -ENOMEM); 2436 return; 2437 } 2438 2439 ctx->bs = bs; 2440 2441 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2442 if (!ctx->super) { 2443 free(ctx); 2444 cb_fn(cb_arg, -ENOMEM); 2445 return; 2446 } 2447 2448 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2449 cpl.u.bs_basic.cb_fn = cb_fn; 2450 cpl.u.bs_basic.cb_arg = cb_arg; 2451 2452 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2453 if (!seq) { 2454 spdk_dma_free(ctx->super); 2455 free(ctx); 2456 cb_fn(cb_arg, -ENOMEM); 2457 return; 2458 } 2459 2460 /* Read super block */ 2461 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2462 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2463 _spdk_bs_unload_read_super_cpl, ctx); 2464 } 2465 2466 /* END spdk_bs_unload */ 2467 2468 void 2469 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2470 spdk_bs_op_complete cb_fn, void *cb_arg) 2471 { 2472 bs->super_blob = blobid; 2473 cb_fn(cb_arg, 0); 2474 } 2475 2476 void 2477 spdk_bs_get_super(struct spdk_blob_store *bs, 2478 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2479 { 2480 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2481 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2482 } else { 2483 cb_fn(cb_arg, bs->super_blob, 0); 2484 } 2485 } 2486 2487 uint64_t 2488 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2489 { 2490 return bs->cluster_sz; 2491 } 2492 2493 uint64_t 2494 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2495 { 2496 return SPDK_BS_PAGE_SIZE; 2497 } 2498 2499 uint64_t 2500 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2501 { 2502 return bs->num_free_clusters; 2503 } 2504 2505 uint64_t 2506 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2507 { 2508 return bs->total_data_clusters; 2509 } 2510 2511 static int 2512 spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2513 { 2514 bs->md_channel = spdk_get_io_channel(bs); 2515 if (!bs->md_channel) { 2516 SPDK_ERRLOG("Failed to get IO channel.\n"); 2517 return -1; 2518 } 2519 2520 return 0; 2521 } 2522 2523 static int 2524 spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2525 { 2526 spdk_put_io_channel(bs->md_channel); 2527 2528 return 0; 2529 } 2530 2531 spdk_blob_id spdk_blob_get_id(struct spdk_blob *_blob) 2532 { 2533 struct spdk_blob_data *blob = __blob_to_data(_blob); 2534 2535 assert(blob != NULL); 2536 2537 return blob->id; 2538 } 2539 2540 uint64_t spdk_blob_get_num_pages(struct spdk_blob *_blob) 2541 { 2542 struct spdk_blob_data *blob = __blob_to_data(_blob); 2543 2544 assert(blob != NULL); 2545 2546 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2547 } 2548 2549 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *_blob) 2550 { 2551 struct spdk_blob_data *blob = __blob_to_data(_blob); 2552 2553 assert(blob != NULL); 2554 2555 return blob->active.num_clusters; 2556 } 2557 2558 /* START spdk_bs_create_blob */ 2559 2560 static void 2561 _spdk_bs_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2562 { 2563 struct spdk_blob_data *blob = cb_arg; 2564 2565 _spdk_blob_free(blob); 2566 2567 spdk_bs_sequence_finish(seq, bserrno); 2568 } 2569 2570 void spdk_bs_create_blob(struct spdk_blob_store *bs, 2571 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2572 { 2573 struct spdk_blob_data *blob; 2574 uint32_t page_idx; 2575 struct spdk_bs_cpl cpl; 2576 spdk_bs_sequence_t *seq; 2577 spdk_blob_id id; 2578 2579 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2580 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2581 cb_fn(cb_arg, 0, -ENOMEM); 2582 return; 2583 } 2584 spdk_bit_array_set(bs->used_md_pages, page_idx); 2585 2586 id = _spdk_bs_page_to_blobid(page_idx); 2587 2588 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2589 2590 blob = _spdk_blob_alloc(bs, id); 2591 if (!blob) { 2592 cb_fn(cb_arg, 0, -ENOMEM); 2593 return; 2594 } 2595 2596 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2597 cpl.u.blobid.cb_fn = cb_fn; 2598 cpl.u.blobid.cb_arg = cb_arg; 2599 cpl.u.blobid.blobid = blob->id; 2600 2601 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2602 if (!seq) { 2603 _spdk_blob_free(blob); 2604 cb_fn(cb_arg, 0, -ENOMEM); 2605 return; 2606 } 2607 2608 _spdk_blob_persist(seq, blob, _spdk_bs_create_blob_cpl, blob); 2609 } 2610 2611 /* END spdk_bs_create_blob */ 2612 2613 /* START spdk_blob_resize */ 2614 int 2615 spdk_blob_resize(struct spdk_blob *_blob, uint64_t sz) 2616 { 2617 struct spdk_blob_data *blob = __blob_to_data(_blob); 2618 int rc; 2619 2620 assert(blob != NULL); 2621 2622 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2623 2624 if (blob->md_ro) { 2625 return -EPERM; 2626 } 2627 2628 if (sz == blob->active.num_clusters) { 2629 return 0; 2630 } 2631 2632 rc = _spdk_resize_blob(blob, sz); 2633 if (rc < 0) { 2634 return rc; 2635 } 2636 2637 return 0; 2638 } 2639 2640 /* END spdk_blob_resize */ 2641 2642 2643 /* START spdk_bs_delete_blob */ 2644 2645 static void 2646 _spdk_bs_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2647 { 2648 struct spdk_blob_data *blob = cb_arg; 2649 2650 _spdk_blob_free(blob); 2651 2652 spdk_bs_sequence_finish(seq, bserrno); 2653 } 2654 2655 static void 2656 _spdk_bs_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2657 { 2658 struct spdk_blob_data *blob = cb_arg; 2659 2660 /* If the blob have crc error, we just return NULL. */ 2661 if (blob == NULL) { 2662 spdk_bs_sequence_finish(seq, bserrno); 2663 return; 2664 } 2665 blob->state = SPDK_BLOB_STATE_DIRTY; 2666 blob->active.num_pages = 0; 2667 _spdk_resize_blob(blob, 0); 2668 2669 _spdk_blob_persist(seq, blob, _spdk_bs_delete_blob_cpl, blob); 2670 } 2671 2672 void 2673 spdk_bs_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2674 spdk_blob_op_complete cb_fn, void *cb_arg) 2675 { 2676 struct spdk_blob_data *blob; 2677 struct spdk_bs_cpl cpl; 2678 spdk_bs_sequence_t *seq; 2679 2680 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Deleting blob %lu\n", blobid); 2681 2682 blob = _spdk_blob_lookup(bs, blobid); 2683 if (blob) { 2684 assert(blob->open_ref > 0); 2685 cb_fn(cb_arg, -EINVAL); 2686 return; 2687 } 2688 2689 blob = _spdk_blob_alloc(bs, blobid); 2690 if (!blob) { 2691 cb_fn(cb_arg, -ENOMEM); 2692 return; 2693 } 2694 2695 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2696 cpl.u.blob_basic.cb_fn = cb_fn; 2697 cpl.u.blob_basic.cb_arg = cb_arg; 2698 2699 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2700 if (!seq) { 2701 _spdk_blob_free(blob); 2702 cb_fn(cb_arg, -ENOMEM); 2703 return; 2704 } 2705 2706 _spdk_blob_load(seq, blob, _spdk_bs_delete_open_cpl, blob); 2707 } 2708 2709 /* END spdk_bs_delete_blob */ 2710 2711 /* START spdk_bs_open_blob */ 2712 2713 static void 2714 _spdk_bs_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2715 { 2716 struct spdk_blob_data *blob = cb_arg; 2717 2718 /* If the blob have crc error, we just return NULL. */ 2719 if (blob == NULL) { 2720 seq->cpl.u.blob_handle.blob = NULL; 2721 spdk_bs_sequence_finish(seq, bserrno); 2722 return; 2723 } 2724 2725 blob->open_ref++; 2726 2727 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2728 2729 spdk_bs_sequence_finish(seq, bserrno); 2730 } 2731 2732 void spdk_bs_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2733 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2734 { 2735 struct spdk_blob_data *blob; 2736 struct spdk_bs_cpl cpl; 2737 spdk_bs_sequence_t *seq; 2738 uint32_t page_num; 2739 2740 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Opening blob %lu\n", blobid); 2741 2742 blob = _spdk_blob_lookup(bs, blobid); 2743 if (blob) { 2744 blob->open_ref++; 2745 cb_fn(cb_arg, __data_to_blob(blob), 0); 2746 return; 2747 } 2748 2749 page_num = _spdk_bs_blobid_to_page(blobid); 2750 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2751 /* Invalid blobid */ 2752 cb_fn(cb_arg, NULL, -ENOENT); 2753 return; 2754 } 2755 2756 blob = _spdk_blob_alloc(bs, blobid); 2757 if (!blob) { 2758 cb_fn(cb_arg, NULL, -ENOMEM); 2759 return; 2760 } 2761 2762 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2763 cpl.u.blob_handle.cb_fn = cb_fn; 2764 cpl.u.blob_handle.cb_arg = cb_arg; 2765 cpl.u.blob_handle.blob = __data_to_blob(blob); 2766 2767 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 2768 if (!seq) { 2769 _spdk_blob_free(blob); 2770 cb_fn(cb_arg, NULL, -ENOMEM); 2771 return; 2772 } 2773 2774 _spdk_blob_load(seq, blob, _spdk_bs_open_blob_cpl, blob); 2775 } 2776 2777 /* END spdk_bs_open_blob */ 2778 2779 /* START spdk_blob_sync_md */ 2780 2781 static void 2782 _spdk_blob_sync_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2783 { 2784 spdk_bs_sequence_finish(seq, bserrno); 2785 } 2786 2787 void 2788 spdk_blob_sync_md(struct spdk_blob *_blob, spdk_blob_op_complete cb_fn, void *cb_arg) 2789 { 2790 struct spdk_blob_data *blob = __blob_to_data(_blob); 2791 struct spdk_bs_cpl cpl; 2792 spdk_bs_sequence_t *seq; 2793 2794 assert(blob != NULL); 2795 2796 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Syncing blob %lu\n", blob->id); 2797 2798 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2799 blob->state != SPDK_BLOB_STATE_SYNCING); 2800 2801 if (blob->md_ro) { 2802 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 2803 return; 2804 } 2805 2806 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2807 cb_fn(cb_arg, 0); 2808 return; 2809 } 2810 2811 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2812 cpl.u.blob_basic.cb_fn = cb_fn; 2813 cpl.u.blob_basic.cb_arg = cb_arg; 2814 2815 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2816 if (!seq) { 2817 cb_fn(cb_arg, -ENOMEM); 2818 return; 2819 } 2820 2821 _spdk_blob_persist(seq, blob, _spdk_blob_sync_md_cpl, blob); 2822 } 2823 2824 /* END spdk_blob_sync_md */ 2825 2826 /* START spdk_blob_close */ 2827 2828 static void 2829 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2830 { 2831 struct spdk_blob_data *blob = cb_arg; 2832 2833 if (blob->open_ref == 0) { 2834 TAILQ_REMOVE(&blob->bs->blobs, blob, link); 2835 _spdk_blob_free(blob); 2836 } 2837 2838 spdk_bs_sequence_finish(seq, bserrno); 2839 } 2840 2841 void spdk_blob_close(struct spdk_blob *b, spdk_blob_op_complete cb_fn, void *cb_arg) 2842 { 2843 struct spdk_bs_cpl cpl; 2844 struct spdk_blob_data *blob; 2845 spdk_bs_sequence_t *seq; 2846 2847 assert(b != NULL); 2848 blob = __blob_to_data(b); 2849 assert(blob != NULL); 2850 2851 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "Closing blob %lu\n", blob->id); 2852 2853 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2854 blob->state != SPDK_BLOB_STATE_SYNCING); 2855 2856 if (blob->open_ref == 0) { 2857 cb_fn(cb_arg, -EBADF); 2858 return; 2859 } 2860 2861 blob->open_ref--; 2862 2863 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2864 cpl.u.blob_basic.cb_fn = cb_fn; 2865 cpl.u.blob_basic.cb_arg = cb_arg; 2866 2867 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2868 if (!seq) { 2869 cb_fn(cb_arg, -ENOMEM); 2870 return; 2871 } 2872 2873 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2874 _spdk_blob_close_cpl(seq, blob, 0); 2875 return; 2876 } 2877 2878 /* Sync metadata */ 2879 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, blob); 2880 } 2881 2882 /* END spdk_blob_close */ 2883 2884 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2885 { 2886 return spdk_get_io_channel(bs); 2887 } 2888 2889 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2890 { 2891 spdk_put_io_channel(channel); 2892 } 2893 2894 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2895 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2896 { 2897 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2898 SPDK_BLOB_UNMAP); 2899 } 2900 2901 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2902 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2903 { 2904 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2905 SPDK_BLOB_WRITE_ZEROES); 2906 } 2907 2908 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2909 void *payload, uint64_t offset, uint64_t length, 2910 spdk_blob_op_complete cb_fn, void *cb_arg) 2911 { 2912 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2913 SPDK_BLOB_WRITE); 2914 } 2915 2916 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2917 void *payload, uint64_t offset, uint64_t length, 2918 spdk_blob_op_complete cb_fn, void *cb_arg) 2919 { 2920 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2921 SPDK_BLOB_READ); 2922 } 2923 2924 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2925 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2926 spdk_blob_op_complete cb_fn, void *cb_arg) 2927 { 2928 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2929 } 2930 2931 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2932 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2933 spdk_blob_op_complete cb_fn, void *cb_arg) 2934 { 2935 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2936 } 2937 2938 struct spdk_bs_iter_ctx { 2939 int64_t page_num; 2940 struct spdk_blob_store *bs; 2941 2942 spdk_blob_op_with_handle_complete cb_fn; 2943 void *cb_arg; 2944 }; 2945 2946 static void 2947 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *_blob, int bserrno) 2948 { 2949 struct spdk_bs_iter_ctx *ctx = cb_arg; 2950 struct spdk_blob_store *bs = ctx->bs; 2951 spdk_blob_id id; 2952 2953 if (bserrno == 0) { 2954 ctx->cb_fn(ctx->cb_arg, _blob, bserrno); 2955 free(ctx); 2956 return; 2957 } 2958 2959 ctx->page_num++; 2960 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2961 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2962 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2963 free(ctx); 2964 return; 2965 } 2966 2967 id = _spdk_bs_page_to_blobid(ctx->page_num); 2968 2969 spdk_bs_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2970 } 2971 2972 void 2973 spdk_bs_iter_first(struct spdk_blob_store *bs, 2974 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2975 { 2976 struct spdk_bs_iter_ctx *ctx; 2977 2978 ctx = calloc(1, sizeof(*ctx)); 2979 if (!ctx) { 2980 cb_fn(cb_arg, NULL, -ENOMEM); 2981 return; 2982 } 2983 2984 ctx->page_num = -1; 2985 ctx->bs = bs; 2986 ctx->cb_fn = cb_fn; 2987 ctx->cb_arg = cb_arg; 2988 2989 _spdk_bs_iter_cpl(ctx, NULL, -1); 2990 } 2991 2992 static void 2993 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2994 { 2995 struct spdk_bs_iter_ctx *ctx = cb_arg; 2996 2997 _spdk_bs_iter_cpl(ctx, NULL, -1); 2998 } 2999 3000 void 3001 spdk_bs_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 3002 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 3003 { 3004 struct spdk_bs_iter_ctx *ctx; 3005 struct spdk_blob_data *blob; 3006 3007 assert(b != NULL); 3008 blob = __blob_to_data(*b); 3009 assert(blob != NULL); 3010 3011 ctx = calloc(1, sizeof(*ctx)); 3012 if (!ctx) { 3013 cb_fn(cb_arg, NULL, -ENOMEM); 3014 return; 3015 } 3016 3017 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 3018 ctx->bs = bs; 3019 ctx->cb_fn = cb_fn; 3020 ctx->cb_arg = cb_arg; 3021 3022 /* Close the existing blob */ 3023 spdk_blob_close(*b, _spdk_bs_iter_close_cpl, ctx); 3024 } 3025 3026 int 3027 spdk_blob_set_xattr(struct spdk_blob *_blob, const char *name, const void *value, 3028 uint16_t value_len) 3029 { 3030 struct spdk_blob_data *blob = __blob_to_data(_blob); 3031 struct spdk_xattr *xattr; 3032 3033 assert(blob != NULL); 3034 3035 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3036 blob->state != SPDK_BLOB_STATE_SYNCING); 3037 3038 if (blob->md_ro) { 3039 return -EPERM; 3040 } 3041 3042 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3043 if (!strcmp(name, xattr->name)) { 3044 free(xattr->value); 3045 xattr->value_len = value_len; 3046 xattr->value = malloc(value_len); 3047 memcpy(xattr->value, value, value_len); 3048 3049 blob->state = SPDK_BLOB_STATE_DIRTY; 3050 3051 return 0; 3052 } 3053 } 3054 3055 xattr = calloc(1, sizeof(*xattr)); 3056 if (!xattr) { 3057 return -1; 3058 } 3059 xattr->name = strdup(name); 3060 xattr->value_len = value_len; 3061 xattr->value = malloc(value_len); 3062 memcpy(xattr->value, value, value_len); 3063 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 3064 3065 blob->state = SPDK_BLOB_STATE_DIRTY; 3066 3067 return 0; 3068 } 3069 3070 int 3071 spdk_blob_remove_xattr(struct spdk_blob *_blob, const char *name) 3072 { 3073 struct spdk_blob_data *blob = __blob_to_data(_blob); 3074 struct spdk_xattr *xattr; 3075 3076 assert(blob != NULL); 3077 3078 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3079 blob->state != SPDK_BLOB_STATE_SYNCING); 3080 3081 if (blob->md_ro) { 3082 return -EPERM; 3083 } 3084 3085 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3086 if (!strcmp(name, xattr->name)) { 3087 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3088 free(xattr->value); 3089 free(xattr->name); 3090 free(xattr); 3091 3092 blob->state = SPDK_BLOB_STATE_DIRTY; 3093 3094 return 0; 3095 } 3096 } 3097 3098 return -ENOENT; 3099 } 3100 3101 int 3102 spdk_blob_get_xattr_value(struct spdk_blob *_blob, const char *name, 3103 const void **value, size_t *value_len) 3104 { 3105 struct spdk_blob_data *blob = __blob_to_data(_blob); 3106 struct spdk_xattr *xattr; 3107 3108 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3109 if (!strcmp(name, xattr->name)) { 3110 *value = xattr->value; 3111 *value_len = xattr->value_len; 3112 return 0; 3113 } 3114 } 3115 3116 return -ENOENT; 3117 } 3118 3119 struct spdk_xattr_names { 3120 uint32_t count; 3121 const char *names[0]; 3122 }; 3123 3124 int 3125 spdk_blob_get_xattr_names(struct spdk_blob *_blob, struct spdk_xattr_names **names) 3126 { 3127 struct spdk_blob_data *blob = __blob_to_data(_blob); 3128 struct spdk_xattr *xattr; 3129 int count = 0; 3130 3131 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3132 count++; 3133 } 3134 3135 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3136 if (*names == NULL) { 3137 return -ENOMEM; 3138 } 3139 3140 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3141 (*names)->names[(*names)->count++] = xattr->name; 3142 } 3143 3144 return 0; 3145 } 3146 3147 uint32_t 3148 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3149 { 3150 assert(names != NULL); 3151 3152 return names->count; 3153 } 3154 3155 const char * 3156 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3157 { 3158 if (index >= names->count) { 3159 return NULL; 3160 } 3161 3162 return names->names[index]; 3163 } 3164 3165 void 3166 spdk_xattr_names_free(struct spdk_xattr_names *names) 3167 { 3168 free(names); 3169 } 3170 3171 struct spdk_bs_type 3172 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3173 { 3174 return bs->bstype; 3175 } 3176 3177 void 3178 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3179 { 3180 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3181 } 3182 3183 SPDK_LOG_REGISTER_COMPONENT("blob", SPDK_LOG_BLOB) 3184