1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 #include "request.h" 48 49 #define BLOB_CRC32C_INITIAL 0xffffffffUL 50 51 static inline size_t 52 divide_round_up(size_t num, size_t divisor) 53 { 54 return (num + divisor - 1) / divisor; 55 } 56 57 static void 58 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 59 { 60 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 61 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 62 assert(bs->num_free_clusters > 0); 63 64 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 65 66 spdk_bit_array_set(bs->used_clusters, cluster_num); 67 bs->num_free_clusters--; 68 } 69 70 static void 71 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 72 { 73 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 74 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 75 assert(bs->num_free_clusters < bs->total_clusters); 76 77 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 78 79 spdk_bit_array_clear(bs->used_clusters, cluster_num); 80 bs->num_free_clusters++; 81 } 82 83 static struct spdk_blob * 84 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 85 { 86 struct spdk_blob *blob; 87 88 blob = calloc(1, sizeof(*blob)); 89 if (!blob) { 90 return NULL; 91 } 92 93 blob->id = id; 94 blob->bs = bs; 95 96 blob->state = SPDK_BLOB_STATE_DIRTY; 97 blob->active.num_pages = 1; 98 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 99 if (!blob->active.pages) { 100 free(blob); 101 return NULL; 102 } 103 104 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 105 106 TAILQ_INIT(&blob->xattrs); 107 108 return blob; 109 } 110 111 static void 112 _spdk_blob_free(struct spdk_blob *blob) 113 { 114 struct spdk_xattr *xattr, *xattr_tmp; 115 116 assert(blob != NULL); 117 118 free(blob->active.clusters); 119 free(blob->clean.clusters); 120 free(blob->active.pages); 121 free(blob->clean.pages); 122 123 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 124 TAILQ_REMOVE(&blob->xattrs, xattr, link); 125 free(xattr->name); 126 free(xattr->value); 127 free(xattr); 128 } 129 130 free(blob); 131 } 132 133 static int 134 _spdk_blob_mark_clean(struct spdk_blob *blob) 135 { 136 uint64_t *clusters = NULL; 137 uint32_t *pages = NULL; 138 139 assert(blob != NULL); 140 assert(blob->state == SPDK_BLOB_STATE_LOADING || 141 blob->state == SPDK_BLOB_STATE_SYNCING); 142 143 if (blob->active.num_clusters) { 144 assert(blob->active.clusters); 145 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 146 if (!clusters) { 147 return -1; 148 } 149 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 150 } 151 152 if (blob->active.num_pages) { 153 assert(blob->active.pages); 154 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 155 if (!pages) { 156 free(clusters); 157 return -1; 158 } 159 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 160 } 161 162 free(blob->clean.clusters); 163 free(blob->clean.pages); 164 165 blob->clean.num_clusters = blob->active.num_clusters; 166 blob->clean.clusters = blob->active.clusters; 167 blob->clean.num_pages = blob->active.num_pages; 168 blob->clean.pages = blob->active.pages; 169 170 blob->active.clusters = clusters; 171 blob->active.pages = pages; 172 173 blob->state = SPDK_BLOB_STATE_CLEAN; 174 175 return 0; 176 } 177 178 static void 179 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 180 { 181 struct spdk_blob_md_descriptor *desc; 182 size_t cur_desc = 0; 183 void *tmp; 184 185 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 186 while (cur_desc < sizeof(page->descriptors)) { 187 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 188 if (desc->length == 0) { 189 /* If padding and length are 0, this terminates the page */ 190 break; 191 } 192 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 193 struct spdk_blob_md_descriptor_extent *desc_extent; 194 unsigned int i, j; 195 unsigned int cluster_count = blob->active.num_clusters; 196 197 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 198 199 assert(desc_extent->length > 0); 200 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 201 202 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 203 for (j = 0; j < desc_extent->extents[i].length; j++) { 204 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 205 cluster_count++; 206 } 207 } 208 209 assert(cluster_count > 0); 210 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 211 assert(tmp != NULL); 212 blob->active.clusters = tmp; 213 blob->active.cluster_array_size = cluster_count; 214 215 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 216 for (j = 0; j < desc_extent->extents[i].length; j++) { 217 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 218 desc_extent->extents[i].cluster_idx + j); 219 } 220 } 221 222 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 223 struct spdk_blob_md_descriptor_xattr *desc_xattr; 224 struct spdk_xattr *xattr; 225 226 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 227 228 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 229 sizeof(desc_xattr->value_length) + 230 desc_xattr->name_length + desc_xattr->value_length); 231 232 xattr = calloc(1, sizeof(*xattr)); 233 assert(xattr != NULL); 234 235 xattr->name = malloc(desc_xattr->name_length + 1); 236 assert(xattr->name); 237 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 238 xattr->name[desc_xattr->name_length] = '\0'; 239 240 xattr->value = malloc(desc_xattr->value_length); 241 assert(xattr->value != NULL); 242 xattr->value_len = desc_xattr->value_length; 243 memcpy(xattr->value, 244 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 245 desc_xattr->value_length); 246 247 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 248 } else { 249 /* Error */ 250 break; 251 } 252 253 /* Advance to the next descriptor */ 254 cur_desc += sizeof(*desc) + desc->length; 255 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 256 break; 257 } 258 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 259 } 260 } 261 262 static int 263 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 264 struct spdk_blob *blob) 265 { 266 const struct spdk_blob_md_page *page; 267 uint32_t i; 268 269 assert(page_count > 0); 270 assert(pages[0].sequence_num == 0); 271 assert(blob != NULL); 272 assert(blob->state == SPDK_BLOB_STATE_LOADING); 273 assert(blob->active.clusters == NULL); 274 assert(blob->id == pages[0].id); 275 assert(blob->state == SPDK_BLOB_STATE_LOADING); 276 277 for (i = 0; i < page_count; i++) { 278 page = &pages[i]; 279 280 assert(page->id == blob->id); 281 assert(page->sequence_num == i); 282 283 _spdk_blob_parse_page(page, blob); 284 } 285 286 return 0; 287 } 288 289 static int 290 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 291 struct spdk_blob_md_page **pages, 292 uint32_t *page_count, 293 struct spdk_blob_md_page **last_page) 294 { 295 struct spdk_blob_md_page *page; 296 297 assert(pages != NULL); 298 assert(page_count != NULL); 299 300 if (*page_count == 0) { 301 assert(*pages == NULL); 302 *page_count = 1; 303 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 304 SPDK_BS_PAGE_SIZE, 305 NULL); 306 } else { 307 assert(*pages != NULL); 308 (*page_count)++; 309 *pages = spdk_dma_realloc(*pages, 310 SPDK_BS_PAGE_SIZE * (*page_count), 311 SPDK_BS_PAGE_SIZE, 312 NULL); 313 } 314 315 if (*pages == NULL) { 316 *page_count = 0; 317 *last_page = NULL; 318 return -ENOMEM; 319 } 320 321 page = &(*pages)[*page_count - 1]; 322 memset(page, 0, sizeof(*page)); 323 page->id = blob->id; 324 page->sequence_num = *page_count - 1; 325 page->next = SPDK_INVALID_MD_PAGE; 326 *last_page = page; 327 328 return 0; 329 } 330 331 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 332 * Update required_sz on both success and failure. 333 * 334 */ 335 static int 336 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 337 uint8_t *buf, size_t buf_sz, 338 size_t *required_sz) 339 { 340 struct spdk_blob_md_descriptor_xattr *desc; 341 342 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 343 strlen(xattr->name) + 344 xattr->value_len; 345 346 if (buf_sz < *required_sz) { 347 return -1; 348 } 349 350 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 351 352 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 353 desc->length = sizeof(desc->name_length) + 354 sizeof(desc->value_length) + 355 strlen(xattr->name) + 356 xattr->value_len; 357 desc->name_length = strlen(xattr->name); 358 desc->value_length = xattr->value_len; 359 360 memcpy(desc->name, xattr->name, desc->name_length); 361 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 362 xattr->value, 363 desc->value_length); 364 365 return 0; 366 } 367 368 static void 369 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 370 uint64_t start_cluster, uint64_t *next_cluster, 371 uint8_t *buf, size_t buf_sz) 372 { 373 struct spdk_blob_md_descriptor_extent *desc; 374 size_t cur_sz; 375 uint64_t i, extent_idx; 376 uint32_t lba, lba_per_cluster, lba_count; 377 378 /* The buffer must have room for at least one extent */ 379 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 380 if (buf_sz < cur_sz) { 381 *next_cluster = start_cluster; 382 return; 383 } 384 385 desc = (struct spdk_blob_md_descriptor_extent *)buf; 386 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 387 388 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 389 390 lba = blob->active.clusters[start_cluster]; 391 lba_count = lba_per_cluster; 392 extent_idx = 0; 393 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 394 if ((lba + lba_count) == blob->active.clusters[i]) { 395 lba_count += lba_per_cluster; 396 continue; 397 } 398 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 399 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 400 extent_idx++; 401 402 cur_sz += sizeof(desc->extents[extent_idx]); 403 404 if (buf_sz < cur_sz) { 405 /* If we ran out of buffer space, return */ 406 desc->length = sizeof(desc->extents[0]) * extent_idx; 407 *next_cluster = i; 408 return; 409 } 410 411 lba = blob->active.clusters[i]; 412 lba_count = lba_per_cluster; 413 } 414 415 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 416 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 417 extent_idx++; 418 419 desc->length = sizeof(desc->extents[0]) * extent_idx; 420 *next_cluster = blob->active.num_clusters; 421 422 return; 423 } 424 425 static int 426 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 427 uint32_t *page_count) 428 { 429 struct spdk_blob_md_page *cur_page; 430 const struct spdk_xattr *xattr; 431 int rc; 432 uint8_t *buf; 433 size_t remaining_sz; 434 uint64_t last_cluster; 435 436 assert(pages != NULL); 437 assert(page_count != NULL); 438 assert(blob != NULL); 439 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 440 441 *pages = NULL; 442 *page_count = 0; 443 444 /* A blob always has at least 1 page, even if it has no descriptors */ 445 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 446 if (rc < 0) { 447 return rc; 448 } 449 450 buf = (uint8_t *)cur_page->descriptors; 451 remaining_sz = sizeof(cur_page->descriptors); 452 453 /* Serialize xattrs */ 454 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 455 size_t required_sz = 0; 456 rc = _spdk_blob_serialize_xattr(xattr, 457 buf, remaining_sz, 458 &required_sz); 459 if (rc < 0) { 460 /* Need to add a new page to the chain */ 461 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 462 &cur_page); 463 if (rc < 0) { 464 spdk_dma_free(*pages); 465 *pages = NULL; 466 *page_count = 0; 467 return rc; 468 } 469 470 buf = (uint8_t *)cur_page->descriptors; 471 remaining_sz = sizeof(cur_page->descriptors); 472 473 /* Try again */ 474 required_sz = 0; 475 rc = _spdk_blob_serialize_xattr(xattr, 476 buf, remaining_sz, 477 &required_sz); 478 479 if (rc < 0) { 480 spdk_dma_free(*pages); 481 *pages = NULL; 482 *page_count = 0; 483 return -1; 484 } 485 } 486 487 remaining_sz -= required_sz; 488 buf += required_sz; 489 } 490 491 /* Serialize extents */ 492 last_cluster = 0; 493 while (last_cluster < blob->active.num_clusters) { 494 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 495 buf, remaining_sz); 496 497 if (last_cluster == blob->active.num_clusters) { 498 break; 499 } 500 501 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 502 &cur_page); 503 if (rc < 0) { 504 return rc; 505 } 506 507 buf = (uint8_t *)cur_page->descriptors; 508 remaining_sz = sizeof(cur_page->descriptors); 509 } 510 511 return 0; 512 } 513 514 struct spdk_blob_load_ctx { 515 struct spdk_blob *blob; 516 517 struct spdk_blob_md_page *pages; 518 uint32_t num_pages; 519 520 spdk_bs_sequence_cpl cb_fn; 521 void *cb_arg; 522 }; 523 524 static uint32_t 525 _spdk_blob_md_page_calc_crc(void *page) 526 { 527 uint32_t crc; 528 529 crc = BLOB_CRC32C_INITIAL; 530 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 531 crc ^= BLOB_CRC32C_INITIAL; 532 533 return crc; 534 535 } 536 537 static void 538 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 539 { 540 struct spdk_blob_load_ctx *ctx = cb_arg; 541 struct spdk_blob *blob = ctx->blob; 542 struct spdk_blob_md_page *page; 543 int rc; 544 uint32_t crc; 545 546 page = &ctx->pages[ctx->num_pages - 1]; 547 crc = _spdk_blob_md_page_calc_crc(page); 548 if (crc != page->crc) { 549 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 550 _spdk_blob_free(blob); 551 ctx->cb_fn(seq, NULL, -EINVAL); 552 spdk_dma_free(ctx->pages); 553 free(ctx); 554 return; 555 } 556 557 if (page->next != SPDK_INVALID_MD_PAGE) { 558 uint32_t next_page = page->next; 559 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 560 561 562 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 563 564 /* Read the next page */ 565 ctx->num_pages++; 566 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 567 sizeof(*page), NULL); 568 if (ctx->pages == NULL) { 569 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 570 free(ctx); 571 return; 572 } 573 574 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 575 next_lba, 576 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 577 _spdk_blob_load_cpl, ctx); 578 return; 579 } 580 581 /* Parse the pages */ 582 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 583 584 _spdk_blob_mark_clean(blob); 585 586 ctx->cb_fn(seq, ctx->cb_arg, rc); 587 588 /* Free the memory */ 589 spdk_dma_free(ctx->pages); 590 free(ctx); 591 } 592 593 /* Load a blob from disk given a blobid */ 594 static void 595 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 596 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 597 { 598 struct spdk_blob_load_ctx *ctx; 599 struct spdk_blob_store *bs; 600 uint32_t page_num; 601 uint64_t lba; 602 603 assert(blob != NULL); 604 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 605 blob->state == SPDK_BLOB_STATE_DIRTY); 606 607 bs = blob->bs; 608 609 ctx = calloc(1, sizeof(*ctx)); 610 if (!ctx) { 611 cb_fn(seq, cb_arg, -ENOMEM); 612 return; 613 } 614 615 ctx->blob = blob; 616 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 617 SPDK_BS_PAGE_SIZE, NULL); 618 if (!ctx->pages) { 619 free(ctx); 620 cb_fn(seq, cb_arg, -ENOMEM); 621 return; 622 } 623 ctx->num_pages = 1; 624 ctx->cb_fn = cb_fn; 625 ctx->cb_arg = cb_arg; 626 627 page_num = _spdk_bs_blobid_to_page(blob->id); 628 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 629 630 blob->state = SPDK_BLOB_STATE_LOADING; 631 632 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 633 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 634 _spdk_blob_load_cpl, ctx); 635 } 636 637 struct spdk_blob_persist_ctx { 638 struct spdk_blob *blob; 639 640 struct spdk_blob_md_page *pages; 641 642 uint64_t idx; 643 644 spdk_bs_sequence_cpl cb_fn; 645 void *cb_arg; 646 }; 647 648 static void 649 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 650 { 651 struct spdk_blob_persist_ctx *ctx = cb_arg; 652 struct spdk_blob *blob = ctx->blob; 653 654 if (bserrno == 0) { 655 _spdk_blob_mark_clean(blob); 656 } 657 658 /* Call user callback */ 659 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 660 661 /* Free the memory */ 662 spdk_dma_free(ctx->pages); 663 free(ctx); 664 } 665 666 static void 667 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 668 { 669 struct spdk_blob_persist_ctx *ctx = cb_arg; 670 struct spdk_blob *blob = ctx->blob; 671 struct spdk_blob_store *bs = blob->bs; 672 void *tmp; 673 size_t i; 674 675 /* Release all clusters that were truncated */ 676 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 677 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 678 679 _spdk_bs_release_cluster(bs, cluster_num); 680 } 681 682 if (blob->active.num_clusters == 0) { 683 free(blob->active.clusters); 684 blob->active.clusters = NULL; 685 blob->active.cluster_array_size = 0; 686 } else { 687 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 688 assert(tmp != NULL); 689 blob->active.clusters = tmp; 690 blob->active.cluster_array_size = blob->active.num_clusters; 691 } 692 693 _spdk_blob_persist_complete(seq, ctx, bserrno); 694 } 695 696 static void 697 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 698 { 699 struct spdk_blob_persist_ctx *ctx = cb_arg; 700 struct spdk_blob *blob = ctx->blob; 701 struct spdk_blob_store *bs = blob->bs; 702 spdk_bs_batch_t *batch; 703 size_t i; 704 uint64_t lba; 705 uint32_t lba_count; 706 707 /* Clusters don't move around in blobs. The list shrinks or grows 708 * at the end, but no changes ever occur in the middle of the list. 709 */ 710 711 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 712 713 /* Unmap all clusters that were truncated */ 714 lba = 0; 715 lba_count = 0; 716 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 717 uint64_t next_lba = blob->active.clusters[i]; 718 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 719 720 if ((lba + lba_count) == next_lba) { 721 /* This cluster is contiguous with the previous one. */ 722 lba_count += next_lba_count; 723 continue; 724 } 725 726 /* This cluster is not contiguous with the previous one. */ 727 728 /* If a run of LBAs previously existing, send them 729 * as an unmap. 730 */ 731 if (lba_count > 0) { 732 spdk_bs_batch_unmap(batch, lba, lba_count); 733 } 734 735 /* Start building the next batch */ 736 lba = next_lba; 737 lba_count = next_lba_count; 738 } 739 740 /* If we ended with a contiguous set of LBAs, send the unmap now */ 741 if (lba_count > 0) { 742 spdk_bs_batch_unmap(batch, lba, lba_count); 743 } 744 745 spdk_bs_batch_close(batch); 746 } 747 748 static void 749 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 750 { 751 struct spdk_blob_persist_ctx *ctx = cb_arg; 752 struct spdk_blob *blob = ctx->blob; 753 struct spdk_blob_store *bs = blob->bs; 754 size_t i; 755 756 /* This loop starts at 1 because the first page is special and handled 757 * below. The pages (except the first) are never written in place, 758 * so any pages in the clean list must be unmapped. 759 */ 760 for (i = 1; i < blob->clean.num_pages; i++) { 761 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 762 } 763 764 if (blob->active.num_pages == 0) { 765 uint32_t page_num; 766 767 page_num = _spdk_bs_blobid_to_page(blob->id); 768 spdk_bit_array_clear(bs->used_md_pages, page_num); 769 } 770 771 /* Move on to unmapping clusters */ 772 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 773 } 774 775 static void 776 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 777 { 778 struct spdk_blob_persist_ctx *ctx = cb_arg; 779 struct spdk_blob *blob = ctx->blob; 780 struct spdk_blob_store *bs = blob->bs; 781 uint64_t lba; 782 uint32_t lba_count; 783 spdk_bs_batch_t *batch; 784 size_t i; 785 786 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 787 788 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 789 790 /* This loop starts at 1 because the first page is special and handled 791 * below. The pages (except the first) are never written in place, 792 * so any pages in the clean list must be unmapped. 793 */ 794 for (i = 1; i < blob->clean.num_pages; i++) { 795 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 796 797 spdk_bs_batch_unmap(batch, lba, lba_count); 798 } 799 800 /* The first page will only be unmapped if this is a delete. */ 801 if (blob->active.num_pages == 0) { 802 uint32_t page_num; 803 804 /* The first page in the metadata goes where the blobid indicates */ 805 page_num = _spdk_bs_blobid_to_page(blob->id); 806 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 807 808 spdk_bs_batch_unmap(batch, lba, lba_count); 809 } 810 811 spdk_bs_batch_close(batch); 812 } 813 814 static void 815 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 816 { 817 struct spdk_blob_persist_ctx *ctx = cb_arg; 818 struct spdk_blob *blob = ctx->blob; 819 struct spdk_blob_store *bs = blob->bs; 820 uint64_t lba; 821 uint32_t lba_count; 822 struct spdk_blob_md_page *page; 823 824 if (blob->active.num_pages == 0) { 825 /* Move on to the next step */ 826 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 827 return; 828 } 829 830 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 831 832 page = &ctx->pages[0]; 833 /* The first page in the metadata goes where the blobid indicates */ 834 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 835 836 spdk_bs_sequence_write(seq, page, lba, lba_count, 837 _spdk_blob_persist_unmap_pages, ctx); 838 } 839 840 static void 841 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 842 { 843 struct spdk_blob_persist_ctx *ctx = cb_arg; 844 struct spdk_blob *blob = ctx->blob; 845 struct spdk_blob_store *bs = blob->bs; 846 uint64_t lba; 847 uint32_t lba_count; 848 struct spdk_blob_md_page *page; 849 spdk_bs_batch_t *batch; 850 size_t i; 851 852 /* Clusters don't move around in blobs. The list shrinks or grows 853 * at the end, but no changes ever occur in the middle of the list. 854 */ 855 856 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 857 858 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 859 860 /* This starts at 1. The root page is not written until 861 * all of the others are finished 862 */ 863 for (i = 1; i < blob->active.num_pages; i++) { 864 page = &ctx->pages[i]; 865 assert(page->sequence_num == i); 866 867 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 868 869 spdk_bs_batch_write(batch, page, lba, lba_count); 870 } 871 872 spdk_bs_batch_close(batch); 873 } 874 875 static int 876 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 877 { 878 uint64_t i; 879 uint64_t *tmp; 880 uint64_t lfc; /* lowest free cluster */ 881 struct spdk_blob_store *bs; 882 883 bs = blob->bs; 884 885 assert(blob->state != SPDK_BLOB_STATE_LOADING && 886 blob->state != SPDK_BLOB_STATE_SYNCING); 887 888 if (blob->active.num_clusters == sz) { 889 return 0; 890 } 891 892 if (blob->active.num_clusters < blob->active.cluster_array_size) { 893 /* If this blob was resized to be larger, then smaller, then 894 * larger without syncing, then the cluster array already 895 * contains spare assigned clusters we can use. 896 */ 897 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 898 sz); 899 } 900 901 blob->state = SPDK_BLOB_STATE_DIRTY; 902 903 /* Do two passes - one to verify that we can obtain enough clusters 904 * and another to actually claim them. 905 */ 906 907 lfc = 0; 908 for (i = blob->active.num_clusters; i < sz; i++) { 909 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 910 if (lfc >= bs->total_clusters) { 911 /* No more free clusters. Cannot satisfy the request */ 912 assert(false); 913 return -1; 914 } 915 lfc++; 916 } 917 918 if (sz > blob->active.num_clusters) { 919 /* Expand the cluster array if necessary. 920 * We only shrink the array when persisting. 921 */ 922 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 923 if (sz > 0 && tmp == NULL) { 924 assert(false); 925 return -1; 926 } 927 blob->active.clusters = tmp; 928 blob->active.cluster_array_size = sz; 929 } 930 931 lfc = 0; 932 for (i = blob->active.num_clusters; i < sz; i++) { 933 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 934 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 935 _spdk_bs_claim_cluster(bs, lfc); 936 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 937 lfc++; 938 } 939 940 blob->active.num_clusters = sz; 941 942 return 0; 943 } 944 945 /* Write a blob to disk */ 946 static void 947 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 948 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 949 { 950 struct spdk_blob_persist_ctx *ctx; 951 int rc; 952 uint64_t i; 953 uint32_t page_num; 954 struct spdk_blob_store *bs; 955 956 assert(blob != NULL); 957 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 958 blob->state == SPDK_BLOB_STATE_DIRTY); 959 960 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 961 cb_fn(seq, cb_arg, 0); 962 return; 963 } 964 965 bs = blob->bs; 966 967 ctx = calloc(1, sizeof(*ctx)); 968 if (!ctx) { 969 cb_fn(seq, cb_arg, -ENOMEM); 970 return; 971 } 972 ctx->blob = blob; 973 ctx->cb_fn = cb_fn; 974 ctx->cb_arg = cb_arg; 975 976 blob->state = SPDK_BLOB_STATE_SYNCING; 977 978 if (blob->active.num_pages == 0) { 979 /* This is the signal that the blob should be deleted. 980 * Immediately jump to the clean up routine. */ 981 assert(blob->clean.num_pages > 0); 982 ctx->idx = blob->clean.num_pages - 1; 983 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 984 return; 985 986 } 987 988 /* Generate the new metadata */ 989 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 990 if (rc < 0) { 991 free(ctx); 992 cb_fn(seq, cb_arg, rc); 993 return; 994 } 995 996 assert(blob->active.num_pages >= 1); 997 998 /* Resize the cache of page indices */ 999 blob->active.pages = realloc(blob->active.pages, 1000 blob->active.num_pages * sizeof(*blob->active.pages)); 1001 if (!blob->active.pages) { 1002 free(ctx); 1003 cb_fn(seq, cb_arg, -ENOMEM); 1004 return; 1005 } 1006 1007 /* Assign this metadata to pages. This requires two passes - 1008 * one to verify that there are enough pages and a second 1009 * to actually claim them. */ 1010 page_num = 0; 1011 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1012 for (i = 1; i < blob->active.num_pages; i++) { 1013 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1014 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1015 spdk_dma_free(ctx->pages); 1016 free(ctx); 1017 blob->state = SPDK_BLOB_STATE_DIRTY; 1018 cb_fn(seq, cb_arg, -ENOMEM); 1019 return; 1020 } 1021 page_num++; 1022 } 1023 1024 page_num = 0; 1025 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1026 for (i = 1; i < blob->active.num_pages; i++) { 1027 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1028 ctx->pages[i - 1].next = page_num; 1029 /* Now that previous metadata page is complete, calculate the crc for it. */ 1030 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1031 blob->active.pages[i] = page_num; 1032 spdk_bit_array_set(bs->used_md_pages, page_num); 1033 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1034 page_num++; 1035 } 1036 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1037 /* Start writing the metadata from last page to first */ 1038 ctx->idx = blob->active.num_pages - 1; 1039 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1040 } 1041 1042 static void 1043 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1044 void *payload, uint64_t offset, uint64_t length, 1045 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1046 { 1047 spdk_bs_batch_t *batch; 1048 struct spdk_bs_cpl cpl; 1049 uint64_t lba; 1050 uint32_t lba_count; 1051 uint8_t *buf; 1052 uint64_t page; 1053 1054 assert(blob != NULL); 1055 1056 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1057 cb_fn(cb_arg, -EINVAL); 1058 return; 1059 } 1060 1061 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1062 cpl.u.blob_basic.cb_fn = cb_fn; 1063 cpl.u.blob_basic.cb_arg = cb_arg; 1064 1065 batch = spdk_bs_batch_open(_channel, &cpl); 1066 if (!batch) { 1067 cb_fn(cb_arg, -ENOMEM); 1068 return; 1069 } 1070 1071 length = _spdk_bs_page_to_lba(blob->bs, length); 1072 page = offset; 1073 buf = payload; 1074 while (length > 0) { 1075 lba = _spdk_bs_blob_page_to_lba(blob, page); 1076 lba_count = spdk_min(length, 1077 _spdk_bs_page_to_lba(blob->bs, 1078 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1079 1080 if (read) { 1081 spdk_bs_batch_read(batch, buf, lba, lba_count); 1082 } else { 1083 spdk_bs_batch_write(batch, buf, lba, lba_count); 1084 } 1085 1086 length -= lba_count; 1087 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1088 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1089 } 1090 1091 spdk_bs_batch_close(batch); 1092 } 1093 1094 struct rw_iov_ctx { 1095 struct spdk_blob *blob; 1096 bool read; 1097 int iovcnt; 1098 struct iovec *orig_iov; 1099 uint64_t page_offset; 1100 uint64_t pages_remaining; 1101 uint64_t pages_done; 1102 struct iovec iov[0]; 1103 }; 1104 1105 static void 1106 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1107 { 1108 assert(cb_arg == NULL); 1109 spdk_bs_sequence_finish(seq, bserrno); 1110 } 1111 1112 static void 1113 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1114 { 1115 struct rw_iov_ctx *ctx = cb_arg; 1116 struct iovec *iov, *orig_iov; 1117 int iovcnt; 1118 size_t orig_iovoff; 1119 uint64_t lba; 1120 uint64_t page_count, pages_to_boundary; 1121 uint32_t lba_count; 1122 uint64_t byte_count; 1123 1124 if (bserrno != 0 || ctx->pages_remaining == 0) { 1125 free(ctx); 1126 spdk_bs_sequence_finish(seq, bserrno); 1127 return; 1128 } 1129 1130 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1131 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1132 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1133 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1134 1135 /* 1136 * Get index and offset into the original iov array for our current position in the I/O sequence. 1137 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1138 * point to the current position in the I/O sequence. 1139 */ 1140 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1141 orig_iov = &ctx->orig_iov[0]; 1142 orig_iovoff = 0; 1143 while (byte_count > 0) { 1144 if (byte_count >= orig_iov->iov_len) { 1145 byte_count -= orig_iov->iov_len; 1146 orig_iov++; 1147 } else { 1148 orig_iovoff = byte_count; 1149 byte_count = 0; 1150 } 1151 } 1152 1153 /* 1154 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1155 * bytes of this next I/O remain to be accounted for in the new iov array. 1156 */ 1157 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1158 iov = &ctx->iov[0]; 1159 iovcnt = 0; 1160 while (byte_count > 0) { 1161 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1162 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1163 byte_count -= iov->iov_len; 1164 orig_iovoff = 0; 1165 orig_iov++; 1166 iov++; 1167 iovcnt++; 1168 } 1169 1170 ctx->page_offset += page_count; 1171 ctx->pages_done += page_count; 1172 ctx->pages_remaining -= page_count; 1173 iov = &ctx->iov[0]; 1174 1175 if (ctx->read) { 1176 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1177 } else { 1178 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1179 } 1180 } 1181 1182 static void 1183 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1184 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1185 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1186 { 1187 spdk_bs_sequence_t *seq; 1188 struct spdk_bs_cpl cpl; 1189 1190 assert(blob != NULL); 1191 1192 if (length == 0) { 1193 cb_fn(cb_arg, 0); 1194 return; 1195 } 1196 1197 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1198 cb_fn(cb_arg, -EINVAL); 1199 return; 1200 } 1201 1202 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1203 cpl.u.blob_basic.cb_fn = cb_fn; 1204 cpl.u.blob_basic.cb_arg = cb_arg; 1205 1206 /* 1207 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1208 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1209 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1210 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1211 * to allocate a separate iov array and split the I/O such that none of the resulting 1212 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1213 * but since this case happens very infrequently, any performance impact will be negligible. 1214 * 1215 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1216 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1217 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1218 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1219 */ 1220 seq = spdk_bs_sequence_start(_channel, &cpl); 1221 if (!seq) { 1222 cb_fn(cb_arg, -ENOMEM); 1223 return; 1224 } 1225 1226 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1227 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1228 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1229 1230 if (read) { 1231 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1232 } else { 1233 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1234 } 1235 } else { 1236 struct rw_iov_ctx *ctx; 1237 1238 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1239 if (ctx == NULL) { 1240 spdk_bs_sequence_finish(seq, -ENOMEM); 1241 return; 1242 } 1243 1244 ctx->blob = blob; 1245 ctx->read = read; 1246 ctx->orig_iov = iov; 1247 ctx->iovcnt = iovcnt; 1248 ctx->page_offset = offset; 1249 ctx->pages_remaining = length; 1250 ctx->pages_done = 0; 1251 1252 _spdk_rw_iov_split_next(seq, ctx, 0); 1253 } 1254 } 1255 1256 static struct spdk_blob * 1257 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1258 { 1259 struct spdk_blob *blob; 1260 1261 TAILQ_FOREACH(blob, &bs->blobs, link) { 1262 if (blob->id == blobid) { 1263 return blob; 1264 } 1265 } 1266 1267 return NULL; 1268 } 1269 1270 static int 1271 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1272 uint32_t max_ops) 1273 { 1274 struct spdk_bs_dev *dev; 1275 uint32_t i; 1276 1277 dev = bs->dev; 1278 1279 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1280 if (!channel->req_mem) { 1281 return -1; 1282 } 1283 1284 TAILQ_INIT(&channel->reqs); 1285 1286 for (i = 0; i < max_ops; i++) { 1287 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1288 } 1289 1290 channel->bs = bs; 1291 channel->dev = dev; 1292 channel->dev_channel = dev->create_channel(dev); 1293 1294 return 0; 1295 } 1296 1297 static int 1298 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1299 { 1300 struct spdk_blob_store *bs; 1301 struct spdk_bs_channel *channel = ctx_buf; 1302 1303 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1304 1305 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1306 } 1307 1308 static int 1309 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1310 { 1311 struct spdk_blob_store *bs; 1312 struct spdk_bs_channel *channel = ctx_buf; 1313 1314 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1315 1316 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1317 } 1318 1319 1320 static void 1321 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1322 { 1323 struct spdk_bs_channel *channel = ctx_buf; 1324 1325 free(channel->req_mem); 1326 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1327 } 1328 1329 static void 1330 _spdk_bs_dev_destroy(void *io_device) 1331 { 1332 struct spdk_blob_store *bs; 1333 struct spdk_blob *blob, *blob_tmp; 1334 1335 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1336 bs->dev->destroy(bs->dev); 1337 1338 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1339 TAILQ_REMOVE(&bs->blobs, blob, link); 1340 _spdk_blob_free(blob); 1341 } 1342 1343 spdk_bit_array_free(&bs->used_md_pages); 1344 spdk_bit_array_free(&bs->used_clusters); 1345 free(bs); 1346 } 1347 1348 static void 1349 _spdk_bs_free(struct spdk_blob_store *bs) 1350 { 1351 spdk_bs_unregister_md_thread(bs); 1352 spdk_io_device_unregister(&bs->io_target, NULL); 1353 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1354 } 1355 1356 void 1357 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1358 { 1359 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1360 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1361 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1362 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1363 } 1364 1365 static struct spdk_blob_store * 1366 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1367 { 1368 struct spdk_blob_store *bs; 1369 1370 bs = calloc(1, sizeof(struct spdk_blob_store)); 1371 if (!bs) { 1372 return NULL; 1373 } 1374 1375 TAILQ_INIT(&bs->blobs); 1376 bs->dev = dev; 1377 1378 /* 1379 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1380 * even multiple of the cluster size. 1381 */ 1382 bs->cluster_sz = opts->cluster_sz; 1383 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1384 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1385 bs->num_free_clusters = bs->total_clusters; 1386 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1387 if (bs->used_clusters == NULL) { 1388 free(bs); 1389 return NULL; 1390 } 1391 1392 bs->md_target.max_md_ops = opts->max_md_ops; 1393 bs->io_target.max_channel_ops = opts->max_channel_ops; 1394 bs->super_blob = SPDK_BLOBID_INVALID; 1395 1396 /* The metadata is assumed to be at least 1 page */ 1397 bs->used_md_pages = spdk_bit_array_create(1); 1398 1399 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1400 sizeof(struct spdk_bs_channel)); 1401 spdk_bs_register_md_thread(bs); 1402 1403 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1404 sizeof(struct spdk_bs_channel)); 1405 1406 return bs; 1407 } 1408 1409 /* START spdk_bs_load */ 1410 1411 struct spdk_bs_load_ctx { 1412 struct spdk_blob_store *bs; 1413 struct spdk_bs_super_block *super; 1414 1415 struct spdk_bs_md_mask *mask; 1416 }; 1417 1418 static void 1419 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1420 { 1421 struct spdk_bs_load_ctx *ctx = cb_arg; 1422 uint32_t i, j; 1423 int rc; 1424 1425 /* The type must be correct */ 1426 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1427 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1428 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1429 struct spdk_blob_md_page) * 8)); 1430 /* The length of the mask must be exactly equal to the total number of clusters */ 1431 assert(ctx->mask->length == ctx->bs->total_clusters); 1432 1433 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1434 if (rc < 0) { 1435 spdk_dma_free(ctx->super); 1436 spdk_dma_free(ctx->mask); 1437 _spdk_bs_free(ctx->bs); 1438 free(ctx); 1439 spdk_bs_sequence_finish(seq, -ENOMEM); 1440 return; 1441 } 1442 1443 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1444 for (i = 0; i < ctx->mask->length / 8; i++) { 1445 uint8_t segment = ctx->mask->mask[i]; 1446 for (j = 0; segment && (j < 8); j++) { 1447 if (segment & 1U) { 1448 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1449 assert(ctx->bs->num_free_clusters > 0); 1450 ctx->bs->num_free_clusters--; 1451 } 1452 segment >>= 1U; 1453 } 1454 } 1455 1456 spdk_dma_free(ctx->super); 1457 spdk_dma_free(ctx->mask); 1458 free(ctx); 1459 1460 spdk_bs_sequence_finish(seq, bserrno); 1461 } 1462 1463 static void 1464 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1465 { 1466 struct spdk_bs_load_ctx *ctx = cb_arg; 1467 uint64_t lba, lba_count, mask_size; 1468 uint32_t i, j; 1469 int rc; 1470 1471 /* The type must be correct */ 1472 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1473 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1474 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1475 8)); 1476 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1477 assert(ctx->mask->length == ctx->super->md_len); 1478 1479 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1480 if (rc < 0) { 1481 spdk_dma_free(ctx->super); 1482 spdk_dma_free(ctx->mask); 1483 _spdk_bs_free(ctx->bs); 1484 free(ctx); 1485 spdk_bs_sequence_finish(seq, -ENOMEM); 1486 return; 1487 } 1488 1489 for (i = 0; i < ctx->mask->length / 8; i++) { 1490 uint8_t segment = ctx->mask->mask[i]; 1491 for (j = 0; segment && (j < 8); j++) { 1492 if (segment & 1U) { 1493 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1494 } 1495 segment >>= 1U; 1496 } 1497 } 1498 spdk_dma_free(ctx->mask); 1499 1500 /* Read the used clusters mask */ 1501 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1502 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1503 if (!ctx->mask) { 1504 spdk_dma_free(ctx->super); 1505 _spdk_bs_free(ctx->bs); 1506 free(ctx); 1507 spdk_bs_sequence_finish(seq, -ENOMEM); 1508 return; 1509 } 1510 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1511 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1512 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1513 _spdk_bs_load_used_clusters_cpl, ctx); 1514 } 1515 1516 static void 1517 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1518 { 1519 struct spdk_bs_load_ctx *ctx = cb_arg; 1520 uint64_t lba, lba_count, mask_size; 1521 1522 /* Parse the super block */ 1523 ctx->bs->cluster_sz = ctx->super->cluster_size; 1524 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1525 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1526 ctx->bs->md_start = ctx->super->md_start; 1527 ctx->bs->md_len = ctx->super->md_len; 1528 ctx->bs->super_blob = ctx->super->super_blob; 1529 1530 /* Read the used pages mask */ 1531 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1532 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1533 if (!ctx->mask) { 1534 spdk_dma_free(ctx->super); 1535 _spdk_bs_free(ctx->bs); 1536 free(ctx); 1537 spdk_bs_sequence_finish(seq, -ENOMEM); 1538 return; 1539 } 1540 1541 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1542 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1543 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1544 _spdk_bs_load_used_pages_cpl, ctx); 1545 } 1546 1547 static void 1548 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1549 { 1550 struct spdk_bs_load_ctx *ctx = cb_arg; 1551 uint32_t crc; 1552 1553 if (ctx->super->version != SPDK_BS_VERSION) { 1554 spdk_dma_free(ctx->super); 1555 _spdk_bs_free(ctx->bs); 1556 free(ctx); 1557 spdk_bs_sequence_finish(seq, -EILSEQ); 1558 return; 1559 } 1560 1561 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1562 sizeof(ctx->super->signature)) != 0) { 1563 spdk_dma_free(ctx->super); 1564 _spdk_bs_free(ctx->bs); 1565 free(ctx); 1566 spdk_bs_sequence_finish(seq, -EILSEQ); 1567 return; 1568 } 1569 1570 crc = _spdk_blob_md_page_calc_crc(ctx->super); 1571 if (crc != ctx->super->crc) { 1572 spdk_dma_free(ctx->super); 1573 _spdk_bs_free(ctx->bs); 1574 free(ctx); 1575 spdk_bs_sequence_finish(seq, -EILSEQ); 1576 return; 1577 } 1578 1579 if (ctx->super->clean != 1) { 1580 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1581 * All of the necessary data to recover is available 1582 * on disk - the code just has not been written yet. 1583 */ 1584 assert(false); 1585 spdk_dma_free(ctx->super); 1586 _spdk_bs_free(ctx->bs); 1587 free(ctx); 1588 spdk_bs_sequence_finish(seq, -EILSEQ); 1589 return; 1590 } 1591 1592 ctx->super->clean = 0; 1593 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 1594 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1595 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1596 _spdk_bs_load_write_super_cpl, ctx); 1597 } 1598 1599 void 1600 spdk_bs_load(struct spdk_bs_dev *dev, 1601 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1602 { 1603 struct spdk_blob_store *bs; 1604 struct spdk_bs_cpl cpl; 1605 spdk_bs_sequence_t *seq; 1606 struct spdk_bs_load_ctx *ctx; 1607 struct spdk_bs_opts opts = {}; 1608 1609 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1610 1611 spdk_bs_opts_init(&opts); 1612 1613 bs = _spdk_bs_alloc(dev, &opts); 1614 if (!bs) { 1615 cb_fn(cb_arg, NULL, -ENOMEM); 1616 return; 1617 } 1618 1619 ctx = calloc(1, sizeof(*ctx)); 1620 if (!ctx) { 1621 _spdk_bs_free(bs); 1622 cb_fn(cb_arg, NULL, -ENOMEM); 1623 return; 1624 } 1625 1626 ctx->bs = bs; 1627 1628 /* Allocate memory for the super block */ 1629 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1630 if (!ctx->super) { 1631 free(ctx); 1632 _spdk_bs_free(bs); 1633 return; 1634 } 1635 1636 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1637 cpl.u.bs_handle.cb_fn = cb_fn; 1638 cpl.u.bs_handle.cb_arg = cb_arg; 1639 cpl.u.bs_handle.bs = bs; 1640 1641 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1642 if (!seq) { 1643 spdk_dma_free(ctx->super); 1644 free(ctx); 1645 _spdk_bs_free(bs); 1646 cb_fn(cb_arg, NULL, -ENOMEM); 1647 return; 1648 } 1649 1650 /* Read the super block */ 1651 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1652 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1653 _spdk_bs_load_super_cpl, ctx); 1654 } 1655 1656 /* END spdk_bs_load */ 1657 1658 /* START spdk_bs_init */ 1659 1660 struct spdk_bs_init_ctx { 1661 struct spdk_blob_store *bs; 1662 struct spdk_bs_super_block *super; 1663 }; 1664 1665 static void 1666 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1667 { 1668 struct spdk_bs_init_ctx *ctx = cb_arg; 1669 1670 spdk_dma_free(ctx->super); 1671 free(ctx); 1672 1673 spdk_bs_sequence_finish(seq, bserrno); 1674 } 1675 1676 static void 1677 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1678 { 1679 struct spdk_bs_init_ctx *ctx = cb_arg; 1680 1681 /* Write super block */ 1682 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1683 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1684 _spdk_bs_init_persist_super_cpl, ctx); 1685 } 1686 1687 void 1688 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1689 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1690 { 1691 struct spdk_bs_init_ctx *ctx; 1692 struct spdk_blob_store *bs; 1693 struct spdk_bs_cpl cpl; 1694 spdk_bs_sequence_t *seq; 1695 uint64_t num_md_pages; 1696 uint32_t i; 1697 struct spdk_bs_opts opts = {}; 1698 int rc; 1699 1700 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1701 1702 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 1703 SPDK_ERRLOG("unsupported dev block length of %d\n", 1704 dev->blocklen); 1705 cb_fn(cb_arg, NULL, -EINVAL); 1706 return; 1707 } 1708 1709 if (o) { 1710 opts = *o; 1711 } else { 1712 spdk_bs_opts_init(&opts); 1713 } 1714 1715 bs = _spdk_bs_alloc(dev, &opts); 1716 if (!bs) { 1717 cb_fn(cb_arg, NULL, -ENOMEM); 1718 return; 1719 } 1720 1721 if (opts.num_md_pages == UINT32_MAX) { 1722 /* By default, allocate 1 page per cluster. 1723 * Technically, this over-allocates metadata 1724 * because more metadata will reduce the number 1725 * of usable clusters. This can be addressed with 1726 * more complex math in the future. 1727 */ 1728 bs->md_len = bs->total_clusters; 1729 } else { 1730 bs->md_len = opts.num_md_pages; 1731 } 1732 1733 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1734 if (rc < 0) { 1735 _spdk_bs_free(bs); 1736 cb_fn(cb_arg, NULL, -ENOMEM); 1737 return; 1738 } 1739 1740 ctx = calloc(1, sizeof(*ctx)); 1741 if (!ctx) { 1742 _spdk_bs_free(bs); 1743 cb_fn(cb_arg, NULL, -ENOMEM); 1744 return; 1745 } 1746 1747 ctx->bs = bs; 1748 1749 /* Allocate memory for the super block */ 1750 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1751 if (!ctx->super) { 1752 free(ctx); 1753 _spdk_bs_free(bs); 1754 return; 1755 } 1756 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1757 sizeof(ctx->super->signature)); 1758 ctx->super->version = SPDK_BS_VERSION; 1759 ctx->super->length = sizeof(*ctx->super); 1760 ctx->super->super_blob = bs->super_blob; 1761 ctx->super->clean = 0; 1762 ctx->super->cluster_size = bs->cluster_sz; 1763 1764 /* Calculate how many pages the metadata consumes at the front 1765 * of the disk. 1766 */ 1767 1768 /* The super block uses 1 page */ 1769 num_md_pages = 1; 1770 1771 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1772 * up to the nearest page, plus a header. 1773 */ 1774 ctx->super->used_page_mask_start = num_md_pages; 1775 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1776 divide_round_up(bs->md_len, 8), 1777 SPDK_BS_PAGE_SIZE); 1778 num_md_pages += ctx->super->used_page_mask_len; 1779 1780 /* The used_clusters mask requires 1 bit per cluster, rounded 1781 * up to the nearest page, plus a header. 1782 */ 1783 ctx->super->used_cluster_mask_start = num_md_pages; 1784 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1785 divide_round_up(bs->total_clusters, 8), 1786 SPDK_BS_PAGE_SIZE); 1787 num_md_pages += ctx->super->used_cluster_mask_len; 1788 1789 /* The metadata region size was chosen above */ 1790 ctx->super->md_start = bs->md_start = num_md_pages; 1791 ctx->super->md_len = bs->md_len; 1792 num_md_pages += bs->md_len; 1793 1794 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 1795 /* Claim all of the clusters used by the metadata */ 1796 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1797 _spdk_bs_claim_cluster(bs, i); 1798 } 1799 1800 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1801 cpl.u.bs_handle.cb_fn = cb_fn; 1802 cpl.u.bs_handle.cb_arg = cb_arg; 1803 cpl.u.bs_handle.bs = bs; 1804 1805 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1806 if (!seq) { 1807 spdk_dma_free(ctx->super); 1808 free(ctx); 1809 _spdk_bs_free(bs); 1810 cb_fn(cb_arg, NULL, -ENOMEM); 1811 return; 1812 } 1813 1814 /* TRIM the entire device */ 1815 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1816 } 1817 1818 /* END spdk_bs_init */ 1819 1820 /* START spdk_bs_unload */ 1821 1822 struct spdk_bs_unload_ctx { 1823 struct spdk_blob_store *bs; 1824 struct spdk_bs_super_block *super; 1825 1826 struct spdk_bs_md_mask *mask; 1827 }; 1828 1829 static void 1830 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1831 { 1832 struct spdk_bs_unload_ctx *ctx = cb_arg; 1833 1834 spdk_dma_free(ctx->super); 1835 1836 spdk_bs_sequence_finish(seq, bserrno); 1837 1838 _spdk_bs_free(ctx->bs); 1839 free(ctx); 1840 } 1841 1842 static void 1843 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1844 { 1845 struct spdk_bs_unload_ctx *ctx = cb_arg; 1846 1847 spdk_dma_free(ctx->mask); 1848 1849 /* Update the values in the super block */ 1850 ctx->super->super_blob = ctx->bs->super_blob; 1851 ctx->super->clean = 1; 1852 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 1853 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1854 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1855 _spdk_bs_unload_write_super_cpl, ctx); 1856 } 1857 1858 static void 1859 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1860 { 1861 struct spdk_bs_unload_ctx *ctx = cb_arg; 1862 uint32_t i; 1863 uint64_t lba, lba_count, mask_size; 1864 1865 spdk_dma_free(ctx->mask); 1866 1867 /* Write out the used clusters mask */ 1868 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1869 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1870 if (!ctx->mask) { 1871 spdk_dma_free(ctx->super); 1872 free(ctx); 1873 spdk_bs_sequence_finish(seq, -ENOMEM); 1874 return; 1875 } 1876 1877 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1878 ctx->mask->length = ctx->bs->total_clusters; 1879 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1880 1881 i = 0; 1882 while (true) { 1883 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1884 if (i > ctx->mask->length) { 1885 break; 1886 } 1887 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1888 i++; 1889 } 1890 1891 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1892 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1893 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1894 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1895 } 1896 1897 static void 1898 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1899 { 1900 struct spdk_bs_unload_ctx *ctx = cb_arg; 1901 uint32_t i; 1902 uint64_t lba, lba_count, mask_size; 1903 1904 /* Write out the used page mask */ 1905 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1906 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1907 if (!ctx->mask) { 1908 spdk_dma_free(ctx->super); 1909 free(ctx); 1910 spdk_bs_sequence_finish(seq, -ENOMEM); 1911 return; 1912 } 1913 1914 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1915 ctx->mask->length = ctx->super->md_len; 1916 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1917 1918 i = 0; 1919 while (true) { 1920 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1921 if (i > ctx->mask->length) { 1922 break; 1923 } 1924 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1925 i++; 1926 } 1927 1928 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1929 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1930 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1931 _spdk_bs_unload_write_used_pages_cpl, ctx); 1932 } 1933 1934 void 1935 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1936 { 1937 struct spdk_bs_cpl cpl; 1938 spdk_bs_sequence_t *seq; 1939 struct spdk_bs_unload_ctx *ctx; 1940 1941 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1942 1943 ctx = calloc(1, sizeof(*ctx)); 1944 if (!ctx) { 1945 cb_fn(cb_arg, -ENOMEM); 1946 return; 1947 } 1948 1949 ctx->bs = bs; 1950 1951 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1952 if (!ctx->super) { 1953 free(ctx); 1954 cb_fn(cb_arg, -ENOMEM); 1955 return; 1956 } 1957 1958 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1959 cpl.u.bs_basic.cb_fn = cb_fn; 1960 cpl.u.bs_basic.cb_arg = cb_arg; 1961 1962 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1963 if (!seq) { 1964 spdk_dma_free(ctx->super); 1965 free(ctx); 1966 cb_fn(cb_arg, -ENOMEM); 1967 return; 1968 } 1969 1970 assert(TAILQ_EMPTY(&bs->blobs)); 1971 1972 /* Read super block */ 1973 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1974 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1975 _spdk_bs_unload_read_super_cpl, ctx); 1976 } 1977 1978 /* END spdk_bs_unload */ 1979 1980 void 1981 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1982 spdk_bs_op_complete cb_fn, void *cb_arg) 1983 { 1984 bs->super_blob = blobid; 1985 cb_fn(cb_arg, 0); 1986 } 1987 1988 void 1989 spdk_bs_get_super(struct spdk_blob_store *bs, 1990 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1991 { 1992 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1993 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1994 } else { 1995 cb_fn(cb_arg, bs->super_blob, 0); 1996 } 1997 } 1998 1999 uint64_t 2000 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2001 { 2002 return bs->cluster_sz; 2003 } 2004 2005 uint64_t 2006 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2007 { 2008 return SPDK_BS_PAGE_SIZE; 2009 } 2010 2011 uint64_t 2012 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2013 { 2014 return bs->num_free_clusters; 2015 } 2016 2017 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2018 { 2019 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 2020 2021 return 0; 2022 } 2023 2024 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2025 { 2026 spdk_put_io_channel(bs->md_target.md_channel); 2027 2028 return 0; 2029 } 2030 2031 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 2032 { 2033 assert(blob != NULL); 2034 2035 return blob->id; 2036 } 2037 2038 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 2039 { 2040 assert(blob != NULL); 2041 2042 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2043 } 2044 2045 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 2046 { 2047 assert(blob != NULL); 2048 2049 return blob->active.num_clusters; 2050 } 2051 2052 /* START spdk_bs_md_create_blob */ 2053 2054 static void 2055 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2056 { 2057 struct spdk_blob *blob = cb_arg; 2058 2059 _spdk_blob_free(blob); 2060 2061 spdk_bs_sequence_finish(seq, bserrno); 2062 } 2063 2064 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2065 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2066 { 2067 struct spdk_blob *blob; 2068 uint32_t page_idx; 2069 struct spdk_bs_cpl cpl; 2070 spdk_bs_sequence_t *seq; 2071 spdk_blob_id id; 2072 2073 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2074 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2075 cb_fn(cb_arg, 0, -ENOMEM); 2076 return; 2077 } 2078 spdk_bit_array_set(bs->used_md_pages, page_idx); 2079 2080 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 2081 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 2082 * code assumes blob id == page_idx. 2083 */ 2084 id = (1ULL << 32) | page_idx; 2085 2086 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2087 2088 blob = _spdk_blob_alloc(bs, id); 2089 if (!blob) { 2090 cb_fn(cb_arg, 0, -ENOMEM); 2091 return; 2092 } 2093 2094 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2095 cpl.u.blobid.cb_fn = cb_fn; 2096 cpl.u.blobid.cb_arg = cb_arg; 2097 cpl.u.blobid.blobid = blob->id; 2098 2099 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2100 if (!seq) { 2101 _spdk_blob_free(blob); 2102 cb_fn(cb_arg, 0, -ENOMEM); 2103 return; 2104 } 2105 2106 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2107 } 2108 2109 /* END spdk_bs_md_create_blob */ 2110 2111 /* START spdk_bs_md_resize_blob */ 2112 int 2113 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2114 { 2115 int rc; 2116 2117 assert(blob != NULL); 2118 2119 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2120 2121 if (sz == blob->active.num_clusters) { 2122 return 0; 2123 } 2124 2125 rc = _spdk_resize_blob(blob, sz); 2126 if (rc < 0) { 2127 return rc; 2128 } 2129 2130 return 0; 2131 } 2132 2133 /* END spdk_bs_md_resize_blob */ 2134 2135 2136 /* START spdk_bs_md_delete_blob */ 2137 2138 static void 2139 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2140 { 2141 struct spdk_blob *blob = cb_arg; 2142 2143 _spdk_blob_free(blob); 2144 2145 spdk_bs_sequence_finish(seq, bserrno); 2146 } 2147 2148 static void 2149 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2150 { 2151 struct spdk_blob *blob = cb_arg; 2152 2153 /* If the blob have crc error, we just return NULL. */ 2154 if (blob == NULL) { 2155 spdk_bs_sequence_finish(seq, bserrno); 2156 return; 2157 } 2158 blob->state = SPDK_BLOB_STATE_DIRTY; 2159 blob->active.num_pages = 0; 2160 _spdk_resize_blob(blob, 0); 2161 2162 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2163 } 2164 2165 void 2166 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2167 spdk_blob_op_complete cb_fn, void *cb_arg) 2168 { 2169 struct spdk_blob *blob; 2170 struct spdk_bs_cpl cpl; 2171 spdk_bs_sequence_t *seq; 2172 2173 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 2174 2175 blob = _spdk_blob_lookup(bs, blobid); 2176 if (blob) { 2177 assert(blob->open_ref > 0); 2178 cb_fn(cb_arg, -EINVAL); 2179 return; 2180 } 2181 2182 blob = _spdk_blob_alloc(bs, blobid); 2183 if (!blob) { 2184 cb_fn(cb_arg, -ENOMEM); 2185 return; 2186 } 2187 2188 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2189 cpl.u.blob_basic.cb_fn = cb_fn; 2190 cpl.u.blob_basic.cb_arg = cb_arg; 2191 2192 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2193 if (!seq) { 2194 _spdk_blob_free(blob); 2195 cb_fn(cb_arg, -ENOMEM); 2196 return; 2197 } 2198 2199 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2200 } 2201 2202 /* END spdk_bs_md_delete_blob */ 2203 2204 /* START spdk_bs_md_open_blob */ 2205 2206 static void 2207 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2208 { 2209 struct spdk_blob *blob = cb_arg; 2210 2211 /* If the blob have crc error, we just return NULL. */ 2212 if (blob == NULL) { 2213 seq->cpl.u.blob_handle.blob = NULL; 2214 spdk_bs_sequence_finish(seq, bserrno); 2215 return; 2216 } 2217 2218 blob->open_ref++; 2219 2220 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2221 2222 spdk_bs_sequence_finish(seq, bserrno); 2223 } 2224 2225 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2226 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2227 { 2228 struct spdk_blob *blob; 2229 struct spdk_bs_cpl cpl; 2230 spdk_bs_sequence_t *seq; 2231 uint32_t page_num; 2232 2233 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 2234 2235 blob = _spdk_blob_lookup(bs, blobid); 2236 if (blob) { 2237 blob->open_ref++; 2238 cb_fn(cb_arg, blob, 0); 2239 return; 2240 } 2241 2242 page_num = _spdk_bs_blobid_to_page(blobid); 2243 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2244 /* Invalid blobid */ 2245 cb_fn(cb_arg, NULL, -ENOENT); 2246 return; 2247 } 2248 2249 blob = _spdk_blob_alloc(bs, blobid); 2250 if (!blob) { 2251 cb_fn(cb_arg, NULL, -ENOMEM); 2252 return; 2253 } 2254 2255 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2256 cpl.u.blob_handle.cb_fn = cb_fn; 2257 cpl.u.blob_handle.cb_arg = cb_arg; 2258 cpl.u.blob_handle.blob = blob; 2259 2260 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2261 if (!seq) { 2262 _spdk_blob_free(blob); 2263 cb_fn(cb_arg, NULL, -ENOMEM); 2264 return; 2265 } 2266 2267 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2268 } 2269 2270 /* START spdk_bs_md_sync_blob */ 2271 static void 2272 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2273 { 2274 spdk_bs_sequence_finish(seq, bserrno); 2275 } 2276 2277 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2278 spdk_blob_op_complete cb_fn, void *cb_arg) 2279 { 2280 struct spdk_bs_cpl cpl; 2281 spdk_bs_sequence_t *seq; 2282 2283 assert(blob != NULL); 2284 2285 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2286 2287 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2288 blob->state != SPDK_BLOB_STATE_SYNCING); 2289 2290 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2291 cb_fn(cb_arg, 0); 2292 return; 2293 } 2294 2295 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2296 cpl.u.blob_basic.cb_fn = cb_fn; 2297 cpl.u.blob_basic.cb_arg = cb_arg; 2298 2299 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2300 if (!seq) { 2301 cb_fn(cb_arg, -ENOMEM); 2302 return; 2303 } 2304 2305 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2306 } 2307 2308 /* END spdk_bs_md_sync_blob */ 2309 2310 /* START spdk_bs_md_close_blob */ 2311 2312 static void 2313 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2314 { 2315 struct spdk_blob **blob = cb_arg; 2316 2317 if ((*blob)->open_ref == 0) { 2318 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2319 _spdk_blob_free((*blob)); 2320 } 2321 2322 *blob = NULL; 2323 2324 spdk_bs_sequence_finish(seq, bserrno); 2325 } 2326 2327 void spdk_bs_md_close_blob(struct spdk_blob **b, 2328 spdk_blob_op_complete cb_fn, void *cb_arg) 2329 { 2330 struct spdk_bs_cpl cpl; 2331 struct spdk_blob *blob; 2332 spdk_bs_sequence_t *seq; 2333 2334 assert(b != NULL); 2335 blob = *b; 2336 assert(blob != NULL); 2337 2338 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2339 2340 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2341 blob->state != SPDK_BLOB_STATE_SYNCING); 2342 2343 if (blob->open_ref == 0) { 2344 cb_fn(cb_arg, -EBADF); 2345 return; 2346 } 2347 2348 blob->open_ref--; 2349 2350 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2351 cpl.u.blob_basic.cb_fn = cb_fn; 2352 cpl.u.blob_basic.cb_arg = cb_arg; 2353 2354 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2355 if (!seq) { 2356 cb_fn(cb_arg, -ENOMEM); 2357 return; 2358 } 2359 2360 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2361 _spdk_blob_close_cpl(seq, b, 0); 2362 return; 2363 } 2364 2365 /* Sync metadata */ 2366 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2367 } 2368 2369 /* END spdk_bs_md_close_blob */ 2370 2371 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2372 { 2373 return spdk_get_io_channel(&bs->io_target); 2374 } 2375 2376 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2377 { 2378 spdk_put_io_channel(channel); 2379 } 2380 2381 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2382 spdk_blob_op_complete cb_fn, void *cb_arg) 2383 { 2384 /* Flush is synchronous right now */ 2385 cb_fn(cb_arg, 0); 2386 } 2387 2388 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2389 void *payload, uint64_t offset, uint64_t length, 2390 spdk_blob_op_complete cb_fn, void *cb_arg) 2391 { 2392 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2393 } 2394 2395 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2396 void *payload, uint64_t offset, uint64_t length, 2397 spdk_blob_op_complete cb_fn, void *cb_arg) 2398 { 2399 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2400 } 2401 2402 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2403 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2404 spdk_blob_op_complete cb_fn, void *cb_arg) 2405 { 2406 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2407 } 2408 2409 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2410 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2411 spdk_blob_op_complete cb_fn, void *cb_arg) 2412 { 2413 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2414 } 2415 2416 struct spdk_bs_iter_ctx { 2417 int64_t page_num; 2418 struct spdk_blob_store *bs; 2419 2420 spdk_blob_op_with_handle_complete cb_fn; 2421 void *cb_arg; 2422 }; 2423 2424 static void 2425 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2426 { 2427 struct spdk_bs_iter_ctx *ctx = cb_arg; 2428 struct spdk_blob_store *bs = ctx->bs; 2429 spdk_blob_id id; 2430 2431 if (bserrno == 0) { 2432 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2433 free(ctx); 2434 return; 2435 } 2436 2437 ctx->page_num++; 2438 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2439 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2440 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2441 free(ctx); 2442 return; 2443 } 2444 2445 id = (1ULL << 32) | ctx->page_num; 2446 2447 blob = _spdk_blob_lookup(bs, id); 2448 if (blob) { 2449 blob->open_ref++; 2450 ctx->cb_fn(ctx->cb_arg, blob, 0); 2451 free(ctx); 2452 return; 2453 } 2454 2455 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2456 } 2457 2458 void 2459 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2460 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2461 { 2462 struct spdk_bs_iter_ctx *ctx; 2463 2464 ctx = calloc(1, sizeof(*ctx)); 2465 if (!ctx) { 2466 cb_fn(cb_arg, NULL, -ENOMEM); 2467 return; 2468 } 2469 2470 ctx->page_num = -1; 2471 ctx->bs = bs; 2472 ctx->cb_fn = cb_fn; 2473 ctx->cb_arg = cb_arg; 2474 2475 _spdk_bs_iter_cpl(ctx, NULL, -1); 2476 } 2477 2478 static void 2479 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2480 { 2481 struct spdk_bs_iter_ctx *ctx = cb_arg; 2482 2483 _spdk_bs_iter_cpl(ctx, NULL, -1); 2484 } 2485 2486 void 2487 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2488 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2489 { 2490 struct spdk_bs_iter_ctx *ctx; 2491 struct spdk_blob *blob; 2492 2493 assert(b != NULL); 2494 blob = *b; 2495 assert(blob != NULL); 2496 2497 ctx = calloc(1, sizeof(*ctx)); 2498 if (!ctx) { 2499 cb_fn(cb_arg, NULL, -ENOMEM); 2500 return; 2501 } 2502 2503 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2504 ctx->bs = bs; 2505 ctx->cb_fn = cb_fn; 2506 ctx->cb_arg = cb_arg; 2507 2508 /* Close the existing blob */ 2509 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2510 } 2511 2512 int 2513 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2514 uint16_t value_len) 2515 { 2516 struct spdk_xattr *xattr; 2517 2518 assert(blob != NULL); 2519 2520 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2521 blob->state != SPDK_BLOB_STATE_SYNCING); 2522 2523 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2524 if (!strcmp(name, xattr->name)) { 2525 free(xattr->value); 2526 xattr->value_len = value_len; 2527 xattr->value = malloc(value_len); 2528 memcpy(xattr->value, value, value_len); 2529 2530 blob->state = SPDK_BLOB_STATE_DIRTY; 2531 2532 return 0; 2533 } 2534 } 2535 2536 xattr = calloc(1, sizeof(*xattr)); 2537 if (!xattr) { 2538 return -1; 2539 } 2540 xattr->name = strdup(name); 2541 xattr->value_len = value_len; 2542 xattr->value = malloc(value_len); 2543 memcpy(xattr->value, value, value_len); 2544 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2545 2546 blob->state = SPDK_BLOB_STATE_DIRTY; 2547 2548 return 0; 2549 } 2550 2551 int 2552 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2553 { 2554 struct spdk_xattr *xattr; 2555 2556 assert(blob != NULL); 2557 2558 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2559 blob->state != SPDK_BLOB_STATE_SYNCING); 2560 2561 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2562 if (!strcmp(name, xattr->name)) { 2563 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2564 free(xattr->value); 2565 free(xattr->name); 2566 free(xattr); 2567 2568 blob->state = SPDK_BLOB_STATE_DIRTY; 2569 2570 return 0; 2571 } 2572 } 2573 2574 return -ENOENT; 2575 } 2576 2577 int 2578 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2579 const void **value, size_t *value_len) 2580 { 2581 struct spdk_xattr *xattr; 2582 2583 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2584 if (!strcmp(name, xattr->name)) { 2585 *value = xattr->value; 2586 *value_len = xattr->value_len; 2587 return 0; 2588 } 2589 } 2590 2591 return -ENOENT; 2592 } 2593 2594 struct spdk_xattr_names { 2595 uint32_t count; 2596 const char *names[0]; 2597 }; 2598 2599 int 2600 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2601 struct spdk_xattr_names **names) 2602 { 2603 struct spdk_xattr *xattr; 2604 int count = 0; 2605 2606 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2607 count++; 2608 } 2609 2610 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2611 if (*names == NULL) { 2612 return -ENOMEM; 2613 } 2614 2615 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2616 (*names)->names[(*names)->count++] = xattr->name; 2617 } 2618 2619 return 0; 2620 } 2621 2622 uint32_t 2623 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2624 { 2625 assert(names != NULL); 2626 2627 return names->count; 2628 } 2629 2630 const char * 2631 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2632 { 2633 if (index >= names->count) { 2634 return NULL; 2635 } 2636 2637 return names->names[index]; 2638 } 2639 2640 void 2641 spdk_xattr_names_free(struct spdk_xattr_names *names) 2642 { 2643 free(names); 2644 } 2645 2646 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2647