1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static inline size_t 51 divide_round_up(size_t num, size_t divisor) 52 { 53 return (num + divisor - 1) / divisor; 54 } 55 56 static void 57 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 58 { 59 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 60 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 61 assert(bs->num_free_clusters > 0); 62 63 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 64 65 spdk_bit_array_set(bs->used_clusters, cluster_num); 66 bs->num_free_clusters--; 67 } 68 69 static void 70 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 71 { 72 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 73 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 74 assert(bs->num_free_clusters < bs->total_clusters); 75 76 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 77 78 spdk_bit_array_clear(bs->used_clusters, cluster_num); 79 bs->num_free_clusters++; 80 } 81 82 static struct spdk_blob * 83 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 84 { 85 struct spdk_blob *blob; 86 87 blob = calloc(1, sizeof(*blob)); 88 if (!blob) { 89 return NULL; 90 } 91 92 blob->id = id; 93 blob->bs = bs; 94 95 blob->state = SPDK_BLOB_STATE_DIRTY; 96 blob->active.num_pages = 1; 97 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 98 if (!blob->active.pages) { 99 free(blob); 100 return NULL; 101 } 102 103 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 104 105 TAILQ_INIT(&blob->xattrs); 106 107 return blob; 108 } 109 110 static void 111 _spdk_blob_free(struct spdk_blob *blob) 112 { 113 struct spdk_xattr *xattr, *xattr_tmp; 114 115 assert(blob != NULL); 116 117 free(blob->active.clusters); 118 free(blob->clean.clusters); 119 free(blob->active.pages); 120 free(blob->clean.pages); 121 122 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 123 TAILQ_REMOVE(&blob->xattrs, xattr, link); 124 free(xattr->name); 125 free(xattr->value); 126 free(xattr); 127 } 128 129 free(blob); 130 } 131 132 static int 133 _spdk_blob_mark_clean(struct spdk_blob *blob) 134 { 135 uint64_t *clusters = NULL; 136 uint32_t *pages = NULL; 137 138 assert(blob != NULL); 139 assert(blob->state == SPDK_BLOB_STATE_LOADING || 140 blob->state == SPDK_BLOB_STATE_SYNCING); 141 142 if (blob->active.num_clusters) { 143 assert(blob->active.clusters); 144 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 145 if (!clusters) { 146 return -1; 147 } 148 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 149 } 150 151 if (blob->active.num_pages) { 152 assert(blob->active.pages); 153 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 154 if (!pages) { 155 free(clusters); 156 return -1; 157 } 158 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 159 } 160 161 free(blob->clean.clusters); 162 free(blob->clean.pages); 163 164 blob->clean.num_clusters = blob->active.num_clusters; 165 blob->clean.clusters = blob->active.clusters; 166 blob->clean.num_pages = blob->active.num_pages; 167 blob->clean.pages = blob->active.pages; 168 169 blob->active.clusters = clusters; 170 blob->active.pages = pages; 171 172 blob->state = SPDK_BLOB_STATE_CLEAN; 173 174 return 0; 175 } 176 177 static void 178 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 179 { 180 struct spdk_blob_md_descriptor *desc; 181 size_t cur_desc = 0; 182 void *tmp; 183 184 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 185 while (cur_desc < sizeof(page->descriptors)) { 186 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 187 if (desc->length == 0) { 188 /* If padding and length are 0, this terminates the page */ 189 break; 190 } 191 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 192 struct spdk_blob_md_descriptor_extent *desc_extent; 193 unsigned int i, j; 194 unsigned int cluster_count = blob->active.num_clusters; 195 196 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 197 198 assert(desc_extent->length > 0); 199 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 200 201 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 202 for (j = 0; j < desc_extent->extents[i].length; j++) { 203 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 204 cluster_count++; 205 } 206 } 207 208 assert(cluster_count > 0); 209 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 210 assert(tmp != NULL); 211 blob->active.clusters = tmp; 212 blob->active.cluster_array_size = cluster_count; 213 214 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 215 for (j = 0; j < desc_extent->extents[i].length; j++) { 216 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 217 desc_extent->extents[i].cluster_idx + j); 218 } 219 } 220 221 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 222 struct spdk_blob_md_descriptor_xattr *desc_xattr; 223 struct spdk_xattr *xattr; 224 225 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 226 227 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 228 sizeof(desc_xattr->value_length) + 229 desc_xattr->name_length + desc_xattr->value_length); 230 231 xattr = calloc(1, sizeof(*xattr)); 232 assert(xattr != NULL); 233 234 xattr->name = malloc(desc_xattr->name_length + 1); 235 assert(xattr->name); 236 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 237 xattr->name[desc_xattr->name_length] = '\0'; 238 239 xattr->value = malloc(desc_xattr->value_length); 240 assert(xattr->value != NULL); 241 xattr->value_len = desc_xattr->value_length; 242 memcpy(xattr->value, 243 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 244 desc_xattr->value_length); 245 246 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 247 } else { 248 /* Error */ 249 break; 250 } 251 252 /* Advance to the next descriptor */ 253 cur_desc += sizeof(*desc) + desc->length; 254 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 255 break; 256 } 257 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 258 } 259 } 260 261 static int 262 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 263 struct spdk_blob *blob) 264 { 265 const struct spdk_blob_md_page *page; 266 uint32_t i; 267 268 assert(page_count > 0); 269 assert(pages[0].sequence_num == 0); 270 assert(blob != NULL); 271 assert(blob->state == SPDK_BLOB_STATE_LOADING); 272 assert(blob->active.clusters == NULL); 273 assert(blob->id == pages[0].id); 274 assert(blob->state == SPDK_BLOB_STATE_LOADING); 275 276 for (i = 0; i < page_count; i++) { 277 page = &pages[i]; 278 279 assert(page->id == blob->id); 280 assert(page->sequence_num == i); 281 282 _spdk_blob_parse_page(page, blob); 283 } 284 285 return 0; 286 } 287 288 static int 289 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 290 struct spdk_blob_md_page **pages, 291 uint32_t *page_count, 292 struct spdk_blob_md_page **last_page) 293 { 294 struct spdk_blob_md_page *page; 295 296 assert(pages != NULL); 297 assert(page_count != NULL); 298 299 if (*page_count == 0) { 300 assert(*pages == NULL); 301 *page_count = 1; 302 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 303 SPDK_BS_PAGE_SIZE, 304 NULL); 305 } else { 306 assert(*pages != NULL); 307 (*page_count)++; 308 *pages = spdk_dma_realloc(*pages, 309 SPDK_BS_PAGE_SIZE * (*page_count), 310 SPDK_BS_PAGE_SIZE, 311 NULL); 312 } 313 314 if (*pages == NULL) { 315 *page_count = 0; 316 *last_page = NULL; 317 return -ENOMEM; 318 } 319 320 page = &(*pages)[*page_count - 1]; 321 memset(page, 0, sizeof(*page)); 322 page->id = blob->id; 323 page->sequence_num = *page_count - 1; 324 page->next = SPDK_INVALID_MD_PAGE; 325 *last_page = page; 326 327 return 0; 328 } 329 330 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 331 * Update required_sz on both success and failure. 332 * 333 */ 334 static int 335 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 336 uint8_t *buf, size_t buf_sz, 337 size_t *required_sz) 338 { 339 struct spdk_blob_md_descriptor_xattr *desc; 340 341 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 342 strlen(xattr->name) + 343 xattr->value_len; 344 345 if (buf_sz < *required_sz) { 346 return -1; 347 } 348 349 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 350 351 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 352 desc->length = sizeof(desc->name_length) + 353 sizeof(desc->value_length) + 354 strlen(xattr->name) + 355 xattr->value_len; 356 desc->name_length = strlen(xattr->name); 357 desc->value_length = xattr->value_len; 358 359 memcpy(desc->name, xattr->name, desc->name_length); 360 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 361 xattr->value, 362 desc->value_length); 363 364 return 0; 365 } 366 367 static void 368 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 369 uint64_t start_cluster, uint64_t *next_cluster, 370 uint8_t *buf, size_t buf_sz) 371 { 372 struct spdk_blob_md_descriptor_extent *desc; 373 size_t cur_sz; 374 uint64_t i, extent_idx; 375 uint32_t lba, lba_per_cluster, lba_count; 376 377 /* The buffer must have room for at least one extent */ 378 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 379 if (buf_sz < cur_sz) { 380 *next_cluster = start_cluster; 381 return; 382 } 383 384 desc = (struct spdk_blob_md_descriptor_extent *)buf; 385 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 386 387 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 388 389 lba = blob->active.clusters[start_cluster]; 390 lba_count = lba_per_cluster; 391 extent_idx = 0; 392 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 393 if ((lba + lba_count) == blob->active.clusters[i]) { 394 lba_count += lba_per_cluster; 395 continue; 396 } 397 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 398 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 399 extent_idx++; 400 401 cur_sz += sizeof(desc->extents[extent_idx]); 402 403 if (buf_sz < cur_sz) { 404 /* If we ran out of buffer space, return */ 405 desc->length = sizeof(desc->extents[0]) * extent_idx; 406 *next_cluster = i; 407 return; 408 } 409 410 lba = blob->active.clusters[i]; 411 lba_count = lba_per_cluster; 412 } 413 414 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 415 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 416 extent_idx++; 417 418 desc->length = sizeof(desc->extents[0]) * extent_idx; 419 *next_cluster = blob->active.num_clusters; 420 421 return; 422 } 423 424 static int 425 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 426 uint32_t *page_count) 427 { 428 struct spdk_blob_md_page *cur_page; 429 const struct spdk_xattr *xattr; 430 int rc; 431 uint8_t *buf; 432 size_t remaining_sz; 433 uint64_t last_cluster; 434 435 assert(pages != NULL); 436 assert(page_count != NULL); 437 assert(blob != NULL); 438 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 439 440 *pages = NULL; 441 *page_count = 0; 442 443 /* A blob always has at least 1 page, even if it has no descriptors */ 444 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 445 if (rc < 0) { 446 return rc; 447 } 448 449 buf = (uint8_t *)cur_page->descriptors; 450 remaining_sz = sizeof(cur_page->descriptors); 451 452 /* Serialize xattrs */ 453 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 454 size_t required_sz = 0; 455 rc = _spdk_blob_serialize_xattr(xattr, 456 buf, remaining_sz, 457 &required_sz); 458 if (rc < 0) { 459 /* Need to add a new page to the chain */ 460 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 461 &cur_page); 462 if (rc < 0) { 463 spdk_dma_free(*pages); 464 *pages = NULL; 465 *page_count = 0; 466 return rc; 467 } 468 469 buf = (uint8_t *)cur_page->descriptors; 470 remaining_sz = sizeof(cur_page->descriptors); 471 472 /* Try again */ 473 required_sz = 0; 474 rc = _spdk_blob_serialize_xattr(xattr, 475 buf, remaining_sz, 476 &required_sz); 477 478 if (rc < 0) { 479 spdk_dma_free(*pages); 480 *pages = NULL; 481 *page_count = 0; 482 return -1; 483 } 484 } 485 486 remaining_sz -= required_sz; 487 buf += required_sz; 488 } 489 490 /* Serialize extents */ 491 last_cluster = 0; 492 while (last_cluster < blob->active.num_clusters) { 493 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 494 buf, remaining_sz); 495 496 if (last_cluster == blob->active.num_clusters) { 497 break; 498 } 499 500 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 501 &cur_page); 502 if (rc < 0) { 503 return rc; 504 } 505 506 buf = (uint8_t *)cur_page->descriptors; 507 remaining_sz = sizeof(cur_page->descriptors); 508 } 509 510 return 0; 511 } 512 513 struct spdk_blob_load_ctx { 514 struct spdk_blob *blob; 515 516 struct spdk_blob_md_page *pages; 517 uint32_t num_pages; 518 519 spdk_bs_sequence_cpl cb_fn; 520 void *cb_arg; 521 }; 522 523 static uint32_t 524 _spdk_blob_md_page_calc_crc(void *page) 525 { 526 uint32_t crc; 527 528 crc = BLOB_CRC32C_INITIAL; 529 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 530 crc ^= BLOB_CRC32C_INITIAL; 531 532 return crc; 533 534 } 535 536 static void 537 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 538 { 539 struct spdk_blob_load_ctx *ctx = cb_arg; 540 struct spdk_blob *blob = ctx->blob; 541 struct spdk_blob_md_page *page; 542 int rc; 543 uint32_t crc; 544 545 page = &ctx->pages[ctx->num_pages - 1]; 546 crc = _spdk_blob_md_page_calc_crc(page); 547 if (crc != page->crc) { 548 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 549 _spdk_blob_free(blob); 550 ctx->cb_fn(seq, NULL, -EINVAL); 551 spdk_dma_free(ctx->pages); 552 free(ctx); 553 return; 554 } 555 556 if (page->next != SPDK_INVALID_MD_PAGE) { 557 uint32_t next_page = page->next; 558 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 559 560 561 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 562 563 /* Read the next page */ 564 ctx->num_pages++; 565 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 566 sizeof(*page), NULL); 567 if (ctx->pages == NULL) { 568 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 569 free(ctx); 570 return; 571 } 572 573 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 574 next_lba, 575 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 576 _spdk_blob_load_cpl, ctx); 577 return; 578 } 579 580 /* Parse the pages */ 581 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 582 583 _spdk_blob_mark_clean(blob); 584 585 ctx->cb_fn(seq, ctx->cb_arg, rc); 586 587 /* Free the memory */ 588 spdk_dma_free(ctx->pages); 589 free(ctx); 590 } 591 592 /* Load a blob from disk given a blobid */ 593 static void 594 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 595 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 596 { 597 struct spdk_blob_load_ctx *ctx; 598 struct spdk_blob_store *bs; 599 uint32_t page_num; 600 uint64_t lba; 601 602 assert(blob != NULL); 603 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 604 blob->state == SPDK_BLOB_STATE_DIRTY); 605 606 bs = blob->bs; 607 608 ctx = calloc(1, sizeof(*ctx)); 609 if (!ctx) { 610 cb_fn(seq, cb_arg, -ENOMEM); 611 return; 612 } 613 614 ctx->blob = blob; 615 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 616 SPDK_BS_PAGE_SIZE, NULL); 617 if (!ctx->pages) { 618 free(ctx); 619 cb_fn(seq, cb_arg, -ENOMEM); 620 return; 621 } 622 ctx->num_pages = 1; 623 ctx->cb_fn = cb_fn; 624 ctx->cb_arg = cb_arg; 625 626 page_num = _spdk_bs_blobid_to_page(blob->id); 627 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 628 629 blob->state = SPDK_BLOB_STATE_LOADING; 630 631 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 632 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 633 _spdk_blob_load_cpl, ctx); 634 } 635 636 struct spdk_blob_persist_ctx { 637 struct spdk_blob *blob; 638 639 struct spdk_blob_md_page *pages; 640 641 uint64_t idx; 642 643 spdk_bs_sequence_cpl cb_fn; 644 void *cb_arg; 645 }; 646 647 static void 648 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 649 { 650 struct spdk_blob_persist_ctx *ctx = cb_arg; 651 struct spdk_blob *blob = ctx->blob; 652 653 if (bserrno == 0) { 654 _spdk_blob_mark_clean(blob); 655 } 656 657 /* Call user callback */ 658 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 659 660 /* Free the memory */ 661 spdk_dma_free(ctx->pages); 662 free(ctx); 663 } 664 665 static void 666 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 667 { 668 struct spdk_blob_persist_ctx *ctx = cb_arg; 669 struct spdk_blob *blob = ctx->blob; 670 struct spdk_blob_store *bs = blob->bs; 671 void *tmp; 672 size_t i; 673 674 /* Release all clusters that were truncated */ 675 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 676 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 677 678 _spdk_bs_release_cluster(bs, cluster_num); 679 } 680 681 if (blob->active.num_clusters == 0) { 682 free(blob->active.clusters); 683 blob->active.clusters = NULL; 684 blob->active.cluster_array_size = 0; 685 } else { 686 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 687 assert(tmp != NULL); 688 blob->active.clusters = tmp; 689 blob->active.cluster_array_size = blob->active.num_clusters; 690 } 691 692 _spdk_blob_persist_complete(seq, ctx, bserrno); 693 } 694 695 static void 696 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 697 { 698 struct spdk_blob_persist_ctx *ctx = cb_arg; 699 struct spdk_blob *blob = ctx->blob; 700 struct spdk_blob_store *bs = blob->bs; 701 spdk_bs_batch_t *batch; 702 size_t i; 703 uint64_t lba; 704 uint32_t lba_count; 705 706 /* Clusters don't move around in blobs. The list shrinks or grows 707 * at the end, but no changes ever occur in the middle of the list. 708 */ 709 710 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 711 712 /* Unmap all clusters that were truncated */ 713 lba = 0; 714 lba_count = 0; 715 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 716 uint64_t next_lba = blob->active.clusters[i]; 717 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 718 719 if ((lba + lba_count) == next_lba) { 720 /* This cluster is contiguous with the previous one. */ 721 lba_count += next_lba_count; 722 continue; 723 } 724 725 /* This cluster is not contiguous with the previous one. */ 726 727 /* If a run of LBAs previously existing, send them 728 * as an unmap. 729 */ 730 if (lba_count > 0) { 731 spdk_bs_batch_unmap(batch, lba, lba_count); 732 } 733 734 /* Start building the next batch */ 735 lba = next_lba; 736 lba_count = next_lba_count; 737 } 738 739 /* If we ended with a contiguous set of LBAs, send the unmap now */ 740 if (lba_count > 0) { 741 spdk_bs_batch_unmap(batch, lba, lba_count); 742 } 743 744 spdk_bs_batch_close(batch); 745 } 746 747 static void 748 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 749 { 750 struct spdk_blob_persist_ctx *ctx = cb_arg; 751 struct spdk_blob *blob = ctx->blob; 752 struct spdk_blob_store *bs = blob->bs; 753 size_t i; 754 755 /* This loop starts at 1 because the first page is special and handled 756 * below. The pages (except the first) are never written in place, 757 * so any pages in the clean list must be unmapped. 758 */ 759 for (i = 1; i < blob->clean.num_pages; i++) { 760 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 761 } 762 763 if (blob->active.num_pages == 0) { 764 uint32_t page_num; 765 766 page_num = _spdk_bs_blobid_to_page(blob->id); 767 spdk_bit_array_clear(bs->used_md_pages, page_num); 768 } 769 770 /* Move on to unmapping clusters */ 771 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 772 } 773 774 static void 775 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 776 { 777 struct spdk_blob_persist_ctx *ctx = cb_arg; 778 struct spdk_blob *blob = ctx->blob; 779 struct spdk_blob_store *bs = blob->bs; 780 uint64_t lba; 781 uint32_t lba_count; 782 spdk_bs_batch_t *batch; 783 size_t i; 784 785 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 786 787 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 788 789 /* This loop starts at 1 because the first page is special and handled 790 * below. The pages (except the first) are never written in place, 791 * so any pages in the clean list must be unmapped. 792 */ 793 for (i = 1; i < blob->clean.num_pages; i++) { 794 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 795 796 spdk_bs_batch_unmap(batch, lba, lba_count); 797 } 798 799 /* The first page will only be unmapped if this is a delete. */ 800 if (blob->active.num_pages == 0) { 801 uint32_t page_num; 802 803 /* The first page in the metadata goes where the blobid indicates */ 804 page_num = _spdk_bs_blobid_to_page(blob->id); 805 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 806 807 spdk_bs_batch_unmap(batch, lba, lba_count); 808 } 809 810 spdk_bs_batch_close(batch); 811 } 812 813 static void 814 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 815 { 816 struct spdk_blob_persist_ctx *ctx = cb_arg; 817 struct spdk_blob *blob = ctx->blob; 818 struct spdk_blob_store *bs = blob->bs; 819 uint64_t lba; 820 uint32_t lba_count; 821 struct spdk_blob_md_page *page; 822 823 if (blob->active.num_pages == 0) { 824 /* Move on to the next step */ 825 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 826 return; 827 } 828 829 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 830 831 page = &ctx->pages[0]; 832 /* The first page in the metadata goes where the blobid indicates */ 833 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 834 835 spdk_bs_sequence_write(seq, page, lba, lba_count, 836 _spdk_blob_persist_unmap_pages, ctx); 837 } 838 839 static void 840 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 841 { 842 struct spdk_blob_persist_ctx *ctx = cb_arg; 843 struct spdk_blob *blob = ctx->blob; 844 struct spdk_blob_store *bs = blob->bs; 845 uint64_t lba; 846 uint32_t lba_count; 847 struct spdk_blob_md_page *page; 848 spdk_bs_batch_t *batch; 849 size_t i; 850 851 /* Clusters don't move around in blobs. The list shrinks or grows 852 * at the end, but no changes ever occur in the middle of the list. 853 */ 854 855 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 856 857 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 858 859 /* This starts at 1. The root page is not written until 860 * all of the others are finished 861 */ 862 for (i = 1; i < blob->active.num_pages; i++) { 863 page = &ctx->pages[i]; 864 assert(page->sequence_num == i); 865 866 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 867 868 spdk_bs_batch_write(batch, page, lba, lba_count); 869 } 870 871 spdk_bs_batch_close(batch); 872 } 873 874 static int 875 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 876 { 877 uint64_t i; 878 uint64_t *tmp; 879 uint64_t lfc; /* lowest free cluster */ 880 struct spdk_blob_store *bs; 881 882 bs = blob->bs; 883 884 assert(blob->state != SPDK_BLOB_STATE_LOADING && 885 blob->state != SPDK_BLOB_STATE_SYNCING); 886 887 if (blob->active.num_clusters == sz) { 888 return 0; 889 } 890 891 if (blob->active.num_clusters < blob->active.cluster_array_size) { 892 /* If this blob was resized to be larger, then smaller, then 893 * larger without syncing, then the cluster array already 894 * contains spare assigned clusters we can use. 895 */ 896 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 897 sz); 898 } 899 900 blob->state = SPDK_BLOB_STATE_DIRTY; 901 902 /* Do two passes - one to verify that we can obtain enough clusters 903 * and another to actually claim them. 904 */ 905 906 lfc = 0; 907 for (i = blob->active.num_clusters; i < sz; i++) { 908 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 909 if (lfc >= bs->total_clusters) { 910 /* No more free clusters. Cannot satisfy the request */ 911 assert(false); 912 return -1; 913 } 914 lfc++; 915 } 916 917 if (sz > blob->active.num_clusters) { 918 /* Expand the cluster array if necessary. 919 * We only shrink the array when persisting. 920 */ 921 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 922 if (sz > 0 && tmp == NULL) { 923 assert(false); 924 return -1; 925 } 926 blob->active.clusters = tmp; 927 blob->active.cluster_array_size = sz; 928 } 929 930 lfc = 0; 931 for (i = blob->active.num_clusters; i < sz; i++) { 932 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 933 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 934 _spdk_bs_claim_cluster(bs, lfc); 935 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 936 lfc++; 937 } 938 939 blob->active.num_clusters = sz; 940 941 return 0; 942 } 943 944 /* Write a blob to disk */ 945 static void 946 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 947 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 948 { 949 struct spdk_blob_persist_ctx *ctx; 950 int rc; 951 uint64_t i; 952 uint32_t page_num; 953 struct spdk_blob_store *bs; 954 955 assert(blob != NULL); 956 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 957 blob->state == SPDK_BLOB_STATE_DIRTY); 958 959 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 960 cb_fn(seq, cb_arg, 0); 961 return; 962 } 963 964 bs = blob->bs; 965 966 ctx = calloc(1, sizeof(*ctx)); 967 if (!ctx) { 968 cb_fn(seq, cb_arg, -ENOMEM); 969 return; 970 } 971 ctx->blob = blob; 972 ctx->cb_fn = cb_fn; 973 ctx->cb_arg = cb_arg; 974 975 blob->state = SPDK_BLOB_STATE_SYNCING; 976 977 if (blob->active.num_pages == 0) { 978 /* This is the signal that the blob should be deleted. 979 * Immediately jump to the clean up routine. */ 980 assert(blob->clean.num_pages > 0); 981 ctx->idx = blob->clean.num_pages - 1; 982 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 983 return; 984 985 } 986 987 /* Generate the new metadata */ 988 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 989 if (rc < 0) { 990 free(ctx); 991 cb_fn(seq, cb_arg, rc); 992 return; 993 } 994 995 assert(blob->active.num_pages >= 1); 996 997 /* Resize the cache of page indices */ 998 blob->active.pages = realloc(blob->active.pages, 999 blob->active.num_pages * sizeof(*blob->active.pages)); 1000 if (!blob->active.pages) { 1001 free(ctx); 1002 cb_fn(seq, cb_arg, -ENOMEM); 1003 return; 1004 } 1005 1006 /* Assign this metadata to pages. This requires two passes - 1007 * one to verify that there are enough pages and a second 1008 * to actually claim them. */ 1009 page_num = 0; 1010 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1011 for (i = 1; i < blob->active.num_pages; i++) { 1012 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1013 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1014 spdk_dma_free(ctx->pages); 1015 free(ctx); 1016 blob->state = SPDK_BLOB_STATE_DIRTY; 1017 cb_fn(seq, cb_arg, -ENOMEM); 1018 return; 1019 } 1020 page_num++; 1021 } 1022 1023 page_num = 0; 1024 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1025 for (i = 1; i < blob->active.num_pages; i++) { 1026 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1027 ctx->pages[i - 1].next = page_num; 1028 /* Now that previous metadata page is complete, calculate the crc for it. */ 1029 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1030 blob->active.pages[i] = page_num; 1031 spdk_bit_array_set(bs->used_md_pages, page_num); 1032 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1033 page_num++; 1034 } 1035 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1036 /* Start writing the metadata from last page to first */ 1037 ctx->idx = blob->active.num_pages - 1; 1038 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1039 } 1040 1041 static void 1042 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1043 void *payload, uint64_t offset, uint64_t length, 1044 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1045 { 1046 spdk_bs_batch_t *batch; 1047 struct spdk_bs_cpl cpl; 1048 uint64_t lba; 1049 uint32_t lba_count; 1050 uint8_t *buf; 1051 uint64_t page; 1052 1053 assert(blob != NULL); 1054 1055 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1056 cb_fn(cb_arg, -EINVAL); 1057 return; 1058 } 1059 1060 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1061 cpl.u.blob_basic.cb_fn = cb_fn; 1062 cpl.u.blob_basic.cb_arg = cb_arg; 1063 1064 batch = spdk_bs_batch_open(_channel, &cpl); 1065 if (!batch) { 1066 cb_fn(cb_arg, -ENOMEM); 1067 return; 1068 } 1069 1070 length = _spdk_bs_page_to_lba(blob->bs, length); 1071 page = offset; 1072 buf = payload; 1073 while (length > 0) { 1074 lba = _spdk_bs_blob_page_to_lba(blob, page); 1075 lba_count = spdk_min(length, 1076 _spdk_bs_page_to_lba(blob->bs, 1077 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1078 1079 if (read) { 1080 spdk_bs_batch_read(batch, buf, lba, lba_count); 1081 } else { 1082 spdk_bs_batch_write(batch, buf, lba, lba_count); 1083 } 1084 1085 length -= lba_count; 1086 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1087 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1088 } 1089 1090 spdk_bs_batch_close(batch); 1091 } 1092 1093 struct rw_iov_ctx { 1094 struct spdk_blob *blob; 1095 bool read; 1096 int iovcnt; 1097 struct iovec *orig_iov; 1098 uint64_t page_offset; 1099 uint64_t pages_remaining; 1100 uint64_t pages_done; 1101 struct iovec iov[0]; 1102 }; 1103 1104 static void 1105 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1106 { 1107 assert(cb_arg == NULL); 1108 spdk_bs_sequence_finish(seq, bserrno); 1109 } 1110 1111 static void 1112 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1113 { 1114 struct rw_iov_ctx *ctx = cb_arg; 1115 struct iovec *iov, *orig_iov; 1116 int iovcnt; 1117 size_t orig_iovoff; 1118 uint64_t lba; 1119 uint64_t page_count, pages_to_boundary; 1120 uint32_t lba_count; 1121 uint64_t byte_count; 1122 1123 if (bserrno != 0 || ctx->pages_remaining == 0) { 1124 free(ctx); 1125 spdk_bs_sequence_finish(seq, bserrno); 1126 return; 1127 } 1128 1129 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1130 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1131 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1132 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1133 1134 /* 1135 * Get index and offset into the original iov array for our current position in the I/O sequence. 1136 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1137 * point to the current position in the I/O sequence. 1138 */ 1139 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1140 orig_iov = &ctx->orig_iov[0]; 1141 orig_iovoff = 0; 1142 while (byte_count > 0) { 1143 if (byte_count >= orig_iov->iov_len) { 1144 byte_count -= orig_iov->iov_len; 1145 orig_iov++; 1146 } else { 1147 orig_iovoff = byte_count; 1148 byte_count = 0; 1149 } 1150 } 1151 1152 /* 1153 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1154 * bytes of this next I/O remain to be accounted for in the new iov array. 1155 */ 1156 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1157 iov = &ctx->iov[0]; 1158 iovcnt = 0; 1159 while (byte_count > 0) { 1160 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1161 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1162 byte_count -= iov->iov_len; 1163 orig_iovoff = 0; 1164 orig_iov++; 1165 iov++; 1166 iovcnt++; 1167 } 1168 1169 ctx->page_offset += page_count; 1170 ctx->pages_done += page_count; 1171 ctx->pages_remaining -= page_count; 1172 iov = &ctx->iov[0]; 1173 1174 if (ctx->read) { 1175 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1176 } else { 1177 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1178 } 1179 } 1180 1181 static void 1182 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1183 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1184 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1185 { 1186 spdk_bs_sequence_t *seq; 1187 struct spdk_bs_cpl cpl; 1188 1189 assert(blob != NULL); 1190 1191 if (length == 0) { 1192 cb_fn(cb_arg, 0); 1193 return; 1194 } 1195 1196 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1197 cb_fn(cb_arg, -EINVAL); 1198 return; 1199 } 1200 1201 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1202 cpl.u.blob_basic.cb_fn = cb_fn; 1203 cpl.u.blob_basic.cb_arg = cb_arg; 1204 1205 /* 1206 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1207 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1208 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1209 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1210 * to allocate a separate iov array and split the I/O such that none of the resulting 1211 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1212 * but since this case happens very infrequently, any performance impact will be negligible. 1213 * 1214 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1215 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1216 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1217 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1218 */ 1219 seq = spdk_bs_sequence_start(_channel, &cpl); 1220 if (!seq) { 1221 cb_fn(cb_arg, -ENOMEM); 1222 return; 1223 } 1224 1225 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1226 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1227 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1228 1229 if (read) { 1230 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1231 } else { 1232 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1233 } 1234 } else { 1235 struct rw_iov_ctx *ctx; 1236 1237 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1238 if (ctx == NULL) { 1239 spdk_bs_sequence_finish(seq, -ENOMEM); 1240 return; 1241 } 1242 1243 ctx->blob = blob; 1244 ctx->read = read; 1245 ctx->orig_iov = iov; 1246 ctx->iovcnt = iovcnt; 1247 ctx->page_offset = offset; 1248 ctx->pages_remaining = length; 1249 ctx->pages_done = 0; 1250 1251 _spdk_rw_iov_split_next(seq, ctx, 0); 1252 } 1253 } 1254 1255 static struct spdk_blob * 1256 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1257 { 1258 struct spdk_blob *blob; 1259 1260 TAILQ_FOREACH(blob, &bs->blobs, link) { 1261 if (blob->id == blobid) { 1262 return blob; 1263 } 1264 } 1265 1266 return NULL; 1267 } 1268 1269 static int 1270 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1271 uint32_t max_ops) 1272 { 1273 struct spdk_bs_dev *dev; 1274 uint32_t i; 1275 1276 dev = bs->dev; 1277 1278 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1279 if (!channel->req_mem) { 1280 return -1; 1281 } 1282 1283 TAILQ_INIT(&channel->reqs); 1284 1285 for (i = 0; i < max_ops; i++) { 1286 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1287 } 1288 1289 channel->bs = bs; 1290 channel->dev = dev; 1291 channel->dev_channel = dev->create_channel(dev); 1292 1293 return 0; 1294 } 1295 1296 static int 1297 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1298 { 1299 struct spdk_blob_store *bs; 1300 struct spdk_bs_channel *channel = ctx_buf; 1301 1302 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1303 1304 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1305 } 1306 1307 static int 1308 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1309 { 1310 struct spdk_blob_store *bs; 1311 struct spdk_bs_channel *channel = ctx_buf; 1312 1313 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1314 1315 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1316 } 1317 1318 1319 static void 1320 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1321 { 1322 struct spdk_bs_channel *channel = ctx_buf; 1323 1324 free(channel->req_mem); 1325 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1326 } 1327 1328 static void 1329 _spdk_bs_dev_destroy(void *io_device) 1330 { 1331 struct spdk_blob_store *bs; 1332 struct spdk_blob *blob, *blob_tmp; 1333 1334 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1335 bs->dev->destroy(bs->dev); 1336 1337 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1338 TAILQ_REMOVE(&bs->blobs, blob, link); 1339 _spdk_blob_free(blob); 1340 } 1341 1342 spdk_bit_array_free(&bs->used_md_pages); 1343 spdk_bit_array_free(&bs->used_clusters); 1344 /* 1345 * If this function is called for any reason except a successful unload, 1346 * the unload_cpl type will be NONE and this will be a nop. 1347 */ 1348 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1349 1350 free(bs); 1351 } 1352 1353 static void 1354 _spdk_bs_free(struct spdk_blob_store *bs) 1355 { 1356 spdk_bs_unregister_md_thread(bs); 1357 spdk_io_device_unregister(&bs->io_target, NULL); 1358 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1359 } 1360 1361 void 1362 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1363 { 1364 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1365 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1366 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1367 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1368 } 1369 1370 static int 1371 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1372 { 1373 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1374 opts->max_channel_ops == 0) { 1375 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1376 return -1; 1377 } 1378 1379 return 0; 1380 } 1381 1382 static struct spdk_blob_store * 1383 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1384 { 1385 struct spdk_blob_store *bs; 1386 uint64_t dev_size; 1387 1388 dev_size = dev->blocklen * dev->blockcnt; 1389 if (dev_size < opts->cluster_sz) { 1390 /* Device size cannot be smaller than cluster size of blobstore */ 1391 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1392 opts->cluster_sz); 1393 return NULL; 1394 } 1395 bs = calloc(1, sizeof(struct spdk_blob_store)); 1396 if (!bs) { 1397 return NULL; 1398 } 1399 1400 TAILQ_INIT(&bs->blobs); 1401 bs->dev = dev; 1402 1403 /* 1404 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1405 * even multiple of the cluster size. 1406 */ 1407 bs->cluster_sz = opts->cluster_sz; 1408 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1409 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1410 bs->num_free_clusters = bs->total_clusters; 1411 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1412 if (bs->used_clusters == NULL) { 1413 free(bs); 1414 return NULL; 1415 } 1416 1417 bs->md_target.max_md_ops = opts->max_md_ops; 1418 bs->io_target.max_channel_ops = opts->max_channel_ops; 1419 bs->super_blob = SPDK_BLOBID_INVALID; 1420 1421 /* The metadata is assumed to be at least 1 page */ 1422 bs->used_md_pages = spdk_bit_array_create(1); 1423 1424 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1425 sizeof(struct spdk_bs_channel)); 1426 spdk_bs_register_md_thread(bs); 1427 1428 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1429 sizeof(struct spdk_bs_channel)); 1430 1431 return bs; 1432 } 1433 1434 /* START spdk_bs_load */ 1435 1436 struct spdk_bs_load_ctx { 1437 struct spdk_blob_store *bs; 1438 struct spdk_bs_super_block *super; 1439 1440 struct spdk_bs_md_mask *mask; 1441 }; 1442 1443 static void 1444 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1445 { 1446 struct spdk_bs_load_ctx *ctx = cb_arg; 1447 uint32_t i, j; 1448 int rc; 1449 1450 /* The type must be correct */ 1451 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1452 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1453 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1454 struct spdk_blob_md_page) * 8)); 1455 /* The length of the mask must be exactly equal to the total number of clusters */ 1456 assert(ctx->mask->length == ctx->bs->total_clusters); 1457 1458 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1459 if (rc < 0) { 1460 spdk_dma_free(ctx->super); 1461 spdk_dma_free(ctx->mask); 1462 _spdk_bs_free(ctx->bs); 1463 free(ctx); 1464 spdk_bs_sequence_finish(seq, -ENOMEM); 1465 return; 1466 } 1467 1468 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1469 for (i = 0; i < ctx->mask->length / 8; i++) { 1470 uint8_t segment = ctx->mask->mask[i]; 1471 for (j = 0; segment && (j < 8); j++) { 1472 if (segment & 1U) { 1473 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1474 assert(ctx->bs->num_free_clusters > 0); 1475 ctx->bs->num_free_clusters--; 1476 } 1477 segment >>= 1U; 1478 } 1479 } 1480 1481 spdk_dma_free(ctx->super); 1482 spdk_dma_free(ctx->mask); 1483 free(ctx); 1484 1485 spdk_bs_sequence_finish(seq, bserrno); 1486 } 1487 1488 static void 1489 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1490 { 1491 struct spdk_bs_load_ctx *ctx = cb_arg; 1492 uint64_t lba, lba_count, mask_size; 1493 uint32_t i, j; 1494 int rc; 1495 1496 /* The type must be correct */ 1497 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1498 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1499 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1500 8)); 1501 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1502 assert(ctx->mask->length == ctx->super->md_len); 1503 1504 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1505 if (rc < 0) { 1506 spdk_dma_free(ctx->super); 1507 spdk_dma_free(ctx->mask); 1508 _spdk_bs_free(ctx->bs); 1509 free(ctx); 1510 spdk_bs_sequence_finish(seq, -ENOMEM); 1511 return; 1512 } 1513 1514 for (i = 0; i < ctx->mask->length / 8; i++) { 1515 uint8_t segment = ctx->mask->mask[i]; 1516 for (j = 0; segment && (j < 8); j++) { 1517 if (segment & 1U) { 1518 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1519 } 1520 segment >>= 1U; 1521 } 1522 } 1523 spdk_dma_free(ctx->mask); 1524 1525 /* Read the used clusters mask */ 1526 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1527 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1528 if (!ctx->mask) { 1529 spdk_dma_free(ctx->super); 1530 _spdk_bs_free(ctx->bs); 1531 free(ctx); 1532 spdk_bs_sequence_finish(seq, -ENOMEM); 1533 return; 1534 } 1535 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1536 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1537 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1538 _spdk_bs_load_used_clusters_cpl, ctx); 1539 } 1540 1541 static void 1542 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1543 { 1544 struct spdk_bs_load_ctx *ctx = cb_arg; 1545 uint64_t lba, lba_count, mask_size; 1546 1547 /* Parse the super block */ 1548 ctx->bs->cluster_sz = ctx->super->cluster_size; 1549 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1550 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1551 ctx->bs->md_start = ctx->super->md_start; 1552 ctx->bs->md_len = ctx->super->md_len; 1553 ctx->bs->super_blob = ctx->super->super_blob; 1554 1555 /* Read the used pages mask */ 1556 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1557 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1558 if (!ctx->mask) { 1559 spdk_dma_free(ctx->super); 1560 _spdk_bs_free(ctx->bs); 1561 free(ctx); 1562 spdk_bs_sequence_finish(seq, -ENOMEM); 1563 return; 1564 } 1565 1566 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1567 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1568 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1569 _spdk_bs_load_used_pages_cpl, ctx); 1570 } 1571 1572 static void 1573 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1574 { 1575 struct spdk_bs_load_ctx *ctx = cb_arg; 1576 uint32_t crc; 1577 1578 if (ctx->super->version != SPDK_BS_VERSION) { 1579 spdk_dma_free(ctx->super); 1580 _spdk_bs_free(ctx->bs); 1581 free(ctx); 1582 spdk_bs_sequence_finish(seq, -EILSEQ); 1583 return; 1584 } 1585 1586 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1587 sizeof(ctx->super->signature)) != 0) { 1588 spdk_dma_free(ctx->super); 1589 _spdk_bs_free(ctx->bs); 1590 free(ctx); 1591 spdk_bs_sequence_finish(seq, -EILSEQ); 1592 return; 1593 } 1594 1595 crc = _spdk_blob_md_page_calc_crc(ctx->super); 1596 if (crc != ctx->super->crc) { 1597 spdk_dma_free(ctx->super); 1598 _spdk_bs_free(ctx->bs); 1599 free(ctx); 1600 spdk_bs_sequence_finish(seq, -EILSEQ); 1601 return; 1602 } 1603 1604 if (ctx->super->clean != 1) { 1605 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1606 * All of the necessary data to recover is available 1607 * on disk - the code just has not been written yet. 1608 */ 1609 assert(false); 1610 spdk_dma_free(ctx->super); 1611 _spdk_bs_free(ctx->bs); 1612 free(ctx); 1613 spdk_bs_sequence_finish(seq, -EILSEQ); 1614 return; 1615 } 1616 1617 ctx->super->clean = 0; 1618 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 1619 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1620 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1621 _spdk_bs_load_write_super_cpl, ctx); 1622 } 1623 1624 void 1625 spdk_bs_load(struct spdk_bs_dev *dev, 1626 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1627 { 1628 struct spdk_blob_store *bs; 1629 struct spdk_bs_cpl cpl; 1630 spdk_bs_sequence_t *seq; 1631 struct spdk_bs_load_ctx *ctx; 1632 struct spdk_bs_opts opts = {}; 1633 1634 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1635 1636 spdk_bs_opts_init(&opts); 1637 1638 bs = _spdk_bs_alloc(dev, &opts); 1639 if (!bs) { 1640 cb_fn(cb_arg, NULL, -ENOMEM); 1641 return; 1642 } 1643 1644 ctx = calloc(1, sizeof(*ctx)); 1645 if (!ctx) { 1646 _spdk_bs_free(bs); 1647 cb_fn(cb_arg, NULL, -ENOMEM); 1648 return; 1649 } 1650 1651 ctx->bs = bs; 1652 1653 /* Allocate memory for the super block */ 1654 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1655 if (!ctx->super) { 1656 free(ctx); 1657 _spdk_bs_free(bs); 1658 return; 1659 } 1660 1661 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1662 cpl.u.bs_handle.cb_fn = cb_fn; 1663 cpl.u.bs_handle.cb_arg = cb_arg; 1664 cpl.u.bs_handle.bs = bs; 1665 1666 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1667 if (!seq) { 1668 spdk_dma_free(ctx->super); 1669 free(ctx); 1670 _spdk_bs_free(bs); 1671 cb_fn(cb_arg, NULL, -ENOMEM); 1672 return; 1673 } 1674 1675 /* Read the super block */ 1676 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1677 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1678 _spdk_bs_load_super_cpl, ctx); 1679 } 1680 1681 /* END spdk_bs_load */ 1682 1683 /* START spdk_bs_init */ 1684 1685 struct spdk_bs_init_ctx { 1686 struct spdk_blob_store *bs; 1687 struct spdk_bs_super_block *super; 1688 }; 1689 1690 static void 1691 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1692 { 1693 struct spdk_bs_init_ctx *ctx = cb_arg; 1694 1695 spdk_dma_free(ctx->super); 1696 free(ctx); 1697 1698 spdk_bs_sequence_finish(seq, bserrno); 1699 } 1700 1701 static void 1702 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1703 { 1704 struct spdk_bs_init_ctx *ctx = cb_arg; 1705 1706 /* Write super block */ 1707 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1708 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1709 _spdk_bs_init_persist_super_cpl, ctx); 1710 } 1711 1712 void 1713 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1714 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1715 { 1716 struct spdk_bs_init_ctx *ctx; 1717 struct spdk_blob_store *bs; 1718 struct spdk_bs_cpl cpl; 1719 spdk_bs_sequence_t *seq; 1720 uint64_t num_md_pages; 1721 uint64_t num_md_clusters; 1722 uint32_t i; 1723 struct spdk_bs_opts opts = {}; 1724 int rc; 1725 1726 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1727 1728 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 1729 SPDK_ERRLOG("unsupported dev block length of %d\n", 1730 dev->blocklen); 1731 dev->destroy(dev); 1732 cb_fn(cb_arg, NULL, -EINVAL); 1733 return; 1734 } 1735 1736 if (o) { 1737 opts = *o; 1738 } else { 1739 spdk_bs_opts_init(&opts); 1740 } 1741 1742 if (_spdk_bs_opts_verify(&opts) != 0) { 1743 dev->destroy(dev); 1744 cb_fn(cb_arg, NULL, -EINVAL); 1745 return; 1746 } 1747 1748 bs = _spdk_bs_alloc(dev, &opts); 1749 if (!bs) { 1750 dev->destroy(dev); 1751 cb_fn(cb_arg, NULL, -ENOMEM); 1752 return; 1753 } 1754 1755 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 1756 /* By default, allocate 1 page per cluster. 1757 * Technically, this over-allocates metadata 1758 * because more metadata will reduce the number 1759 * of usable clusters. This can be addressed with 1760 * more complex math in the future. 1761 */ 1762 bs->md_len = bs->total_clusters; 1763 } else { 1764 bs->md_len = opts.num_md_pages; 1765 } 1766 1767 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1768 if (rc < 0) { 1769 _spdk_bs_free(bs); 1770 cb_fn(cb_arg, NULL, -ENOMEM); 1771 return; 1772 } 1773 1774 ctx = calloc(1, sizeof(*ctx)); 1775 if (!ctx) { 1776 _spdk_bs_free(bs); 1777 cb_fn(cb_arg, NULL, -ENOMEM); 1778 return; 1779 } 1780 1781 ctx->bs = bs; 1782 1783 /* Allocate memory for the super block */ 1784 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1785 if (!ctx->super) { 1786 free(ctx); 1787 _spdk_bs_free(bs); 1788 return; 1789 } 1790 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1791 sizeof(ctx->super->signature)); 1792 ctx->super->version = SPDK_BS_VERSION; 1793 ctx->super->length = sizeof(*ctx->super); 1794 ctx->super->super_blob = bs->super_blob; 1795 ctx->super->clean = 0; 1796 ctx->super->cluster_size = bs->cluster_sz; 1797 1798 /* Calculate how many pages the metadata consumes at the front 1799 * of the disk. 1800 */ 1801 1802 /* The super block uses 1 page */ 1803 num_md_pages = 1; 1804 1805 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1806 * up to the nearest page, plus a header. 1807 */ 1808 ctx->super->used_page_mask_start = num_md_pages; 1809 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1810 divide_round_up(bs->md_len, 8), 1811 SPDK_BS_PAGE_SIZE); 1812 num_md_pages += ctx->super->used_page_mask_len; 1813 1814 /* The used_clusters mask requires 1 bit per cluster, rounded 1815 * up to the nearest page, plus a header. 1816 */ 1817 ctx->super->used_cluster_mask_start = num_md_pages; 1818 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1819 divide_round_up(bs->total_clusters, 8), 1820 SPDK_BS_PAGE_SIZE); 1821 num_md_pages += ctx->super->used_cluster_mask_len; 1822 1823 /* The metadata region size was chosen above */ 1824 ctx->super->md_start = bs->md_start = num_md_pages; 1825 ctx->super->md_len = bs->md_len; 1826 num_md_pages += bs->md_len; 1827 1828 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 1829 1830 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 1831 if (num_md_clusters > bs->total_clusters) { 1832 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 1833 "please decrease number of pages reserved for metadata " 1834 "or increase cluster size.\n"); 1835 spdk_dma_free(ctx->super); 1836 free(ctx); 1837 _spdk_bs_free(bs); 1838 cb_fn(cb_arg, NULL, -ENOMEM); 1839 return; 1840 } 1841 /* Claim all of the clusters used by the metadata */ 1842 for (i = 0; i < num_md_clusters; i++) { 1843 _spdk_bs_claim_cluster(bs, i); 1844 } 1845 1846 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1847 cpl.u.bs_handle.cb_fn = cb_fn; 1848 cpl.u.bs_handle.cb_arg = cb_arg; 1849 cpl.u.bs_handle.bs = bs; 1850 1851 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1852 if (!seq) { 1853 spdk_dma_free(ctx->super); 1854 free(ctx); 1855 _spdk_bs_free(bs); 1856 cb_fn(cb_arg, NULL, -ENOMEM); 1857 return; 1858 } 1859 1860 /* TRIM the entire device */ 1861 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1862 } 1863 1864 /* END spdk_bs_init */ 1865 1866 /* START spdk_bs_unload */ 1867 1868 struct spdk_bs_unload_ctx { 1869 struct spdk_blob_store *bs; 1870 struct spdk_bs_super_block *super; 1871 1872 struct spdk_bs_md_mask *mask; 1873 }; 1874 1875 static void 1876 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1877 { 1878 struct spdk_bs_unload_ctx *ctx = cb_arg; 1879 1880 spdk_dma_free(ctx->super); 1881 1882 /* 1883 * We need to defer calling spdk_bs_call_cpl() until after 1884 * dev destuction, so tuck these away for later use. 1885 */ 1886 ctx->bs->unload_err = bserrno; 1887 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 1888 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 1889 1890 spdk_bs_sequence_finish(seq, bserrno); 1891 1892 _spdk_bs_free(ctx->bs); 1893 free(ctx); 1894 } 1895 1896 static void 1897 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1898 { 1899 struct spdk_bs_unload_ctx *ctx = cb_arg; 1900 1901 spdk_dma_free(ctx->mask); 1902 1903 /* Update the values in the super block */ 1904 ctx->super->super_blob = ctx->bs->super_blob; 1905 ctx->super->clean = 1; 1906 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 1907 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1908 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1909 _spdk_bs_unload_write_super_cpl, ctx); 1910 } 1911 1912 static void 1913 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1914 { 1915 struct spdk_bs_unload_ctx *ctx = cb_arg; 1916 uint32_t i; 1917 uint64_t lba, lba_count, mask_size; 1918 1919 spdk_dma_free(ctx->mask); 1920 1921 /* Write out the used clusters mask */ 1922 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1923 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1924 if (!ctx->mask) { 1925 spdk_dma_free(ctx->super); 1926 free(ctx); 1927 spdk_bs_sequence_finish(seq, -ENOMEM); 1928 return; 1929 } 1930 1931 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1932 ctx->mask->length = ctx->bs->total_clusters; 1933 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1934 1935 i = 0; 1936 while (true) { 1937 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1938 if (i > ctx->mask->length) { 1939 break; 1940 } 1941 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1942 i++; 1943 } 1944 1945 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1946 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1947 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1948 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1949 } 1950 1951 static void 1952 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1953 { 1954 struct spdk_bs_unload_ctx *ctx = cb_arg; 1955 uint32_t i; 1956 uint64_t lba, lba_count, mask_size; 1957 1958 /* Write out the used page mask */ 1959 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1960 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1961 if (!ctx->mask) { 1962 spdk_dma_free(ctx->super); 1963 free(ctx); 1964 spdk_bs_sequence_finish(seq, -ENOMEM); 1965 return; 1966 } 1967 1968 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1969 ctx->mask->length = ctx->super->md_len; 1970 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1971 1972 i = 0; 1973 while (true) { 1974 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1975 if (i > ctx->mask->length) { 1976 break; 1977 } 1978 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1979 i++; 1980 } 1981 1982 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1983 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1984 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1985 _spdk_bs_unload_write_used_pages_cpl, ctx); 1986 } 1987 1988 void 1989 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1990 { 1991 struct spdk_bs_cpl cpl; 1992 spdk_bs_sequence_t *seq; 1993 struct spdk_bs_unload_ctx *ctx; 1994 1995 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1996 1997 ctx = calloc(1, sizeof(*ctx)); 1998 if (!ctx) { 1999 cb_fn(cb_arg, -ENOMEM); 2000 return; 2001 } 2002 2003 ctx->bs = bs; 2004 2005 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2006 if (!ctx->super) { 2007 free(ctx); 2008 cb_fn(cb_arg, -ENOMEM); 2009 return; 2010 } 2011 2012 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2013 cpl.u.bs_basic.cb_fn = cb_fn; 2014 cpl.u.bs_basic.cb_arg = cb_arg; 2015 2016 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2017 if (!seq) { 2018 spdk_dma_free(ctx->super); 2019 free(ctx); 2020 cb_fn(cb_arg, -ENOMEM); 2021 return; 2022 } 2023 2024 assert(TAILQ_EMPTY(&bs->blobs)); 2025 2026 /* Read super block */ 2027 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2028 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2029 _spdk_bs_unload_read_super_cpl, ctx); 2030 } 2031 2032 /* END spdk_bs_unload */ 2033 2034 void 2035 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2036 spdk_bs_op_complete cb_fn, void *cb_arg) 2037 { 2038 bs->super_blob = blobid; 2039 cb_fn(cb_arg, 0); 2040 } 2041 2042 void 2043 spdk_bs_get_super(struct spdk_blob_store *bs, 2044 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2045 { 2046 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2047 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2048 } else { 2049 cb_fn(cb_arg, bs->super_blob, 0); 2050 } 2051 } 2052 2053 uint64_t 2054 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2055 { 2056 return bs->cluster_sz; 2057 } 2058 2059 uint64_t 2060 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2061 { 2062 return SPDK_BS_PAGE_SIZE; 2063 } 2064 2065 uint64_t 2066 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2067 { 2068 return bs->num_free_clusters; 2069 } 2070 2071 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2072 { 2073 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 2074 2075 return 0; 2076 } 2077 2078 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2079 { 2080 spdk_put_io_channel(bs->md_target.md_channel); 2081 2082 return 0; 2083 } 2084 2085 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 2086 { 2087 assert(blob != NULL); 2088 2089 return blob->id; 2090 } 2091 2092 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 2093 { 2094 assert(blob != NULL); 2095 2096 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2097 } 2098 2099 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 2100 { 2101 assert(blob != NULL); 2102 2103 return blob->active.num_clusters; 2104 } 2105 2106 /* START spdk_bs_md_create_blob */ 2107 2108 static void 2109 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2110 { 2111 struct spdk_blob *blob = cb_arg; 2112 2113 _spdk_blob_free(blob); 2114 2115 spdk_bs_sequence_finish(seq, bserrno); 2116 } 2117 2118 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2119 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2120 { 2121 struct spdk_blob *blob; 2122 uint32_t page_idx; 2123 struct spdk_bs_cpl cpl; 2124 spdk_bs_sequence_t *seq; 2125 spdk_blob_id id; 2126 2127 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2128 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2129 cb_fn(cb_arg, 0, -ENOMEM); 2130 return; 2131 } 2132 spdk_bit_array_set(bs->used_md_pages, page_idx); 2133 2134 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 2135 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 2136 * code assumes blob id == page_idx. 2137 */ 2138 id = (1ULL << 32) | page_idx; 2139 2140 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2141 2142 blob = _spdk_blob_alloc(bs, id); 2143 if (!blob) { 2144 cb_fn(cb_arg, 0, -ENOMEM); 2145 return; 2146 } 2147 2148 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2149 cpl.u.blobid.cb_fn = cb_fn; 2150 cpl.u.blobid.cb_arg = cb_arg; 2151 cpl.u.blobid.blobid = blob->id; 2152 2153 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2154 if (!seq) { 2155 _spdk_blob_free(blob); 2156 cb_fn(cb_arg, 0, -ENOMEM); 2157 return; 2158 } 2159 2160 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2161 } 2162 2163 /* END spdk_bs_md_create_blob */ 2164 2165 /* START spdk_bs_md_resize_blob */ 2166 int 2167 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2168 { 2169 int rc; 2170 2171 assert(blob != NULL); 2172 2173 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2174 2175 if (sz == blob->active.num_clusters) { 2176 return 0; 2177 } 2178 2179 rc = _spdk_resize_blob(blob, sz); 2180 if (rc < 0) { 2181 return rc; 2182 } 2183 2184 return 0; 2185 } 2186 2187 /* END spdk_bs_md_resize_blob */ 2188 2189 2190 /* START spdk_bs_md_delete_blob */ 2191 2192 static void 2193 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2194 { 2195 struct spdk_blob *blob = cb_arg; 2196 2197 _spdk_blob_free(blob); 2198 2199 spdk_bs_sequence_finish(seq, bserrno); 2200 } 2201 2202 static void 2203 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2204 { 2205 struct spdk_blob *blob = cb_arg; 2206 2207 /* If the blob have crc error, we just return NULL. */ 2208 if (blob == NULL) { 2209 spdk_bs_sequence_finish(seq, bserrno); 2210 return; 2211 } 2212 blob->state = SPDK_BLOB_STATE_DIRTY; 2213 blob->active.num_pages = 0; 2214 _spdk_resize_blob(blob, 0); 2215 2216 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2217 } 2218 2219 void 2220 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2221 spdk_blob_op_complete cb_fn, void *cb_arg) 2222 { 2223 struct spdk_blob *blob; 2224 struct spdk_bs_cpl cpl; 2225 spdk_bs_sequence_t *seq; 2226 2227 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 2228 2229 blob = _spdk_blob_lookup(bs, blobid); 2230 if (blob) { 2231 assert(blob->open_ref > 0); 2232 cb_fn(cb_arg, -EINVAL); 2233 return; 2234 } 2235 2236 blob = _spdk_blob_alloc(bs, blobid); 2237 if (!blob) { 2238 cb_fn(cb_arg, -ENOMEM); 2239 return; 2240 } 2241 2242 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2243 cpl.u.blob_basic.cb_fn = cb_fn; 2244 cpl.u.blob_basic.cb_arg = cb_arg; 2245 2246 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2247 if (!seq) { 2248 _spdk_blob_free(blob); 2249 cb_fn(cb_arg, -ENOMEM); 2250 return; 2251 } 2252 2253 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2254 } 2255 2256 /* END spdk_bs_md_delete_blob */ 2257 2258 /* START spdk_bs_md_open_blob */ 2259 2260 static void 2261 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2262 { 2263 struct spdk_blob *blob = cb_arg; 2264 2265 /* If the blob have crc error, we just return NULL. */ 2266 if (blob == NULL) { 2267 seq->cpl.u.blob_handle.blob = NULL; 2268 spdk_bs_sequence_finish(seq, bserrno); 2269 return; 2270 } 2271 2272 blob->open_ref++; 2273 2274 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2275 2276 spdk_bs_sequence_finish(seq, bserrno); 2277 } 2278 2279 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2280 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2281 { 2282 struct spdk_blob *blob; 2283 struct spdk_bs_cpl cpl; 2284 spdk_bs_sequence_t *seq; 2285 uint32_t page_num; 2286 2287 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 2288 2289 blob = _spdk_blob_lookup(bs, blobid); 2290 if (blob) { 2291 blob->open_ref++; 2292 cb_fn(cb_arg, blob, 0); 2293 return; 2294 } 2295 2296 page_num = _spdk_bs_blobid_to_page(blobid); 2297 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2298 /* Invalid blobid */ 2299 cb_fn(cb_arg, NULL, -ENOENT); 2300 return; 2301 } 2302 2303 blob = _spdk_blob_alloc(bs, blobid); 2304 if (!blob) { 2305 cb_fn(cb_arg, NULL, -ENOMEM); 2306 return; 2307 } 2308 2309 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2310 cpl.u.blob_handle.cb_fn = cb_fn; 2311 cpl.u.blob_handle.cb_arg = cb_arg; 2312 cpl.u.blob_handle.blob = blob; 2313 2314 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2315 if (!seq) { 2316 _spdk_blob_free(blob); 2317 cb_fn(cb_arg, NULL, -ENOMEM); 2318 return; 2319 } 2320 2321 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2322 } 2323 2324 /* START spdk_bs_md_sync_blob */ 2325 static void 2326 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2327 { 2328 spdk_bs_sequence_finish(seq, bserrno); 2329 } 2330 2331 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2332 spdk_blob_op_complete cb_fn, void *cb_arg) 2333 { 2334 struct spdk_bs_cpl cpl; 2335 spdk_bs_sequence_t *seq; 2336 2337 assert(blob != NULL); 2338 2339 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2340 2341 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2342 blob->state != SPDK_BLOB_STATE_SYNCING); 2343 2344 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2345 cb_fn(cb_arg, 0); 2346 return; 2347 } 2348 2349 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2350 cpl.u.blob_basic.cb_fn = cb_fn; 2351 cpl.u.blob_basic.cb_arg = cb_arg; 2352 2353 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2354 if (!seq) { 2355 cb_fn(cb_arg, -ENOMEM); 2356 return; 2357 } 2358 2359 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2360 } 2361 2362 /* END spdk_bs_md_sync_blob */ 2363 2364 /* START spdk_bs_md_close_blob */ 2365 2366 static void 2367 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2368 { 2369 struct spdk_blob **blob = cb_arg; 2370 2371 if ((*blob)->open_ref == 0) { 2372 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2373 _spdk_blob_free((*blob)); 2374 } 2375 2376 *blob = NULL; 2377 2378 spdk_bs_sequence_finish(seq, bserrno); 2379 } 2380 2381 void spdk_bs_md_close_blob(struct spdk_blob **b, 2382 spdk_blob_op_complete cb_fn, void *cb_arg) 2383 { 2384 struct spdk_bs_cpl cpl; 2385 struct spdk_blob *blob; 2386 spdk_bs_sequence_t *seq; 2387 2388 assert(b != NULL); 2389 blob = *b; 2390 assert(blob != NULL); 2391 2392 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2393 2394 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2395 blob->state != SPDK_BLOB_STATE_SYNCING); 2396 2397 if (blob->open_ref == 0) { 2398 cb_fn(cb_arg, -EBADF); 2399 return; 2400 } 2401 2402 blob->open_ref--; 2403 2404 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2405 cpl.u.blob_basic.cb_fn = cb_fn; 2406 cpl.u.blob_basic.cb_arg = cb_arg; 2407 2408 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2409 if (!seq) { 2410 cb_fn(cb_arg, -ENOMEM); 2411 return; 2412 } 2413 2414 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2415 _spdk_blob_close_cpl(seq, b, 0); 2416 return; 2417 } 2418 2419 /* Sync metadata */ 2420 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2421 } 2422 2423 /* END spdk_bs_md_close_blob */ 2424 2425 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2426 { 2427 return spdk_get_io_channel(&bs->io_target); 2428 } 2429 2430 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2431 { 2432 spdk_put_io_channel(channel); 2433 } 2434 2435 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2436 spdk_blob_op_complete cb_fn, void *cb_arg) 2437 { 2438 /* Flush is synchronous right now */ 2439 cb_fn(cb_arg, 0); 2440 } 2441 2442 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2443 void *payload, uint64_t offset, uint64_t length, 2444 spdk_blob_op_complete cb_fn, void *cb_arg) 2445 { 2446 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2447 } 2448 2449 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2450 void *payload, uint64_t offset, uint64_t length, 2451 spdk_blob_op_complete cb_fn, void *cb_arg) 2452 { 2453 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2454 } 2455 2456 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2457 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2458 spdk_blob_op_complete cb_fn, void *cb_arg) 2459 { 2460 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2461 } 2462 2463 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2464 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2465 spdk_blob_op_complete cb_fn, void *cb_arg) 2466 { 2467 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2468 } 2469 2470 struct spdk_bs_iter_ctx { 2471 int64_t page_num; 2472 struct spdk_blob_store *bs; 2473 2474 spdk_blob_op_with_handle_complete cb_fn; 2475 void *cb_arg; 2476 }; 2477 2478 static void 2479 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2480 { 2481 struct spdk_bs_iter_ctx *ctx = cb_arg; 2482 struct spdk_blob_store *bs = ctx->bs; 2483 spdk_blob_id id; 2484 2485 if (bserrno == 0) { 2486 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2487 free(ctx); 2488 return; 2489 } 2490 2491 ctx->page_num++; 2492 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2493 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2494 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2495 free(ctx); 2496 return; 2497 } 2498 2499 id = (1ULL << 32) | ctx->page_num; 2500 2501 blob = _spdk_blob_lookup(bs, id); 2502 if (blob) { 2503 blob->open_ref++; 2504 ctx->cb_fn(ctx->cb_arg, blob, 0); 2505 free(ctx); 2506 return; 2507 } 2508 2509 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2510 } 2511 2512 void 2513 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2514 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2515 { 2516 struct spdk_bs_iter_ctx *ctx; 2517 2518 ctx = calloc(1, sizeof(*ctx)); 2519 if (!ctx) { 2520 cb_fn(cb_arg, NULL, -ENOMEM); 2521 return; 2522 } 2523 2524 ctx->page_num = -1; 2525 ctx->bs = bs; 2526 ctx->cb_fn = cb_fn; 2527 ctx->cb_arg = cb_arg; 2528 2529 _spdk_bs_iter_cpl(ctx, NULL, -1); 2530 } 2531 2532 static void 2533 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2534 { 2535 struct spdk_bs_iter_ctx *ctx = cb_arg; 2536 2537 _spdk_bs_iter_cpl(ctx, NULL, -1); 2538 } 2539 2540 void 2541 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2542 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2543 { 2544 struct spdk_bs_iter_ctx *ctx; 2545 struct spdk_blob *blob; 2546 2547 assert(b != NULL); 2548 blob = *b; 2549 assert(blob != NULL); 2550 2551 ctx = calloc(1, sizeof(*ctx)); 2552 if (!ctx) { 2553 cb_fn(cb_arg, NULL, -ENOMEM); 2554 return; 2555 } 2556 2557 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2558 ctx->bs = bs; 2559 ctx->cb_fn = cb_fn; 2560 ctx->cb_arg = cb_arg; 2561 2562 /* Close the existing blob */ 2563 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2564 } 2565 2566 int 2567 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2568 uint16_t value_len) 2569 { 2570 struct spdk_xattr *xattr; 2571 2572 assert(blob != NULL); 2573 2574 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2575 blob->state != SPDK_BLOB_STATE_SYNCING); 2576 2577 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2578 if (!strcmp(name, xattr->name)) { 2579 free(xattr->value); 2580 xattr->value_len = value_len; 2581 xattr->value = malloc(value_len); 2582 memcpy(xattr->value, value, value_len); 2583 2584 blob->state = SPDK_BLOB_STATE_DIRTY; 2585 2586 return 0; 2587 } 2588 } 2589 2590 xattr = calloc(1, sizeof(*xattr)); 2591 if (!xattr) { 2592 return -1; 2593 } 2594 xattr->name = strdup(name); 2595 xattr->value_len = value_len; 2596 xattr->value = malloc(value_len); 2597 memcpy(xattr->value, value, value_len); 2598 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2599 2600 blob->state = SPDK_BLOB_STATE_DIRTY; 2601 2602 return 0; 2603 } 2604 2605 int 2606 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2607 { 2608 struct spdk_xattr *xattr; 2609 2610 assert(blob != NULL); 2611 2612 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2613 blob->state != SPDK_BLOB_STATE_SYNCING); 2614 2615 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2616 if (!strcmp(name, xattr->name)) { 2617 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2618 free(xattr->value); 2619 free(xattr->name); 2620 free(xattr); 2621 2622 blob->state = SPDK_BLOB_STATE_DIRTY; 2623 2624 return 0; 2625 } 2626 } 2627 2628 return -ENOENT; 2629 } 2630 2631 int 2632 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2633 const void **value, size_t *value_len) 2634 { 2635 struct spdk_xattr *xattr; 2636 2637 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2638 if (!strcmp(name, xattr->name)) { 2639 *value = xattr->value; 2640 *value_len = xattr->value_len; 2641 return 0; 2642 } 2643 } 2644 2645 return -ENOENT; 2646 } 2647 2648 struct spdk_xattr_names { 2649 uint32_t count; 2650 const char *names[0]; 2651 }; 2652 2653 int 2654 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2655 struct spdk_xattr_names **names) 2656 { 2657 struct spdk_xattr *xattr; 2658 int count = 0; 2659 2660 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2661 count++; 2662 } 2663 2664 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2665 if (*names == NULL) { 2666 return -ENOMEM; 2667 } 2668 2669 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2670 (*names)->names[(*names)->count++] = xattr->name; 2671 } 2672 2673 return 0; 2674 } 2675 2676 uint32_t 2677 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2678 { 2679 assert(names != NULL); 2680 2681 return names->count; 2682 } 2683 2684 const char * 2685 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2686 { 2687 if (index >= names->count) { 2688 return NULL; 2689 } 2690 2691 return names->names[index]; 2692 } 2693 2694 void 2695 spdk_xattr_names_free(struct spdk_xattr_names *names) 2696 { 2697 free(names); 2698 } 2699 2700 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2701