1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/env.h" 38 #include "spdk/queue.h" 39 #include "spdk/io_channel.h" 40 #include "spdk/bit_array.h" 41 #include "spdk/likely.h" 42 43 #include "spdk_internal/log.h" 44 45 #include "blobstore.h" 46 #include "request.h" 47 48 static inline size_t 49 divide_round_up(size_t num, size_t divisor) 50 { 51 return (num + divisor - 1) / divisor; 52 } 53 54 static void 55 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 56 { 57 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 58 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 59 assert(bs->num_free_clusters > 0); 60 61 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 62 63 spdk_bit_array_set(bs->used_clusters, cluster_num); 64 bs->num_free_clusters--; 65 } 66 67 static void 68 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 69 { 70 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 71 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 72 assert(bs->num_free_clusters < bs->total_clusters); 73 74 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 75 76 spdk_bit_array_clear(bs->used_clusters, cluster_num); 77 bs->num_free_clusters++; 78 } 79 80 static struct spdk_blob * 81 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 82 { 83 struct spdk_blob *blob; 84 85 blob = calloc(1, sizeof(*blob)); 86 if (!blob) { 87 return NULL; 88 } 89 90 blob->id = id; 91 blob->bs = bs; 92 93 blob->state = SPDK_BLOB_STATE_DIRTY; 94 blob->active.num_pages = 1; 95 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 96 if (!blob->active.pages) { 97 free(blob); 98 return NULL; 99 } 100 101 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 102 103 TAILQ_INIT(&blob->xattrs); 104 105 return blob; 106 } 107 108 static void 109 _spdk_blob_free(struct spdk_blob *blob) 110 { 111 struct spdk_xattr *xattr, *xattr_tmp; 112 113 assert(blob != NULL); 114 115 free(blob->active.clusters); 116 free(blob->clean.clusters); 117 free(blob->active.pages); 118 free(blob->clean.pages); 119 120 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 121 TAILQ_REMOVE(&blob->xattrs, xattr, link); 122 free(xattr->name); 123 free(xattr->value); 124 free(xattr); 125 } 126 127 free(blob); 128 } 129 130 static int 131 _spdk_blob_mark_clean(struct spdk_blob *blob) 132 { 133 uint64_t *clusters = NULL; 134 uint32_t *pages = NULL; 135 136 assert(blob != NULL); 137 assert(blob->state == SPDK_BLOB_STATE_LOADING || 138 blob->state == SPDK_BLOB_STATE_SYNCING); 139 140 if (blob->active.num_clusters) { 141 assert(blob->active.clusters); 142 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 143 if (!clusters) { 144 return -1; 145 } 146 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 147 } 148 149 if (blob->active.num_pages) { 150 assert(blob->active.pages); 151 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 152 if (!pages) { 153 free(clusters); 154 return -1; 155 } 156 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 157 } 158 159 free(blob->clean.clusters); 160 free(blob->clean.pages); 161 162 blob->clean.num_clusters = blob->active.num_clusters; 163 blob->clean.clusters = blob->active.clusters; 164 blob->clean.num_pages = blob->active.num_pages; 165 blob->clean.pages = blob->active.pages; 166 167 blob->active.clusters = clusters; 168 blob->active.pages = pages; 169 170 blob->state = SPDK_BLOB_STATE_CLEAN; 171 172 return 0; 173 } 174 175 static void 176 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 177 { 178 struct spdk_blob_md_descriptor *desc; 179 size_t cur_desc = 0; 180 void *tmp; 181 182 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 183 while (cur_desc < sizeof(page->descriptors)) { 184 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 185 if (desc->length == 0) { 186 /* If padding and length are 0, this terminates the page */ 187 break; 188 } 189 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 190 struct spdk_blob_md_descriptor_extent *desc_extent; 191 unsigned int i, j; 192 unsigned int cluster_count = blob->active.num_clusters; 193 194 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 195 196 assert(desc_extent->length > 0); 197 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 198 199 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 200 for (j = 0; j < desc_extent->extents[i].length; j++) { 201 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 202 cluster_count++; 203 } 204 } 205 206 assert(cluster_count > 0); 207 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 208 assert(tmp != NULL); 209 blob->active.clusters = tmp; 210 blob->active.cluster_array_size = cluster_count; 211 212 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 213 for (j = 0; j < desc_extent->extents[i].length; j++) { 214 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 215 desc_extent->extents[i].cluster_idx + j); 216 } 217 } 218 219 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 220 struct spdk_blob_md_descriptor_xattr *desc_xattr; 221 struct spdk_xattr *xattr; 222 223 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 224 225 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 226 sizeof(desc_xattr->value_length) + 227 desc_xattr->name_length + desc_xattr->value_length); 228 229 xattr = calloc(1, sizeof(*xattr)); 230 assert(xattr != NULL); 231 232 xattr->name = malloc(desc_xattr->name_length + 1); 233 assert(xattr->name); 234 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 235 xattr->name[desc_xattr->name_length] = '\0'; 236 237 xattr->value = malloc(desc_xattr->value_length); 238 assert(xattr->value != NULL); 239 xattr->value_len = desc_xattr->value_length; 240 memcpy(xattr->value, 241 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 242 desc_xattr->value_length); 243 244 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 245 } else { 246 /* Error */ 247 break; 248 } 249 250 /* Advance to the next descriptor */ 251 cur_desc += sizeof(*desc) + desc->length; 252 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 253 break; 254 } 255 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 256 } 257 } 258 259 static int 260 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 261 struct spdk_blob *blob) 262 { 263 const struct spdk_blob_md_page *page; 264 uint32_t i; 265 266 assert(page_count > 0); 267 assert(pages[0].sequence_num == 0); 268 assert(blob != NULL); 269 assert(blob->state == SPDK_BLOB_STATE_LOADING); 270 assert(blob->active.clusters == NULL); 271 assert(blob->id == pages[0].id); 272 assert(blob->state == SPDK_BLOB_STATE_LOADING); 273 274 for (i = 0; i < page_count; i++) { 275 page = &pages[i]; 276 277 assert(page->id == blob->id); 278 assert(page->sequence_num == i); 279 280 _spdk_blob_parse_page(page, blob); 281 } 282 283 return 0; 284 } 285 286 static int 287 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 288 struct spdk_blob_md_page **pages, 289 uint32_t *page_count, 290 struct spdk_blob_md_page **last_page) 291 { 292 struct spdk_blob_md_page *page; 293 294 assert(pages != NULL); 295 assert(page_count != NULL); 296 297 if (*page_count == 0) { 298 assert(*pages == NULL); 299 *page_count = 1; 300 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 301 SPDK_BS_PAGE_SIZE, 302 NULL); 303 } else { 304 assert(*pages != NULL); 305 (*page_count)++; 306 *pages = spdk_dma_realloc(*pages, 307 SPDK_BS_PAGE_SIZE * (*page_count), 308 SPDK_BS_PAGE_SIZE, 309 NULL); 310 } 311 312 if (*pages == NULL) { 313 *page_count = 0; 314 *last_page = NULL; 315 return -ENOMEM; 316 } 317 318 page = &(*pages)[*page_count - 1]; 319 memset(page, 0, sizeof(*page)); 320 page->id = blob->id; 321 page->sequence_num = *page_count - 1; 322 page->next = SPDK_INVALID_MD_PAGE; 323 *last_page = page; 324 325 return 0; 326 } 327 328 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 329 * Update required_sz on both success and failure. 330 * 331 */ 332 static int 333 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 334 uint8_t *buf, size_t buf_sz, 335 size_t *required_sz) 336 { 337 struct spdk_blob_md_descriptor_xattr *desc; 338 339 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 340 strlen(xattr->name) + 341 xattr->value_len; 342 343 if (buf_sz < *required_sz) { 344 return -1; 345 } 346 347 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 348 349 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 350 desc->length = sizeof(desc->name_length) + 351 sizeof(desc->value_length) + 352 strlen(xattr->name) + 353 xattr->value_len; 354 desc->name_length = strlen(xattr->name); 355 desc->value_length = xattr->value_len; 356 357 memcpy(desc->name, xattr->name, desc->name_length); 358 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 359 xattr->value, 360 desc->value_length); 361 362 return 0; 363 } 364 365 static void 366 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 367 uint64_t start_cluster, uint64_t *next_cluster, 368 uint8_t *buf, size_t buf_sz) 369 { 370 struct spdk_blob_md_descriptor_extent *desc; 371 size_t cur_sz; 372 uint64_t i, extent_idx; 373 uint32_t lba, lba_per_cluster, lba_count; 374 375 /* The buffer must have room for at least one extent */ 376 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 377 if (buf_sz < cur_sz) { 378 *next_cluster = start_cluster; 379 return; 380 } 381 382 desc = (struct spdk_blob_md_descriptor_extent *)buf; 383 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 384 385 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 386 387 lba = blob->active.clusters[start_cluster]; 388 lba_count = lba_per_cluster; 389 extent_idx = 0; 390 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 391 if ((lba + lba_count) == blob->active.clusters[i]) { 392 lba_count += lba_per_cluster; 393 continue; 394 } 395 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 396 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 397 extent_idx++; 398 399 cur_sz += sizeof(desc->extents[extent_idx]); 400 401 if (buf_sz < cur_sz) { 402 /* If we ran out of buffer space, return */ 403 desc->length = sizeof(desc->extents[0]) * extent_idx; 404 *next_cluster = i; 405 return; 406 } 407 408 lba = blob->active.clusters[i]; 409 lba_count = lba_per_cluster; 410 } 411 412 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 413 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 414 extent_idx++; 415 416 desc->length = sizeof(desc->extents[0]) * extent_idx; 417 *next_cluster = blob->active.num_clusters; 418 419 return; 420 } 421 422 static int 423 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 424 uint32_t *page_count) 425 { 426 struct spdk_blob_md_page *cur_page; 427 const struct spdk_xattr *xattr; 428 int rc; 429 uint8_t *buf; 430 size_t remaining_sz; 431 uint64_t last_cluster; 432 433 assert(pages != NULL); 434 assert(page_count != NULL); 435 assert(blob != NULL); 436 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 437 438 *pages = NULL; 439 *page_count = 0; 440 441 /* A blob always has at least 1 page, even if it has no descriptors */ 442 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 443 if (rc < 0) { 444 return rc; 445 } 446 447 buf = (uint8_t *)cur_page->descriptors; 448 remaining_sz = sizeof(cur_page->descriptors); 449 450 /* Serialize xattrs */ 451 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 452 size_t required_sz = 0; 453 rc = _spdk_blob_serialize_xattr(xattr, 454 buf, remaining_sz, 455 &required_sz); 456 if (rc < 0) { 457 /* Need to add a new page to the chain */ 458 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 459 &cur_page); 460 if (rc < 0) { 461 spdk_dma_free(*pages); 462 *pages = NULL; 463 *page_count = 0; 464 return rc; 465 } 466 467 buf = (uint8_t *)cur_page->descriptors; 468 remaining_sz = sizeof(cur_page->descriptors); 469 470 /* Try again */ 471 required_sz = 0; 472 rc = _spdk_blob_serialize_xattr(xattr, 473 buf, remaining_sz, 474 &required_sz); 475 476 if (rc < 0) { 477 spdk_dma_free(*pages); 478 *pages = NULL; 479 *page_count = 0; 480 return -1; 481 } 482 } 483 484 remaining_sz -= required_sz; 485 buf += required_sz; 486 } 487 488 /* Serialize extents */ 489 last_cluster = 0; 490 while (last_cluster < blob->active.num_clusters) { 491 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 492 buf, remaining_sz); 493 494 if (last_cluster == blob->active.num_clusters) { 495 break; 496 } 497 498 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 499 &cur_page); 500 if (rc < 0) { 501 return rc; 502 } 503 504 buf = (uint8_t *)cur_page->descriptors; 505 remaining_sz = sizeof(cur_page->descriptors); 506 } 507 508 return 0; 509 } 510 511 struct spdk_blob_load_ctx { 512 struct spdk_blob *blob; 513 514 struct spdk_blob_md_page *pages; 515 uint32_t num_pages; 516 517 spdk_bs_sequence_cpl cb_fn; 518 void *cb_arg; 519 }; 520 521 static void 522 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 523 { 524 struct spdk_blob_load_ctx *ctx = cb_arg; 525 struct spdk_blob *blob = ctx->blob; 526 struct spdk_blob_md_page *page; 527 int rc; 528 529 page = &ctx->pages[ctx->num_pages - 1]; 530 531 if (page->next != SPDK_INVALID_MD_PAGE) { 532 uint32_t next_page = page->next; 533 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 534 535 536 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 537 538 /* Read the next page */ 539 ctx->num_pages++; 540 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 541 sizeof(*page), NULL); 542 if (ctx->pages == NULL) { 543 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 544 free(ctx); 545 return; 546 } 547 548 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 549 next_lba, 550 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 551 _spdk_blob_load_cpl, ctx); 552 return; 553 } 554 555 /* Parse the pages */ 556 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 557 558 _spdk_blob_mark_clean(blob); 559 560 ctx->cb_fn(seq, ctx->cb_arg, rc); 561 562 /* Free the memory */ 563 spdk_dma_free(ctx->pages); 564 free(ctx); 565 } 566 567 /* Load a blob from disk given a blobid */ 568 static void 569 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 570 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 571 { 572 struct spdk_blob_load_ctx *ctx; 573 struct spdk_blob_store *bs; 574 uint32_t page_num; 575 uint64_t lba; 576 577 assert(blob != NULL); 578 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 579 blob->state == SPDK_BLOB_STATE_DIRTY); 580 581 bs = blob->bs; 582 583 ctx = calloc(1, sizeof(*ctx)); 584 if (!ctx) { 585 cb_fn(seq, cb_arg, -ENOMEM); 586 return; 587 } 588 589 ctx->blob = blob; 590 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 591 SPDK_BS_PAGE_SIZE, NULL); 592 if (!ctx->pages) { 593 free(ctx); 594 cb_fn(seq, cb_arg, -ENOMEM); 595 return; 596 } 597 ctx->num_pages = 1; 598 ctx->cb_fn = cb_fn; 599 ctx->cb_arg = cb_arg; 600 601 page_num = _spdk_bs_blobid_to_page(blob->id); 602 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 603 604 blob->state = SPDK_BLOB_STATE_LOADING; 605 606 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 607 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 608 _spdk_blob_load_cpl, ctx); 609 } 610 611 struct spdk_blob_persist_ctx { 612 struct spdk_blob *blob; 613 614 struct spdk_blob_md_page *pages; 615 616 uint64_t idx; 617 618 spdk_bs_sequence_cpl cb_fn; 619 void *cb_arg; 620 }; 621 622 static void 623 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 624 { 625 struct spdk_blob_persist_ctx *ctx = cb_arg; 626 struct spdk_blob *blob = ctx->blob; 627 628 if (bserrno == 0) { 629 _spdk_blob_mark_clean(blob); 630 } 631 632 /* Call user callback */ 633 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 634 635 /* Free the memory */ 636 spdk_dma_free(ctx->pages); 637 free(ctx); 638 } 639 640 static void 641 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 642 { 643 struct spdk_blob_persist_ctx *ctx = cb_arg; 644 struct spdk_blob *blob = ctx->blob; 645 struct spdk_blob_store *bs = blob->bs; 646 void *tmp; 647 size_t i; 648 649 /* Release all clusters that were truncated */ 650 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 651 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 652 653 _spdk_bs_release_cluster(bs, cluster_num); 654 } 655 656 if (blob->active.num_clusters == 0) { 657 free(blob->active.clusters); 658 blob->active.clusters = NULL; 659 blob->active.cluster_array_size = 0; 660 } else { 661 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 662 assert(tmp != NULL); 663 blob->active.clusters = tmp; 664 blob->active.cluster_array_size = blob->active.num_clusters; 665 } 666 667 _spdk_blob_persist_complete(seq, ctx, bserrno); 668 } 669 670 static void 671 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 672 { 673 struct spdk_blob_persist_ctx *ctx = cb_arg; 674 struct spdk_blob *blob = ctx->blob; 675 struct spdk_blob_store *bs = blob->bs; 676 spdk_bs_batch_t *batch; 677 size_t i; 678 uint64_t lba; 679 uint32_t lba_count; 680 681 /* Clusters don't move around in blobs. The list shrinks or grows 682 * at the end, but no changes ever occur in the middle of the list. 683 */ 684 685 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 686 687 /* Unmap all clusters that were truncated */ 688 lba = 0; 689 lba_count = 0; 690 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 691 uint64_t next_lba = blob->active.clusters[i]; 692 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 693 694 if ((lba + lba_count) == next_lba) { 695 /* This cluster is contiguous with the previous one. */ 696 lba_count += next_lba_count; 697 continue; 698 } 699 700 /* This cluster is not contiguous with the previous one. */ 701 702 /* If a run of LBAs previously existing, send them 703 * as an unmap. 704 */ 705 if (lba_count > 0) { 706 spdk_bs_batch_unmap(batch, lba, lba_count); 707 } 708 709 /* Start building the next batch */ 710 lba = next_lba; 711 lba_count = next_lba_count; 712 } 713 714 /* If we ended with a contiguous set of LBAs, send the unmap now */ 715 if (lba_count > 0) { 716 spdk_bs_batch_unmap(batch, lba, lba_count); 717 } 718 719 spdk_bs_batch_close(batch); 720 } 721 722 static void 723 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 724 { 725 struct spdk_blob_persist_ctx *ctx = cb_arg; 726 struct spdk_blob *blob = ctx->blob; 727 struct spdk_blob_store *bs = blob->bs; 728 size_t i; 729 730 /* This loop starts at 1 because the first page is special and handled 731 * below. The pages (except the first) are never written in place, 732 * so any pages in the clean list must be unmapped. 733 */ 734 for (i = 1; i < blob->clean.num_pages; i++) { 735 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 736 } 737 738 if (blob->active.num_pages == 0) { 739 uint32_t page_num; 740 741 page_num = _spdk_bs_blobid_to_page(blob->id); 742 spdk_bit_array_clear(bs->used_md_pages, page_num); 743 } 744 745 /* Move on to unmapping clusters */ 746 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 747 } 748 749 static void 750 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 751 { 752 struct spdk_blob_persist_ctx *ctx = cb_arg; 753 struct spdk_blob *blob = ctx->blob; 754 struct spdk_blob_store *bs = blob->bs; 755 uint64_t lba; 756 uint32_t lba_count; 757 spdk_bs_batch_t *batch; 758 size_t i; 759 760 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 761 762 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 763 764 /* This loop starts at 1 because the first page is special and handled 765 * below. The pages (except the first) are never written in place, 766 * so any pages in the clean list must be unmapped. 767 */ 768 for (i = 1; i < blob->clean.num_pages; i++) { 769 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 770 771 spdk_bs_batch_unmap(batch, lba, lba_count); 772 } 773 774 /* The first page will only be unmapped if this is a delete. */ 775 if (blob->active.num_pages == 0) { 776 uint32_t page_num; 777 778 /* The first page in the metadata goes where the blobid indicates */ 779 page_num = _spdk_bs_blobid_to_page(blob->id); 780 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 781 782 spdk_bs_batch_unmap(batch, lba, lba_count); 783 } 784 785 spdk_bs_batch_close(batch); 786 } 787 788 static void 789 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 790 { 791 struct spdk_blob_persist_ctx *ctx = cb_arg; 792 struct spdk_blob *blob = ctx->blob; 793 struct spdk_blob_store *bs = blob->bs; 794 uint64_t lba; 795 uint32_t lba_count; 796 struct spdk_blob_md_page *page; 797 798 if (blob->active.num_pages == 0) { 799 /* Move on to the next step */ 800 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 801 return; 802 } 803 804 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 805 806 page = &ctx->pages[0]; 807 /* The first page in the metadata goes where the blobid indicates */ 808 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 809 810 spdk_bs_sequence_write(seq, page, lba, lba_count, 811 _spdk_blob_persist_unmap_pages, ctx); 812 } 813 814 static void 815 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 816 { 817 struct spdk_blob_persist_ctx *ctx = cb_arg; 818 struct spdk_blob *blob = ctx->blob; 819 struct spdk_blob_store *bs = blob->bs; 820 uint64_t lba; 821 uint32_t lba_count; 822 struct spdk_blob_md_page *page; 823 spdk_bs_batch_t *batch; 824 size_t i; 825 826 /* Clusters don't move around in blobs. The list shrinks or grows 827 * at the end, but no changes ever occur in the middle of the list. 828 */ 829 830 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 831 832 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 833 834 /* This starts at 1. The root page is not written until 835 * all of the others are finished 836 */ 837 for (i = 1; i < blob->active.num_pages; i++) { 838 page = &ctx->pages[i]; 839 assert(page->sequence_num == i); 840 841 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 842 843 spdk_bs_batch_write(batch, page, lba, lba_count); 844 } 845 846 spdk_bs_batch_close(batch); 847 } 848 849 static int 850 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 851 { 852 uint64_t i; 853 uint64_t *tmp; 854 uint64_t lfc; /* lowest free cluster */ 855 struct spdk_blob_store *bs; 856 857 bs = blob->bs; 858 859 assert(blob->state != SPDK_BLOB_STATE_LOADING && 860 blob->state != SPDK_BLOB_STATE_SYNCING); 861 862 if (blob->active.num_clusters == sz) { 863 return 0; 864 } 865 866 if (blob->active.num_clusters < blob->active.cluster_array_size) { 867 /* If this blob was resized to be larger, then smaller, then 868 * larger without syncing, then the cluster array already 869 * contains spare assigned clusters we can use. 870 */ 871 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 872 sz); 873 } 874 875 blob->state = SPDK_BLOB_STATE_DIRTY; 876 877 /* Do two passes - one to verify that we can obtain enough clusters 878 * and another to actually claim them. 879 */ 880 881 lfc = 0; 882 for (i = blob->active.num_clusters; i < sz; i++) { 883 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 884 if (lfc >= bs->total_clusters) { 885 /* No more free clusters. Cannot satisfy the request */ 886 assert(false); 887 return -1; 888 } 889 lfc++; 890 } 891 892 if (sz > blob->active.num_clusters) { 893 /* Expand the cluster array if necessary. 894 * We only shrink the array when persisting. 895 */ 896 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 897 if (sz > 0 && tmp == NULL) { 898 assert(false); 899 return -1; 900 } 901 blob->active.clusters = tmp; 902 blob->active.cluster_array_size = sz; 903 } 904 905 lfc = 0; 906 for (i = blob->active.num_clusters; i < sz; i++) { 907 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 908 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 909 _spdk_bs_claim_cluster(bs, lfc); 910 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 911 lfc++; 912 } 913 914 blob->active.num_clusters = sz; 915 916 return 0; 917 } 918 919 /* Write a blob to disk */ 920 static void 921 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 922 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 923 { 924 struct spdk_blob_persist_ctx *ctx; 925 int rc; 926 uint64_t i; 927 uint32_t page_num; 928 struct spdk_blob_store *bs; 929 930 assert(blob != NULL); 931 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 932 blob->state == SPDK_BLOB_STATE_DIRTY); 933 934 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 935 cb_fn(seq, cb_arg, 0); 936 return; 937 } 938 939 bs = blob->bs; 940 941 ctx = calloc(1, sizeof(*ctx)); 942 if (!ctx) { 943 cb_fn(seq, cb_arg, -ENOMEM); 944 return; 945 } 946 ctx->blob = blob; 947 ctx->cb_fn = cb_fn; 948 ctx->cb_arg = cb_arg; 949 950 blob->state = SPDK_BLOB_STATE_SYNCING; 951 952 if (blob->active.num_pages == 0) { 953 /* This is the signal that the blob should be deleted. 954 * Immediately jump to the clean up routine. */ 955 assert(blob->clean.num_pages > 0); 956 ctx->idx = blob->clean.num_pages - 1; 957 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 958 return; 959 960 } 961 962 /* Generate the new metadata */ 963 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 964 if (rc < 0) { 965 free(ctx); 966 cb_fn(seq, cb_arg, rc); 967 return; 968 } 969 970 assert(blob->active.num_pages >= 1); 971 972 /* Resize the cache of page indices */ 973 blob->active.pages = realloc(blob->active.pages, 974 blob->active.num_pages * sizeof(*blob->active.pages)); 975 if (!blob->active.pages) { 976 free(ctx); 977 cb_fn(seq, cb_arg, -ENOMEM); 978 return; 979 } 980 981 /* Assign this metadata to pages. This requires two passes - 982 * one to verify that there are enough pages and a second 983 * to actually claim them. */ 984 page_num = 0; 985 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 986 for (i = 1; i < blob->active.num_pages; i++) { 987 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 988 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 989 spdk_dma_free(ctx->pages); 990 free(ctx); 991 blob->state = SPDK_BLOB_STATE_DIRTY; 992 cb_fn(seq, cb_arg, -ENOMEM); 993 return; 994 } 995 page_num++; 996 } 997 998 page_num = 0; 999 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1000 for (i = 1; i < blob->active.num_pages; i++) { 1001 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1002 ctx->pages[i - 1].next = page_num; 1003 blob->active.pages[i] = page_num; 1004 spdk_bit_array_set(bs->used_md_pages, page_num); 1005 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1006 page_num++; 1007 } 1008 1009 /* Start writing the metadata from last page to first */ 1010 ctx->idx = blob->active.num_pages - 1; 1011 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1012 } 1013 1014 static void 1015 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1016 void *payload, uint64_t offset, uint64_t length, 1017 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1018 { 1019 spdk_bs_batch_t *batch; 1020 struct spdk_bs_cpl cpl; 1021 uint64_t lba; 1022 uint32_t lba_count; 1023 uint8_t *buf; 1024 uint64_t page; 1025 1026 assert(blob != NULL); 1027 1028 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1029 cb_fn(cb_arg, -EINVAL); 1030 return; 1031 } 1032 1033 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1034 cpl.u.blob_basic.cb_fn = cb_fn; 1035 cpl.u.blob_basic.cb_arg = cb_arg; 1036 1037 batch = spdk_bs_batch_open(_channel, &cpl); 1038 if (!batch) { 1039 cb_fn(cb_arg, -ENOMEM); 1040 return; 1041 } 1042 1043 length = _spdk_bs_page_to_lba(blob->bs, length); 1044 page = offset; 1045 buf = payload; 1046 while (length > 0) { 1047 lba = _spdk_bs_blob_page_to_lba(blob, page); 1048 lba_count = spdk_min(length, 1049 _spdk_bs_page_to_lba(blob->bs, 1050 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1051 1052 if (read) { 1053 spdk_bs_batch_read(batch, buf, lba, lba_count); 1054 } else { 1055 spdk_bs_batch_write(batch, buf, lba, lba_count); 1056 } 1057 1058 length -= lba_count; 1059 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1060 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1061 } 1062 1063 spdk_bs_batch_close(batch); 1064 } 1065 1066 struct rw_iov_ctx { 1067 struct spdk_blob *blob; 1068 bool read; 1069 int iovcnt; 1070 struct iovec *orig_iov; 1071 uint64_t page_offset; 1072 uint64_t pages_remaining; 1073 uint64_t pages_done; 1074 struct iovec iov[0]; 1075 }; 1076 1077 static void 1078 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1079 { 1080 assert(cb_arg == NULL); 1081 spdk_bs_sequence_finish(seq, bserrno); 1082 } 1083 1084 static void 1085 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1086 { 1087 struct rw_iov_ctx *ctx = cb_arg; 1088 struct iovec *iov, *orig_iov; 1089 int iovcnt; 1090 size_t orig_iovoff; 1091 uint64_t lba; 1092 uint64_t page_count, pages_to_boundary; 1093 uint32_t lba_count; 1094 uint64_t byte_count; 1095 1096 if (bserrno != 0 || ctx->pages_remaining == 0) { 1097 free(ctx); 1098 spdk_bs_sequence_finish(seq, bserrno); 1099 return; 1100 } 1101 1102 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1103 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1104 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1105 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1106 1107 /* 1108 * Get index and offset into the original iov array for our current position in the I/O sequence. 1109 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1110 * point to the current position in the I/O sequence. 1111 */ 1112 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1113 orig_iov = &ctx->orig_iov[0]; 1114 orig_iovoff = 0; 1115 while (byte_count > 0) { 1116 if (byte_count >= orig_iov->iov_len) { 1117 byte_count -= orig_iov->iov_len; 1118 orig_iov++; 1119 } else { 1120 orig_iovoff = byte_count; 1121 byte_count = 0; 1122 } 1123 } 1124 1125 /* 1126 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1127 * bytes of this next I/O remain to be accounted for in the new iov array. 1128 */ 1129 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1130 iov = &ctx->iov[0]; 1131 iovcnt = 0; 1132 while (byte_count > 0) { 1133 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1134 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1135 byte_count -= iov->iov_len; 1136 orig_iovoff = 0; 1137 orig_iov++; 1138 iov++; 1139 iovcnt++; 1140 } 1141 1142 ctx->page_offset += page_count; 1143 ctx->pages_done += page_count; 1144 ctx->pages_remaining -= page_count; 1145 iov = &ctx->iov[0]; 1146 1147 if (ctx->read) { 1148 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1149 } else { 1150 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1151 } 1152 } 1153 1154 static void 1155 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1156 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1157 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1158 { 1159 spdk_bs_sequence_t *seq; 1160 struct spdk_bs_cpl cpl; 1161 1162 assert(blob != NULL); 1163 1164 if (length == 0) { 1165 cb_fn(cb_arg, 0); 1166 return; 1167 } 1168 1169 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1170 cb_fn(cb_arg, -EINVAL); 1171 return; 1172 } 1173 1174 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1175 cpl.u.blob_basic.cb_fn = cb_fn; 1176 cpl.u.blob_basic.cb_arg = cb_arg; 1177 1178 /* 1179 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1180 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1181 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1182 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1183 * to allocate a separate iov array and split the I/O such that none of the resulting 1184 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1185 * but since this case happens very infrequently, any performance impact will be negligible. 1186 * 1187 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1188 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1189 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1190 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1191 */ 1192 seq = spdk_bs_sequence_start(_channel, &cpl); 1193 if (!seq) { 1194 cb_fn(cb_arg, -ENOMEM); 1195 return; 1196 } 1197 1198 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1199 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1200 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1201 1202 if (read) { 1203 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1204 } else { 1205 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1206 } 1207 } else { 1208 struct rw_iov_ctx *ctx; 1209 1210 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1211 if (ctx == NULL) { 1212 spdk_bs_sequence_finish(seq, -ENOMEM); 1213 return; 1214 } 1215 1216 ctx->blob = blob; 1217 ctx->read = read; 1218 ctx->orig_iov = iov; 1219 ctx->iovcnt = iovcnt; 1220 ctx->page_offset = offset; 1221 ctx->pages_remaining = length; 1222 ctx->pages_done = 0; 1223 1224 _spdk_rw_iov_split_next(seq, ctx, 0); 1225 } 1226 } 1227 1228 static struct spdk_blob * 1229 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1230 { 1231 struct spdk_blob *blob; 1232 1233 TAILQ_FOREACH(blob, &bs->blobs, link) { 1234 if (blob->id == blobid) { 1235 return blob; 1236 } 1237 } 1238 1239 return NULL; 1240 } 1241 1242 static int 1243 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1244 uint32_t max_ops) 1245 { 1246 struct spdk_bs_dev *dev; 1247 uint32_t i; 1248 1249 dev = bs->dev; 1250 1251 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1252 if (!channel->req_mem) { 1253 return -1; 1254 } 1255 1256 TAILQ_INIT(&channel->reqs); 1257 1258 for (i = 0; i < max_ops; i++) { 1259 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1260 } 1261 1262 channel->bs = bs; 1263 channel->dev = dev; 1264 channel->dev_channel = dev->create_channel(dev); 1265 1266 return 0; 1267 } 1268 1269 static int 1270 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1271 { 1272 struct spdk_blob_store *bs; 1273 struct spdk_bs_channel *channel = ctx_buf; 1274 1275 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1276 1277 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1278 } 1279 1280 static int 1281 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1282 { 1283 struct spdk_blob_store *bs; 1284 struct spdk_bs_channel *channel = ctx_buf; 1285 1286 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1287 1288 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1289 } 1290 1291 1292 static void 1293 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1294 { 1295 struct spdk_bs_channel *channel = ctx_buf; 1296 1297 free(channel->req_mem); 1298 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1299 } 1300 1301 static void 1302 _spdk_bs_dev_destroy(void *io_device) 1303 { 1304 struct spdk_blob_store *bs; 1305 struct spdk_blob *blob, *blob_tmp; 1306 1307 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1308 bs->dev->destroy(bs->dev); 1309 1310 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1311 TAILQ_REMOVE(&bs->blobs, blob, link); 1312 _spdk_blob_free(blob); 1313 } 1314 1315 spdk_bit_array_free(&bs->used_md_pages); 1316 spdk_bit_array_free(&bs->used_clusters); 1317 free(bs); 1318 } 1319 1320 static void 1321 _spdk_bs_free(struct spdk_blob_store *bs) 1322 { 1323 spdk_bs_unregister_md_thread(bs); 1324 spdk_io_device_unregister(&bs->io_target, NULL); 1325 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1326 } 1327 1328 void 1329 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1330 { 1331 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1332 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1333 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1334 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1335 } 1336 1337 static struct spdk_blob_store * 1338 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1339 { 1340 struct spdk_blob_store *bs; 1341 1342 bs = calloc(1, sizeof(struct spdk_blob_store)); 1343 if (!bs) { 1344 return NULL; 1345 } 1346 1347 TAILQ_INIT(&bs->blobs); 1348 bs->dev = dev; 1349 1350 /* 1351 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1352 * even multiple of the cluster size. 1353 */ 1354 bs->cluster_sz = opts->cluster_sz; 1355 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1356 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1357 bs->num_free_clusters = bs->total_clusters; 1358 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1359 if (bs->used_clusters == NULL) { 1360 _spdk_bs_free(bs); 1361 return NULL; 1362 } 1363 1364 bs->md_target.max_md_ops = opts->max_md_ops; 1365 bs->io_target.max_channel_ops = opts->max_channel_ops; 1366 bs->super_blob = SPDK_BLOBID_INVALID; 1367 1368 /* The metadata is assumed to be at least 1 page */ 1369 bs->used_md_pages = spdk_bit_array_create(1); 1370 1371 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1372 sizeof(struct spdk_bs_channel)); 1373 spdk_bs_register_md_thread(bs); 1374 1375 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1376 sizeof(struct spdk_bs_channel)); 1377 1378 return bs; 1379 } 1380 1381 /* START spdk_bs_load */ 1382 1383 struct spdk_bs_load_ctx { 1384 struct spdk_blob_store *bs; 1385 struct spdk_bs_super_block *super; 1386 1387 struct spdk_bs_md_mask *mask; 1388 }; 1389 1390 static void 1391 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1392 { 1393 struct spdk_bs_load_ctx *ctx = cb_arg; 1394 uint32_t i, j; 1395 int rc; 1396 1397 /* The type must be correct */ 1398 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1399 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1400 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1401 struct spdk_blob_md_page) * 8)); 1402 /* The length of the mask must be exactly equal to the total number of clusters */ 1403 assert(ctx->mask->length == ctx->bs->total_clusters); 1404 1405 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1406 if (rc < 0) { 1407 spdk_dma_free(ctx->super); 1408 spdk_dma_free(ctx->mask); 1409 _spdk_bs_free(ctx->bs); 1410 free(ctx); 1411 spdk_bs_sequence_finish(seq, -ENOMEM); 1412 return; 1413 } 1414 1415 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1416 for (i = 0; i < ctx->mask->length / 8; i++) { 1417 uint8_t segment = ctx->mask->mask[i]; 1418 for (j = 0; segment && (j < 8); j++) { 1419 if (segment & 1U) { 1420 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1421 assert(ctx->bs->num_free_clusters > 0); 1422 ctx->bs->num_free_clusters--; 1423 } 1424 segment >>= 1U; 1425 } 1426 } 1427 1428 spdk_dma_free(ctx->super); 1429 spdk_dma_free(ctx->mask); 1430 free(ctx); 1431 1432 spdk_bs_sequence_finish(seq, bserrno); 1433 } 1434 1435 static void 1436 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1437 { 1438 struct spdk_bs_load_ctx *ctx = cb_arg; 1439 uint64_t lba, lba_count, mask_size; 1440 uint32_t i, j; 1441 int rc; 1442 1443 /* The type must be correct */ 1444 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1445 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1446 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1447 8)); 1448 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1449 assert(ctx->mask->length == ctx->super->md_len); 1450 1451 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1452 if (rc < 0) { 1453 spdk_dma_free(ctx->super); 1454 spdk_dma_free(ctx->mask); 1455 _spdk_bs_free(ctx->bs); 1456 free(ctx); 1457 spdk_bs_sequence_finish(seq, -ENOMEM); 1458 return; 1459 } 1460 1461 for (i = 0; i < ctx->mask->length / 8; i++) { 1462 uint8_t segment = ctx->mask->mask[i]; 1463 for (j = 0; segment && (j < 8); j++) { 1464 if (segment & 1U) { 1465 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1466 } 1467 segment >>= 1U; 1468 } 1469 } 1470 spdk_dma_free(ctx->mask); 1471 1472 /* Read the used clusters mask */ 1473 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1474 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1475 if (!ctx->mask) { 1476 spdk_dma_free(ctx->super); 1477 _spdk_bs_free(ctx->bs); 1478 free(ctx); 1479 spdk_bs_sequence_finish(seq, -ENOMEM); 1480 return; 1481 } 1482 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1483 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1484 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1485 _spdk_bs_load_used_clusters_cpl, ctx); 1486 } 1487 1488 static void 1489 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1490 { 1491 struct spdk_bs_load_ctx *ctx = cb_arg; 1492 uint64_t lba, lba_count, mask_size; 1493 1494 if (ctx->super->version != SPDK_BS_VERSION) { 1495 spdk_dma_free(ctx->super); 1496 _spdk_bs_free(ctx->bs); 1497 free(ctx); 1498 spdk_bs_sequence_finish(seq, -EILSEQ); 1499 return; 1500 } 1501 1502 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1503 sizeof(ctx->super->signature)) != 0) { 1504 spdk_dma_free(ctx->super); 1505 _spdk_bs_free(ctx->bs); 1506 free(ctx); 1507 spdk_bs_sequence_finish(seq, -EILSEQ); 1508 return; 1509 } 1510 1511 if (ctx->super->clean != 1) { 1512 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1513 * All of the necessary data to recover is available 1514 * on disk - the code just has not been written yet. 1515 */ 1516 assert(false); 1517 spdk_dma_free(ctx->super); 1518 _spdk_bs_free(ctx->bs); 1519 free(ctx); 1520 spdk_bs_sequence_finish(seq, -EILSEQ); 1521 return; 1522 } 1523 ctx->super->clean = 0; 1524 1525 /* Parse the super block */ 1526 ctx->bs->cluster_sz = ctx->super->cluster_size; 1527 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1528 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1529 ctx->bs->md_start = ctx->super->md_start; 1530 ctx->bs->md_len = ctx->super->md_len; 1531 ctx->bs->super_blob = ctx->super->super_blob; 1532 1533 /* Read the used pages mask */ 1534 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1535 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1536 if (!ctx->mask) { 1537 spdk_dma_free(ctx->super); 1538 _spdk_bs_free(ctx->bs); 1539 free(ctx); 1540 spdk_bs_sequence_finish(seq, -ENOMEM); 1541 return; 1542 } 1543 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1544 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1545 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1546 _spdk_bs_load_used_pages_cpl, ctx); 1547 } 1548 1549 void 1550 spdk_bs_load(struct spdk_bs_dev *dev, 1551 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1552 { 1553 struct spdk_blob_store *bs; 1554 struct spdk_bs_cpl cpl; 1555 spdk_bs_sequence_t *seq; 1556 struct spdk_bs_load_ctx *ctx; 1557 struct spdk_bs_opts opts = {}; 1558 1559 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1560 1561 spdk_bs_opts_init(&opts); 1562 1563 bs = _spdk_bs_alloc(dev, &opts); 1564 if (!bs) { 1565 cb_fn(cb_arg, NULL, -ENOMEM); 1566 return; 1567 } 1568 1569 ctx = calloc(1, sizeof(*ctx)); 1570 if (!ctx) { 1571 _spdk_bs_free(bs); 1572 cb_fn(cb_arg, NULL, -ENOMEM); 1573 return; 1574 } 1575 1576 ctx->bs = bs; 1577 1578 /* Allocate memory for the super block */ 1579 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1580 if (!ctx->super) { 1581 free(ctx); 1582 _spdk_bs_free(bs); 1583 return; 1584 } 1585 1586 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1587 cpl.u.bs_handle.cb_fn = cb_fn; 1588 cpl.u.bs_handle.cb_arg = cb_arg; 1589 cpl.u.bs_handle.bs = bs; 1590 1591 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1592 if (!seq) { 1593 spdk_dma_free(ctx->super); 1594 free(ctx); 1595 _spdk_bs_free(bs); 1596 cb_fn(cb_arg, NULL, -ENOMEM); 1597 return; 1598 } 1599 1600 /* Read the super block */ 1601 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1602 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1603 _spdk_bs_load_super_cpl, ctx); 1604 } 1605 1606 /* END spdk_bs_load */ 1607 1608 /* START spdk_bs_init */ 1609 1610 struct spdk_bs_init_ctx { 1611 struct spdk_blob_store *bs; 1612 struct spdk_bs_super_block *super; 1613 }; 1614 1615 static void 1616 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1617 { 1618 struct spdk_bs_init_ctx *ctx = cb_arg; 1619 1620 spdk_dma_free(ctx->super); 1621 free(ctx); 1622 1623 spdk_bs_sequence_finish(seq, bserrno); 1624 } 1625 1626 static void 1627 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1628 { 1629 struct spdk_bs_init_ctx *ctx = cb_arg; 1630 1631 /* Write super block */ 1632 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1633 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1634 _spdk_bs_init_persist_super_cpl, ctx); 1635 } 1636 1637 void 1638 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1639 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1640 { 1641 struct spdk_bs_init_ctx *ctx; 1642 struct spdk_blob_store *bs; 1643 struct spdk_bs_cpl cpl; 1644 spdk_bs_sequence_t *seq; 1645 uint64_t num_md_pages; 1646 uint32_t i; 1647 struct spdk_bs_opts opts = {}; 1648 int rc; 1649 1650 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1651 1652 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 1653 SPDK_ERRLOG("unsupported dev block length of %d\n", 1654 dev->blocklen); 1655 cb_fn(cb_arg, NULL, -EINVAL); 1656 return; 1657 } 1658 1659 if (o) { 1660 opts = *o; 1661 } else { 1662 spdk_bs_opts_init(&opts); 1663 } 1664 1665 bs = _spdk_bs_alloc(dev, &opts); 1666 if (!bs) { 1667 cb_fn(cb_arg, NULL, -ENOMEM); 1668 return; 1669 } 1670 1671 if (opts.num_md_pages == UINT32_MAX) { 1672 /* By default, allocate 1 page per cluster. 1673 * Technically, this over-allocates metadata 1674 * because more metadata will reduce the number 1675 * of usable clusters. This can be addressed with 1676 * more complex math in the future. 1677 */ 1678 bs->md_len = bs->total_clusters; 1679 } else { 1680 bs->md_len = opts.num_md_pages; 1681 } 1682 1683 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1684 if (rc < 0) { 1685 _spdk_bs_free(bs); 1686 cb_fn(cb_arg, NULL, -ENOMEM); 1687 return; 1688 } 1689 1690 ctx = calloc(1, sizeof(*ctx)); 1691 if (!ctx) { 1692 _spdk_bs_free(bs); 1693 cb_fn(cb_arg, NULL, -ENOMEM); 1694 return; 1695 } 1696 1697 ctx->bs = bs; 1698 1699 /* Allocate memory for the super block */ 1700 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1701 if (!ctx->super) { 1702 free(ctx); 1703 _spdk_bs_free(bs); 1704 return; 1705 } 1706 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1707 sizeof(ctx->super->signature)); 1708 ctx->super->version = SPDK_BS_VERSION; 1709 ctx->super->length = sizeof(*ctx->super); 1710 ctx->super->super_blob = bs->super_blob; 1711 ctx->super->clean = 0; 1712 ctx->super->cluster_size = bs->cluster_sz; 1713 1714 /* Calculate how many pages the metadata consumes at the front 1715 * of the disk. 1716 */ 1717 1718 /* The super block uses 1 page */ 1719 num_md_pages = 1; 1720 1721 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1722 * up to the nearest page, plus a header. 1723 */ 1724 ctx->super->used_page_mask_start = num_md_pages; 1725 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1726 divide_round_up(bs->md_len, 8), 1727 SPDK_BS_PAGE_SIZE); 1728 num_md_pages += ctx->super->used_page_mask_len; 1729 1730 /* The used_clusters mask requires 1 bit per cluster, rounded 1731 * up to the nearest page, plus a header. 1732 */ 1733 ctx->super->used_cluster_mask_start = num_md_pages; 1734 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1735 divide_round_up(bs->total_clusters, 8), 1736 SPDK_BS_PAGE_SIZE); 1737 num_md_pages += ctx->super->used_cluster_mask_len; 1738 1739 /* The metadata region size was chosen above */ 1740 ctx->super->md_start = bs->md_start = num_md_pages; 1741 ctx->super->md_len = bs->md_len; 1742 num_md_pages += bs->md_len; 1743 1744 /* Claim all of the clusters used by the metadata */ 1745 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1746 _spdk_bs_claim_cluster(bs, i); 1747 } 1748 1749 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1750 cpl.u.bs_handle.cb_fn = cb_fn; 1751 cpl.u.bs_handle.cb_arg = cb_arg; 1752 cpl.u.bs_handle.bs = bs; 1753 1754 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1755 if (!seq) { 1756 spdk_dma_free(ctx->super); 1757 free(ctx); 1758 _spdk_bs_free(bs); 1759 cb_fn(cb_arg, NULL, -ENOMEM); 1760 return; 1761 } 1762 1763 /* TRIM the entire device */ 1764 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1765 } 1766 1767 /* END spdk_bs_init */ 1768 1769 /* START spdk_bs_unload */ 1770 1771 struct spdk_bs_unload_ctx { 1772 struct spdk_blob_store *bs; 1773 struct spdk_bs_super_block *super; 1774 1775 struct spdk_bs_md_mask *mask; 1776 }; 1777 1778 static void 1779 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1780 { 1781 struct spdk_bs_unload_ctx *ctx = cb_arg; 1782 1783 spdk_dma_free(ctx->super); 1784 1785 spdk_bs_sequence_finish(seq, bserrno); 1786 1787 _spdk_bs_free(ctx->bs); 1788 free(ctx); 1789 } 1790 1791 static void 1792 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1793 { 1794 struct spdk_bs_unload_ctx *ctx = cb_arg; 1795 1796 spdk_dma_free(ctx->mask); 1797 1798 /* Update the values in the super block */ 1799 ctx->super->super_blob = ctx->bs->super_blob; 1800 ctx->super->clean = 1; 1801 1802 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1803 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1804 _spdk_bs_unload_write_super_cpl, ctx); 1805 } 1806 1807 static void 1808 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1809 { 1810 struct spdk_bs_unload_ctx *ctx = cb_arg; 1811 uint32_t i; 1812 uint64_t lba, lba_count, mask_size; 1813 1814 spdk_dma_free(ctx->mask); 1815 1816 /* Write out the used clusters mask */ 1817 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1818 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1819 if (!ctx->mask) { 1820 spdk_dma_free(ctx->super); 1821 free(ctx); 1822 spdk_bs_sequence_finish(seq, -ENOMEM); 1823 return; 1824 } 1825 1826 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1827 ctx->mask->length = ctx->bs->total_clusters; 1828 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1829 1830 i = 0; 1831 while (true) { 1832 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1833 if (i > ctx->mask->length) { 1834 break; 1835 } 1836 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1837 i++; 1838 } 1839 1840 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1841 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1842 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1843 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1844 } 1845 1846 static void 1847 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1848 { 1849 struct spdk_bs_unload_ctx *ctx = cb_arg; 1850 uint32_t i; 1851 uint64_t lba, lba_count, mask_size; 1852 1853 /* Write out the used page mask */ 1854 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1855 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1856 if (!ctx->mask) { 1857 spdk_dma_free(ctx->super); 1858 free(ctx); 1859 spdk_bs_sequence_finish(seq, -ENOMEM); 1860 return; 1861 } 1862 1863 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1864 ctx->mask->length = ctx->super->md_len; 1865 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1866 1867 i = 0; 1868 while (true) { 1869 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1870 if (i > ctx->mask->length) { 1871 break; 1872 } 1873 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1874 i++; 1875 } 1876 1877 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1878 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1879 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1880 _spdk_bs_unload_write_used_pages_cpl, ctx); 1881 } 1882 1883 void 1884 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1885 { 1886 struct spdk_bs_cpl cpl; 1887 spdk_bs_sequence_t *seq; 1888 struct spdk_bs_unload_ctx *ctx; 1889 1890 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1891 1892 ctx = calloc(1, sizeof(*ctx)); 1893 if (!ctx) { 1894 cb_fn(cb_arg, -ENOMEM); 1895 return; 1896 } 1897 1898 ctx->bs = bs; 1899 1900 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1901 if (!ctx->super) { 1902 free(ctx); 1903 cb_fn(cb_arg, -ENOMEM); 1904 return; 1905 } 1906 1907 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1908 cpl.u.bs_basic.cb_fn = cb_fn; 1909 cpl.u.bs_basic.cb_arg = cb_arg; 1910 1911 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1912 if (!seq) { 1913 spdk_dma_free(ctx->super); 1914 free(ctx); 1915 cb_fn(cb_arg, -ENOMEM); 1916 return; 1917 } 1918 1919 assert(TAILQ_EMPTY(&bs->blobs)); 1920 1921 /* Read super block */ 1922 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1923 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1924 _spdk_bs_unload_read_super_cpl, ctx); 1925 } 1926 1927 /* END spdk_bs_unload */ 1928 1929 void 1930 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1931 spdk_bs_op_complete cb_fn, void *cb_arg) 1932 { 1933 bs->super_blob = blobid; 1934 cb_fn(cb_arg, 0); 1935 } 1936 1937 void 1938 spdk_bs_get_super(struct spdk_blob_store *bs, 1939 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1940 { 1941 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1942 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1943 } else { 1944 cb_fn(cb_arg, bs->super_blob, 0); 1945 } 1946 } 1947 1948 uint64_t 1949 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1950 { 1951 return bs->cluster_sz; 1952 } 1953 1954 uint64_t 1955 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1956 { 1957 return SPDK_BS_PAGE_SIZE; 1958 } 1959 1960 uint64_t 1961 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1962 { 1963 return bs->num_free_clusters; 1964 } 1965 1966 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1967 { 1968 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 1969 1970 return 0; 1971 } 1972 1973 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1974 { 1975 spdk_put_io_channel(bs->md_target.md_channel); 1976 1977 return 0; 1978 } 1979 1980 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1981 { 1982 assert(blob != NULL); 1983 1984 return blob->id; 1985 } 1986 1987 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1988 { 1989 assert(blob != NULL); 1990 1991 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1992 } 1993 1994 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1995 { 1996 assert(blob != NULL); 1997 1998 return blob->active.num_clusters; 1999 } 2000 2001 /* START spdk_bs_md_create_blob */ 2002 2003 static void 2004 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2005 { 2006 struct spdk_blob *blob = cb_arg; 2007 2008 _spdk_blob_free(blob); 2009 2010 spdk_bs_sequence_finish(seq, bserrno); 2011 } 2012 2013 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2014 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2015 { 2016 struct spdk_blob *blob; 2017 uint32_t page_idx; 2018 struct spdk_bs_cpl cpl; 2019 spdk_bs_sequence_t *seq; 2020 spdk_blob_id id; 2021 2022 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2023 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2024 cb_fn(cb_arg, 0, -ENOMEM); 2025 return; 2026 } 2027 spdk_bit_array_set(bs->used_md_pages, page_idx); 2028 2029 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 2030 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 2031 * code assumes blob id == page_idx. 2032 */ 2033 id = (1ULL << 32) | page_idx; 2034 2035 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2036 2037 blob = _spdk_blob_alloc(bs, id); 2038 if (!blob) { 2039 cb_fn(cb_arg, 0, -ENOMEM); 2040 return; 2041 } 2042 2043 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2044 cpl.u.blobid.cb_fn = cb_fn; 2045 cpl.u.blobid.cb_arg = cb_arg; 2046 cpl.u.blobid.blobid = blob->id; 2047 2048 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2049 if (!seq) { 2050 _spdk_blob_free(blob); 2051 cb_fn(cb_arg, 0, -ENOMEM); 2052 return; 2053 } 2054 2055 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2056 } 2057 2058 /* END spdk_bs_md_create_blob */ 2059 2060 /* START spdk_bs_md_resize_blob */ 2061 int 2062 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2063 { 2064 int rc; 2065 2066 assert(blob != NULL); 2067 2068 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2069 2070 if (sz == blob->active.num_clusters) { 2071 return 0; 2072 } 2073 2074 rc = _spdk_resize_blob(blob, sz); 2075 if (rc < 0) { 2076 return rc; 2077 } 2078 2079 return 0; 2080 } 2081 2082 /* END spdk_bs_md_resize_blob */ 2083 2084 2085 /* START spdk_bs_md_delete_blob */ 2086 2087 static void 2088 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2089 { 2090 struct spdk_blob *blob = cb_arg; 2091 2092 _spdk_blob_free(blob); 2093 2094 spdk_bs_sequence_finish(seq, bserrno); 2095 } 2096 2097 static void 2098 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2099 { 2100 struct spdk_blob *blob = cb_arg; 2101 2102 blob->state = SPDK_BLOB_STATE_DIRTY; 2103 blob->active.num_pages = 0; 2104 _spdk_resize_blob(blob, 0); 2105 2106 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2107 } 2108 2109 void 2110 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2111 spdk_blob_op_complete cb_fn, void *cb_arg) 2112 { 2113 struct spdk_blob *blob; 2114 struct spdk_bs_cpl cpl; 2115 spdk_bs_sequence_t *seq; 2116 2117 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 2118 2119 blob = _spdk_blob_lookup(bs, blobid); 2120 if (blob) { 2121 assert(blob->open_ref > 0); 2122 cb_fn(cb_arg, -EINVAL); 2123 return; 2124 } 2125 2126 blob = _spdk_blob_alloc(bs, blobid); 2127 if (!blob) { 2128 cb_fn(cb_arg, -ENOMEM); 2129 return; 2130 } 2131 2132 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2133 cpl.u.blob_basic.cb_fn = cb_fn; 2134 cpl.u.blob_basic.cb_arg = cb_arg; 2135 2136 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2137 if (!seq) { 2138 _spdk_blob_free(blob); 2139 cb_fn(cb_arg, -ENOMEM); 2140 return; 2141 } 2142 2143 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2144 } 2145 2146 /* END spdk_bs_md_delete_blob */ 2147 2148 /* START spdk_bs_md_open_blob */ 2149 2150 static void 2151 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2152 { 2153 struct spdk_blob *blob = cb_arg; 2154 2155 blob->open_ref++; 2156 2157 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2158 2159 spdk_bs_sequence_finish(seq, bserrno); 2160 } 2161 2162 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2163 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2164 { 2165 struct spdk_blob *blob; 2166 struct spdk_bs_cpl cpl; 2167 spdk_bs_sequence_t *seq; 2168 uint32_t page_num; 2169 2170 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 2171 2172 blob = _spdk_blob_lookup(bs, blobid); 2173 if (blob) { 2174 blob->open_ref++; 2175 cb_fn(cb_arg, blob, 0); 2176 return; 2177 } 2178 2179 page_num = _spdk_bs_blobid_to_page(blobid); 2180 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2181 /* Invalid blobid */ 2182 cb_fn(cb_arg, NULL, -ENOENT); 2183 return; 2184 } 2185 2186 blob = _spdk_blob_alloc(bs, blobid); 2187 if (!blob) { 2188 cb_fn(cb_arg, NULL, -ENOMEM); 2189 return; 2190 } 2191 2192 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2193 cpl.u.blob_handle.cb_fn = cb_fn; 2194 cpl.u.blob_handle.cb_arg = cb_arg; 2195 cpl.u.blob_handle.blob = blob; 2196 2197 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2198 if (!seq) { 2199 _spdk_blob_free(blob); 2200 cb_fn(cb_arg, NULL, -ENOMEM); 2201 return; 2202 } 2203 2204 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2205 } 2206 2207 /* START spdk_bs_md_sync_blob */ 2208 static void 2209 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2210 { 2211 spdk_bs_sequence_finish(seq, bserrno); 2212 } 2213 2214 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2215 spdk_blob_op_complete cb_fn, void *cb_arg) 2216 { 2217 struct spdk_bs_cpl cpl; 2218 spdk_bs_sequence_t *seq; 2219 2220 assert(blob != NULL); 2221 2222 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2223 2224 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2225 blob->state != SPDK_BLOB_STATE_SYNCING); 2226 2227 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2228 cb_fn(cb_arg, 0); 2229 return; 2230 } 2231 2232 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2233 cpl.u.blob_basic.cb_fn = cb_fn; 2234 cpl.u.blob_basic.cb_arg = cb_arg; 2235 2236 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2237 if (!seq) { 2238 cb_fn(cb_arg, -ENOMEM); 2239 return; 2240 } 2241 2242 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2243 } 2244 2245 /* END spdk_bs_md_sync_blob */ 2246 2247 /* START spdk_bs_md_close_blob */ 2248 2249 static void 2250 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2251 { 2252 struct spdk_blob **blob = cb_arg; 2253 2254 if ((*blob)->open_ref == 0) { 2255 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2256 _spdk_blob_free((*blob)); 2257 } 2258 2259 *blob = NULL; 2260 2261 spdk_bs_sequence_finish(seq, bserrno); 2262 } 2263 2264 void spdk_bs_md_close_blob(struct spdk_blob **b, 2265 spdk_blob_op_complete cb_fn, void *cb_arg) 2266 { 2267 struct spdk_bs_cpl cpl; 2268 struct spdk_blob *blob; 2269 spdk_bs_sequence_t *seq; 2270 2271 assert(b != NULL); 2272 blob = *b; 2273 assert(blob != NULL); 2274 2275 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2276 2277 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2278 blob->state != SPDK_BLOB_STATE_SYNCING); 2279 2280 if (blob->open_ref == 0) { 2281 cb_fn(cb_arg, -EBADF); 2282 return; 2283 } 2284 2285 blob->open_ref--; 2286 2287 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2288 cpl.u.blob_basic.cb_fn = cb_fn; 2289 cpl.u.blob_basic.cb_arg = cb_arg; 2290 2291 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2292 if (!seq) { 2293 cb_fn(cb_arg, -ENOMEM); 2294 return; 2295 } 2296 2297 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2298 _spdk_blob_close_cpl(seq, b, 0); 2299 return; 2300 } 2301 2302 /* Sync metadata */ 2303 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2304 } 2305 2306 /* END spdk_bs_md_close_blob */ 2307 2308 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2309 { 2310 return spdk_get_io_channel(&bs->io_target); 2311 } 2312 2313 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2314 { 2315 spdk_put_io_channel(channel); 2316 } 2317 2318 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2319 spdk_blob_op_complete cb_fn, void *cb_arg) 2320 { 2321 /* Flush is synchronous right now */ 2322 cb_fn(cb_arg, 0); 2323 } 2324 2325 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2326 void *payload, uint64_t offset, uint64_t length, 2327 spdk_blob_op_complete cb_fn, void *cb_arg) 2328 { 2329 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2330 } 2331 2332 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2333 void *payload, uint64_t offset, uint64_t length, 2334 spdk_blob_op_complete cb_fn, void *cb_arg) 2335 { 2336 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2337 } 2338 2339 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2340 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2341 spdk_blob_op_complete cb_fn, void *cb_arg) 2342 { 2343 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2344 } 2345 2346 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2347 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2348 spdk_blob_op_complete cb_fn, void *cb_arg) 2349 { 2350 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2351 } 2352 2353 struct spdk_bs_iter_ctx { 2354 int64_t page_num; 2355 struct spdk_blob_store *bs; 2356 2357 spdk_blob_op_with_handle_complete cb_fn; 2358 void *cb_arg; 2359 }; 2360 2361 static void 2362 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2363 { 2364 struct spdk_bs_iter_ctx *ctx = cb_arg; 2365 struct spdk_blob_store *bs = ctx->bs; 2366 spdk_blob_id id; 2367 2368 if (bserrno == 0) { 2369 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2370 free(ctx); 2371 return; 2372 } 2373 2374 ctx->page_num++; 2375 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2376 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2377 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2378 free(ctx); 2379 return; 2380 } 2381 2382 id = (1ULL << 32) | ctx->page_num; 2383 2384 blob = _spdk_blob_lookup(bs, id); 2385 if (blob) { 2386 blob->open_ref++; 2387 ctx->cb_fn(ctx->cb_arg, blob, 0); 2388 free(ctx); 2389 return; 2390 } 2391 2392 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2393 } 2394 2395 void 2396 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2397 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2398 { 2399 struct spdk_bs_iter_ctx *ctx; 2400 2401 ctx = calloc(1, sizeof(*ctx)); 2402 if (!ctx) { 2403 cb_fn(cb_arg, NULL, -ENOMEM); 2404 return; 2405 } 2406 2407 ctx->page_num = -1; 2408 ctx->bs = bs; 2409 ctx->cb_fn = cb_fn; 2410 ctx->cb_arg = cb_arg; 2411 2412 _spdk_bs_iter_cpl(ctx, NULL, -1); 2413 } 2414 2415 static void 2416 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2417 { 2418 struct spdk_bs_iter_ctx *ctx = cb_arg; 2419 2420 _spdk_bs_iter_cpl(ctx, NULL, -1); 2421 } 2422 2423 void 2424 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2425 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2426 { 2427 struct spdk_bs_iter_ctx *ctx; 2428 struct spdk_blob *blob; 2429 2430 assert(b != NULL); 2431 blob = *b; 2432 assert(blob != NULL); 2433 2434 ctx = calloc(1, sizeof(*ctx)); 2435 if (!ctx) { 2436 cb_fn(cb_arg, NULL, -ENOMEM); 2437 return; 2438 } 2439 2440 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2441 ctx->bs = bs; 2442 ctx->cb_fn = cb_fn; 2443 ctx->cb_arg = cb_arg; 2444 2445 /* Close the existing blob */ 2446 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2447 } 2448 2449 int 2450 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2451 uint16_t value_len) 2452 { 2453 struct spdk_xattr *xattr; 2454 2455 assert(blob != NULL); 2456 2457 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2458 blob->state != SPDK_BLOB_STATE_SYNCING); 2459 2460 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2461 if (!strcmp(name, xattr->name)) { 2462 free(xattr->value); 2463 xattr->value_len = value_len; 2464 xattr->value = malloc(value_len); 2465 memcpy(xattr->value, value, value_len); 2466 2467 blob->state = SPDK_BLOB_STATE_DIRTY; 2468 2469 return 0; 2470 } 2471 } 2472 2473 xattr = calloc(1, sizeof(*xattr)); 2474 if (!xattr) { 2475 return -1; 2476 } 2477 xattr->name = strdup(name); 2478 xattr->value_len = value_len; 2479 xattr->value = malloc(value_len); 2480 memcpy(xattr->value, value, value_len); 2481 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2482 2483 blob->state = SPDK_BLOB_STATE_DIRTY; 2484 2485 return 0; 2486 } 2487 2488 int 2489 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2490 { 2491 struct spdk_xattr *xattr; 2492 2493 assert(blob != NULL); 2494 2495 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2496 blob->state != SPDK_BLOB_STATE_SYNCING); 2497 2498 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2499 if (!strcmp(name, xattr->name)) { 2500 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2501 free(xattr->value); 2502 free(xattr->name); 2503 free(xattr); 2504 2505 blob->state = SPDK_BLOB_STATE_DIRTY; 2506 2507 return 0; 2508 } 2509 } 2510 2511 return -ENOENT; 2512 } 2513 2514 int 2515 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2516 const void **value, size_t *value_len) 2517 { 2518 struct spdk_xattr *xattr; 2519 2520 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2521 if (!strcmp(name, xattr->name)) { 2522 *value = xattr->value; 2523 *value_len = xattr->value_len; 2524 return 0; 2525 } 2526 } 2527 2528 return -ENOENT; 2529 } 2530 2531 struct spdk_xattr_names { 2532 uint32_t count; 2533 const char *names[0]; 2534 }; 2535 2536 int 2537 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2538 struct spdk_xattr_names **names) 2539 { 2540 struct spdk_xattr *xattr; 2541 int count = 0; 2542 2543 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2544 count++; 2545 } 2546 2547 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2548 if (*names == NULL) { 2549 return -ENOMEM; 2550 } 2551 2552 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2553 (*names)->names[(*names)->count++] = xattr->name; 2554 } 2555 2556 return 0; 2557 } 2558 2559 uint32_t 2560 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2561 { 2562 assert(names != NULL); 2563 2564 return names->count; 2565 } 2566 2567 const char * 2568 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2569 { 2570 if (index >= names->count) { 2571 return NULL; 2572 } 2573 2574 return names->names[index]; 2575 } 2576 2577 void 2578 spdk_xattr_names_free(struct spdk_xattr_names *names) 2579 { 2580 free(names); 2581 } 2582 2583 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2584