1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/env.h" 38 #include "spdk/queue.h" 39 #include "spdk/io_channel.h" 40 #include "spdk/bit_array.h" 41 #include "spdk/likely.h" 42 43 #include "spdk_internal/log.h" 44 45 #include "blobstore.h" 46 #include "request.h" 47 48 static inline size_t 49 divide_round_up(size_t num, size_t divisor) 50 { 51 return (num + divisor - 1) / divisor; 52 } 53 54 static void 55 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 56 { 57 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 58 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 59 assert(bs->num_free_clusters > 0); 60 61 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 62 63 spdk_bit_array_set(bs->used_clusters, cluster_num); 64 bs->num_free_clusters--; 65 } 66 67 static void 68 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 69 { 70 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 71 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 72 assert(bs->num_free_clusters < bs->total_clusters); 73 74 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 75 76 spdk_bit_array_clear(bs->used_clusters, cluster_num); 77 bs->num_free_clusters++; 78 } 79 80 static struct spdk_blob * 81 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 82 { 83 struct spdk_blob *blob; 84 85 blob = calloc(1, sizeof(*blob)); 86 if (!blob) { 87 return NULL; 88 } 89 90 blob->id = id; 91 blob->bs = bs; 92 93 blob->state = SPDK_BLOB_STATE_DIRTY; 94 blob->active.num_pages = 1; 95 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 96 if (!blob->active.pages) { 97 free(blob); 98 return NULL; 99 } 100 101 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 102 103 TAILQ_INIT(&blob->xattrs); 104 105 return blob; 106 } 107 108 static void 109 _spdk_blob_free(struct spdk_blob *blob) 110 { 111 struct spdk_xattr *xattr, *xattr_tmp; 112 113 assert(blob != NULL); 114 115 free(blob->active.clusters); 116 free(blob->clean.clusters); 117 free(blob->active.pages); 118 free(blob->clean.pages); 119 120 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 121 TAILQ_REMOVE(&blob->xattrs, xattr, link); 122 free(xattr->name); 123 free(xattr->value); 124 free(xattr); 125 } 126 127 free(blob); 128 } 129 130 static int 131 _spdk_blob_mark_clean(struct spdk_blob *blob) 132 { 133 uint64_t *clusters = NULL; 134 uint32_t *pages = NULL; 135 136 assert(blob != NULL); 137 assert(blob->state == SPDK_BLOB_STATE_LOADING || 138 blob->state == SPDK_BLOB_STATE_SYNCING); 139 140 if (blob->active.num_clusters) { 141 assert(blob->active.clusters); 142 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 143 if (!clusters) { 144 return -1; 145 } 146 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 147 } 148 149 if (blob->active.num_pages) { 150 assert(blob->active.pages); 151 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 152 if (!pages) { 153 free(clusters); 154 return -1; 155 } 156 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 157 } 158 159 free(blob->clean.clusters); 160 free(blob->clean.pages); 161 162 blob->clean.num_clusters = blob->active.num_clusters; 163 blob->clean.clusters = blob->active.clusters; 164 blob->clean.num_pages = blob->active.num_pages; 165 blob->clean.pages = blob->active.pages; 166 167 blob->active.clusters = clusters; 168 blob->active.pages = pages; 169 170 blob->state = SPDK_BLOB_STATE_CLEAN; 171 172 return 0; 173 } 174 175 static void 176 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 177 { 178 struct spdk_blob_md_descriptor *desc; 179 size_t cur_desc = 0; 180 void *tmp; 181 182 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 183 while (cur_desc < sizeof(page->descriptors)) { 184 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 185 if (desc->length == 0) { 186 /* If padding and length are 0, this terminates the page */ 187 break; 188 } 189 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 190 struct spdk_blob_md_descriptor_extent *desc_extent; 191 unsigned int i, j; 192 unsigned int cluster_count = blob->active.num_clusters; 193 194 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 195 196 assert(desc_extent->length > 0); 197 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 198 199 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 200 for (j = 0; j < desc_extent->extents[i].length; j++) { 201 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 202 cluster_count++; 203 } 204 } 205 206 assert(cluster_count > 0); 207 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 208 assert(tmp != NULL); 209 blob->active.clusters = tmp; 210 blob->active.cluster_array_size = cluster_count; 211 212 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 213 for (j = 0; j < desc_extent->extents[i].length; j++) { 214 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 215 desc_extent->extents[i].cluster_idx + j); 216 } 217 } 218 219 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 220 struct spdk_blob_md_descriptor_xattr *desc_xattr; 221 struct spdk_xattr *xattr; 222 223 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 224 225 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 226 sizeof(desc_xattr->value_length) + 227 desc_xattr->name_length + desc_xattr->value_length); 228 229 xattr = calloc(1, sizeof(*xattr)); 230 assert(xattr != NULL); 231 232 xattr->name = malloc(desc_xattr->name_length + 1); 233 assert(xattr->name); 234 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 235 xattr->name[desc_xattr->name_length] = '\0'; 236 237 xattr->value = malloc(desc_xattr->value_length); 238 assert(xattr->value != NULL); 239 xattr->value_len = desc_xattr->value_length; 240 memcpy(xattr->value, 241 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 242 desc_xattr->value_length); 243 244 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 245 } else { 246 /* Error */ 247 break; 248 } 249 250 /* Advance to the next descriptor */ 251 cur_desc += sizeof(*desc) + desc->length; 252 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 253 break; 254 } 255 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 256 } 257 } 258 259 static int 260 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 261 struct spdk_blob *blob) 262 { 263 const struct spdk_blob_md_page *page; 264 uint32_t i; 265 266 assert(page_count > 0); 267 assert(pages[0].sequence_num == 0); 268 assert(blob != NULL); 269 assert(blob->state == SPDK_BLOB_STATE_LOADING); 270 assert(blob->active.clusters == NULL); 271 assert(blob->id == pages[0].id); 272 assert(blob->state == SPDK_BLOB_STATE_LOADING); 273 274 for (i = 0; i < page_count; i++) { 275 page = &pages[i]; 276 277 assert(page->id == blob->id); 278 assert(page->sequence_num == i); 279 280 _spdk_blob_parse_page(page, blob); 281 } 282 283 return 0; 284 } 285 286 static int 287 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 288 struct spdk_blob_md_page **pages, 289 uint32_t *page_count, 290 struct spdk_blob_md_page **last_page) 291 { 292 struct spdk_blob_md_page *page; 293 294 assert(pages != NULL); 295 assert(page_count != NULL); 296 297 if (*page_count == 0) { 298 assert(*pages == NULL); 299 *page_count = 1; 300 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 301 SPDK_BS_PAGE_SIZE, 302 NULL); 303 } else { 304 assert(*pages != NULL); 305 (*page_count)++; 306 *pages = spdk_dma_realloc(*pages, 307 SPDK_BS_PAGE_SIZE * (*page_count), 308 SPDK_BS_PAGE_SIZE, 309 NULL); 310 } 311 312 if (*pages == NULL) { 313 *page_count = 0; 314 *last_page = NULL; 315 return -ENOMEM; 316 } 317 318 page = &(*pages)[*page_count - 1]; 319 memset(page, 0, sizeof(*page)); 320 page->id = blob->id; 321 page->sequence_num = *page_count - 1; 322 page->next = SPDK_INVALID_MD_PAGE; 323 *last_page = page; 324 325 return 0; 326 } 327 328 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 329 * Update required_sz on both success and failure. 330 * 331 */ 332 static int 333 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 334 uint8_t *buf, size_t buf_sz, 335 size_t *required_sz) 336 { 337 struct spdk_blob_md_descriptor_xattr *desc; 338 339 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 340 strlen(xattr->name) + 341 xattr->value_len; 342 343 if (buf_sz < *required_sz) { 344 return -1; 345 } 346 347 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 348 349 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 350 desc->length = sizeof(desc->name_length) + 351 sizeof(desc->value_length) + 352 strlen(xattr->name) + 353 xattr->value_len; 354 desc->name_length = strlen(xattr->name); 355 desc->value_length = xattr->value_len; 356 357 memcpy(desc->name, xattr->name, desc->name_length); 358 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 359 xattr->value, 360 desc->value_length); 361 362 return 0; 363 } 364 365 static void 366 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 367 uint64_t start_cluster, uint64_t *next_cluster, 368 uint8_t *buf, size_t buf_sz) 369 { 370 struct spdk_blob_md_descriptor_extent *desc; 371 size_t cur_sz; 372 uint64_t i, extent_idx; 373 uint32_t lba, lba_per_cluster, lba_count; 374 375 /* The buffer must have room for at least one extent */ 376 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 377 if (buf_sz < cur_sz) { 378 *next_cluster = start_cluster; 379 return; 380 } 381 382 desc = (struct spdk_blob_md_descriptor_extent *)buf; 383 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 384 385 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 386 387 lba = blob->active.clusters[start_cluster]; 388 lba_count = lba_per_cluster; 389 extent_idx = 0; 390 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 391 if ((lba + lba_count) == blob->active.clusters[i]) { 392 lba_count += lba_per_cluster; 393 continue; 394 } 395 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 396 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 397 extent_idx++; 398 399 cur_sz += sizeof(desc->extents[extent_idx]); 400 401 if (buf_sz < cur_sz) { 402 /* If we ran out of buffer space, return */ 403 desc->length = sizeof(desc->extents[0]) * extent_idx; 404 *next_cluster = i; 405 return; 406 } 407 408 lba = blob->active.clusters[i]; 409 lba_count = lba_per_cluster; 410 } 411 412 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 413 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 414 extent_idx++; 415 416 desc->length = sizeof(desc->extents[0]) * extent_idx; 417 *next_cluster = blob->active.num_clusters; 418 419 return; 420 } 421 422 static int 423 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 424 uint32_t *page_count) 425 { 426 struct spdk_blob_md_page *cur_page; 427 const struct spdk_xattr *xattr; 428 int rc; 429 uint8_t *buf; 430 size_t remaining_sz; 431 uint64_t last_cluster; 432 433 assert(pages != NULL); 434 assert(page_count != NULL); 435 assert(blob != NULL); 436 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 437 438 *pages = NULL; 439 *page_count = 0; 440 441 /* A blob always has at least 1 page, even if it has no descriptors */ 442 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 443 if (rc < 0) { 444 return rc; 445 } 446 447 buf = (uint8_t *)cur_page->descriptors; 448 remaining_sz = sizeof(cur_page->descriptors); 449 450 /* Serialize xattrs */ 451 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 452 size_t required_sz = 0; 453 rc = _spdk_blob_serialize_xattr(xattr, 454 buf, remaining_sz, 455 &required_sz); 456 if (rc < 0) { 457 /* Need to add a new page to the chain */ 458 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 459 &cur_page); 460 if (rc < 0) { 461 spdk_dma_free(*pages); 462 *pages = NULL; 463 *page_count = 0; 464 return rc; 465 } 466 467 buf = (uint8_t *)cur_page->descriptors; 468 remaining_sz = sizeof(cur_page->descriptors); 469 470 /* Try again */ 471 required_sz = 0; 472 rc = _spdk_blob_serialize_xattr(xattr, 473 buf, remaining_sz, 474 &required_sz); 475 476 if (rc < 0) { 477 spdk_dma_free(*pages); 478 *pages = NULL; 479 *page_count = 0; 480 return -1; 481 } 482 } 483 484 remaining_sz -= required_sz; 485 buf += required_sz; 486 } 487 488 /* Serialize extents */ 489 last_cluster = 0; 490 while (last_cluster < blob->active.num_clusters) { 491 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 492 buf, remaining_sz); 493 494 if (last_cluster == blob->active.num_clusters) { 495 break; 496 } 497 498 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 499 &cur_page); 500 if (rc < 0) { 501 return rc; 502 } 503 504 buf = (uint8_t *)cur_page->descriptors; 505 remaining_sz = sizeof(cur_page->descriptors); 506 } 507 508 return 0; 509 } 510 511 struct spdk_blob_load_ctx { 512 struct spdk_blob *blob; 513 514 struct spdk_blob_md_page *pages; 515 uint32_t num_pages; 516 517 spdk_bs_sequence_cpl cb_fn; 518 void *cb_arg; 519 }; 520 521 static void 522 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 523 { 524 struct spdk_blob_load_ctx *ctx = cb_arg; 525 struct spdk_blob *blob = ctx->blob; 526 struct spdk_blob_md_page *page; 527 int rc; 528 529 page = &ctx->pages[ctx->num_pages - 1]; 530 531 if (page->next != SPDK_INVALID_MD_PAGE) { 532 uint32_t next_page = page->next; 533 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 534 535 536 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 537 538 /* Read the next page */ 539 ctx->num_pages++; 540 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 541 sizeof(*page), NULL); 542 if (ctx->pages == NULL) { 543 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 544 free(ctx); 545 return; 546 } 547 548 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 549 next_lba, 550 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 551 _spdk_blob_load_cpl, ctx); 552 return; 553 } 554 555 /* Parse the pages */ 556 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 557 558 _spdk_blob_mark_clean(blob); 559 560 ctx->cb_fn(seq, ctx->cb_arg, rc); 561 562 /* Free the memory */ 563 spdk_dma_free(ctx->pages); 564 free(ctx); 565 } 566 567 /* Load a blob from disk given a blobid */ 568 static void 569 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 570 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 571 { 572 struct spdk_blob_load_ctx *ctx; 573 struct spdk_blob_store *bs; 574 uint32_t page_num; 575 uint64_t lba; 576 577 assert(blob != NULL); 578 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 579 blob->state == SPDK_BLOB_STATE_DIRTY); 580 581 bs = blob->bs; 582 583 ctx = calloc(1, sizeof(*ctx)); 584 if (!ctx) { 585 cb_fn(seq, cb_arg, -ENOMEM); 586 return; 587 } 588 589 ctx->blob = blob; 590 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 591 SPDK_BS_PAGE_SIZE, NULL); 592 if (!ctx->pages) { 593 free(ctx); 594 cb_fn(seq, cb_arg, -ENOMEM); 595 return; 596 } 597 ctx->num_pages = 1; 598 ctx->cb_fn = cb_fn; 599 ctx->cb_arg = cb_arg; 600 601 page_num = _spdk_bs_blobid_to_page(blob->id); 602 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 603 604 blob->state = SPDK_BLOB_STATE_LOADING; 605 606 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 607 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 608 _spdk_blob_load_cpl, ctx); 609 } 610 611 struct spdk_blob_persist_ctx { 612 struct spdk_blob *blob; 613 614 struct spdk_blob_md_page *pages; 615 616 uint64_t idx; 617 618 spdk_bs_sequence_cpl cb_fn; 619 void *cb_arg; 620 }; 621 622 static void 623 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 624 { 625 struct spdk_blob_persist_ctx *ctx = cb_arg; 626 struct spdk_blob *blob = ctx->blob; 627 628 if (bserrno == 0) { 629 _spdk_blob_mark_clean(blob); 630 } 631 632 /* Call user callback */ 633 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 634 635 /* Free the memory */ 636 spdk_dma_free(ctx->pages); 637 free(ctx); 638 } 639 640 static void 641 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 642 { 643 struct spdk_blob_persist_ctx *ctx = cb_arg; 644 struct spdk_blob *blob = ctx->blob; 645 struct spdk_blob_store *bs = blob->bs; 646 void *tmp; 647 size_t i; 648 649 /* Release all clusters that were truncated */ 650 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 651 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 652 653 _spdk_bs_release_cluster(bs, cluster_num); 654 } 655 656 if (blob->active.num_clusters == 0) { 657 free(blob->active.clusters); 658 blob->active.clusters = NULL; 659 blob->active.cluster_array_size = 0; 660 } else { 661 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 662 assert(tmp != NULL); 663 blob->active.clusters = tmp; 664 blob->active.cluster_array_size = blob->active.num_clusters; 665 } 666 667 _spdk_blob_persist_complete(seq, ctx, bserrno); 668 } 669 670 static void 671 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 672 { 673 struct spdk_blob_persist_ctx *ctx = cb_arg; 674 struct spdk_blob *blob = ctx->blob; 675 struct spdk_blob_store *bs = blob->bs; 676 spdk_bs_batch_t *batch; 677 size_t i; 678 uint64_t lba; 679 uint32_t lba_count; 680 681 /* Clusters don't move around in blobs. The list shrinks or grows 682 * at the end, but no changes ever occur in the middle of the list. 683 */ 684 685 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 686 687 /* Unmap all clusters that were truncated */ 688 lba = 0; 689 lba_count = 0; 690 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 691 uint64_t next_lba = blob->active.clusters[i]; 692 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 693 694 if ((lba + lba_count) == next_lba) { 695 /* This cluster is contiguous with the previous one. */ 696 lba_count += next_lba_count; 697 continue; 698 } 699 700 /* This cluster is not contiguous with the previous one. */ 701 702 /* If a run of LBAs previously existing, send them 703 * as an unmap. 704 */ 705 if (lba_count > 0) { 706 spdk_bs_batch_unmap(batch, lba, lba_count); 707 } 708 709 /* Start building the next batch */ 710 lba = next_lba; 711 lba_count = next_lba_count; 712 } 713 714 /* If we ended with a contiguous set of LBAs, send the unmap now */ 715 if (lba_count > 0) { 716 spdk_bs_batch_unmap(batch, lba, lba_count); 717 } 718 719 spdk_bs_batch_close(batch); 720 } 721 722 static void 723 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 724 { 725 struct spdk_blob_persist_ctx *ctx = cb_arg; 726 struct spdk_blob *blob = ctx->blob; 727 struct spdk_blob_store *bs = blob->bs; 728 size_t i; 729 730 /* This loop starts at 1 because the first page is special and handled 731 * below. The pages (except the first) are never written in place, 732 * so any pages in the clean list must be unmapped. 733 */ 734 for (i = 1; i < blob->clean.num_pages; i++) { 735 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 736 } 737 738 if (blob->active.num_pages == 0) { 739 uint32_t page_num; 740 741 page_num = _spdk_bs_blobid_to_page(blob->id); 742 spdk_bit_array_clear(bs->used_md_pages, page_num); 743 } 744 745 /* Move on to unmapping clusters */ 746 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 747 } 748 749 static void 750 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 751 { 752 struct spdk_blob_persist_ctx *ctx = cb_arg; 753 struct spdk_blob *blob = ctx->blob; 754 struct spdk_blob_store *bs = blob->bs; 755 uint64_t lba; 756 uint32_t lba_count; 757 spdk_bs_batch_t *batch; 758 size_t i; 759 760 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 761 762 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 763 764 /* This loop starts at 1 because the first page is special and handled 765 * below. The pages (except the first) are never written in place, 766 * so any pages in the clean list must be unmapped. 767 */ 768 for (i = 1; i < blob->clean.num_pages; i++) { 769 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 770 771 spdk_bs_batch_unmap(batch, lba, lba_count); 772 } 773 774 /* The first page will only be unmapped if this is a delete. */ 775 if (blob->active.num_pages == 0) { 776 uint32_t page_num; 777 778 /* The first page in the metadata goes where the blobid indicates */ 779 page_num = _spdk_bs_blobid_to_page(blob->id); 780 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 781 782 spdk_bs_batch_unmap(batch, lba, lba_count); 783 } 784 785 spdk_bs_batch_close(batch); 786 } 787 788 static void 789 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 790 { 791 struct spdk_blob_persist_ctx *ctx = cb_arg; 792 struct spdk_blob *blob = ctx->blob; 793 struct spdk_blob_store *bs = blob->bs; 794 uint64_t lba; 795 uint32_t lba_count; 796 struct spdk_blob_md_page *page; 797 798 if (blob->active.num_pages == 0) { 799 /* Move on to the next step */ 800 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 801 return; 802 } 803 804 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 805 806 page = &ctx->pages[0]; 807 /* The first page in the metadata goes where the blobid indicates */ 808 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 809 810 spdk_bs_sequence_write(seq, page, lba, lba_count, 811 _spdk_blob_persist_unmap_pages, ctx); 812 } 813 814 static void 815 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 816 { 817 struct spdk_blob_persist_ctx *ctx = cb_arg; 818 struct spdk_blob *blob = ctx->blob; 819 struct spdk_blob_store *bs = blob->bs; 820 uint64_t lba; 821 uint32_t lba_count; 822 struct spdk_blob_md_page *page; 823 spdk_bs_batch_t *batch; 824 size_t i; 825 826 /* Clusters don't move around in blobs. The list shrinks or grows 827 * at the end, but no changes ever occur in the middle of the list. 828 */ 829 830 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 831 832 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 833 834 /* This starts at 1. The root page is not written until 835 * all of the others are finished 836 */ 837 for (i = 1; i < blob->active.num_pages; i++) { 838 page = &ctx->pages[i]; 839 assert(page->sequence_num == i); 840 841 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 842 843 spdk_bs_batch_write(batch, page, lba, lba_count); 844 } 845 846 spdk_bs_batch_close(batch); 847 } 848 849 static int 850 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 851 { 852 uint64_t i; 853 uint64_t *tmp; 854 uint64_t lfc; /* lowest free cluster */ 855 struct spdk_blob_store *bs; 856 857 bs = blob->bs; 858 859 assert(blob->state != SPDK_BLOB_STATE_LOADING && 860 blob->state != SPDK_BLOB_STATE_SYNCING); 861 862 if (blob->active.num_clusters == sz) { 863 return 0; 864 } 865 866 if (blob->active.num_clusters < blob->active.cluster_array_size) { 867 /* If this blob was resized to be larger, then smaller, then 868 * larger without syncing, then the cluster array already 869 * contains spare assigned clusters we can use. 870 */ 871 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 872 sz); 873 } 874 875 blob->state = SPDK_BLOB_STATE_DIRTY; 876 877 /* Do two passes - one to verify that we can obtain enough clusters 878 * and another to actually claim them. 879 */ 880 881 lfc = 0; 882 for (i = blob->active.num_clusters; i < sz; i++) { 883 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 884 if (lfc >= bs->total_clusters) { 885 /* No more free clusters. Cannot satisfy the request */ 886 assert(false); 887 return -1; 888 } 889 lfc++; 890 } 891 892 if (sz > blob->active.num_clusters) { 893 /* Expand the cluster array if necessary. 894 * We only shrink the array when persisting. 895 */ 896 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 897 if (sz > 0 && tmp == NULL) { 898 assert(false); 899 return -1; 900 } 901 blob->active.clusters = tmp; 902 blob->active.cluster_array_size = sz; 903 } 904 905 lfc = 0; 906 for (i = blob->active.num_clusters; i < sz; i++) { 907 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 908 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 909 _spdk_bs_claim_cluster(bs, lfc); 910 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 911 lfc++; 912 } 913 914 blob->active.num_clusters = sz; 915 916 return 0; 917 } 918 919 /* Write a blob to disk */ 920 static void 921 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 922 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 923 { 924 struct spdk_blob_persist_ctx *ctx; 925 int rc; 926 uint64_t i; 927 uint32_t page_num; 928 struct spdk_blob_store *bs; 929 930 assert(blob != NULL); 931 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 932 blob->state == SPDK_BLOB_STATE_DIRTY); 933 934 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 935 cb_fn(seq, cb_arg, 0); 936 return; 937 } 938 939 bs = blob->bs; 940 941 ctx = calloc(1, sizeof(*ctx)); 942 if (!ctx) { 943 cb_fn(seq, cb_arg, -ENOMEM); 944 return; 945 } 946 ctx->blob = blob; 947 ctx->cb_fn = cb_fn; 948 ctx->cb_arg = cb_arg; 949 950 blob->state = SPDK_BLOB_STATE_SYNCING; 951 952 if (blob->active.num_pages == 0) { 953 /* This is the signal that the blob should be deleted. 954 * Immediately jump to the clean up routine. */ 955 assert(blob->clean.num_pages > 0); 956 ctx->idx = blob->clean.num_pages - 1; 957 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 958 return; 959 960 } 961 962 /* Generate the new metadata */ 963 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 964 if (rc < 0) { 965 free(ctx); 966 cb_fn(seq, cb_arg, rc); 967 return; 968 } 969 970 assert(blob->active.num_pages >= 1); 971 972 /* Resize the cache of page indices */ 973 blob->active.pages = realloc(blob->active.pages, 974 blob->active.num_pages * sizeof(*blob->active.pages)); 975 if (!blob->active.pages) { 976 free(ctx); 977 cb_fn(seq, cb_arg, -ENOMEM); 978 return; 979 } 980 981 /* Assign this metadata to pages. This requires two passes - 982 * one to verify that there are enough pages and a second 983 * to actually claim them. */ 984 page_num = 0; 985 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 986 for (i = 1; i < blob->active.num_pages; i++) { 987 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 988 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 989 spdk_dma_free(ctx->pages); 990 free(ctx); 991 blob->state = SPDK_BLOB_STATE_DIRTY; 992 cb_fn(seq, cb_arg, -ENOMEM); 993 return; 994 } 995 page_num++; 996 } 997 998 page_num = 0; 999 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1000 for (i = 1; i < blob->active.num_pages; i++) { 1001 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1002 ctx->pages[i - 1].next = page_num; 1003 blob->active.pages[i] = page_num; 1004 spdk_bit_array_set(bs->used_md_pages, page_num); 1005 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1006 page_num++; 1007 } 1008 1009 /* Start writing the metadata from last page to first */ 1010 ctx->idx = blob->active.num_pages - 1; 1011 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1012 } 1013 1014 static void 1015 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1016 void *payload, uint64_t offset, uint64_t length, 1017 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1018 { 1019 spdk_bs_batch_t *batch; 1020 struct spdk_bs_cpl cpl; 1021 uint64_t lba; 1022 uint32_t lba_count; 1023 uint8_t *buf; 1024 uint64_t page; 1025 1026 assert(blob != NULL); 1027 1028 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1029 cb_fn(cb_arg, -EINVAL); 1030 return; 1031 } 1032 1033 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1034 cpl.u.blob_basic.cb_fn = cb_fn; 1035 cpl.u.blob_basic.cb_arg = cb_arg; 1036 1037 batch = spdk_bs_batch_open(_channel, &cpl); 1038 if (!batch) { 1039 cb_fn(cb_arg, -ENOMEM); 1040 return; 1041 } 1042 1043 length = _spdk_bs_page_to_lba(blob->bs, length); 1044 page = offset; 1045 buf = payload; 1046 while (length > 0) { 1047 lba = _spdk_bs_blob_page_to_lba(blob, page); 1048 lba_count = spdk_min(length, 1049 _spdk_bs_page_to_lba(blob->bs, 1050 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1051 1052 if (read) { 1053 spdk_bs_batch_read(batch, buf, lba, lba_count); 1054 } else { 1055 spdk_bs_batch_write(batch, buf, lba, lba_count); 1056 } 1057 1058 length -= lba_count; 1059 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1060 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1061 } 1062 1063 spdk_bs_batch_close(batch); 1064 } 1065 1066 struct rw_iov_ctx { 1067 struct spdk_blob *blob; 1068 bool read; 1069 int iovcnt; 1070 struct iovec *orig_iov; 1071 uint64_t page_offset; 1072 uint64_t pages_remaining; 1073 uint64_t pages_done; 1074 struct iovec iov[0]; 1075 }; 1076 1077 static void 1078 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1079 { 1080 assert(cb_arg == NULL); 1081 spdk_bs_sequence_finish(seq, bserrno); 1082 } 1083 1084 static void 1085 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1086 { 1087 struct rw_iov_ctx *ctx = cb_arg; 1088 struct iovec *iov, *orig_iov; 1089 int iovcnt; 1090 size_t orig_iovoff; 1091 uint64_t lba; 1092 uint64_t page_count, pages_to_boundary; 1093 uint32_t lba_count; 1094 uint64_t byte_count; 1095 1096 if (bserrno != 0 || ctx->pages_remaining == 0) { 1097 free(ctx); 1098 spdk_bs_sequence_finish(seq, bserrno); 1099 return; 1100 } 1101 1102 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1103 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1104 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1105 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1106 1107 /* 1108 * Get index and offset into the original iov array for our current position in the I/O sequence. 1109 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1110 * point to the current position in the I/O sequence. 1111 */ 1112 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1113 orig_iov = &ctx->orig_iov[0]; 1114 orig_iovoff = 0; 1115 while (byte_count > 0) { 1116 if (byte_count >= orig_iov->iov_len) { 1117 byte_count -= orig_iov->iov_len; 1118 orig_iov++; 1119 } else { 1120 orig_iovoff = byte_count; 1121 byte_count = 0; 1122 } 1123 } 1124 1125 /* 1126 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1127 * bytes of this next I/O remain to be accounted for in the new iov array. 1128 */ 1129 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1130 iov = &ctx->iov[0]; 1131 iovcnt = 0; 1132 while (byte_count > 0) { 1133 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1134 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1135 byte_count -= iov->iov_len; 1136 orig_iovoff = 0; 1137 orig_iov++; 1138 iov++; 1139 iovcnt++; 1140 } 1141 1142 ctx->page_offset += page_count; 1143 ctx->pages_done += page_count; 1144 ctx->pages_remaining -= page_count; 1145 iov = &ctx->iov[0]; 1146 1147 if (ctx->read) { 1148 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1149 } else { 1150 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1151 } 1152 } 1153 1154 static void 1155 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1156 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1157 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1158 { 1159 spdk_bs_sequence_t *seq; 1160 struct spdk_bs_cpl cpl; 1161 1162 assert(blob != NULL); 1163 1164 if (length == 0) { 1165 cb_fn(cb_arg, 0); 1166 return; 1167 } 1168 1169 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1170 cb_fn(cb_arg, -EINVAL); 1171 return; 1172 } 1173 1174 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1175 cpl.u.blob_basic.cb_fn = cb_fn; 1176 cpl.u.blob_basic.cb_arg = cb_arg; 1177 1178 /* 1179 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1180 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1181 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1182 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1183 * to allocate a separate iov array and split the I/O such that none of the resulting 1184 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1185 * but since this case happens very infrequently, any performance impact will be negligible. 1186 * 1187 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1188 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1189 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1190 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1191 */ 1192 seq = spdk_bs_sequence_start(_channel, &cpl); 1193 if (!seq) { 1194 cb_fn(cb_arg, -ENOMEM); 1195 return; 1196 } 1197 1198 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1199 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1200 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1201 1202 if (read) { 1203 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1204 } else { 1205 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1206 } 1207 } else { 1208 struct rw_iov_ctx *ctx; 1209 1210 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1211 if (ctx == NULL) { 1212 spdk_bs_sequence_finish(seq, -ENOMEM); 1213 return; 1214 } 1215 1216 ctx->blob = blob; 1217 ctx->read = read; 1218 ctx->orig_iov = iov; 1219 ctx->iovcnt = iovcnt; 1220 ctx->page_offset = offset; 1221 ctx->pages_remaining = length; 1222 ctx->pages_done = 0; 1223 1224 _spdk_rw_iov_split_next(seq, ctx, 0); 1225 } 1226 } 1227 1228 static struct spdk_blob * 1229 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1230 { 1231 struct spdk_blob *blob; 1232 1233 TAILQ_FOREACH(blob, &bs->blobs, link) { 1234 if (blob->id == blobid) { 1235 return blob; 1236 } 1237 } 1238 1239 return NULL; 1240 } 1241 1242 static int 1243 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1244 uint32_t max_ops) 1245 { 1246 struct spdk_bs_dev *dev; 1247 uint32_t i; 1248 1249 dev = bs->dev; 1250 1251 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1252 if (!channel->req_mem) { 1253 return -1; 1254 } 1255 1256 TAILQ_INIT(&channel->reqs); 1257 1258 for (i = 0; i < max_ops; i++) { 1259 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1260 } 1261 1262 channel->bs = bs; 1263 channel->dev = dev; 1264 channel->dev_channel = dev->create_channel(dev); 1265 1266 return 0; 1267 } 1268 1269 static int 1270 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1271 { 1272 struct spdk_blob_store *bs; 1273 struct spdk_bs_channel *channel = ctx_buf; 1274 1275 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1276 1277 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1278 } 1279 1280 static int 1281 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1282 { 1283 struct spdk_blob_store *bs; 1284 struct spdk_bs_channel *channel = ctx_buf; 1285 1286 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1287 1288 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1289 } 1290 1291 1292 static void 1293 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1294 { 1295 struct spdk_bs_channel *channel = ctx_buf; 1296 1297 free(channel->req_mem); 1298 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1299 } 1300 1301 static void 1302 _spdk_bs_dev_destroy(void *io_device) 1303 { 1304 struct spdk_blob_store *bs; 1305 struct spdk_blob *blob, *blob_tmp; 1306 1307 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1308 bs->dev->destroy(bs->dev); 1309 1310 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1311 TAILQ_REMOVE(&bs->blobs, blob, link); 1312 _spdk_blob_free(blob); 1313 } 1314 1315 spdk_bit_array_free(&bs->used_md_pages); 1316 spdk_bit_array_free(&bs->used_clusters); 1317 free(bs); 1318 } 1319 1320 static void 1321 _spdk_bs_free(struct spdk_blob_store *bs) 1322 { 1323 spdk_bs_unregister_md_thread(bs); 1324 spdk_io_device_unregister(&bs->io_target, NULL); 1325 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1326 } 1327 1328 void 1329 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1330 { 1331 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1332 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1333 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1334 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1335 } 1336 1337 static struct spdk_blob_store * 1338 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1339 { 1340 struct spdk_blob_store *bs; 1341 1342 bs = calloc(1, sizeof(struct spdk_blob_store)); 1343 if (!bs) { 1344 return NULL; 1345 } 1346 1347 TAILQ_INIT(&bs->blobs); 1348 bs->dev = dev; 1349 1350 /* 1351 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1352 * even multiple of the cluster size. 1353 */ 1354 bs->cluster_sz = opts->cluster_sz; 1355 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1356 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1357 bs->num_free_clusters = bs->total_clusters; 1358 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1359 if (bs->used_clusters == NULL) { 1360 _spdk_bs_free(bs); 1361 return NULL; 1362 } 1363 1364 bs->md_target.max_md_ops = opts->max_md_ops; 1365 bs->io_target.max_channel_ops = opts->max_channel_ops; 1366 bs->super_blob = SPDK_BLOBID_INVALID; 1367 1368 /* The metadata is assumed to be at least 1 page */ 1369 bs->used_md_pages = spdk_bit_array_create(1); 1370 1371 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1372 sizeof(struct spdk_bs_channel)); 1373 spdk_bs_register_md_thread(bs); 1374 1375 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1376 sizeof(struct spdk_bs_channel)); 1377 1378 return bs; 1379 } 1380 1381 /* START spdk_bs_load */ 1382 1383 struct spdk_bs_load_ctx { 1384 struct spdk_blob_store *bs; 1385 struct spdk_bs_super_block *super; 1386 1387 struct spdk_bs_md_mask *mask; 1388 }; 1389 1390 static void 1391 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1392 { 1393 struct spdk_bs_load_ctx *ctx = cb_arg; 1394 uint32_t i, j; 1395 int rc; 1396 1397 /* The type must be correct */ 1398 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1399 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1400 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1401 struct spdk_blob_md_page) * 8)); 1402 /* The length of the mask must be exactly equal to the total number of clusters */ 1403 assert(ctx->mask->length == ctx->bs->total_clusters); 1404 1405 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1406 if (rc < 0) { 1407 spdk_dma_free(ctx->super); 1408 spdk_dma_free(ctx->mask); 1409 _spdk_bs_free(ctx->bs); 1410 free(ctx); 1411 spdk_bs_sequence_finish(seq, -ENOMEM); 1412 return; 1413 } 1414 1415 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1416 for (i = 0; i < ctx->mask->length / 8; i++) { 1417 uint8_t segment = ctx->mask->mask[i]; 1418 for (j = 0; segment && (j < 8); j++) { 1419 if (segment & 1U) { 1420 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1421 assert(ctx->bs->num_free_clusters > 0); 1422 ctx->bs->num_free_clusters--; 1423 } 1424 segment >>= 1U; 1425 } 1426 } 1427 1428 spdk_dma_free(ctx->super); 1429 spdk_dma_free(ctx->mask); 1430 free(ctx); 1431 1432 spdk_bs_sequence_finish(seq, bserrno); 1433 } 1434 1435 static void 1436 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1437 { 1438 struct spdk_bs_load_ctx *ctx = cb_arg; 1439 uint64_t lba, lba_count, mask_size; 1440 uint32_t i, j; 1441 int rc; 1442 1443 /* The type must be correct */ 1444 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1445 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1446 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1447 8)); 1448 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1449 assert(ctx->mask->length == ctx->super->md_len); 1450 1451 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1452 if (rc < 0) { 1453 spdk_dma_free(ctx->super); 1454 spdk_dma_free(ctx->mask); 1455 _spdk_bs_free(ctx->bs); 1456 free(ctx); 1457 spdk_bs_sequence_finish(seq, -ENOMEM); 1458 return; 1459 } 1460 1461 for (i = 0; i < ctx->mask->length / 8; i++) { 1462 uint8_t segment = ctx->mask->mask[i]; 1463 for (j = 0; segment && (j < 8); j++) { 1464 if (segment & 1U) { 1465 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1466 } 1467 segment >>= 1U; 1468 } 1469 } 1470 spdk_dma_free(ctx->mask); 1471 1472 /* Read the used clusters mask */ 1473 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1474 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1475 if (!ctx->mask) { 1476 spdk_dma_free(ctx->super); 1477 _spdk_bs_free(ctx->bs); 1478 free(ctx); 1479 spdk_bs_sequence_finish(seq, -ENOMEM); 1480 return; 1481 } 1482 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1483 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1484 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1485 _spdk_bs_load_used_clusters_cpl, ctx); 1486 } 1487 1488 static void 1489 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1490 { 1491 struct spdk_bs_load_ctx *ctx = cb_arg; 1492 uint64_t lba, lba_count, mask_size; 1493 1494 /* Parse the super block */ 1495 ctx->bs->cluster_sz = ctx->super->cluster_size; 1496 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1497 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1498 ctx->bs->md_start = ctx->super->md_start; 1499 ctx->bs->md_len = ctx->super->md_len; 1500 ctx->bs->super_blob = ctx->super->super_blob; 1501 1502 /* Read the used pages mask */ 1503 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1504 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1505 if (!ctx->mask) { 1506 spdk_dma_free(ctx->super); 1507 _spdk_bs_free(ctx->bs); 1508 free(ctx); 1509 spdk_bs_sequence_finish(seq, -ENOMEM); 1510 return; 1511 } 1512 1513 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1514 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1515 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1516 _spdk_bs_load_used_pages_cpl, ctx); 1517 } 1518 1519 static void 1520 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1521 { 1522 struct spdk_bs_load_ctx *ctx = cb_arg; 1523 1524 if (ctx->super->version != SPDK_BS_VERSION) { 1525 spdk_dma_free(ctx->super); 1526 _spdk_bs_free(ctx->bs); 1527 free(ctx); 1528 spdk_bs_sequence_finish(seq, -EILSEQ); 1529 return; 1530 } 1531 1532 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1533 sizeof(ctx->super->signature)) != 0) { 1534 spdk_dma_free(ctx->super); 1535 _spdk_bs_free(ctx->bs); 1536 free(ctx); 1537 spdk_bs_sequence_finish(seq, -EILSEQ); 1538 return; 1539 } 1540 1541 if (ctx->super->clean != 1) { 1542 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1543 * All of the necessary data to recover is available 1544 * on disk - the code just has not been written yet. 1545 */ 1546 assert(false); 1547 spdk_dma_free(ctx->super); 1548 _spdk_bs_free(ctx->bs); 1549 free(ctx); 1550 spdk_bs_sequence_finish(seq, -EILSEQ); 1551 return; 1552 } 1553 1554 ctx->super->clean = 0; 1555 1556 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1557 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1558 _spdk_bs_load_write_super_cpl, ctx); 1559 } 1560 1561 void 1562 spdk_bs_load(struct spdk_bs_dev *dev, 1563 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1564 { 1565 struct spdk_blob_store *bs; 1566 struct spdk_bs_cpl cpl; 1567 spdk_bs_sequence_t *seq; 1568 struct spdk_bs_load_ctx *ctx; 1569 struct spdk_bs_opts opts = {}; 1570 1571 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1572 1573 spdk_bs_opts_init(&opts); 1574 1575 bs = _spdk_bs_alloc(dev, &opts); 1576 if (!bs) { 1577 cb_fn(cb_arg, NULL, -ENOMEM); 1578 return; 1579 } 1580 1581 ctx = calloc(1, sizeof(*ctx)); 1582 if (!ctx) { 1583 _spdk_bs_free(bs); 1584 cb_fn(cb_arg, NULL, -ENOMEM); 1585 return; 1586 } 1587 1588 ctx->bs = bs; 1589 1590 /* Allocate memory for the super block */ 1591 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1592 if (!ctx->super) { 1593 free(ctx); 1594 _spdk_bs_free(bs); 1595 return; 1596 } 1597 1598 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1599 cpl.u.bs_handle.cb_fn = cb_fn; 1600 cpl.u.bs_handle.cb_arg = cb_arg; 1601 cpl.u.bs_handle.bs = bs; 1602 1603 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1604 if (!seq) { 1605 spdk_dma_free(ctx->super); 1606 free(ctx); 1607 _spdk_bs_free(bs); 1608 cb_fn(cb_arg, NULL, -ENOMEM); 1609 return; 1610 } 1611 1612 /* Read the super block */ 1613 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1614 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1615 _spdk_bs_load_super_cpl, ctx); 1616 } 1617 1618 /* END spdk_bs_load */ 1619 1620 /* START spdk_bs_init */ 1621 1622 struct spdk_bs_init_ctx { 1623 struct spdk_blob_store *bs; 1624 struct spdk_bs_super_block *super; 1625 }; 1626 1627 static void 1628 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1629 { 1630 struct spdk_bs_init_ctx *ctx = cb_arg; 1631 1632 spdk_dma_free(ctx->super); 1633 free(ctx); 1634 1635 spdk_bs_sequence_finish(seq, bserrno); 1636 } 1637 1638 static void 1639 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1640 { 1641 struct spdk_bs_init_ctx *ctx = cb_arg; 1642 1643 /* Write super block */ 1644 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1645 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1646 _spdk_bs_init_persist_super_cpl, ctx); 1647 } 1648 1649 void 1650 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1651 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1652 { 1653 struct spdk_bs_init_ctx *ctx; 1654 struct spdk_blob_store *bs; 1655 struct spdk_bs_cpl cpl; 1656 spdk_bs_sequence_t *seq; 1657 uint64_t num_md_pages; 1658 uint32_t i; 1659 struct spdk_bs_opts opts = {}; 1660 int rc; 1661 1662 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1663 1664 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 1665 SPDK_ERRLOG("unsupported dev block length of %d\n", 1666 dev->blocklen); 1667 cb_fn(cb_arg, NULL, -EINVAL); 1668 return; 1669 } 1670 1671 if (o) { 1672 opts = *o; 1673 } else { 1674 spdk_bs_opts_init(&opts); 1675 } 1676 1677 bs = _spdk_bs_alloc(dev, &opts); 1678 if (!bs) { 1679 cb_fn(cb_arg, NULL, -ENOMEM); 1680 return; 1681 } 1682 1683 if (opts.num_md_pages == UINT32_MAX) { 1684 /* By default, allocate 1 page per cluster. 1685 * Technically, this over-allocates metadata 1686 * because more metadata will reduce the number 1687 * of usable clusters. This can be addressed with 1688 * more complex math in the future. 1689 */ 1690 bs->md_len = bs->total_clusters; 1691 } else { 1692 bs->md_len = opts.num_md_pages; 1693 } 1694 1695 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1696 if (rc < 0) { 1697 _spdk_bs_free(bs); 1698 cb_fn(cb_arg, NULL, -ENOMEM); 1699 return; 1700 } 1701 1702 ctx = calloc(1, sizeof(*ctx)); 1703 if (!ctx) { 1704 _spdk_bs_free(bs); 1705 cb_fn(cb_arg, NULL, -ENOMEM); 1706 return; 1707 } 1708 1709 ctx->bs = bs; 1710 1711 /* Allocate memory for the super block */ 1712 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1713 if (!ctx->super) { 1714 free(ctx); 1715 _spdk_bs_free(bs); 1716 return; 1717 } 1718 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1719 sizeof(ctx->super->signature)); 1720 ctx->super->version = SPDK_BS_VERSION; 1721 ctx->super->length = sizeof(*ctx->super); 1722 ctx->super->super_blob = bs->super_blob; 1723 ctx->super->clean = 0; 1724 ctx->super->cluster_size = bs->cluster_sz; 1725 1726 /* Calculate how many pages the metadata consumes at the front 1727 * of the disk. 1728 */ 1729 1730 /* The super block uses 1 page */ 1731 num_md_pages = 1; 1732 1733 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1734 * up to the nearest page, plus a header. 1735 */ 1736 ctx->super->used_page_mask_start = num_md_pages; 1737 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1738 divide_round_up(bs->md_len, 8), 1739 SPDK_BS_PAGE_SIZE); 1740 num_md_pages += ctx->super->used_page_mask_len; 1741 1742 /* The used_clusters mask requires 1 bit per cluster, rounded 1743 * up to the nearest page, plus a header. 1744 */ 1745 ctx->super->used_cluster_mask_start = num_md_pages; 1746 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1747 divide_round_up(bs->total_clusters, 8), 1748 SPDK_BS_PAGE_SIZE); 1749 num_md_pages += ctx->super->used_cluster_mask_len; 1750 1751 /* The metadata region size was chosen above */ 1752 ctx->super->md_start = bs->md_start = num_md_pages; 1753 ctx->super->md_len = bs->md_len; 1754 num_md_pages += bs->md_len; 1755 1756 /* Claim all of the clusters used by the metadata */ 1757 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1758 _spdk_bs_claim_cluster(bs, i); 1759 } 1760 1761 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1762 cpl.u.bs_handle.cb_fn = cb_fn; 1763 cpl.u.bs_handle.cb_arg = cb_arg; 1764 cpl.u.bs_handle.bs = bs; 1765 1766 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1767 if (!seq) { 1768 spdk_dma_free(ctx->super); 1769 free(ctx); 1770 _spdk_bs_free(bs); 1771 cb_fn(cb_arg, NULL, -ENOMEM); 1772 return; 1773 } 1774 1775 /* TRIM the entire device */ 1776 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1777 } 1778 1779 /* END spdk_bs_init */ 1780 1781 /* START spdk_bs_unload */ 1782 1783 struct spdk_bs_unload_ctx { 1784 struct spdk_blob_store *bs; 1785 struct spdk_bs_super_block *super; 1786 1787 struct spdk_bs_md_mask *mask; 1788 }; 1789 1790 static void 1791 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1792 { 1793 struct spdk_bs_unload_ctx *ctx = cb_arg; 1794 1795 spdk_dma_free(ctx->super); 1796 1797 spdk_bs_sequence_finish(seq, bserrno); 1798 1799 _spdk_bs_free(ctx->bs); 1800 free(ctx); 1801 } 1802 1803 static void 1804 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1805 { 1806 struct spdk_bs_unload_ctx *ctx = cb_arg; 1807 1808 spdk_dma_free(ctx->mask); 1809 1810 /* Update the values in the super block */ 1811 ctx->super->super_blob = ctx->bs->super_blob; 1812 ctx->super->clean = 1; 1813 1814 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1815 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1816 _spdk_bs_unload_write_super_cpl, ctx); 1817 } 1818 1819 static void 1820 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1821 { 1822 struct spdk_bs_unload_ctx *ctx = cb_arg; 1823 uint32_t i; 1824 uint64_t lba, lba_count, mask_size; 1825 1826 spdk_dma_free(ctx->mask); 1827 1828 /* Write out the used clusters mask */ 1829 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1830 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1831 if (!ctx->mask) { 1832 spdk_dma_free(ctx->super); 1833 free(ctx); 1834 spdk_bs_sequence_finish(seq, -ENOMEM); 1835 return; 1836 } 1837 1838 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1839 ctx->mask->length = ctx->bs->total_clusters; 1840 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1841 1842 i = 0; 1843 while (true) { 1844 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1845 if (i > ctx->mask->length) { 1846 break; 1847 } 1848 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1849 i++; 1850 } 1851 1852 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1853 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1854 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1855 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1856 } 1857 1858 static void 1859 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1860 { 1861 struct spdk_bs_unload_ctx *ctx = cb_arg; 1862 uint32_t i; 1863 uint64_t lba, lba_count, mask_size; 1864 1865 /* Write out the used page mask */ 1866 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1867 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1868 if (!ctx->mask) { 1869 spdk_dma_free(ctx->super); 1870 free(ctx); 1871 spdk_bs_sequence_finish(seq, -ENOMEM); 1872 return; 1873 } 1874 1875 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1876 ctx->mask->length = ctx->super->md_len; 1877 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1878 1879 i = 0; 1880 while (true) { 1881 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1882 if (i > ctx->mask->length) { 1883 break; 1884 } 1885 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1886 i++; 1887 } 1888 1889 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1890 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1891 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1892 _spdk_bs_unload_write_used_pages_cpl, ctx); 1893 } 1894 1895 void 1896 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1897 { 1898 struct spdk_bs_cpl cpl; 1899 spdk_bs_sequence_t *seq; 1900 struct spdk_bs_unload_ctx *ctx; 1901 1902 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1903 1904 ctx = calloc(1, sizeof(*ctx)); 1905 if (!ctx) { 1906 cb_fn(cb_arg, -ENOMEM); 1907 return; 1908 } 1909 1910 ctx->bs = bs; 1911 1912 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1913 if (!ctx->super) { 1914 free(ctx); 1915 cb_fn(cb_arg, -ENOMEM); 1916 return; 1917 } 1918 1919 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1920 cpl.u.bs_basic.cb_fn = cb_fn; 1921 cpl.u.bs_basic.cb_arg = cb_arg; 1922 1923 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1924 if (!seq) { 1925 spdk_dma_free(ctx->super); 1926 free(ctx); 1927 cb_fn(cb_arg, -ENOMEM); 1928 return; 1929 } 1930 1931 assert(TAILQ_EMPTY(&bs->blobs)); 1932 1933 /* Read super block */ 1934 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1935 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1936 _spdk_bs_unload_read_super_cpl, ctx); 1937 } 1938 1939 /* END spdk_bs_unload */ 1940 1941 void 1942 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1943 spdk_bs_op_complete cb_fn, void *cb_arg) 1944 { 1945 bs->super_blob = blobid; 1946 cb_fn(cb_arg, 0); 1947 } 1948 1949 void 1950 spdk_bs_get_super(struct spdk_blob_store *bs, 1951 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1952 { 1953 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1954 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1955 } else { 1956 cb_fn(cb_arg, bs->super_blob, 0); 1957 } 1958 } 1959 1960 uint64_t 1961 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1962 { 1963 return bs->cluster_sz; 1964 } 1965 1966 uint64_t 1967 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1968 { 1969 return SPDK_BS_PAGE_SIZE; 1970 } 1971 1972 uint64_t 1973 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1974 { 1975 return bs->num_free_clusters; 1976 } 1977 1978 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1979 { 1980 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 1981 1982 return 0; 1983 } 1984 1985 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1986 { 1987 spdk_put_io_channel(bs->md_target.md_channel); 1988 1989 return 0; 1990 } 1991 1992 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1993 { 1994 assert(blob != NULL); 1995 1996 return blob->id; 1997 } 1998 1999 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 2000 { 2001 assert(blob != NULL); 2002 2003 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2004 } 2005 2006 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 2007 { 2008 assert(blob != NULL); 2009 2010 return blob->active.num_clusters; 2011 } 2012 2013 /* START spdk_bs_md_create_blob */ 2014 2015 static void 2016 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2017 { 2018 struct spdk_blob *blob = cb_arg; 2019 2020 _spdk_blob_free(blob); 2021 2022 spdk_bs_sequence_finish(seq, bserrno); 2023 } 2024 2025 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2026 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2027 { 2028 struct spdk_blob *blob; 2029 uint32_t page_idx; 2030 struct spdk_bs_cpl cpl; 2031 spdk_bs_sequence_t *seq; 2032 spdk_blob_id id; 2033 2034 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2035 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2036 cb_fn(cb_arg, 0, -ENOMEM); 2037 return; 2038 } 2039 spdk_bit_array_set(bs->used_md_pages, page_idx); 2040 2041 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 2042 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 2043 * code assumes blob id == page_idx. 2044 */ 2045 id = (1ULL << 32) | page_idx; 2046 2047 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2048 2049 blob = _spdk_blob_alloc(bs, id); 2050 if (!blob) { 2051 cb_fn(cb_arg, 0, -ENOMEM); 2052 return; 2053 } 2054 2055 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2056 cpl.u.blobid.cb_fn = cb_fn; 2057 cpl.u.blobid.cb_arg = cb_arg; 2058 cpl.u.blobid.blobid = blob->id; 2059 2060 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2061 if (!seq) { 2062 _spdk_blob_free(blob); 2063 cb_fn(cb_arg, 0, -ENOMEM); 2064 return; 2065 } 2066 2067 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2068 } 2069 2070 /* END spdk_bs_md_create_blob */ 2071 2072 /* START spdk_bs_md_resize_blob */ 2073 int 2074 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2075 { 2076 int rc; 2077 2078 assert(blob != NULL); 2079 2080 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2081 2082 if (sz == blob->active.num_clusters) { 2083 return 0; 2084 } 2085 2086 rc = _spdk_resize_blob(blob, sz); 2087 if (rc < 0) { 2088 return rc; 2089 } 2090 2091 return 0; 2092 } 2093 2094 /* END spdk_bs_md_resize_blob */ 2095 2096 2097 /* START spdk_bs_md_delete_blob */ 2098 2099 static void 2100 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2101 { 2102 struct spdk_blob *blob = cb_arg; 2103 2104 _spdk_blob_free(blob); 2105 2106 spdk_bs_sequence_finish(seq, bserrno); 2107 } 2108 2109 static void 2110 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2111 { 2112 struct spdk_blob *blob = cb_arg; 2113 2114 blob->state = SPDK_BLOB_STATE_DIRTY; 2115 blob->active.num_pages = 0; 2116 _spdk_resize_blob(blob, 0); 2117 2118 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2119 } 2120 2121 void 2122 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2123 spdk_blob_op_complete cb_fn, void *cb_arg) 2124 { 2125 struct spdk_blob *blob; 2126 struct spdk_bs_cpl cpl; 2127 spdk_bs_sequence_t *seq; 2128 2129 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 2130 2131 blob = _spdk_blob_lookup(bs, blobid); 2132 if (blob) { 2133 assert(blob->open_ref > 0); 2134 cb_fn(cb_arg, -EINVAL); 2135 return; 2136 } 2137 2138 blob = _spdk_blob_alloc(bs, blobid); 2139 if (!blob) { 2140 cb_fn(cb_arg, -ENOMEM); 2141 return; 2142 } 2143 2144 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2145 cpl.u.blob_basic.cb_fn = cb_fn; 2146 cpl.u.blob_basic.cb_arg = cb_arg; 2147 2148 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2149 if (!seq) { 2150 _spdk_blob_free(blob); 2151 cb_fn(cb_arg, -ENOMEM); 2152 return; 2153 } 2154 2155 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2156 } 2157 2158 /* END spdk_bs_md_delete_blob */ 2159 2160 /* START spdk_bs_md_open_blob */ 2161 2162 static void 2163 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2164 { 2165 struct spdk_blob *blob = cb_arg; 2166 2167 blob->open_ref++; 2168 2169 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2170 2171 spdk_bs_sequence_finish(seq, bserrno); 2172 } 2173 2174 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2175 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2176 { 2177 struct spdk_blob *blob; 2178 struct spdk_bs_cpl cpl; 2179 spdk_bs_sequence_t *seq; 2180 uint32_t page_num; 2181 2182 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 2183 2184 blob = _spdk_blob_lookup(bs, blobid); 2185 if (blob) { 2186 blob->open_ref++; 2187 cb_fn(cb_arg, blob, 0); 2188 return; 2189 } 2190 2191 page_num = _spdk_bs_blobid_to_page(blobid); 2192 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2193 /* Invalid blobid */ 2194 cb_fn(cb_arg, NULL, -ENOENT); 2195 return; 2196 } 2197 2198 blob = _spdk_blob_alloc(bs, blobid); 2199 if (!blob) { 2200 cb_fn(cb_arg, NULL, -ENOMEM); 2201 return; 2202 } 2203 2204 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2205 cpl.u.blob_handle.cb_fn = cb_fn; 2206 cpl.u.blob_handle.cb_arg = cb_arg; 2207 cpl.u.blob_handle.blob = blob; 2208 2209 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2210 if (!seq) { 2211 _spdk_blob_free(blob); 2212 cb_fn(cb_arg, NULL, -ENOMEM); 2213 return; 2214 } 2215 2216 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2217 } 2218 2219 /* START spdk_bs_md_sync_blob */ 2220 static void 2221 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2222 { 2223 spdk_bs_sequence_finish(seq, bserrno); 2224 } 2225 2226 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2227 spdk_blob_op_complete cb_fn, void *cb_arg) 2228 { 2229 struct spdk_bs_cpl cpl; 2230 spdk_bs_sequence_t *seq; 2231 2232 assert(blob != NULL); 2233 2234 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2235 2236 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2237 blob->state != SPDK_BLOB_STATE_SYNCING); 2238 2239 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2240 cb_fn(cb_arg, 0); 2241 return; 2242 } 2243 2244 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2245 cpl.u.blob_basic.cb_fn = cb_fn; 2246 cpl.u.blob_basic.cb_arg = cb_arg; 2247 2248 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2249 if (!seq) { 2250 cb_fn(cb_arg, -ENOMEM); 2251 return; 2252 } 2253 2254 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2255 } 2256 2257 /* END spdk_bs_md_sync_blob */ 2258 2259 /* START spdk_bs_md_close_blob */ 2260 2261 static void 2262 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2263 { 2264 struct spdk_blob **blob = cb_arg; 2265 2266 if ((*blob)->open_ref == 0) { 2267 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2268 _spdk_blob_free((*blob)); 2269 } 2270 2271 *blob = NULL; 2272 2273 spdk_bs_sequence_finish(seq, bserrno); 2274 } 2275 2276 void spdk_bs_md_close_blob(struct spdk_blob **b, 2277 spdk_blob_op_complete cb_fn, void *cb_arg) 2278 { 2279 struct spdk_bs_cpl cpl; 2280 struct spdk_blob *blob; 2281 spdk_bs_sequence_t *seq; 2282 2283 assert(b != NULL); 2284 blob = *b; 2285 assert(blob != NULL); 2286 2287 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2288 2289 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2290 blob->state != SPDK_BLOB_STATE_SYNCING); 2291 2292 if (blob->open_ref == 0) { 2293 cb_fn(cb_arg, -EBADF); 2294 return; 2295 } 2296 2297 blob->open_ref--; 2298 2299 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2300 cpl.u.blob_basic.cb_fn = cb_fn; 2301 cpl.u.blob_basic.cb_arg = cb_arg; 2302 2303 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2304 if (!seq) { 2305 cb_fn(cb_arg, -ENOMEM); 2306 return; 2307 } 2308 2309 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2310 _spdk_blob_close_cpl(seq, b, 0); 2311 return; 2312 } 2313 2314 /* Sync metadata */ 2315 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2316 } 2317 2318 /* END spdk_bs_md_close_blob */ 2319 2320 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2321 { 2322 return spdk_get_io_channel(&bs->io_target); 2323 } 2324 2325 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2326 { 2327 spdk_put_io_channel(channel); 2328 } 2329 2330 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2331 spdk_blob_op_complete cb_fn, void *cb_arg) 2332 { 2333 /* Flush is synchronous right now */ 2334 cb_fn(cb_arg, 0); 2335 } 2336 2337 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2338 void *payload, uint64_t offset, uint64_t length, 2339 spdk_blob_op_complete cb_fn, void *cb_arg) 2340 { 2341 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2342 } 2343 2344 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2345 void *payload, uint64_t offset, uint64_t length, 2346 spdk_blob_op_complete cb_fn, void *cb_arg) 2347 { 2348 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2349 } 2350 2351 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2352 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2353 spdk_blob_op_complete cb_fn, void *cb_arg) 2354 { 2355 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2356 } 2357 2358 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2359 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2360 spdk_blob_op_complete cb_fn, void *cb_arg) 2361 { 2362 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2363 } 2364 2365 struct spdk_bs_iter_ctx { 2366 int64_t page_num; 2367 struct spdk_blob_store *bs; 2368 2369 spdk_blob_op_with_handle_complete cb_fn; 2370 void *cb_arg; 2371 }; 2372 2373 static void 2374 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2375 { 2376 struct spdk_bs_iter_ctx *ctx = cb_arg; 2377 struct spdk_blob_store *bs = ctx->bs; 2378 spdk_blob_id id; 2379 2380 if (bserrno == 0) { 2381 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2382 free(ctx); 2383 return; 2384 } 2385 2386 ctx->page_num++; 2387 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2388 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2389 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2390 free(ctx); 2391 return; 2392 } 2393 2394 id = (1ULL << 32) | ctx->page_num; 2395 2396 blob = _spdk_blob_lookup(bs, id); 2397 if (blob) { 2398 blob->open_ref++; 2399 ctx->cb_fn(ctx->cb_arg, blob, 0); 2400 free(ctx); 2401 return; 2402 } 2403 2404 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2405 } 2406 2407 void 2408 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2409 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2410 { 2411 struct spdk_bs_iter_ctx *ctx; 2412 2413 ctx = calloc(1, sizeof(*ctx)); 2414 if (!ctx) { 2415 cb_fn(cb_arg, NULL, -ENOMEM); 2416 return; 2417 } 2418 2419 ctx->page_num = -1; 2420 ctx->bs = bs; 2421 ctx->cb_fn = cb_fn; 2422 ctx->cb_arg = cb_arg; 2423 2424 _spdk_bs_iter_cpl(ctx, NULL, -1); 2425 } 2426 2427 static void 2428 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2429 { 2430 struct spdk_bs_iter_ctx *ctx = cb_arg; 2431 2432 _spdk_bs_iter_cpl(ctx, NULL, -1); 2433 } 2434 2435 void 2436 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2437 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2438 { 2439 struct spdk_bs_iter_ctx *ctx; 2440 struct spdk_blob *blob; 2441 2442 assert(b != NULL); 2443 blob = *b; 2444 assert(blob != NULL); 2445 2446 ctx = calloc(1, sizeof(*ctx)); 2447 if (!ctx) { 2448 cb_fn(cb_arg, NULL, -ENOMEM); 2449 return; 2450 } 2451 2452 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2453 ctx->bs = bs; 2454 ctx->cb_fn = cb_fn; 2455 ctx->cb_arg = cb_arg; 2456 2457 /* Close the existing blob */ 2458 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2459 } 2460 2461 int 2462 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2463 uint16_t value_len) 2464 { 2465 struct spdk_xattr *xattr; 2466 2467 assert(blob != NULL); 2468 2469 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2470 blob->state != SPDK_BLOB_STATE_SYNCING); 2471 2472 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2473 if (!strcmp(name, xattr->name)) { 2474 free(xattr->value); 2475 xattr->value_len = value_len; 2476 xattr->value = malloc(value_len); 2477 memcpy(xattr->value, value, value_len); 2478 2479 blob->state = SPDK_BLOB_STATE_DIRTY; 2480 2481 return 0; 2482 } 2483 } 2484 2485 xattr = calloc(1, sizeof(*xattr)); 2486 if (!xattr) { 2487 return -1; 2488 } 2489 xattr->name = strdup(name); 2490 xattr->value_len = value_len; 2491 xattr->value = malloc(value_len); 2492 memcpy(xattr->value, value, value_len); 2493 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2494 2495 blob->state = SPDK_BLOB_STATE_DIRTY; 2496 2497 return 0; 2498 } 2499 2500 int 2501 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2502 { 2503 struct spdk_xattr *xattr; 2504 2505 assert(blob != NULL); 2506 2507 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2508 blob->state != SPDK_BLOB_STATE_SYNCING); 2509 2510 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2511 if (!strcmp(name, xattr->name)) { 2512 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2513 free(xattr->value); 2514 free(xattr->name); 2515 free(xattr); 2516 2517 blob->state = SPDK_BLOB_STATE_DIRTY; 2518 2519 return 0; 2520 } 2521 } 2522 2523 return -ENOENT; 2524 } 2525 2526 int 2527 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2528 const void **value, size_t *value_len) 2529 { 2530 struct spdk_xattr *xattr; 2531 2532 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2533 if (!strcmp(name, xattr->name)) { 2534 *value = xattr->value; 2535 *value_len = xattr->value_len; 2536 return 0; 2537 } 2538 } 2539 2540 return -ENOENT; 2541 } 2542 2543 struct spdk_xattr_names { 2544 uint32_t count; 2545 const char *names[0]; 2546 }; 2547 2548 int 2549 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2550 struct spdk_xattr_names **names) 2551 { 2552 struct spdk_xattr *xattr; 2553 int count = 0; 2554 2555 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2556 count++; 2557 } 2558 2559 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2560 if (*names == NULL) { 2561 return -ENOMEM; 2562 } 2563 2564 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2565 (*names)->names[(*names)->count++] = xattr->name; 2566 } 2567 2568 return 0; 2569 } 2570 2571 uint32_t 2572 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2573 { 2574 assert(names != NULL); 2575 2576 return names->count; 2577 } 2578 2579 const char * 2580 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2581 { 2582 if (index >= names->count) { 2583 return NULL; 2584 } 2585 2586 return names->names[index]; 2587 } 2588 2589 void 2590 spdk_xattr_names_free(struct spdk_xattr_names *names) 2591 { 2592 free(names); 2593 } 2594 2595 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2596