1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/env.h" 38 #include "spdk/queue.h" 39 #include "spdk/io_channel.h" 40 #include "spdk/bit_array.h" 41 #include "spdk/likely.h" 42 43 #include "spdk_internal/log.h" 44 45 #include "blobstore.h" 46 #include "request.h" 47 48 static inline size_t 49 divide_round_up(size_t num, size_t divisor) 50 { 51 return (num + divisor - 1) / divisor; 52 } 53 54 static void 55 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 56 { 57 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 58 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 59 assert(bs->num_free_clusters > 0); 60 61 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 62 63 spdk_bit_array_set(bs->used_clusters, cluster_num); 64 bs->num_free_clusters--; 65 } 66 67 static void 68 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 69 { 70 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 71 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 72 assert(bs->num_free_clusters < bs->total_clusters); 73 74 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 75 76 spdk_bit_array_clear(bs->used_clusters, cluster_num); 77 bs->num_free_clusters++; 78 } 79 80 static struct spdk_blob * 81 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 82 { 83 struct spdk_blob *blob; 84 85 blob = calloc(1, sizeof(*blob)); 86 if (!blob) { 87 return NULL; 88 } 89 90 blob->id = id; 91 blob->bs = bs; 92 93 blob->state = SPDK_BLOB_STATE_DIRTY; 94 blob->active.num_pages = 1; 95 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 96 if (!blob->active.pages) { 97 free(blob); 98 return NULL; 99 } 100 101 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 102 103 TAILQ_INIT(&blob->xattrs); 104 105 return blob; 106 } 107 108 static void 109 _spdk_blob_free(struct spdk_blob *blob) 110 { 111 struct spdk_xattr *xattr, *xattr_tmp; 112 113 assert(blob != NULL); 114 115 free(blob->active.clusters); 116 free(blob->clean.clusters); 117 free(blob->active.pages); 118 free(blob->clean.pages); 119 120 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 121 TAILQ_REMOVE(&blob->xattrs, xattr, link); 122 free(xattr->name); 123 free(xattr->value); 124 free(xattr); 125 } 126 127 free(blob); 128 } 129 130 static int 131 _spdk_blob_mark_clean(struct spdk_blob *blob) 132 { 133 uint64_t *clusters = NULL; 134 uint32_t *pages = NULL; 135 136 assert(blob != NULL); 137 assert(blob->state == SPDK_BLOB_STATE_LOADING || 138 blob->state == SPDK_BLOB_STATE_SYNCING); 139 140 if (blob->active.num_clusters) { 141 assert(blob->active.clusters); 142 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 143 if (!clusters) { 144 return -1; 145 } 146 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 147 } 148 149 if (blob->active.num_pages) { 150 assert(blob->active.pages); 151 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 152 if (!pages) { 153 free(clusters); 154 return -1; 155 } 156 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 157 } 158 159 free(blob->clean.clusters); 160 free(blob->clean.pages); 161 162 blob->clean.num_clusters = blob->active.num_clusters; 163 blob->clean.clusters = blob->active.clusters; 164 blob->clean.num_pages = blob->active.num_pages; 165 blob->clean.pages = blob->active.pages; 166 167 blob->active.clusters = clusters; 168 blob->active.pages = pages; 169 170 blob->state = SPDK_BLOB_STATE_CLEAN; 171 172 return 0; 173 } 174 175 static void 176 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 177 { 178 struct spdk_blob_md_descriptor *desc; 179 size_t cur_desc = 0; 180 void *tmp; 181 182 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 183 while (cur_desc < sizeof(page->descriptors)) { 184 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 185 if (desc->length == 0) { 186 /* If padding and length are 0, this terminates the page */ 187 break; 188 } 189 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 190 struct spdk_blob_md_descriptor_extent *desc_extent; 191 unsigned int i, j; 192 unsigned int cluster_count = blob->active.num_clusters; 193 194 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 195 196 assert(desc_extent->length > 0); 197 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 198 199 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 200 for (j = 0; j < desc_extent->extents[i].length; j++) { 201 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 202 cluster_count++; 203 } 204 } 205 206 assert(cluster_count > 0); 207 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 208 assert(tmp != NULL); 209 blob->active.clusters = tmp; 210 blob->active.cluster_array_size = cluster_count; 211 212 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 213 for (j = 0; j < desc_extent->extents[i].length; j++) { 214 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 215 desc_extent->extents[i].cluster_idx + j); 216 } 217 } 218 219 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 220 struct spdk_blob_md_descriptor_xattr *desc_xattr; 221 struct spdk_xattr *xattr; 222 223 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 224 225 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 226 sizeof(desc_xattr->value_length) + 227 desc_xattr->name_length + desc_xattr->value_length); 228 229 xattr = calloc(1, sizeof(*xattr)); 230 assert(xattr != NULL); 231 232 xattr->name = malloc(desc_xattr->name_length + 1); 233 assert(xattr->name); 234 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 235 xattr->name[desc_xattr->name_length] = '\0'; 236 237 xattr->value = malloc(desc_xattr->value_length); 238 assert(xattr->value != NULL); 239 xattr->value_len = desc_xattr->value_length; 240 memcpy(xattr->value, 241 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 242 desc_xattr->value_length); 243 244 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 245 } else { 246 /* Error */ 247 break; 248 } 249 250 /* Advance to the next descriptor */ 251 cur_desc += sizeof(*desc) + desc->length; 252 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 253 break; 254 } 255 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 256 } 257 } 258 259 static int 260 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 261 struct spdk_blob *blob) 262 { 263 const struct spdk_blob_md_page *page; 264 uint32_t i; 265 266 assert(page_count > 0); 267 assert(pages[0].sequence_num == 0); 268 assert(blob != NULL); 269 assert(blob->state == SPDK_BLOB_STATE_LOADING); 270 assert(blob->active.clusters == NULL); 271 assert(blob->id == pages[0].id); 272 assert(blob->state == SPDK_BLOB_STATE_LOADING); 273 274 for (i = 0; i < page_count; i++) { 275 page = &pages[i]; 276 277 assert(page->id == blob->id); 278 assert(page->sequence_num == i); 279 280 _spdk_blob_parse_page(page, blob); 281 } 282 283 return 0; 284 } 285 286 static int 287 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 288 struct spdk_blob_md_page **pages, 289 uint32_t *page_count, 290 struct spdk_blob_md_page **last_page) 291 { 292 struct spdk_blob_md_page *page; 293 294 assert(pages != NULL); 295 assert(page_count != NULL); 296 297 if (*page_count == 0) { 298 assert(*pages == NULL); 299 *page_count = 1; 300 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 301 SPDK_BS_PAGE_SIZE, 302 NULL); 303 } else { 304 assert(*pages != NULL); 305 (*page_count)++; 306 *pages = spdk_dma_realloc(*pages, 307 SPDK_BS_PAGE_SIZE * (*page_count), 308 SPDK_BS_PAGE_SIZE, 309 NULL); 310 } 311 312 if (*pages == NULL) { 313 *page_count = 0; 314 *last_page = NULL; 315 return -ENOMEM; 316 } 317 318 page = &(*pages)[*page_count - 1]; 319 memset(page, 0, sizeof(*page)); 320 page->id = blob->id; 321 page->sequence_num = *page_count - 1; 322 page->next = SPDK_INVALID_MD_PAGE; 323 *last_page = page; 324 325 return 0; 326 } 327 328 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 329 * Update required_sz on both success and failure. 330 * 331 */ 332 static int 333 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 334 uint8_t *buf, size_t buf_sz, 335 size_t *required_sz) 336 { 337 struct spdk_blob_md_descriptor_xattr *desc; 338 339 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 340 strlen(xattr->name) + 341 xattr->value_len; 342 343 if (buf_sz < *required_sz) { 344 return -1; 345 } 346 347 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 348 349 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 350 desc->length = sizeof(desc->name_length) + 351 sizeof(desc->value_length) + 352 strlen(xattr->name) + 353 xattr->value_len; 354 desc->name_length = strlen(xattr->name); 355 desc->value_length = xattr->value_len; 356 357 memcpy(desc->name, xattr->name, desc->name_length); 358 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 359 xattr->value, 360 desc->value_length); 361 362 return 0; 363 } 364 365 static void 366 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 367 uint64_t start_cluster, uint64_t *next_cluster, 368 uint8_t *buf, size_t buf_sz) 369 { 370 struct spdk_blob_md_descriptor_extent *desc; 371 size_t cur_sz; 372 uint64_t i, extent_idx; 373 uint32_t lba, lba_per_cluster, lba_count; 374 375 /* The buffer must have room for at least one extent */ 376 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 377 if (buf_sz < cur_sz) { 378 *next_cluster = start_cluster; 379 return; 380 } 381 382 desc = (struct spdk_blob_md_descriptor_extent *)buf; 383 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 384 385 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 386 387 lba = blob->active.clusters[start_cluster]; 388 lba_count = lba_per_cluster; 389 extent_idx = 0; 390 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 391 if ((lba + lba_count) == blob->active.clusters[i]) { 392 lba_count += lba_per_cluster; 393 continue; 394 } 395 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 396 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 397 extent_idx++; 398 399 cur_sz += sizeof(desc->extents[extent_idx]); 400 401 if (buf_sz < cur_sz) { 402 /* If we ran out of buffer space, return */ 403 desc->length = sizeof(desc->extents[0]) * extent_idx; 404 *next_cluster = i; 405 return; 406 } 407 408 lba = blob->active.clusters[i]; 409 lba_count = lba_per_cluster; 410 } 411 412 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 413 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 414 extent_idx++; 415 416 desc->length = sizeof(desc->extents[0]) * extent_idx; 417 *next_cluster = blob->active.num_clusters; 418 419 return; 420 } 421 422 static int 423 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 424 uint32_t *page_count) 425 { 426 struct spdk_blob_md_page *cur_page; 427 const struct spdk_xattr *xattr; 428 int rc; 429 uint8_t *buf; 430 size_t remaining_sz; 431 uint64_t last_cluster; 432 433 assert(pages != NULL); 434 assert(page_count != NULL); 435 assert(blob != NULL); 436 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 437 438 *pages = NULL; 439 *page_count = 0; 440 441 /* A blob always has at least 1 page, even if it has no descriptors */ 442 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 443 if (rc < 0) { 444 return rc; 445 } 446 447 buf = (uint8_t *)cur_page->descriptors; 448 remaining_sz = sizeof(cur_page->descriptors); 449 450 /* Serialize xattrs */ 451 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 452 size_t required_sz = 0; 453 rc = _spdk_blob_serialize_xattr(xattr, 454 buf, remaining_sz, 455 &required_sz); 456 if (rc < 0) { 457 /* Need to add a new page to the chain */ 458 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 459 &cur_page); 460 if (rc < 0) { 461 spdk_dma_free(*pages); 462 *pages = NULL; 463 *page_count = 0; 464 return rc; 465 } 466 467 buf = (uint8_t *)cur_page->descriptors; 468 remaining_sz = sizeof(cur_page->descriptors); 469 470 /* Try again */ 471 required_sz = 0; 472 rc = _spdk_blob_serialize_xattr(xattr, 473 buf, remaining_sz, 474 &required_sz); 475 476 if (rc < 0) { 477 spdk_dma_free(*pages); 478 *pages = NULL; 479 *page_count = 0; 480 return -1; 481 } 482 } 483 484 remaining_sz -= required_sz; 485 buf += required_sz; 486 } 487 488 /* Serialize extents */ 489 last_cluster = 0; 490 while (last_cluster < blob->active.num_clusters) { 491 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 492 buf, remaining_sz); 493 494 if (last_cluster == blob->active.num_clusters) { 495 break; 496 } 497 498 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 499 &cur_page); 500 if (rc < 0) { 501 return rc; 502 } 503 504 buf = (uint8_t *)cur_page->descriptors; 505 remaining_sz = sizeof(cur_page->descriptors); 506 } 507 508 return 0; 509 } 510 511 struct spdk_blob_load_ctx { 512 struct spdk_blob *blob; 513 514 struct spdk_blob_md_page *pages; 515 uint32_t num_pages; 516 517 spdk_bs_sequence_cpl cb_fn; 518 void *cb_arg; 519 }; 520 521 static void 522 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 523 { 524 struct spdk_blob_load_ctx *ctx = cb_arg; 525 struct spdk_blob *blob = ctx->blob; 526 struct spdk_blob_md_page *page; 527 int rc; 528 529 page = &ctx->pages[ctx->num_pages - 1]; 530 531 if (page->next != SPDK_INVALID_MD_PAGE) { 532 uint32_t next_page = page->next; 533 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 534 535 536 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 537 538 /* Read the next page */ 539 ctx->num_pages++; 540 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 541 sizeof(*page), NULL); 542 if (ctx->pages == NULL) { 543 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 544 free(ctx); 545 return; 546 } 547 548 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 549 next_lba, 550 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 551 _spdk_blob_load_cpl, ctx); 552 return; 553 } 554 555 /* Parse the pages */ 556 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 557 558 _spdk_blob_mark_clean(blob); 559 560 ctx->cb_fn(seq, ctx->cb_arg, rc); 561 562 /* Free the memory */ 563 spdk_dma_free(ctx->pages); 564 free(ctx); 565 } 566 567 /* Load a blob from disk given a blobid */ 568 static void 569 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 570 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 571 { 572 struct spdk_blob_load_ctx *ctx; 573 struct spdk_blob_store *bs; 574 uint32_t page_num; 575 uint64_t lba; 576 577 assert(blob != NULL); 578 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 579 blob->state == SPDK_BLOB_STATE_DIRTY); 580 581 bs = blob->bs; 582 583 ctx = calloc(1, sizeof(*ctx)); 584 if (!ctx) { 585 cb_fn(seq, cb_arg, -ENOMEM); 586 return; 587 } 588 589 ctx->blob = blob; 590 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 591 SPDK_BS_PAGE_SIZE, NULL); 592 if (!ctx->pages) { 593 free(ctx); 594 cb_fn(seq, cb_arg, -ENOMEM); 595 return; 596 } 597 ctx->num_pages = 1; 598 ctx->cb_fn = cb_fn; 599 ctx->cb_arg = cb_arg; 600 601 page_num = _spdk_bs_blobid_to_page(blob->id); 602 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 603 604 blob->state = SPDK_BLOB_STATE_LOADING; 605 606 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 607 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 608 _spdk_blob_load_cpl, ctx); 609 } 610 611 struct spdk_blob_persist_ctx { 612 struct spdk_blob *blob; 613 614 struct spdk_blob_md_page *pages; 615 616 uint64_t idx; 617 618 spdk_bs_sequence_cpl cb_fn; 619 void *cb_arg; 620 }; 621 622 static void 623 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 624 { 625 struct spdk_blob_persist_ctx *ctx = cb_arg; 626 struct spdk_blob *blob = ctx->blob; 627 628 if (bserrno == 0) { 629 _spdk_blob_mark_clean(blob); 630 } 631 632 /* Call user callback */ 633 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 634 635 /* Free the memory */ 636 spdk_dma_free(ctx->pages); 637 free(ctx); 638 } 639 640 static void 641 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 642 { 643 struct spdk_blob_persist_ctx *ctx = cb_arg; 644 struct spdk_blob *blob = ctx->blob; 645 struct spdk_blob_store *bs = blob->bs; 646 void *tmp; 647 size_t i; 648 649 /* Release all clusters that were truncated */ 650 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 651 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 652 653 _spdk_bs_release_cluster(bs, cluster_num); 654 } 655 656 if (blob->active.num_clusters == 0) { 657 free(blob->active.clusters); 658 blob->active.clusters = NULL; 659 blob->active.cluster_array_size = 0; 660 } else { 661 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 662 assert(tmp != NULL); 663 blob->active.clusters = tmp; 664 blob->active.cluster_array_size = blob->active.num_clusters; 665 } 666 667 _spdk_blob_persist_complete(seq, ctx, bserrno); 668 } 669 670 static void 671 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 672 { 673 struct spdk_blob_persist_ctx *ctx = cb_arg; 674 struct spdk_blob *blob = ctx->blob; 675 struct spdk_blob_store *bs = blob->bs; 676 spdk_bs_batch_t *batch; 677 size_t i; 678 uint64_t lba; 679 uint32_t lba_count; 680 681 /* Clusters don't move around in blobs. The list shrinks or grows 682 * at the end, but no changes ever occur in the middle of the list. 683 */ 684 685 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 686 687 /* Unmap all clusters that were truncated */ 688 lba = 0; 689 lba_count = 0; 690 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 691 uint64_t next_lba = blob->active.clusters[i]; 692 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 693 694 if ((lba + lba_count) == next_lba) { 695 /* This cluster is contiguous with the previous one. */ 696 lba_count += next_lba_count; 697 continue; 698 } 699 700 /* This cluster is not contiguous with the previous one. */ 701 702 /* If a run of LBAs previously existing, send them 703 * as an unmap. 704 */ 705 if (lba_count > 0) { 706 spdk_bs_batch_unmap(batch, lba, lba_count); 707 } 708 709 /* Start building the next batch */ 710 lba = next_lba; 711 lba_count = next_lba_count; 712 } 713 714 /* If we ended with a contiguous set of LBAs, send the unmap now */ 715 if (lba_count > 0) { 716 spdk_bs_batch_unmap(batch, lba, lba_count); 717 } 718 719 spdk_bs_batch_close(batch); 720 } 721 722 static void 723 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 724 { 725 struct spdk_blob_persist_ctx *ctx = cb_arg; 726 struct spdk_blob *blob = ctx->blob; 727 struct spdk_blob_store *bs = blob->bs; 728 size_t i; 729 730 /* This loop starts at 1 because the first page is special and handled 731 * below. The pages (except the first) are never written in place, 732 * so any pages in the clean list must be unmapped. 733 */ 734 for (i = 1; i < blob->clean.num_pages; i++) { 735 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 736 } 737 738 if (blob->active.num_pages == 0) { 739 uint32_t page_num; 740 741 page_num = _spdk_bs_blobid_to_page(blob->id); 742 spdk_bit_array_clear(bs->used_md_pages, page_num); 743 } 744 745 /* Move on to unmapping clusters */ 746 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 747 } 748 749 static void 750 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 751 { 752 struct spdk_blob_persist_ctx *ctx = cb_arg; 753 struct spdk_blob *blob = ctx->blob; 754 struct spdk_blob_store *bs = blob->bs; 755 uint64_t lba; 756 uint32_t lba_count; 757 spdk_bs_batch_t *batch; 758 size_t i; 759 760 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 761 762 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 763 764 /* This loop starts at 1 because the first page is special and handled 765 * below. The pages (except the first) are never written in place, 766 * so any pages in the clean list must be unmapped. 767 */ 768 for (i = 1; i < blob->clean.num_pages; i++) { 769 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 770 771 spdk_bs_batch_unmap(batch, lba, lba_count); 772 } 773 774 /* The first page will only be unmapped if this is a delete. */ 775 if (blob->active.num_pages == 0) { 776 uint32_t page_num; 777 778 /* The first page in the metadata goes where the blobid indicates */ 779 page_num = _spdk_bs_blobid_to_page(blob->id); 780 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 781 782 spdk_bs_batch_unmap(batch, lba, lba_count); 783 } 784 785 spdk_bs_batch_close(batch); 786 } 787 788 static void 789 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 790 { 791 struct spdk_blob_persist_ctx *ctx = cb_arg; 792 struct spdk_blob *blob = ctx->blob; 793 struct spdk_blob_store *bs = blob->bs; 794 uint64_t lba; 795 uint32_t lba_count; 796 struct spdk_blob_md_page *page; 797 798 if (blob->active.num_pages == 0) { 799 /* Move on to the next step */ 800 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 801 return; 802 } 803 804 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 805 806 page = &ctx->pages[0]; 807 /* The first page in the metadata goes where the blobid indicates */ 808 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 809 810 spdk_bs_sequence_write(seq, page, lba, lba_count, 811 _spdk_blob_persist_unmap_pages, ctx); 812 } 813 814 static void 815 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 816 { 817 struct spdk_blob_persist_ctx *ctx = cb_arg; 818 struct spdk_blob *blob = ctx->blob; 819 struct spdk_blob_store *bs = blob->bs; 820 uint64_t lba; 821 uint32_t lba_count; 822 struct spdk_blob_md_page *page; 823 spdk_bs_batch_t *batch; 824 size_t i; 825 826 /* Clusters don't move around in blobs. The list shrinks or grows 827 * at the end, but no changes ever occur in the middle of the list. 828 */ 829 830 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 831 832 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 833 834 /* This starts at 1. The root page is not written until 835 * all of the others are finished 836 */ 837 for (i = 1; i < blob->active.num_pages; i++) { 838 page = &ctx->pages[i]; 839 assert(page->sequence_num == i); 840 841 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 842 843 spdk_bs_batch_write(batch, page, lba, lba_count); 844 } 845 846 spdk_bs_batch_close(batch); 847 } 848 849 static int 850 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 851 { 852 uint64_t i; 853 uint64_t *tmp; 854 uint64_t lfc; /* lowest free cluster */ 855 struct spdk_blob_store *bs; 856 857 bs = blob->bs; 858 859 assert(blob->state != SPDK_BLOB_STATE_LOADING && 860 blob->state != SPDK_BLOB_STATE_SYNCING); 861 862 if (blob->active.num_clusters == sz) { 863 return 0; 864 } 865 866 if (blob->active.num_clusters < blob->active.cluster_array_size) { 867 /* If this blob was resized to be larger, then smaller, then 868 * larger without syncing, then the cluster array already 869 * contains spare assigned clusters we can use. 870 */ 871 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 872 sz); 873 } 874 875 blob->state = SPDK_BLOB_STATE_DIRTY; 876 877 /* Do two passes - one to verify that we can obtain enough clusters 878 * and another to actually claim them. 879 */ 880 881 lfc = 0; 882 for (i = blob->active.num_clusters; i < sz; i++) { 883 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 884 if (lfc >= bs->total_clusters) { 885 /* No more free clusters. Cannot satisfy the request */ 886 assert(false); 887 return -1; 888 } 889 lfc++; 890 } 891 892 if (sz > blob->active.num_clusters) { 893 /* Expand the cluster array if necessary. 894 * We only shrink the array when persisting. 895 */ 896 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 897 if (sz > 0 && tmp == NULL) { 898 assert(false); 899 return -1; 900 } 901 blob->active.clusters = tmp; 902 blob->active.cluster_array_size = sz; 903 } 904 905 lfc = 0; 906 for (i = blob->active.num_clusters; i < sz; i++) { 907 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 908 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 909 _spdk_bs_claim_cluster(bs, lfc); 910 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 911 lfc++; 912 } 913 914 blob->active.num_clusters = sz; 915 916 return 0; 917 } 918 919 /* Write a blob to disk */ 920 static void 921 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 922 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 923 { 924 struct spdk_blob_persist_ctx *ctx; 925 int rc; 926 uint64_t i; 927 uint32_t page_num; 928 struct spdk_blob_store *bs; 929 930 assert(blob != NULL); 931 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 932 blob->state == SPDK_BLOB_STATE_DIRTY); 933 934 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 935 cb_fn(seq, cb_arg, 0); 936 return; 937 } 938 939 bs = blob->bs; 940 941 ctx = calloc(1, sizeof(*ctx)); 942 if (!ctx) { 943 cb_fn(seq, cb_arg, -ENOMEM); 944 return; 945 } 946 ctx->blob = blob; 947 ctx->cb_fn = cb_fn; 948 ctx->cb_arg = cb_arg; 949 950 blob->state = SPDK_BLOB_STATE_SYNCING; 951 952 if (blob->active.num_pages == 0) { 953 /* This is the signal that the blob should be deleted. 954 * Immediately jump to the clean up routine. */ 955 assert(blob->clean.num_pages > 0); 956 ctx->idx = blob->clean.num_pages - 1; 957 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 958 return; 959 960 } 961 962 /* Generate the new metadata */ 963 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 964 if (rc < 0) { 965 free(ctx); 966 cb_fn(seq, cb_arg, rc); 967 return; 968 } 969 970 assert(blob->active.num_pages >= 1); 971 972 /* Resize the cache of page indices */ 973 blob->active.pages = realloc(blob->active.pages, 974 blob->active.num_pages * sizeof(*blob->active.pages)); 975 if (!blob->active.pages) { 976 free(ctx); 977 cb_fn(seq, cb_arg, -ENOMEM); 978 return; 979 } 980 981 /* Assign this metadata to pages. This requires two passes - 982 * one to verify that there are enough pages and a second 983 * to actually claim them. */ 984 page_num = 0; 985 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 986 for (i = 1; i < blob->active.num_pages; i++) { 987 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 988 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 989 spdk_dma_free(ctx->pages); 990 free(ctx); 991 blob->state = SPDK_BLOB_STATE_DIRTY; 992 cb_fn(seq, cb_arg, -ENOMEM); 993 return; 994 } 995 page_num++; 996 } 997 998 page_num = 0; 999 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1000 for (i = 1; i < blob->active.num_pages; i++) { 1001 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1002 ctx->pages[i - 1].next = page_num; 1003 blob->active.pages[i] = page_num; 1004 spdk_bit_array_set(bs->used_md_pages, page_num); 1005 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1006 page_num++; 1007 } 1008 1009 /* Start writing the metadata from last page to first */ 1010 ctx->idx = blob->active.num_pages - 1; 1011 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1012 } 1013 1014 static void 1015 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1016 void *payload, uint64_t offset, uint64_t length, 1017 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1018 { 1019 spdk_bs_batch_t *batch; 1020 struct spdk_bs_cpl cpl; 1021 uint64_t lba; 1022 uint32_t lba_count; 1023 uint8_t *buf; 1024 uint64_t page; 1025 1026 assert(blob != NULL); 1027 1028 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1029 cb_fn(cb_arg, -EINVAL); 1030 return; 1031 } 1032 1033 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1034 cpl.u.blob_basic.cb_fn = cb_fn; 1035 cpl.u.blob_basic.cb_arg = cb_arg; 1036 1037 batch = spdk_bs_batch_open(_channel, &cpl); 1038 if (!batch) { 1039 cb_fn(cb_arg, -ENOMEM); 1040 return; 1041 } 1042 1043 length = _spdk_bs_page_to_lba(blob->bs, length); 1044 page = offset; 1045 buf = payload; 1046 while (length > 0) { 1047 lba = _spdk_bs_blob_page_to_lba(blob, page); 1048 lba_count = spdk_min(length, 1049 _spdk_bs_page_to_lba(blob->bs, 1050 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1051 1052 if (read) { 1053 spdk_bs_batch_read(batch, buf, lba, lba_count); 1054 } else { 1055 spdk_bs_batch_write(batch, buf, lba, lba_count); 1056 } 1057 1058 length -= lba_count; 1059 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1060 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1061 } 1062 1063 spdk_bs_batch_close(batch); 1064 } 1065 1066 struct rw_iov_ctx { 1067 struct spdk_blob *blob; 1068 bool read; 1069 int iovcnt; 1070 struct iovec *orig_iov; 1071 uint64_t page_offset; 1072 uint64_t pages_remaining; 1073 uint64_t pages_done; 1074 struct iovec iov[0]; 1075 }; 1076 1077 static void 1078 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1079 { 1080 assert(cb_arg == NULL); 1081 spdk_bs_sequence_finish(seq, bserrno); 1082 } 1083 1084 static void 1085 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1086 { 1087 struct rw_iov_ctx *ctx = cb_arg; 1088 struct iovec *iov, *orig_iov; 1089 int iovcnt; 1090 size_t orig_iovoff; 1091 uint64_t lba; 1092 uint64_t page_count, pages_to_boundary; 1093 uint32_t lba_count; 1094 uint64_t byte_count; 1095 1096 if (bserrno != 0 || ctx->pages_remaining == 0) { 1097 free(ctx); 1098 spdk_bs_sequence_finish(seq, bserrno); 1099 return; 1100 } 1101 1102 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1103 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1104 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1105 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1106 1107 /* 1108 * Get index and offset into the original iov array for our current position in the I/O sequence. 1109 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1110 * point to the current position in the I/O sequence. 1111 */ 1112 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1113 orig_iov = &ctx->orig_iov[0]; 1114 orig_iovoff = 0; 1115 while (byte_count > 0) { 1116 if (byte_count >= orig_iov->iov_len) { 1117 byte_count -= orig_iov->iov_len; 1118 orig_iov++; 1119 } else { 1120 orig_iovoff = byte_count; 1121 byte_count = 0; 1122 } 1123 } 1124 1125 /* 1126 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1127 * bytes of this next I/O remain to be accounted for in the new iov array. 1128 */ 1129 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1130 iov = &ctx->iov[0]; 1131 iovcnt = 0; 1132 while (byte_count > 0) { 1133 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1134 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1135 byte_count -= iov->iov_len; 1136 orig_iovoff = 0; 1137 orig_iov++; 1138 iov++; 1139 iovcnt++; 1140 } 1141 1142 ctx->page_offset += page_count; 1143 ctx->pages_done += page_count; 1144 ctx->pages_remaining -= page_count; 1145 iov = &ctx->iov[0]; 1146 1147 if (ctx->read) { 1148 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1149 } else { 1150 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1151 } 1152 } 1153 1154 static void 1155 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1156 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1157 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1158 { 1159 spdk_bs_sequence_t *seq; 1160 struct spdk_bs_cpl cpl; 1161 1162 assert(blob != NULL); 1163 1164 if (length == 0) { 1165 cb_fn(cb_arg, 0); 1166 return; 1167 } 1168 1169 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1170 cb_fn(cb_arg, -EINVAL); 1171 return; 1172 } 1173 1174 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1175 cpl.u.blob_basic.cb_fn = cb_fn; 1176 cpl.u.blob_basic.cb_arg = cb_arg; 1177 1178 /* 1179 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1180 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1181 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1182 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1183 * to allocate a separate iov array and split the I/O such that none of the resulting 1184 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1185 * but since this case happens very infrequently, any performance impact will be negligible. 1186 * 1187 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1188 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1189 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1190 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1191 */ 1192 seq = spdk_bs_sequence_start(_channel, &cpl); 1193 if (!seq) { 1194 cb_fn(cb_arg, -ENOMEM); 1195 return; 1196 } 1197 1198 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1199 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1200 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1201 1202 if (read) { 1203 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1204 } else { 1205 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1206 } 1207 } else { 1208 struct rw_iov_ctx *ctx; 1209 1210 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1211 if (ctx == NULL) { 1212 spdk_bs_sequence_finish(seq, -ENOMEM); 1213 return; 1214 } 1215 1216 ctx->blob = blob; 1217 ctx->read = read; 1218 ctx->orig_iov = iov; 1219 ctx->iovcnt = iovcnt; 1220 ctx->page_offset = offset; 1221 ctx->pages_remaining = length; 1222 ctx->pages_done = 0; 1223 1224 _spdk_rw_iov_split_next(seq, ctx, 0); 1225 } 1226 } 1227 1228 static struct spdk_blob * 1229 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1230 { 1231 struct spdk_blob *blob; 1232 1233 TAILQ_FOREACH(blob, &bs->blobs, link) { 1234 if (blob->id == blobid) { 1235 return blob; 1236 } 1237 } 1238 1239 return NULL; 1240 } 1241 1242 static int 1243 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1244 uint32_t max_ops) 1245 { 1246 struct spdk_bs_dev *dev; 1247 uint32_t i; 1248 1249 dev = bs->dev; 1250 1251 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1252 if (!channel->req_mem) { 1253 return -1; 1254 } 1255 1256 TAILQ_INIT(&channel->reqs); 1257 1258 for (i = 0; i < max_ops; i++) { 1259 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1260 } 1261 1262 channel->bs = bs; 1263 channel->dev = dev; 1264 channel->dev_channel = dev->create_channel(dev); 1265 1266 return 0; 1267 } 1268 1269 static int 1270 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1271 { 1272 struct spdk_blob_store *bs; 1273 struct spdk_bs_channel *channel = ctx_buf; 1274 1275 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1276 1277 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1278 } 1279 1280 static int 1281 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1282 { 1283 struct spdk_blob_store *bs; 1284 struct spdk_bs_channel *channel = ctx_buf; 1285 1286 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1287 1288 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1289 } 1290 1291 1292 static void 1293 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1294 { 1295 struct spdk_bs_channel *channel = ctx_buf; 1296 1297 free(channel->req_mem); 1298 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1299 } 1300 1301 static void 1302 _spdk_bs_free(struct spdk_blob_store *bs) 1303 { 1304 struct spdk_blob *blob, *blob_tmp; 1305 1306 spdk_bs_unregister_md_thread(bs); 1307 spdk_io_device_unregister(&bs->io_target, NULL); 1308 spdk_io_device_unregister(&bs->md_target, NULL); 1309 1310 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1311 TAILQ_REMOVE(&bs->blobs, blob, link); 1312 _spdk_blob_free(blob); 1313 } 1314 1315 spdk_bit_array_free(&bs->used_md_pages); 1316 spdk_bit_array_free(&bs->used_clusters); 1317 1318 bs->dev->destroy(bs->dev); 1319 free(bs); 1320 } 1321 1322 void 1323 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1324 { 1325 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1326 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1327 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1328 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1329 } 1330 1331 static struct spdk_blob_store * 1332 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1333 { 1334 struct spdk_blob_store *bs; 1335 1336 bs = calloc(1, sizeof(struct spdk_blob_store)); 1337 if (!bs) { 1338 return NULL; 1339 } 1340 1341 TAILQ_INIT(&bs->blobs); 1342 bs->dev = dev; 1343 1344 /* 1345 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1346 * even multiple of the cluster size. 1347 */ 1348 bs->cluster_sz = opts->cluster_sz; 1349 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1350 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1351 bs->num_free_clusters = bs->total_clusters; 1352 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1353 if (bs->used_clusters == NULL) { 1354 _spdk_bs_free(bs); 1355 return NULL; 1356 } 1357 1358 bs->md_target.max_md_ops = opts->max_md_ops; 1359 bs->io_target.max_channel_ops = opts->max_channel_ops; 1360 bs->super_blob = SPDK_BLOBID_INVALID; 1361 1362 /* The metadata is assumed to be at least 1 page */ 1363 bs->used_md_pages = spdk_bit_array_create(1); 1364 1365 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1366 sizeof(struct spdk_bs_channel)); 1367 spdk_bs_register_md_thread(bs); 1368 1369 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1370 sizeof(struct spdk_bs_channel)); 1371 1372 return bs; 1373 } 1374 1375 /* START spdk_bs_load */ 1376 1377 struct spdk_bs_load_ctx { 1378 struct spdk_blob_store *bs; 1379 struct spdk_bs_super_block *super; 1380 1381 struct spdk_bs_md_mask *mask; 1382 }; 1383 1384 static void 1385 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1386 { 1387 struct spdk_bs_load_ctx *ctx = cb_arg; 1388 uint32_t i, j; 1389 int rc; 1390 1391 /* The type must be correct */ 1392 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1393 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1394 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1395 struct spdk_blob_md_page) * 8)); 1396 /* The length of the mask must be exactly equal to the total number of clusters */ 1397 assert(ctx->mask->length == ctx->bs->total_clusters); 1398 1399 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1400 if (rc < 0) { 1401 spdk_dma_free(ctx->super); 1402 spdk_dma_free(ctx->mask); 1403 _spdk_bs_free(ctx->bs); 1404 free(ctx); 1405 spdk_bs_sequence_finish(seq, -ENOMEM); 1406 return; 1407 } 1408 1409 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1410 for (i = 0; i < ctx->mask->length / 8; i++) { 1411 uint8_t segment = ctx->mask->mask[i]; 1412 for (j = 0; segment && (j < 8); j++) { 1413 if (segment & 1U) { 1414 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1415 assert(ctx->bs->num_free_clusters > 0); 1416 ctx->bs->num_free_clusters--; 1417 } 1418 segment >>= 1U; 1419 } 1420 } 1421 1422 spdk_dma_free(ctx->super); 1423 spdk_dma_free(ctx->mask); 1424 free(ctx); 1425 1426 spdk_bs_sequence_finish(seq, bserrno); 1427 } 1428 1429 static void 1430 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1431 { 1432 struct spdk_bs_load_ctx *ctx = cb_arg; 1433 uint64_t lba, lba_count, mask_size; 1434 uint32_t i, j; 1435 int rc; 1436 1437 /* The type must be correct */ 1438 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1439 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1440 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1441 8)); 1442 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1443 assert(ctx->mask->length == ctx->super->md_len); 1444 1445 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1446 if (rc < 0) { 1447 spdk_dma_free(ctx->super); 1448 spdk_dma_free(ctx->mask); 1449 _spdk_bs_free(ctx->bs); 1450 free(ctx); 1451 spdk_bs_sequence_finish(seq, -ENOMEM); 1452 return; 1453 } 1454 1455 for (i = 0; i < ctx->mask->length / 8; i++) { 1456 uint8_t segment = ctx->mask->mask[i]; 1457 for (j = 0; segment && (j < 8); j++) { 1458 if (segment & 1U) { 1459 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1460 } 1461 segment >>= 1U; 1462 } 1463 } 1464 spdk_dma_free(ctx->mask); 1465 1466 /* Read the used clusters mask */ 1467 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1468 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1469 if (!ctx->mask) { 1470 spdk_dma_free(ctx->super); 1471 _spdk_bs_free(ctx->bs); 1472 free(ctx); 1473 spdk_bs_sequence_finish(seq, -ENOMEM); 1474 return; 1475 } 1476 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1477 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1478 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1479 _spdk_bs_load_used_clusters_cpl, ctx); 1480 } 1481 1482 static void 1483 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1484 { 1485 struct spdk_bs_load_ctx *ctx = cb_arg; 1486 uint64_t lba, lba_count, mask_size; 1487 1488 if (ctx->super->version != SPDK_BS_VERSION) { 1489 spdk_dma_free(ctx->super); 1490 _spdk_bs_free(ctx->bs); 1491 free(ctx); 1492 spdk_bs_sequence_finish(seq, -EILSEQ); 1493 return; 1494 } 1495 1496 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1497 sizeof(ctx->super->signature)) != 0) { 1498 spdk_dma_free(ctx->super); 1499 _spdk_bs_free(ctx->bs); 1500 free(ctx); 1501 spdk_bs_sequence_finish(seq, -EILSEQ); 1502 return; 1503 } 1504 1505 if (ctx->super->clean != 1) { 1506 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1507 * All of the necessary data to recover is available 1508 * on disk - the code just has not been written yet. 1509 */ 1510 assert(false); 1511 spdk_dma_free(ctx->super); 1512 _spdk_bs_free(ctx->bs); 1513 free(ctx); 1514 spdk_bs_sequence_finish(seq, -EILSEQ); 1515 return; 1516 } 1517 ctx->super->clean = 0; 1518 1519 /* Parse the super block */ 1520 ctx->bs->cluster_sz = ctx->super->cluster_size; 1521 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1522 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1523 ctx->bs->md_start = ctx->super->md_start; 1524 ctx->bs->md_len = ctx->super->md_len; 1525 ctx->bs->super_blob = ctx->super->super_blob; 1526 1527 /* Read the used pages mask */ 1528 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1529 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1530 if (!ctx->mask) { 1531 spdk_dma_free(ctx->super); 1532 _spdk_bs_free(ctx->bs); 1533 free(ctx); 1534 spdk_bs_sequence_finish(seq, -ENOMEM); 1535 return; 1536 } 1537 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1538 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1539 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1540 _spdk_bs_load_used_pages_cpl, ctx); 1541 } 1542 1543 void 1544 spdk_bs_load(struct spdk_bs_dev *dev, 1545 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1546 { 1547 struct spdk_blob_store *bs; 1548 struct spdk_bs_cpl cpl; 1549 spdk_bs_sequence_t *seq; 1550 struct spdk_bs_load_ctx *ctx; 1551 struct spdk_bs_opts opts = {}; 1552 1553 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1554 1555 spdk_bs_opts_init(&opts); 1556 1557 bs = _spdk_bs_alloc(dev, &opts); 1558 if (!bs) { 1559 cb_fn(cb_arg, NULL, -ENOMEM); 1560 return; 1561 } 1562 1563 ctx = calloc(1, sizeof(*ctx)); 1564 if (!ctx) { 1565 _spdk_bs_free(bs); 1566 cb_fn(cb_arg, NULL, -ENOMEM); 1567 return; 1568 } 1569 1570 ctx->bs = bs; 1571 1572 /* Allocate memory for the super block */ 1573 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1574 if (!ctx->super) { 1575 free(ctx); 1576 _spdk_bs_free(bs); 1577 return; 1578 } 1579 1580 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1581 cpl.u.bs_handle.cb_fn = cb_fn; 1582 cpl.u.bs_handle.cb_arg = cb_arg; 1583 cpl.u.bs_handle.bs = bs; 1584 1585 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1586 if (!seq) { 1587 spdk_dma_free(ctx->super); 1588 free(ctx); 1589 _spdk_bs_free(bs); 1590 cb_fn(cb_arg, NULL, -ENOMEM); 1591 return; 1592 } 1593 1594 /* Read the super block */ 1595 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1596 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1597 _spdk_bs_load_super_cpl, ctx); 1598 } 1599 1600 /* END spdk_bs_load */ 1601 1602 /* START spdk_bs_init */ 1603 1604 struct spdk_bs_init_ctx { 1605 struct spdk_blob_store *bs; 1606 struct spdk_bs_super_block *super; 1607 }; 1608 1609 static void 1610 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1611 { 1612 struct spdk_bs_init_ctx *ctx = cb_arg; 1613 1614 spdk_dma_free(ctx->super); 1615 free(ctx); 1616 1617 spdk_bs_sequence_finish(seq, bserrno); 1618 } 1619 1620 static void 1621 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1622 { 1623 struct spdk_bs_init_ctx *ctx = cb_arg; 1624 1625 /* Write super block */ 1626 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1627 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1628 _spdk_bs_init_persist_super_cpl, ctx); 1629 } 1630 1631 void 1632 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1633 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1634 { 1635 struct spdk_bs_init_ctx *ctx; 1636 struct spdk_blob_store *bs; 1637 struct spdk_bs_cpl cpl; 1638 spdk_bs_sequence_t *seq; 1639 uint64_t num_md_pages; 1640 uint32_t i; 1641 struct spdk_bs_opts opts = {}; 1642 int rc; 1643 1644 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1645 1646 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 1647 SPDK_ERRLOG("unsupported dev block length of %d\n", 1648 dev->blocklen); 1649 cb_fn(cb_arg, NULL, -EINVAL); 1650 return; 1651 } 1652 1653 if (o) { 1654 opts = *o; 1655 } else { 1656 spdk_bs_opts_init(&opts); 1657 } 1658 1659 bs = _spdk_bs_alloc(dev, &opts); 1660 if (!bs) { 1661 cb_fn(cb_arg, NULL, -ENOMEM); 1662 return; 1663 } 1664 1665 if (opts.num_md_pages == UINT32_MAX) { 1666 /* By default, allocate 1 page per cluster. 1667 * Technically, this over-allocates metadata 1668 * because more metadata will reduce the number 1669 * of usable clusters. This can be addressed with 1670 * more complex math in the future. 1671 */ 1672 bs->md_len = bs->total_clusters; 1673 } else { 1674 bs->md_len = opts.num_md_pages; 1675 } 1676 1677 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1678 if (rc < 0) { 1679 _spdk_bs_free(bs); 1680 cb_fn(cb_arg, NULL, -ENOMEM); 1681 return; 1682 } 1683 1684 ctx = calloc(1, sizeof(*ctx)); 1685 if (!ctx) { 1686 _spdk_bs_free(bs); 1687 cb_fn(cb_arg, NULL, -ENOMEM); 1688 return; 1689 } 1690 1691 ctx->bs = bs; 1692 1693 /* Allocate memory for the super block */ 1694 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1695 if (!ctx->super) { 1696 free(ctx); 1697 _spdk_bs_free(bs); 1698 return; 1699 } 1700 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1701 sizeof(ctx->super->signature)); 1702 ctx->super->version = SPDK_BS_VERSION; 1703 ctx->super->length = sizeof(*ctx->super); 1704 ctx->super->super_blob = bs->super_blob; 1705 ctx->super->clean = 0; 1706 ctx->super->cluster_size = bs->cluster_sz; 1707 1708 /* Calculate how many pages the metadata consumes at the front 1709 * of the disk. 1710 */ 1711 1712 /* The super block uses 1 page */ 1713 num_md_pages = 1; 1714 1715 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1716 * up to the nearest page, plus a header. 1717 */ 1718 ctx->super->used_page_mask_start = num_md_pages; 1719 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1720 divide_round_up(bs->md_len, 8), 1721 SPDK_BS_PAGE_SIZE); 1722 num_md_pages += ctx->super->used_page_mask_len; 1723 1724 /* The used_clusters mask requires 1 bit per cluster, rounded 1725 * up to the nearest page, plus a header. 1726 */ 1727 ctx->super->used_cluster_mask_start = num_md_pages; 1728 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1729 divide_round_up(bs->total_clusters, 8), 1730 SPDK_BS_PAGE_SIZE); 1731 num_md_pages += ctx->super->used_cluster_mask_len; 1732 1733 /* The metadata region size was chosen above */ 1734 ctx->super->md_start = bs->md_start = num_md_pages; 1735 ctx->super->md_len = bs->md_len; 1736 num_md_pages += bs->md_len; 1737 1738 /* Claim all of the clusters used by the metadata */ 1739 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1740 _spdk_bs_claim_cluster(bs, i); 1741 } 1742 1743 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1744 cpl.u.bs_handle.cb_fn = cb_fn; 1745 cpl.u.bs_handle.cb_arg = cb_arg; 1746 cpl.u.bs_handle.bs = bs; 1747 1748 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1749 if (!seq) { 1750 spdk_dma_free(ctx->super); 1751 free(ctx); 1752 _spdk_bs_free(bs); 1753 cb_fn(cb_arg, NULL, -ENOMEM); 1754 return; 1755 } 1756 1757 /* TRIM the entire device */ 1758 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1759 } 1760 1761 /* END spdk_bs_init */ 1762 1763 /* START spdk_bs_unload */ 1764 1765 struct spdk_bs_unload_ctx { 1766 struct spdk_blob_store *bs; 1767 struct spdk_bs_super_block *super; 1768 1769 struct spdk_bs_md_mask *mask; 1770 }; 1771 1772 static void 1773 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1774 { 1775 struct spdk_bs_unload_ctx *ctx = cb_arg; 1776 1777 spdk_dma_free(ctx->super); 1778 1779 spdk_bs_sequence_finish(seq, bserrno); 1780 1781 _spdk_bs_free(ctx->bs); 1782 free(ctx); 1783 } 1784 1785 static void 1786 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1787 { 1788 struct spdk_bs_unload_ctx *ctx = cb_arg; 1789 1790 spdk_dma_free(ctx->mask); 1791 1792 /* Update the values in the super block */ 1793 ctx->super->super_blob = ctx->bs->super_blob; 1794 ctx->super->clean = 1; 1795 1796 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1797 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1798 _spdk_bs_unload_write_super_cpl, ctx); 1799 } 1800 1801 static void 1802 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1803 { 1804 struct spdk_bs_unload_ctx *ctx = cb_arg; 1805 uint32_t i; 1806 uint64_t lba, lba_count, mask_size; 1807 1808 spdk_dma_free(ctx->mask); 1809 1810 /* Write out the used clusters mask */ 1811 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1812 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1813 if (!ctx->mask) { 1814 spdk_dma_free(ctx->super); 1815 free(ctx); 1816 spdk_bs_sequence_finish(seq, -ENOMEM); 1817 return; 1818 } 1819 1820 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1821 ctx->mask->length = ctx->bs->total_clusters; 1822 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1823 1824 i = 0; 1825 while (true) { 1826 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1827 if (i > ctx->mask->length) { 1828 break; 1829 } 1830 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1831 i++; 1832 } 1833 1834 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1835 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1836 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1837 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1838 } 1839 1840 static void 1841 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1842 { 1843 struct spdk_bs_unload_ctx *ctx = cb_arg; 1844 uint32_t i; 1845 uint64_t lba, lba_count, mask_size; 1846 1847 /* Write out the used page mask */ 1848 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1849 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1850 if (!ctx->mask) { 1851 spdk_dma_free(ctx->super); 1852 free(ctx); 1853 spdk_bs_sequence_finish(seq, -ENOMEM); 1854 return; 1855 } 1856 1857 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1858 ctx->mask->length = ctx->super->md_len; 1859 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1860 1861 i = 0; 1862 while (true) { 1863 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1864 if (i > ctx->mask->length) { 1865 break; 1866 } 1867 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1868 i++; 1869 } 1870 1871 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1872 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1873 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1874 _spdk_bs_unload_write_used_pages_cpl, ctx); 1875 } 1876 1877 void 1878 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1879 { 1880 struct spdk_bs_cpl cpl; 1881 spdk_bs_sequence_t *seq; 1882 struct spdk_bs_unload_ctx *ctx; 1883 1884 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1885 1886 ctx = calloc(1, sizeof(*ctx)); 1887 if (!ctx) { 1888 cb_fn(cb_arg, -ENOMEM); 1889 return; 1890 } 1891 1892 ctx->bs = bs; 1893 1894 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1895 if (!ctx->super) { 1896 free(ctx); 1897 cb_fn(cb_arg, -ENOMEM); 1898 return; 1899 } 1900 1901 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1902 cpl.u.bs_basic.cb_fn = cb_fn; 1903 cpl.u.bs_basic.cb_arg = cb_arg; 1904 1905 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1906 if (!seq) { 1907 spdk_dma_free(ctx->super); 1908 free(ctx); 1909 cb_fn(cb_arg, -ENOMEM); 1910 return; 1911 } 1912 1913 assert(TAILQ_EMPTY(&bs->blobs)); 1914 1915 /* Read super block */ 1916 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1917 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1918 _spdk_bs_unload_read_super_cpl, ctx); 1919 } 1920 1921 /* END spdk_bs_unload */ 1922 1923 void 1924 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1925 spdk_bs_op_complete cb_fn, void *cb_arg) 1926 { 1927 bs->super_blob = blobid; 1928 cb_fn(cb_arg, 0); 1929 } 1930 1931 void 1932 spdk_bs_get_super(struct spdk_blob_store *bs, 1933 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1934 { 1935 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1936 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1937 } else { 1938 cb_fn(cb_arg, bs->super_blob, 0); 1939 } 1940 } 1941 1942 uint64_t 1943 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1944 { 1945 return bs->cluster_sz; 1946 } 1947 1948 uint64_t 1949 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1950 { 1951 return SPDK_BS_PAGE_SIZE; 1952 } 1953 1954 uint64_t 1955 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1956 { 1957 return bs->num_free_clusters; 1958 } 1959 1960 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1961 { 1962 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 1963 1964 return 0; 1965 } 1966 1967 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1968 { 1969 spdk_put_io_channel(bs->md_target.md_channel); 1970 1971 return 0; 1972 } 1973 1974 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1975 { 1976 assert(blob != NULL); 1977 1978 return blob->id; 1979 } 1980 1981 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1982 { 1983 assert(blob != NULL); 1984 1985 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1986 } 1987 1988 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1989 { 1990 assert(blob != NULL); 1991 1992 return blob->active.num_clusters; 1993 } 1994 1995 /* START spdk_bs_md_create_blob */ 1996 1997 static void 1998 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1999 { 2000 struct spdk_blob *blob = cb_arg; 2001 2002 _spdk_blob_free(blob); 2003 2004 spdk_bs_sequence_finish(seq, bserrno); 2005 } 2006 2007 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2008 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2009 { 2010 struct spdk_blob *blob; 2011 uint32_t page_idx; 2012 struct spdk_bs_cpl cpl; 2013 spdk_bs_sequence_t *seq; 2014 spdk_blob_id id; 2015 2016 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2017 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2018 cb_fn(cb_arg, 0, -ENOMEM); 2019 return; 2020 } 2021 spdk_bit_array_set(bs->used_md_pages, page_idx); 2022 2023 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 2024 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 2025 * code assumes blob id == page_idx. 2026 */ 2027 id = (1ULL << 32) | page_idx; 2028 2029 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2030 2031 blob = _spdk_blob_alloc(bs, id); 2032 if (!blob) { 2033 cb_fn(cb_arg, 0, -ENOMEM); 2034 return; 2035 } 2036 2037 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2038 cpl.u.blobid.cb_fn = cb_fn; 2039 cpl.u.blobid.cb_arg = cb_arg; 2040 cpl.u.blobid.blobid = blob->id; 2041 2042 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2043 if (!seq) { 2044 _spdk_blob_free(blob); 2045 cb_fn(cb_arg, 0, -ENOMEM); 2046 return; 2047 } 2048 2049 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2050 } 2051 2052 /* END spdk_bs_md_create_blob */ 2053 2054 /* START spdk_bs_md_resize_blob */ 2055 int 2056 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2057 { 2058 int rc; 2059 2060 assert(blob != NULL); 2061 2062 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2063 2064 if (sz == blob->active.num_clusters) { 2065 return 0; 2066 } 2067 2068 rc = _spdk_resize_blob(blob, sz); 2069 if (rc < 0) { 2070 return rc; 2071 } 2072 2073 return 0; 2074 } 2075 2076 /* END spdk_bs_md_resize_blob */ 2077 2078 2079 /* START spdk_bs_md_delete_blob */ 2080 2081 static void 2082 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2083 { 2084 struct spdk_blob *blob = cb_arg; 2085 2086 _spdk_blob_free(blob); 2087 2088 spdk_bs_sequence_finish(seq, bserrno); 2089 } 2090 2091 static void 2092 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2093 { 2094 struct spdk_blob *blob = cb_arg; 2095 2096 blob->state = SPDK_BLOB_STATE_DIRTY; 2097 blob->active.num_pages = 0; 2098 _spdk_resize_blob(blob, 0); 2099 2100 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2101 } 2102 2103 void 2104 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2105 spdk_blob_op_complete cb_fn, void *cb_arg) 2106 { 2107 struct spdk_blob *blob; 2108 struct spdk_bs_cpl cpl; 2109 spdk_bs_sequence_t *seq; 2110 2111 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 2112 2113 blob = _spdk_blob_lookup(bs, blobid); 2114 if (blob) { 2115 assert(blob->open_ref > 0); 2116 cb_fn(cb_arg, -EINVAL); 2117 return; 2118 } 2119 2120 blob = _spdk_blob_alloc(bs, blobid); 2121 if (!blob) { 2122 cb_fn(cb_arg, -ENOMEM); 2123 return; 2124 } 2125 2126 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2127 cpl.u.blob_basic.cb_fn = cb_fn; 2128 cpl.u.blob_basic.cb_arg = cb_arg; 2129 2130 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2131 if (!seq) { 2132 _spdk_blob_free(blob); 2133 cb_fn(cb_arg, -ENOMEM); 2134 return; 2135 } 2136 2137 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2138 } 2139 2140 /* END spdk_bs_md_delete_blob */ 2141 2142 /* START spdk_bs_md_open_blob */ 2143 2144 static void 2145 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2146 { 2147 struct spdk_blob *blob = cb_arg; 2148 2149 blob->open_ref++; 2150 2151 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2152 2153 spdk_bs_sequence_finish(seq, bserrno); 2154 } 2155 2156 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2157 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2158 { 2159 struct spdk_blob *blob; 2160 struct spdk_bs_cpl cpl; 2161 spdk_bs_sequence_t *seq; 2162 uint32_t page_num; 2163 2164 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 2165 2166 blob = _spdk_blob_lookup(bs, blobid); 2167 if (blob) { 2168 blob->open_ref++; 2169 cb_fn(cb_arg, blob, 0); 2170 return; 2171 } 2172 2173 page_num = _spdk_bs_blobid_to_page(blobid); 2174 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2175 /* Invalid blobid */ 2176 cb_fn(cb_arg, NULL, -ENOENT); 2177 return; 2178 } 2179 2180 blob = _spdk_blob_alloc(bs, blobid); 2181 if (!blob) { 2182 cb_fn(cb_arg, NULL, -ENOMEM); 2183 return; 2184 } 2185 2186 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2187 cpl.u.blob_handle.cb_fn = cb_fn; 2188 cpl.u.blob_handle.cb_arg = cb_arg; 2189 cpl.u.blob_handle.blob = blob; 2190 2191 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2192 if (!seq) { 2193 _spdk_blob_free(blob); 2194 cb_fn(cb_arg, NULL, -ENOMEM); 2195 return; 2196 } 2197 2198 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2199 } 2200 2201 /* START spdk_bs_md_sync_blob */ 2202 static void 2203 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2204 { 2205 spdk_bs_sequence_finish(seq, bserrno); 2206 } 2207 2208 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2209 spdk_blob_op_complete cb_fn, void *cb_arg) 2210 { 2211 struct spdk_bs_cpl cpl; 2212 spdk_bs_sequence_t *seq; 2213 2214 assert(blob != NULL); 2215 2216 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2217 2218 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2219 blob->state != SPDK_BLOB_STATE_SYNCING); 2220 2221 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2222 cb_fn(cb_arg, 0); 2223 return; 2224 } 2225 2226 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2227 cpl.u.blob_basic.cb_fn = cb_fn; 2228 cpl.u.blob_basic.cb_arg = cb_arg; 2229 2230 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2231 if (!seq) { 2232 cb_fn(cb_arg, -ENOMEM); 2233 return; 2234 } 2235 2236 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2237 } 2238 2239 /* END spdk_bs_md_sync_blob */ 2240 2241 /* START spdk_bs_md_close_blob */ 2242 2243 static void 2244 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2245 { 2246 struct spdk_blob **blob = cb_arg; 2247 2248 if ((*blob)->open_ref == 0) { 2249 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2250 _spdk_blob_free((*blob)); 2251 } 2252 2253 *blob = NULL; 2254 2255 spdk_bs_sequence_finish(seq, bserrno); 2256 } 2257 2258 void spdk_bs_md_close_blob(struct spdk_blob **b, 2259 spdk_blob_op_complete cb_fn, void *cb_arg) 2260 { 2261 struct spdk_bs_cpl cpl; 2262 struct spdk_blob *blob; 2263 spdk_bs_sequence_t *seq; 2264 2265 assert(b != NULL); 2266 blob = *b; 2267 assert(blob != NULL); 2268 2269 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2270 2271 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2272 blob->state != SPDK_BLOB_STATE_SYNCING); 2273 2274 if (blob->open_ref == 0) { 2275 cb_fn(cb_arg, -EBADF); 2276 return; 2277 } 2278 2279 blob->open_ref--; 2280 2281 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2282 cpl.u.blob_basic.cb_fn = cb_fn; 2283 cpl.u.blob_basic.cb_arg = cb_arg; 2284 2285 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2286 if (!seq) { 2287 cb_fn(cb_arg, -ENOMEM); 2288 return; 2289 } 2290 2291 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2292 _spdk_blob_close_cpl(seq, b, 0); 2293 return; 2294 } 2295 2296 /* Sync metadata */ 2297 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2298 } 2299 2300 /* END spdk_bs_md_close_blob */ 2301 2302 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2303 { 2304 return spdk_get_io_channel(&bs->io_target); 2305 } 2306 2307 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2308 { 2309 spdk_put_io_channel(channel); 2310 } 2311 2312 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2313 spdk_blob_op_complete cb_fn, void *cb_arg) 2314 { 2315 /* Flush is synchronous right now */ 2316 cb_fn(cb_arg, 0); 2317 } 2318 2319 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2320 void *payload, uint64_t offset, uint64_t length, 2321 spdk_blob_op_complete cb_fn, void *cb_arg) 2322 { 2323 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2324 } 2325 2326 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2327 void *payload, uint64_t offset, uint64_t length, 2328 spdk_blob_op_complete cb_fn, void *cb_arg) 2329 { 2330 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2331 } 2332 2333 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2334 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2335 spdk_blob_op_complete cb_fn, void *cb_arg) 2336 { 2337 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2338 } 2339 2340 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2341 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2342 spdk_blob_op_complete cb_fn, void *cb_arg) 2343 { 2344 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2345 } 2346 2347 struct spdk_bs_iter_ctx { 2348 int64_t page_num; 2349 struct spdk_blob_store *bs; 2350 2351 spdk_blob_op_with_handle_complete cb_fn; 2352 void *cb_arg; 2353 }; 2354 2355 static void 2356 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2357 { 2358 struct spdk_bs_iter_ctx *ctx = cb_arg; 2359 struct spdk_blob_store *bs = ctx->bs; 2360 spdk_blob_id id; 2361 2362 if (bserrno == 0) { 2363 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2364 free(ctx); 2365 return; 2366 } 2367 2368 ctx->page_num++; 2369 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2370 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2371 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2372 free(ctx); 2373 return; 2374 } 2375 2376 id = (1ULL << 32) | ctx->page_num; 2377 2378 blob = _spdk_blob_lookup(bs, id); 2379 if (blob) { 2380 blob->open_ref++; 2381 ctx->cb_fn(ctx->cb_arg, blob, 0); 2382 free(ctx); 2383 return; 2384 } 2385 2386 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2387 } 2388 2389 void 2390 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2391 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2392 { 2393 struct spdk_bs_iter_ctx *ctx; 2394 2395 ctx = calloc(1, sizeof(*ctx)); 2396 if (!ctx) { 2397 cb_fn(cb_arg, NULL, -ENOMEM); 2398 return; 2399 } 2400 2401 ctx->page_num = -1; 2402 ctx->bs = bs; 2403 ctx->cb_fn = cb_fn; 2404 ctx->cb_arg = cb_arg; 2405 2406 _spdk_bs_iter_cpl(ctx, NULL, -1); 2407 } 2408 2409 static void 2410 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2411 { 2412 struct spdk_bs_iter_ctx *ctx = cb_arg; 2413 2414 _spdk_bs_iter_cpl(ctx, NULL, -1); 2415 } 2416 2417 void 2418 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2419 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2420 { 2421 struct spdk_bs_iter_ctx *ctx; 2422 struct spdk_blob *blob; 2423 2424 assert(b != NULL); 2425 blob = *b; 2426 assert(blob != NULL); 2427 2428 ctx = calloc(1, sizeof(*ctx)); 2429 if (!ctx) { 2430 cb_fn(cb_arg, NULL, -ENOMEM); 2431 return; 2432 } 2433 2434 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2435 ctx->bs = bs; 2436 ctx->cb_fn = cb_fn; 2437 ctx->cb_arg = cb_arg; 2438 2439 /* Close the existing blob */ 2440 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2441 } 2442 2443 int 2444 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2445 uint16_t value_len) 2446 { 2447 struct spdk_xattr *xattr; 2448 2449 assert(blob != NULL); 2450 2451 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2452 blob->state != SPDK_BLOB_STATE_SYNCING); 2453 2454 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2455 if (!strcmp(name, xattr->name)) { 2456 free(xattr->value); 2457 xattr->value_len = value_len; 2458 xattr->value = malloc(value_len); 2459 memcpy(xattr->value, value, value_len); 2460 2461 blob->state = SPDK_BLOB_STATE_DIRTY; 2462 2463 return 0; 2464 } 2465 } 2466 2467 xattr = calloc(1, sizeof(*xattr)); 2468 if (!xattr) { 2469 return -1; 2470 } 2471 xattr->name = strdup(name); 2472 xattr->value_len = value_len; 2473 xattr->value = malloc(value_len); 2474 memcpy(xattr->value, value, value_len); 2475 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2476 2477 blob->state = SPDK_BLOB_STATE_DIRTY; 2478 2479 return 0; 2480 } 2481 2482 int 2483 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2484 { 2485 struct spdk_xattr *xattr; 2486 2487 assert(blob != NULL); 2488 2489 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2490 blob->state != SPDK_BLOB_STATE_SYNCING); 2491 2492 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2493 if (!strcmp(name, xattr->name)) { 2494 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2495 free(xattr->value); 2496 free(xattr->name); 2497 free(xattr); 2498 2499 blob->state = SPDK_BLOB_STATE_DIRTY; 2500 2501 return 0; 2502 } 2503 } 2504 2505 return -ENOENT; 2506 } 2507 2508 int 2509 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2510 const void **value, size_t *value_len) 2511 { 2512 struct spdk_xattr *xattr; 2513 2514 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2515 if (!strcmp(name, xattr->name)) { 2516 *value = xattr->value; 2517 *value_len = xattr->value_len; 2518 return 0; 2519 } 2520 } 2521 2522 return -ENOENT; 2523 } 2524 2525 struct spdk_xattr_names { 2526 uint32_t count; 2527 const char *names[0]; 2528 }; 2529 2530 int 2531 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2532 struct spdk_xattr_names **names) 2533 { 2534 struct spdk_xattr *xattr; 2535 int count = 0; 2536 2537 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2538 count++; 2539 } 2540 2541 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2542 if (*names == NULL) { 2543 return -ENOMEM; 2544 } 2545 2546 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2547 (*names)->names[(*names)->count++] = xattr->name; 2548 } 2549 2550 return 0; 2551 } 2552 2553 uint32_t 2554 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2555 { 2556 assert(names != NULL); 2557 2558 return names->count; 2559 } 2560 2561 const char * 2562 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2563 { 2564 if (index >= names->count) { 2565 return NULL; 2566 } 2567 2568 return names->names[index]; 2569 } 2570 2571 void 2572 spdk_xattr_names_free(struct spdk_xattr_names *names) 2573 { 2574 free(names); 2575 } 2576 2577 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2578