1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/env.h" 38 #include "spdk/queue.h" 39 #include "spdk/io_channel.h" 40 #include "spdk/bit_array.h" 41 42 #include "spdk_internal/log.h" 43 44 #include "blobstore.h" 45 #include "request.h" 46 47 static inline size_t 48 divide_round_up(size_t num, size_t divisor) 49 { 50 return (num + divisor - 1) / divisor; 51 } 52 53 static void 54 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 55 { 56 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 57 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 58 assert(bs->num_free_clusters > 0); 59 60 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 61 62 spdk_bit_array_set(bs->used_clusters, cluster_num); 63 bs->num_free_clusters--; 64 } 65 66 static void 67 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 68 { 69 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 70 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 71 assert(bs->num_free_clusters < bs->total_clusters); 72 73 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 74 75 spdk_bit_array_clear(bs->used_clusters, cluster_num); 76 bs->num_free_clusters++; 77 } 78 79 static struct spdk_blob * 80 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 81 { 82 struct spdk_blob *blob; 83 84 blob = calloc(1, sizeof(*blob)); 85 if (!blob) { 86 return NULL; 87 } 88 89 blob->id = id; 90 blob->bs = bs; 91 92 blob->state = SPDK_BLOB_STATE_DIRTY; 93 blob->active.num_pages = 1; 94 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 95 if (!blob->active.pages) { 96 free(blob); 97 return NULL; 98 } 99 100 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 101 102 TAILQ_INIT(&blob->xattrs); 103 104 return blob; 105 } 106 107 static void 108 _spdk_blob_free(struct spdk_blob *blob) 109 { 110 struct spdk_xattr *xattr, *xattr_tmp; 111 112 assert(blob != NULL); 113 114 free(blob->active.clusters); 115 free(blob->clean.clusters); 116 free(blob->active.pages); 117 free(blob->clean.pages); 118 119 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 120 TAILQ_REMOVE(&blob->xattrs, xattr, link); 121 free(xattr->name); 122 free(xattr->value); 123 free(xattr); 124 } 125 126 free(blob); 127 } 128 129 static int 130 _spdk_blob_mark_clean(struct spdk_blob *blob) 131 { 132 uint64_t *clusters = NULL; 133 uint32_t *pages = NULL; 134 135 assert(blob != NULL); 136 assert(blob->state == SPDK_BLOB_STATE_LOADING || 137 blob->state == SPDK_BLOB_STATE_SYNCING); 138 139 if (blob->active.num_clusters) { 140 assert(blob->active.clusters); 141 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 142 if (!clusters) { 143 return -1; 144 } 145 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 146 } 147 148 if (blob->active.num_pages) { 149 assert(blob->active.pages); 150 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 151 if (!pages) { 152 free(clusters); 153 return -1; 154 } 155 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 156 } 157 158 free(blob->clean.clusters); 159 free(blob->clean.pages); 160 161 blob->clean.num_clusters = blob->active.num_clusters; 162 blob->clean.clusters = blob->active.clusters; 163 blob->clean.num_pages = blob->active.num_pages; 164 blob->clean.pages = blob->active.pages; 165 166 blob->active.clusters = clusters; 167 blob->active.pages = pages; 168 169 blob->state = SPDK_BLOB_STATE_CLEAN; 170 171 return 0; 172 } 173 174 static void 175 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 176 { 177 struct spdk_blob_md_descriptor *desc; 178 size_t cur_desc = 0; 179 void *tmp; 180 181 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 182 while (cur_desc < sizeof(page->descriptors)) { 183 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 184 if (desc->length == 0) { 185 /* If padding and length are 0, this terminates the page */ 186 break; 187 } 188 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 189 struct spdk_blob_md_descriptor_extent *desc_extent; 190 unsigned int i, j; 191 unsigned int cluster_count = blob->active.num_clusters; 192 193 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 194 195 assert(desc_extent->length > 0); 196 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 197 198 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 199 for (j = 0; j < desc_extent->extents[i].length; j++) { 200 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 201 cluster_count++; 202 } 203 } 204 205 assert(cluster_count > 0); 206 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 207 assert(tmp != NULL); 208 blob->active.clusters = tmp; 209 blob->active.cluster_array_size = cluster_count; 210 211 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 212 for (j = 0; j < desc_extent->extents[i].length; j++) { 213 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 214 desc_extent->extents[i].cluster_idx + j); 215 } 216 } 217 218 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 219 struct spdk_blob_md_descriptor_xattr *desc_xattr; 220 struct spdk_xattr *xattr; 221 222 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 223 224 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 225 sizeof(desc_xattr->value_length) + 226 desc_xattr->name_length + desc_xattr->value_length); 227 228 xattr = calloc(1, sizeof(*xattr)); 229 assert(xattr != NULL); 230 231 xattr->name = malloc(desc_xattr->name_length + 1); 232 assert(xattr->name); 233 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 234 xattr->name[desc_xattr->name_length] = '\0'; 235 236 xattr->value = malloc(desc_xattr->value_length); 237 assert(xattr->value != NULL); 238 xattr->value_len = desc_xattr->value_length; 239 memcpy(xattr->value, 240 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 241 desc_xattr->value_length); 242 243 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 244 } else { 245 /* Error */ 246 break; 247 } 248 249 /* Advance to the next descriptor */ 250 cur_desc += sizeof(*desc) + desc->length; 251 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 252 break; 253 } 254 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 255 } 256 } 257 258 static int 259 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 260 struct spdk_blob *blob) 261 { 262 const struct spdk_blob_md_page *page; 263 uint32_t i; 264 265 assert(page_count > 0); 266 assert(pages[0].sequence_num == 0); 267 assert(blob != NULL); 268 assert(blob->state == SPDK_BLOB_STATE_LOADING); 269 assert(blob->active.clusters == NULL); 270 assert(blob->id == pages[0].id); 271 assert(blob->state == SPDK_BLOB_STATE_LOADING); 272 273 for (i = 0; i < page_count; i++) { 274 page = &pages[i]; 275 276 assert(page->id == blob->id); 277 assert(page->sequence_num == i); 278 279 _spdk_blob_parse_page(page, blob); 280 } 281 282 return 0; 283 } 284 285 static int 286 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 287 struct spdk_blob_md_page **pages, 288 uint32_t *page_count, 289 struct spdk_blob_md_page **last_page) 290 { 291 struct spdk_blob_md_page *page; 292 293 assert(pages != NULL); 294 assert(page_count != NULL); 295 296 if (*page_count == 0) { 297 assert(*pages == NULL); 298 *page_count = 1; 299 *pages = spdk_dma_malloc(sizeof(struct spdk_blob_md_page), 300 sizeof(struct spdk_blob_md_page), 301 NULL); 302 } else { 303 assert(*pages != NULL); 304 (*page_count)++; 305 *pages = spdk_dma_realloc(*pages, 306 sizeof(struct spdk_blob_md_page) * (*page_count), 307 sizeof(struct spdk_blob_md_page), 308 NULL); 309 } 310 311 if (*pages == NULL) { 312 *page_count = 0; 313 *last_page = NULL; 314 return -ENOMEM; 315 } 316 317 page = &(*pages)[*page_count - 1]; 318 memset(page, 0, sizeof(*page)); 319 page->id = blob->id; 320 page->sequence_num = *page_count - 1; 321 page->next = SPDK_INVALID_MD_PAGE; 322 *last_page = page; 323 324 return 0; 325 } 326 327 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 328 * Update required_sz on both success and failure. 329 * 330 */ 331 static int 332 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 333 uint8_t *buf, size_t buf_sz, 334 size_t *required_sz) 335 { 336 struct spdk_blob_md_descriptor_xattr *desc; 337 338 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 339 strlen(xattr->name) + 340 xattr->value_len; 341 342 if (buf_sz < *required_sz) { 343 return -1; 344 } 345 346 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 347 348 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 349 desc->length = sizeof(desc->name_length) + 350 sizeof(desc->value_length) + 351 strlen(xattr->name) + 352 xattr->value_len; 353 desc->name_length = strlen(xattr->name); 354 desc->value_length = xattr->value_len; 355 356 memcpy(desc->name, xattr->name, desc->name_length); 357 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 358 xattr->value, 359 desc->value_length); 360 361 return 0; 362 } 363 364 static void 365 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 366 uint64_t start_cluster, uint64_t *next_cluster, 367 uint8_t *buf, size_t buf_sz) 368 { 369 struct spdk_blob_md_descriptor_extent *desc; 370 size_t cur_sz; 371 uint64_t i, extent_idx; 372 uint32_t lba, lba_per_cluster, lba_count; 373 374 /* The buffer must have room for at least one extent */ 375 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 376 if (buf_sz < cur_sz) { 377 *next_cluster = start_cluster; 378 return; 379 } 380 381 desc = (struct spdk_blob_md_descriptor_extent *)buf; 382 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 383 384 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 385 386 lba = blob->active.clusters[start_cluster]; 387 lba_count = lba_per_cluster; 388 extent_idx = 0; 389 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 390 if ((lba + lba_count) == blob->active.clusters[i]) { 391 lba_count += lba_per_cluster; 392 continue; 393 } 394 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 395 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 396 extent_idx++; 397 398 cur_sz += sizeof(desc->extents[extent_idx]); 399 400 if (buf_sz < cur_sz) { 401 /* If we ran out of buffer space, return */ 402 desc->length = sizeof(desc->extents[0]) * extent_idx; 403 *next_cluster = i; 404 return; 405 } 406 407 lba = blob->active.clusters[i]; 408 lba_count = lba_per_cluster; 409 } 410 411 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 412 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 413 extent_idx++; 414 415 desc->length = sizeof(desc->extents[0]) * extent_idx; 416 *next_cluster = blob->active.num_clusters; 417 418 return; 419 } 420 421 static int 422 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 423 uint32_t *page_count) 424 { 425 struct spdk_blob_md_page *cur_page; 426 const struct spdk_xattr *xattr; 427 int rc; 428 uint8_t *buf; 429 size_t remaining_sz; 430 431 assert(pages != NULL); 432 assert(page_count != NULL); 433 assert(blob != NULL); 434 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 435 436 *pages = NULL; 437 *page_count = 0; 438 439 /* A blob always has at least 1 page, even if it has no descriptors */ 440 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 441 if (rc < 0) { 442 return rc; 443 } 444 445 buf = (uint8_t *)cur_page->descriptors; 446 remaining_sz = sizeof(cur_page->descriptors); 447 448 /* Serialize xattrs */ 449 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 450 size_t required_sz = 0; 451 rc = _spdk_blob_serialize_xattr(xattr, 452 buf, remaining_sz, 453 &required_sz); 454 if (rc < 0) { 455 /* Need to add a new page to the chain */ 456 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 457 &cur_page); 458 if (rc < 0) { 459 spdk_dma_free(*pages); 460 *pages = NULL; 461 *page_count = 0; 462 return rc; 463 } 464 465 buf = (uint8_t *)cur_page->descriptors; 466 remaining_sz = sizeof(cur_page->descriptors); 467 468 /* Try again */ 469 required_sz = 0; 470 rc = _spdk_blob_serialize_xattr(xattr, 471 buf, remaining_sz, 472 &required_sz); 473 474 if (rc < 0) { 475 spdk_dma_free(*pages); 476 *pages = NULL; 477 *page_count = 0; 478 return -1; 479 } 480 } 481 482 remaining_sz -= required_sz; 483 buf += required_sz; 484 } 485 486 /* Serialize extents */ 487 uint64_t last_cluster = 0; 488 while (last_cluster < blob->active.num_clusters) { 489 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 490 buf, remaining_sz); 491 492 if (last_cluster == blob->active.num_clusters) { 493 break; 494 } 495 496 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 497 &cur_page); 498 if (rc < 0) { 499 return rc; 500 } 501 502 buf = (uint8_t *)cur_page->descriptors; 503 remaining_sz = sizeof(cur_page->descriptors); 504 } 505 506 return 0; 507 } 508 509 struct spdk_blob_load_ctx { 510 struct spdk_blob *blob; 511 512 struct spdk_blob_md_page *pages; 513 uint32_t num_pages; 514 515 spdk_bs_sequence_cpl cb_fn; 516 void *cb_arg; 517 }; 518 519 static void 520 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 521 { 522 struct spdk_blob_load_ctx *ctx = cb_arg; 523 struct spdk_blob *blob = ctx->blob; 524 struct spdk_blob_md_page *page; 525 int rc; 526 527 page = &ctx->pages[ctx->num_pages - 1]; 528 529 if (page->next != SPDK_INVALID_MD_PAGE) { 530 uint32_t next_page = page->next; 531 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 532 533 534 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 535 536 /* Read the next page */ 537 ctx->num_pages++; 538 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 539 sizeof(*page), NULL); 540 if (ctx->pages == NULL) { 541 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 542 free(ctx); 543 return; 544 } 545 546 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 547 next_lba, 548 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 549 _spdk_blob_load_cpl, ctx); 550 return; 551 } 552 553 /* Parse the pages */ 554 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 555 556 _spdk_blob_mark_clean(blob); 557 558 ctx->cb_fn(seq, ctx->cb_arg, rc); 559 560 /* Free the memory */ 561 spdk_dma_free(ctx->pages); 562 free(ctx); 563 } 564 565 /* Load a blob from disk given a blobid */ 566 static void 567 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 568 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 569 { 570 struct spdk_blob_load_ctx *ctx; 571 struct spdk_blob_store *bs; 572 uint32_t page_num; 573 uint64_t lba; 574 575 assert(blob != NULL); 576 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 577 blob->state == SPDK_BLOB_STATE_DIRTY); 578 579 bs = blob->bs; 580 581 ctx = calloc(1, sizeof(*ctx)); 582 if (!ctx) { 583 cb_fn(seq, cb_arg, -ENOMEM); 584 return; 585 } 586 587 ctx->blob = blob; 588 ctx->pages = spdk_dma_realloc(ctx->pages, sizeof(struct spdk_blob_md_page), 589 sizeof(struct spdk_blob_md_page), NULL); 590 if (!ctx->pages) { 591 free(ctx); 592 cb_fn(seq, cb_arg, -ENOMEM); 593 return; 594 } 595 ctx->num_pages = 1; 596 ctx->cb_fn = cb_fn; 597 ctx->cb_arg = cb_arg; 598 599 page_num = _spdk_bs_blobid_to_page(blob->id); 600 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 601 602 blob->state = SPDK_BLOB_STATE_LOADING; 603 604 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 605 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)), 606 _spdk_blob_load_cpl, ctx); 607 } 608 609 struct spdk_blob_persist_ctx { 610 struct spdk_blob *blob; 611 612 struct spdk_blob_md_page *pages; 613 614 uint64_t idx; 615 616 spdk_bs_sequence_cpl cb_fn; 617 void *cb_arg; 618 }; 619 620 static void 621 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 622 { 623 struct spdk_blob_persist_ctx *ctx = cb_arg; 624 struct spdk_blob *blob = ctx->blob; 625 626 if (bserrno == 0) { 627 _spdk_blob_mark_clean(blob); 628 } 629 630 /* Call user callback */ 631 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 632 633 /* Free the memory */ 634 spdk_dma_free(ctx->pages); 635 free(ctx); 636 } 637 638 static void 639 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 640 { 641 struct spdk_blob_persist_ctx *ctx = cb_arg; 642 struct spdk_blob *blob = ctx->blob; 643 struct spdk_blob_store *bs = blob->bs; 644 void *tmp; 645 size_t i; 646 647 /* Release all clusters that were truncated */ 648 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 649 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 650 651 _spdk_bs_release_cluster(bs, cluster_num); 652 } 653 654 if (blob->active.num_clusters == 0) { 655 free(blob->active.clusters); 656 blob->active.clusters = NULL; 657 blob->active.cluster_array_size = 0; 658 } else { 659 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 660 assert(tmp != NULL); 661 blob->active.clusters = tmp; 662 blob->active.cluster_array_size = blob->active.num_clusters; 663 } 664 665 _spdk_blob_persist_complete(seq, ctx, bserrno); 666 } 667 668 static void 669 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 670 { 671 struct spdk_blob_persist_ctx *ctx = cb_arg; 672 struct spdk_blob *blob = ctx->blob; 673 struct spdk_blob_store *bs = blob->bs; 674 spdk_bs_batch_t *batch; 675 size_t i; 676 uint64_t lba; 677 uint32_t lba_count; 678 679 /* Clusters don't move around in blobs. The list shrinks or grows 680 * at the end, but no changes ever occur in the middle of the list. 681 */ 682 683 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 684 685 /* Unmap all clusters that were truncated */ 686 lba = 0; 687 lba_count = 0; 688 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 689 uint64_t next_lba = blob->active.clusters[i]; 690 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 691 692 if ((lba + lba_count) == next_lba) { 693 /* This cluster is contiguous with the previous one. */ 694 lba_count += next_lba_count; 695 continue; 696 } 697 698 /* This cluster is not contiguous with the previous one. */ 699 700 /* If a run of LBAs previously existing, send them 701 * as an unmap. 702 */ 703 if (lba_count > 0) { 704 spdk_bs_batch_unmap(batch, lba, lba_count); 705 } 706 707 /* Start building the next batch */ 708 lba = next_lba; 709 lba_count = next_lba_count; 710 } 711 712 /* If we ended with a contiguous set of LBAs, send the unmap now */ 713 if (lba_count > 0) { 714 spdk_bs_batch_unmap(batch, lba, lba_count); 715 } 716 717 spdk_bs_batch_close(batch); 718 } 719 720 static void 721 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 722 { 723 struct spdk_blob_persist_ctx *ctx = cb_arg; 724 struct spdk_blob *blob = ctx->blob; 725 struct spdk_blob_store *bs = blob->bs; 726 size_t i; 727 728 /* This loop starts at 1 because the first page is special and handled 729 * below. The pages (except the first) are never written in place, 730 * so any pages in the clean list must be unmapped. 731 */ 732 for (i = 1; i < blob->clean.num_pages; i++) { 733 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 734 } 735 736 if (blob->active.num_pages == 0) { 737 uint32_t page_num; 738 739 page_num = _spdk_bs_blobid_to_page(blob->id); 740 spdk_bit_array_clear(bs->used_md_pages, page_num); 741 } 742 743 /* Move on to unmapping clusters */ 744 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 745 } 746 747 static void 748 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 749 { 750 struct spdk_blob_persist_ctx *ctx = cb_arg; 751 struct spdk_blob *blob = ctx->blob; 752 struct spdk_blob_store *bs = blob->bs; 753 uint64_t lba; 754 uint32_t lba_count; 755 spdk_bs_batch_t *batch; 756 size_t i; 757 758 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 759 760 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)); 761 762 /* This loop starts at 1 because the first page is special and handled 763 * below. The pages (except the first) are never written in place, 764 * so any pages in the clean list must be unmapped. 765 */ 766 for (i = 1; i < blob->clean.num_pages; i++) { 767 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 768 769 spdk_bs_batch_unmap(batch, lba, lba_count); 770 } 771 772 /* The first page will only be unmapped if this is a delete. */ 773 if (blob->active.num_pages == 0) { 774 uint32_t page_num; 775 776 /* The first page in the metadata goes where the blobid indicates */ 777 page_num = _spdk_bs_blobid_to_page(blob->id); 778 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 779 780 spdk_bs_batch_unmap(batch, lba, lba_count); 781 } 782 783 spdk_bs_batch_close(batch); 784 } 785 786 static void 787 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 788 { 789 struct spdk_blob_persist_ctx *ctx = cb_arg; 790 struct spdk_blob *blob = ctx->blob; 791 struct spdk_blob_store *bs = blob->bs; 792 uint64_t lba; 793 uint32_t lba_count; 794 struct spdk_blob_md_page *page; 795 796 if (blob->active.num_pages == 0) { 797 /* Move on to the next step */ 798 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 799 return; 800 } 801 802 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 803 804 page = &ctx->pages[0]; 805 /* The first page in the metadata goes where the blobid indicates */ 806 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 807 808 spdk_bs_sequence_write(seq, page, lba, lba_count, 809 _spdk_blob_persist_unmap_pages, ctx); 810 } 811 812 static void 813 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 814 { 815 struct spdk_blob_persist_ctx *ctx = cb_arg; 816 struct spdk_blob *blob = ctx->blob; 817 struct spdk_blob_store *bs = blob->bs; 818 uint64_t lba; 819 uint32_t lba_count; 820 struct spdk_blob_md_page *page; 821 spdk_bs_batch_t *batch; 822 size_t i; 823 824 /* Clusters don't move around in blobs. The list shrinks or grows 825 * at the end, but no changes ever occur in the middle of the list. 826 */ 827 828 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 829 830 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 831 832 /* This starts at 1. The root page is not written until 833 * all of the others are finished 834 */ 835 for (i = 1; i < blob->active.num_pages; i++) { 836 page = &ctx->pages[i]; 837 assert(page->sequence_num == i); 838 839 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 840 841 spdk_bs_batch_write(batch, page, lba, lba_count); 842 } 843 844 spdk_bs_batch_close(batch); 845 } 846 847 static int 848 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 849 { 850 uint64_t i; 851 uint64_t *tmp; 852 uint64_t lfc; /* lowest free cluster */ 853 struct spdk_blob_store *bs; 854 855 bs = blob->bs; 856 857 assert(blob->state != SPDK_BLOB_STATE_LOADING && 858 blob->state != SPDK_BLOB_STATE_SYNCING); 859 860 if (blob->active.num_clusters == sz) { 861 return 0; 862 } 863 864 if (blob->active.num_clusters < blob->active.cluster_array_size) { 865 /* If this blob was resized to be larger, then smaller, then 866 * larger without syncing, then the cluster array already 867 * contains spare assigned clusters we can use. 868 */ 869 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 870 sz); 871 } 872 873 blob->state = SPDK_BLOB_STATE_DIRTY; 874 875 /* Do two passes - one to verify that we can obtain enough clusters 876 * and another to actually claim them. 877 */ 878 879 lfc = 0; 880 for (i = blob->active.num_clusters; i < sz; i++) { 881 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 882 if (lfc >= bs->total_clusters) { 883 /* No more free clusters. Cannot satisfy the request */ 884 assert(false); 885 return -1; 886 } 887 lfc++; 888 } 889 890 if (sz > blob->active.num_clusters) { 891 /* Expand the cluster array if necessary. 892 * We only shrink the array when persisting. 893 */ 894 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 895 if (sz > 0 && tmp == NULL) { 896 assert(false); 897 return -1; 898 } 899 blob->active.clusters = tmp; 900 blob->active.cluster_array_size = sz; 901 } 902 903 lfc = 0; 904 for (i = blob->active.num_clusters; i < sz; i++) { 905 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 906 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 907 _spdk_bs_claim_cluster(bs, lfc); 908 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 909 lfc++; 910 } 911 912 blob->active.num_clusters = sz; 913 914 return 0; 915 } 916 917 /* Write a blob to disk */ 918 static void 919 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 920 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 921 { 922 struct spdk_blob_persist_ctx *ctx; 923 int rc; 924 uint64_t i; 925 uint32_t page_num; 926 struct spdk_blob_store *bs; 927 928 assert(blob != NULL); 929 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 930 blob->state == SPDK_BLOB_STATE_DIRTY); 931 932 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 933 cb_fn(seq, cb_arg, 0); 934 return; 935 } 936 937 bs = blob->bs; 938 939 ctx = calloc(1, sizeof(*ctx)); 940 if (!ctx) { 941 cb_fn(seq, cb_arg, -ENOMEM); 942 return; 943 } 944 ctx->blob = blob; 945 ctx->cb_fn = cb_fn; 946 ctx->cb_arg = cb_arg; 947 948 blob->state = SPDK_BLOB_STATE_SYNCING; 949 950 if (blob->active.num_pages == 0) { 951 /* This is the signal that the blob should be deleted. 952 * Immediately jump to the clean up routine. */ 953 assert(blob->clean.num_pages > 0); 954 ctx->idx = blob->clean.num_pages - 1; 955 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 956 return; 957 958 } 959 960 /* Generate the new metadata */ 961 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 962 if (rc < 0) { 963 free(ctx); 964 cb_fn(seq, cb_arg, rc); 965 return; 966 } 967 968 assert(blob->active.num_pages >= 1); 969 970 /* Resize the cache of page indices */ 971 blob->active.pages = realloc(blob->active.pages, 972 blob->active.num_pages * sizeof(*blob->active.pages)); 973 if (!blob->active.pages) { 974 free(ctx); 975 cb_fn(seq, cb_arg, -ENOMEM); 976 return; 977 } 978 979 /* Assign this metadata to pages. This requires two passes - 980 * one to verify that there are enough pages and a second 981 * to actually claim them. */ 982 page_num = 0; 983 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 984 for (i = 1; i < blob->active.num_pages; i++) { 985 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 986 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 987 spdk_dma_free(ctx->pages); 988 free(ctx); 989 blob->state = SPDK_BLOB_STATE_DIRTY; 990 cb_fn(seq, cb_arg, -ENOMEM); 991 return; 992 } 993 page_num++; 994 } 995 996 page_num = 0; 997 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 998 for (i = 1; i < blob->active.num_pages; i++) { 999 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1000 ctx->pages[i - 1].next = page_num; 1001 blob->active.pages[i] = page_num; 1002 spdk_bit_array_set(bs->used_md_pages, page_num); 1003 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1004 page_num++; 1005 } 1006 1007 /* Start writing the metadata from last page to first */ 1008 ctx->idx = blob->active.num_pages - 1; 1009 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1010 } 1011 1012 static void 1013 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1014 void *payload, uint64_t offset, uint64_t length, 1015 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1016 { 1017 spdk_bs_batch_t *batch; 1018 struct spdk_bs_cpl cpl; 1019 uint64_t lba; 1020 uint32_t lba_count; 1021 uint8_t *buf; 1022 uint64_t page; 1023 1024 assert(blob != NULL); 1025 1026 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1027 cb_fn(cb_arg, -EINVAL); 1028 return; 1029 } 1030 1031 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1032 cpl.u.blob_basic.cb_fn = cb_fn; 1033 cpl.u.blob_basic.cb_arg = cb_arg; 1034 1035 batch = spdk_bs_batch_open(_channel, &cpl); 1036 if (!batch) { 1037 cb_fn(cb_arg, -ENOMEM); 1038 return; 1039 } 1040 1041 length = _spdk_bs_page_to_lba(blob->bs, length); 1042 page = offset; 1043 buf = payload; 1044 while (length > 0) { 1045 lba = _spdk_bs_blob_page_to_lba(blob, page); 1046 lba_count = spdk_min(length, 1047 _spdk_bs_page_to_lba(blob->bs, 1048 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1049 1050 if (read) { 1051 spdk_bs_batch_read(batch, buf, lba, lba_count); 1052 } else { 1053 spdk_bs_batch_write(batch, buf, lba, lba_count); 1054 } 1055 1056 length -= lba_count; 1057 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1058 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1059 } 1060 1061 spdk_bs_batch_close(batch); 1062 } 1063 1064 static struct spdk_blob * 1065 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1066 { 1067 struct spdk_blob *blob; 1068 1069 TAILQ_FOREACH(blob, &bs->blobs, link) { 1070 if (blob->id == blobid) { 1071 return blob; 1072 } 1073 } 1074 1075 return NULL; 1076 } 1077 1078 static int 1079 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1080 uint32_t max_ops) 1081 { 1082 struct spdk_bs_dev *dev; 1083 uint32_t i; 1084 1085 dev = bs->dev; 1086 1087 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1088 if (!channel->req_mem) { 1089 return -1; 1090 } 1091 1092 TAILQ_INIT(&channel->reqs); 1093 1094 for (i = 0; i < max_ops; i++) { 1095 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1096 } 1097 1098 channel->bs = bs; 1099 channel->dev = dev; 1100 channel->dev_channel = dev->create_channel(dev); 1101 1102 return 0; 1103 } 1104 1105 static int 1106 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1107 { 1108 struct spdk_blob_store *bs; 1109 struct spdk_bs_channel *channel = ctx_buf; 1110 1111 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1112 1113 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1114 } 1115 1116 static int 1117 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1118 { 1119 struct spdk_blob_store *bs; 1120 struct spdk_bs_channel *channel = ctx_buf; 1121 1122 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1123 1124 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1125 } 1126 1127 1128 static void 1129 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1130 { 1131 struct spdk_bs_channel *channel = ctx_buf; 1132 1133 free(channel->req_mem); 1134 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1135 } 1136 1137 static void 1138 _spdk_bs_free(struct spdk_blob_store *bs) 1139 { 1140 struct spdk_blob *blob, *blob_tmp; 1141 1142 spdk_bs_unregister_md_thread(bs); 1143 spdk_io_device_unregister(&bs->io_target); 1144 spdk_io_device_unregister(&bs->md_target); 1145 1146 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1147 TAILQ_REMOVE(&bs->blobs, blob, link); 1148 _spdk_blob_free(blob); 1149 } 1150 1151 spdk_bit_array_free(&bs->used_md_pages); 1152 spdk_bit_array_free(&bs->used_clusters); 1153 1154 bs->dev->destroy(bs->dev); 1155 free(bs); 1156 } 1157 1158 void 1159 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1160 { 1161 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1162 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1163 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1164 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1165 } 1166 1167 static struct spdk_blob_store * 1168 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1169 { 1170 struct spdk_blob_store *bs; 1171 1172 bs = calloc(1, sizeof(struct spdk_blob_store)); 1173 if (!bs) { 1174 return NULL; 1175 } 1176 1177 TAILQ_INIT(&bs->blobs); 1178 bs->dev = dev; 1179 1180 /* 1181 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1182 * even multiple of the cluster size. 1183 */ 1184 bs->cluster_sz = opts->cluster_sz; 1185 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1186 bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1187 bs->num_free_clusters = bs->total_clusters; 1188 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1189 if (bs->used_clusters == NULL) { 1190 _spdk_bs_free(bs); 1191 return NULL; 1192 } 1193 1194 bs->md_target.max_md_ops = opts->max_md_ops; 1195 bs->io_target.max_channel_ops = opts->max_channel_ops; 1196 bs->super_blob = SPDK_BLOBID_INVALID; 1197 1198 /* The metadata is assumed to be at least 1 page */ 1199 bs->used_md_pages = spdk_bit_array_create(1); 1200 1201 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1202 sizeof(struct spdk_bs_channel)); 1203 spdk_bs_register_md_thread(bs); 1204 1205 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1206 sizeof(struct spdk_bs_channel)); 1207 1208 return bs; 1209 } 1210 1211 /* START spdk_bs_load */ 1212 1213 struct spdk_bs_load_ctx { 1214 struct spdk_blob_store *bs; 1215 struct spdk_bs_super_block *super; 1216 1217 struct spdk_bs_md_mask *mask; 1218 }; 1219 1220 static void 1221 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1222 { 1223 struct spdk_bs_load_ctx *ctx = cb_arg; 1224 uint32_t i, j; 1225 int rc; 1226 1227 /* The type must be correct */ 1228 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1229 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1230 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1231 struct spdk_blob_md_page) * 8)); 1232 /* The length of the mask must be exactly equal to the total number of clusters */ 1233 assert(ctx->mask->length == ctx->bs->total_clusters); 1234 1235 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1236 if (rc < 0) { 1237 spdk_dma_free(ctx->super); 1238 spdk_dma_free(ctx->mask); 1239 _spdk_bs_free(ctx->bs); 1240 free(ctx); 1241 spdk_bs_sequence_finish(seq, -ENOMEM); 1242 return; 1243 } 1244 1245 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1246 for (i = 0; i < ctx->mask->length / 8; i++) { 1247 uint8_t segment = ctx->mask->mask[i]; 1248 for (j = 0; segment && (j < 8); j++) { 1249 if (segment & 1U) { 1250 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1251 assert(ctx->bs->num_free_clusters > 0); 1252 ctx->bs->num_free_clusters--; 1253 } 1254 segment >>= 1U; 1255 } 1256 } 1257 1258 spdk_dma_free(ctx->super); 1259 spdk_dma_free(ctx->mask); 1260 free(ctx); 1261 1262 spdk_bs_sequence_finish(seq, bserrno); 1263 } 1264 1265 static void 1266 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1267 { 1268 struct spdk_bs_load_ctx *ctx = cb_arg; 1269 uint64_t lba, lba_count, mask_size; 1270 uint32_t i, j; 1271 int rc; 1272 1273 /* The type must be correct */ 1274 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1275 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1276 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) * 1277 8)); 1278 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1279 assert(ctx->mask->length == ctx->super->md_len); 1280 1281 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1282 if (rc < 0) { 1283 spdk_dma_free(ctx->super); 1284 spdk_dma_free(ctx->mask); 1285 _spdk_bs_free(ctx->bs); 1286 free(ctx); 1287 spdk_bs_sequence_finish(seq, -ENOMEM); 1288 return; 1289 } 1290 1291 for (i = 0; i < ctx->mask->length / 8; i++) { 1292 uint8_t segment = ctx->mask->mask[i]; 1293 for (j = 0; segment && (j < 8); j++) { 1294 if (segment & 1U) { 1295 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1296 } 1297 segment >>= 1U; 1298 } 1299 } 1300 spdk_dma_free(ctx->mask); 1301 1302 /* Read the used clusters mask */ 1303 mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page); 1304 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1305 if (!ctx->mask) { 1306 spdk_dma_free(ctx->super); 1307 _spdk_bs_free(ctx->bs); 1308 free(ctx); 1309 spdk_bs_sequence_finish(seq, -ENOMEM); 1310 return; 1311 } 1312 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1313 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1314 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1315 _spdk_bs_load_used_clusters_cpl, ctx); 1316 } 1317 1318 static void 1319 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1320 { 1321 struct spdk_bs_load_ctx *ctx = cb_arg; 1322 uint64_t lba, lba_count, mask_size; 1323 1324 if (ctx->super->version != SPDK_BS_VERSION) { 1325 spdk_dma_free(ctx->super); 1326 _spdk_bs_free(ctx->bs); 1327 free(ctx); 1328 spdk_bs_sequence_finish(seq, -EILSEQ); 1329 return; 1330 } 1331 1332 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1333 sizeof(ctx->super->signature)) != 0) { 1334 spdk_dma_free(ctx->super); 1335 _spdk_bs_free(ctx->bs); 1336 free(ctx); 1337 spdk_bs_sequence_finish(seq, -EILSEQ); 1338 return; 1339 } 1340 1341 if (ctx->super->clean != 1) { 1342 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1343 * All of the necessary data to recover is available 1344 * on disk - the code just has not been written yet. 1345 */ 1346 assert(false); 1347 spdk_dma_free(ctx->super); 1348 _spdk_bs_free(ctx->bs); 1349 free(ctx); 1350 spdk_bs_sequence_finish(seq, -EILSEQ); 1351 return; 1352 } 1353 ctx->super->clean = 0; 1354 1355 /* Parse the super block */ 1356 ctx->bs->cluster_sz = ctx->super->cluster_size; 1357 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1358 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1359 ctx->bs->md_start = ctx->super->md_start; 1360 ctx->bs->md_len = ctx->super->md_len; 1361 1362 /* Read the used pages mask */ 1363 mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page); 1364 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1365 if (!ctx->mask) { 1366 spdk_dma_free(ctx->super); 1367 _spdk_bs_free(ctx->bs); 1368 free(ctx); 1369 spdk_bs_sequence_finish(seq, -ENOMEM); 1370 return; 1371 } 1372 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1373 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1374 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1375 _spdk_bs_load_used_pages_cpl, ctx); 1376 } 1377 1378 void 1379 spdk_bs_load(struct spdk_bs_dev *dev, 1380 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1381 { 1382 struct spdk_blob_store *bs; 1383 struct spdk_bs_cpl cpl; 1384 spdk_bs_sequence_t *seq; 1385 struct spdk_bs_load_ctx *ctx; 1386 struct spdk_bs_opts opts = {}; 1387 1388 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1389 1390 spdk_bs_opts_init(&opts); 1391 1392 bs = _spdk_bs_alloc(dev, &opts); 1393 if (!bs) { 1394 cb_fn(cb_arg, NULL, -ENOMEM); 1395 return; 1396 } 1397 1398 ctx = calloc(1, sizeof(*ctx)); 1399 if (!ctx) { 1400 _spdk_bs_free(bs); 1401 cb_fn(cb_arg, NULL, -ENOMEM); 1402 return; 1403 } 1404 1405 ctx->bs = bs; 1406 1407 /* Allocate memory for the super block */ 1408 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1409 if (!ctx->super) { 1410 free(ctx); 1411 _spdk_bs_free(bs); 1412 return; 1413 } 1414 1415 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1416 cpl.u.bs_handle.cb_fn = cb_fn; 1417 cpl.u.bs_handle.cb_arg = cb_arg; 1418 cpl.u.bs_handle.bs = bs; 1419 1420 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1421 if (!seq) { 1422 spdk_dma_free(ctx->super); 1423 free(ctx); 1424 _spdk_bs_free(bs); 1425 cb_fn(cb_arg, NULL, -ENOMEM); 1426 return; 1427 } 1428 1429 /* Read the super block */ 1430 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1431 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1432 _spdk_bs_load_super_cpl, ctx); 1433 } 1434 1435 /* END spdk_bs_load */ 1436 1437 /* START spdk_bs_init */ 1438 1439 struct spdk_bs_init_ctx { 1440 struct spdk_blob_store *bs; 1441 struct spdk_bs_super_block *super; 1442 }; 1443 1444 static void 1445 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1446 { 1447 struct spdk_bs_init_ctx *ctx = cb_arg; 1448 1449 spdk_dma_free(ctx->super); 1450 free(ctx); 1451 1452 spdk_bs_sequence_finish(seq, bserrno); 1453 } 1454 1455 static void 1456 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1457 { 1458 struct spdk_bs_init_ctx *ctx = cb_arg; 1459 1460 /* Write super block */ 1461 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1462 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1463 _spdk_bs_init_persist_super_cpl, ctx); 1464 } 1465 1466 void 1467 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1468 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1469 { 1470 struct spdk_bs_init_ctx *ctx; 1471 struct spdk_blob_store *bs; 1472 struct spdk_bs_cpl cpl; 1473 spdk_bs_sequence_t *seq; 1474 uint64_t num_md_pages; 1475 uint32_t i; 1476 struct spdk_bs_opts opts = {}; 1477 int rc; 1478 1479 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1480 1481 if (o) { 1482 opts = *o; 1483 } else { 1484 spdk_bs_opts_init(&opts); 1485 } 1486 1487 bs = _spdk_bs_alloc(dev, &opts); 1488 if (!bs) { 1489 cb_fn(cb_arg, NULL, -ENOMEM); 1490 return; 1491 } 1492 1493 if (opts.num_md_pages == UINT32_MAX) { 1494 /* By default, allocate 1 page per cluster. 1495 * Technically, this over-allocates metadata 1496 * because more metadata will reduce the number 1497 * of usable clusters. This can be addressed with 1498 * more complex math in the future. 1499 */ 1500 bs->md_len = bs->total_clusters; 1501 } else { 1502 bs->md_len = opts.num_md_pages; 1503 } 1504 1505 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1506 if (rc < 0) { 1507 _spdk_bs_free(bs); 1508 cb_fn(cb_arg, NULL, -ENOMEM); 1509 return; 1510 } 1511 1512 ctx = calloc(1, sizeof(*ctx)); 1513 if (!ctx) { 1514 _spdk_bs_free(bs); 1515 cb_fn(cb_arg, NULL, -ENOMEM); 1516 return; 1517 } 1518 1519 ctx->bs = bs; 1520 1521 /* Allocate memory for the super block */ 1522 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1523 if (!ctx->super) { 1524 free(ctx); 1525 _spdk_bs_free(bs); 1526 return; 1527 } 1528 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1529 sizeof(ctx->super->signature)); 1530 ctx->super->version = SPDK_BS_VERSION; 1531 ctx->super->length = sizeof(*ctx->super); 1532 ctx->super->super_blob = bs->super_blob; 1533 ctx->super->clean = 0; 1534 ctx->super->cluster_size = bs->cluster_sz; 1535 1536 /* Calculate how many pages the metadata consumes at the front 1537 * of the disk. 1538 */ 1539 1540 /* The super block uses 1 page */ 1541 num_md_pages = 1; 1542 1543 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1544 * up to the nearest page, plus a header. 1545 */ 1546 ctx->super->used_page_mask_start = num_md_pages; 1547 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1548 divide_round_up(bs->md_len, 8), 1549 sizeof(struct spdk_blob_md_page)); 1550 num_md_pages += ctx->super->used_page_mask_len; 1551 1552 /* The used_clusters mask requires 1 bit per cluster, rounded 1553 * up to the nearest page, plus a header. 1554 */ 1555 ctx->super->used_cluster_mask_start = num_md_pages; 1556 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1557 divide_round_up(bs->total_clusters, 8), 1558 sizeof(struct spdk_blob_md_page)); 1559 num_md_pages += ctx->super->used_cluster_mask_len; 1560 1561 /* The metadata region size was chosen above */ 1562 ctx->super->md_start = bs->md_start = num_md_pages; 1563 ctx->super->md_len = bs->md_len; 1564 num_md_pages += bs->md_len; 1565 1566 /* Claim all of the clusters used by the metadata */ 1567 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1568 _spdk_bs_claim_cluster(bs, i); 1569 } 1570 1571 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1572 cpl.u.bs_handle.cb_fn = cb_fn; 1573 cpl.u.bs_handle.cb_arg = cb_arg; 1574 cpl.u.bs_handle.bs = bs; 1575 1576 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1577 if (!seq) { 1578 spdk_dma_free(ctx->super); 1579 free(ctx); 1580 _spdk_bs_free(bs); 1581 cb_fn(cb_arg, NULL, -ENOMEM); 1582 return; 1583 } 1584 1585 /* TRIM the entire device */ 1586 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1587 } 1588 1589 /* END spdk_bs_init */ 1590 1591 /* START spdk_bs_unload */ 1592 1593 struct spdk_bs_unload_ctx { 1594 struct spdk_blob_store *bs; 1595 struct spdk_bs_super_block *super; 1596 1597 struct spdk_bs_md_mask *mask; 1598 }; 1599 1600 static void 1601 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1602 { 1603 struct spdk_bs_unload_ctx *ctx = cb_arg; 1604 1605 spdk_dma_free(ctx->super); 1606 1607 spdk_bs_sequence_finish(seq, bserrno); 1608 1609 _spdk_bs_free(ctx->bs); 1610 free(ctx); 1611 } 1612 1613 static void 1614 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1615 { 1616 struct spdk_bs_unload_ctx *ctx = cb_arg; 1617 1618 spdk_dma_free(ctx->mask); 1619 1620 /* Update the values in the super block */ 1621 ctx->super->super_blob = ctx->bs->super_blob; 1622 ctx->super->clean = 1; 1623 1624 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1625 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1626 _spdk_bs_unload_write_super_cpl, ctx); 1627 } 1628 1629 static void 1630 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1631 { 1632 struct spdk_bs_unload_ctx *ctx = cb_arg; 1633 uint32_t i; 1634 uint64_t lba, lba_count, mask_size; 1635 1636 spdk_dma_free(ctx->mask); 1637 1638 /* Write out the used clusters mask */ 1639 mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page); 1640 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1641 if (!ctx->mask) { 1642 spdk_dma_free(ctx->super); 1643 free(ctx); 1644 spdk_bs_sequence_finish(seq, -ENOMEM); 1645 return; 1646 } 1647 1648 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1649 ctx->mask->length = ctx->bs->total_clusters; 1650 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1651 1652 i = 0; 1653 while (true) { 1654 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1655 if (i > ctx->mask->length) { 1656 break; 1657 } 1658 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1659 i++; 1660 } 1661 1662 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1663 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1664 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1665 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1666 } 1667 1668 static void 1669 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1670 { 1671 struct spdk_bs_unload_ctx *ctx = cb_arg; 1672 uint32_t i; 1673 uint64_t lba, lba_count, mask_size; 1674 1675 /* Write out the used page mask */ 1676 mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page); 1677 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1678 if (!ctx->mask) { 1679 spdk_dma_free(ctx->super); 1680 free(ctx); 1681 spdk_bs_sequence_finish(seq, -ENOMEM); 1682 return; 1683 } 1684 1685 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1686 ctx->mask->length = ctx->super->md_len; 1687 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1688 1689 i = 0; 1690 while (true) { 1691 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1692 if (i > ctx->mask->length) { 1693 break; 1694 } 1695 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1696 i++; 1697 } 1698 1699 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1700 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1701 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1702 _spdk_bs_unload_write_used_pages_cpl, ctx); 1703 } 1704 1705 void 1706 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1707 { 1708 struct spdk_bs_cpl cpl; 1709 spdk_bs_sequence_t *seq; 1710 struct spdk_bs_unload_ctx *ctx; 1711 1712 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1713 1714 ctx = calloc(1, sizeof(*ctx)); 1715 if (!ctx) { 1716 cb_fn(cb_arg, -ENOMEM); 1717 return; 1718 } 1719 1720 ctx->bs = bs; 1721 1722 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1723 if (!ctx->super) { 1724 free(ctx); 1725 cb_fn(cb_arg, -ENOMEM); 1726 return; 1727 } 1728 1729 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1730 cpl.u.bs_basic.cb_fn = cb_fn; 1731 cpl.u.bs_basic.cb_arg = cb_arg; 1732 1733 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1734 if (!seq) { 1735 spdk_dma_free(ctx->super); 1736 free(ctx); 1737 cb_fn(cb_arg, -ENOMEM); 1738 return; 1739 } 1740 1741 assert(TAILQ_EMPTY(&bs->blobs)); 1742 1743 /* Read super block */ 1744 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1745 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1746 _spdk_bs_unload_read_super_cpl, ctx); 1747 } 1748 1749 /* END spdk_bs_unload */ 1750 1751 void 1752 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1753 spdk_bs_op_complete cb_fn, void *cb_arg) 1754 { 1755 bs->super_blob = blobid; 1756 cb_fn(cb_arg, 0); 1757 } 1758 1759 void 1760 spdk_bs_get_super(struct spdk_blob_store *bs, 1761 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1762 { 1763 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1764 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1765 } else { 1766 cb_fn(cb_arg, bs->super_blob, 0); 1767 } 1768 } 1769 1770 uint64_t 1771 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1772 { 1773 return bs->cluster_sz; 1774 } 1775 1776 uint64_t 1777 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1778 { 1779 return sizeof(struct spdk_blob_md_page); 1780 } 1781 1782 uint64_t 1783 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1784 { 1785 return bs->num_free_clusters; 1786 } 1787 1788 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1789 { 1790 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 1791 1792 return 0; 1793 } 1794 1795 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1796 { 1797 spdk_put_io_channel(bs->md_target.md_channel); 1798 1799 return 0; 1800 } 1801 1802 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1803 { 1804 assert(blob != NULL); 1805 1806 return blob->id; 1807 } 1808 1809 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1810 { 1811 assert(blob != NULL); 1812 1813 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1814 } 1815 1816 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1817 { 1818 assert(blob != NULL); 1819 1820 return blob->active.num_clusters; 1821 } 1822 1823 /* START spdk_bs_md_create_blob */ 1824 1825 static void 1826 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1827 { 1828 struct spdk_blob *blob = cb_arg; 1829 1830 _spdk_blob_free(blob); 1831 1832 spdk_bs_sequence_finish(seq, bserrno); 1833 } 1834 1835 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 1836 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1837 { 1838 struct spdk_blob *blob; 1839 uint32_t page_idx; 1840 struct spdk_bs_cpl cpl; 1841 spdk_bs_sequence_t *seq; 1842 spdk_blob_id id; 1843 1844 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 1845 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 1846 cb_fn(cb_arg, 0, -ENOMEM); 1847 return; 1848 } 1849 spdk_bit_array_set(bs->used_md_pages, page_idx); 1850 1851 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 1852 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 1853 * code assumes blob id == page_idx. 1854 */ 1855 id = (1ULL << 32) | page_idx; 1856 1857 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 1858 1859 blob = _spdk_blob_alloc(bs, id); 1860 if (!blob) { 1861 cb_fn(cb_arg, 0, -ENOMEM); 1862 return; 1863 } 1864 1865 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 1866 cpl.u.blobid.cb_fn = cb_fn; 1867 cpl.u.blobid.cb_arg = cb_arg; 1868 cpl.u.blobid.blobid = blob->id; 1869 1870 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1871 if (!seq) { 1872 _spdk_blob_free(blob); 1873 cb_fn(cb_arg, 0, -ENOMEM); 1874 return; 1875 } 1876 1877 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 1878 } 1879 1880 /* END spdk_bs_md_create_blob */ 1881 1882 /* START spdk_bs_md_resize_blob */ 1883 int 1884 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 1885 { 1886 int rc; 1887 1888 assert(blob != NULL); 1889 1890 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 1891 1892 if (sz == blob->active.num_clusters) { 1893 return 0; 1894 } 1895 1896 rc = _spdk_resize_blob(blob, sz); 1897 if (rc < 0) { 1898 return rc; 1899 } 1900 1901 return 0; 1902 } 1903 1904 /* END spdk_bs_md_resize_blob */ 1905 1906 1907 /* START spdk_bs_md_delete_blob */ 1908 1909 static void 1910 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1911 { 1912 struct spdk_blob *blob = cb_arg; 1913 1914 _spdk_blob_free(blob); 1915 1916 spdk_bs_sequence_finish(seq, bserrno); 1917 } 1918 1919 static void 1920 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1921 { 1922 struct spdk_blob *blob = cb_arg; 1923 1924 blob->state = SPDK_BLOB_STATE_DIRTY; 1925 blob->active.num_pages = 0; 1926 _spdk_resize_blob(blob, 0); 1927 1928 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 1929 } 1930 1931 void 1932 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1933 spdk_blob_op_complete cb_fn, void *cb_arg) 1934 { 1935 struct spdk_blob *blob; 1936 struct spdk_bs_cpl cpl; 1937 spdk_bs_sequence_t *seq; 1938 1939 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 1940 1941 blob = _spdk_blob_lookup(bs, blobid); 1942 if (blob) { 1943 assert(blob->open_ref > 0); 1944 cb_fn(cb_arg, -EINVAL); 1945 return; 1946 } 1947 1948 blob = _spdk_blob_alloc(bs, blobid); 1949 if (!blob) { 1950 cb_fn(cb_arg, -ENOMEM); 1951 return; 1952 } 1953 1954 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1955 cpl.u.blob_basic.cb_fn = cb_fn; 1956 cpl.u.blob_basic.cb_arg = cb_arg; 1957 1958 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1959 if (!seq) { 1960 _spdk_blob_free(blob); 1961 cb_fn(cb_arg, -ENOMEM); 1962 return; 1963 } 1964 1965 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 1966 } 1967 1968 /* END spdk_bs_md_delete_blob */ 1969 1970 /* START spdk_bs_md_open_blob */ 1971 1972 static void 1973 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1974 { 1975 struct spdk_blob *blob = cb_arg; 1976 1977 blob->open_ref++; 1978 1979 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 1980 1981 spdk_bs_sequence_finish(seq, bserrno); 1982 } 1983 1984 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1985 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 1986 { 1987 struct spdk_blob *blob; 1988 struct spdk_bs_cpl cpl; 1989 spdk_bs_sequence_t *seq; 1990 uint32_t page_num; 1991 1992 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 1993 1994 blob = _spdk_blob_lookup(bs, blobid); 1995 if (blob) { 1996 blob->open_ref++; 1997 cb_fn(cb_arg, blob, 0); 1998 return; 1999 } 2000 2001 page_num = _spdk_bs_blobid_to_page(blobid); 2002 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2003 /* Invalid blobid */ 2004 cb_fn(cb_arg, NULL, -ENOENT); 2005 return; 2006 } 2007 2008 blob = _spdk_blob_alloc(bs, blobid); 2009 if (!blob) { 2010 cb_fn(cb_arg, NULL, -ENOMEM); 2011 return; 2012 } 2013 2014 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2015 cpl.u.blob_handle.cb_fn = cb_fn; 2016 cpl.u.blob_handle.cb_arg = cb_arg; 2017 cpl.u.blob_handle.blob = blob; 2018 2019 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2020 if (!seq) { 2021 _spdk_blob_free(blob); 2022 cb_fn(cb_arg, NULL, -ENOMEM); 2023 return; 2024 } 2025 2026 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2027 } 2028 2029 /* START spdk_bs_md_sync_blob */ 2030 static void 2031 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2032 { 2033 spdk_bs_sequence_finish(seq, bserrno); 2034 } 2035 2036 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2037 spdk_blob_op_complete cb_fn, void *cb_arg) 2038 { 2039 struct spdk_bs_cpl cpl; 2040 spdk_bs_sequence_t *seq; 2041 2042 assert(blob != NULL); 2043 2044 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2045 2046 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2047 blob->state != SPDK_BLOB_STATE_SYNCING); 2048 2049 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2050 cb_fn(cb_arg, 0); 2051 return; 2052 } 2053 2054 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2055 cpl.u.blob_basic.cb_fn = cb_fn; 2056 cpl.u.blob_basic.cb_arg = cb_arg; 2057 2058 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2059 if (!seq) { 2060 cb_fn(cb_arg, -ENOMEM); 2061 return; 2062 } 2063 2064 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2065 } 2066 2067 /* END spdk_bs_md_sync_blob */ 2068 2069 /* START spdk_bs_md_close_blob */ 2070 2071 static void 2072 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2073 { 2074 struct spdk_blob **blob = cb_arg; 2075 2076 if ((*blob)->open_ref == 0) { 2077 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2078 _spdk_blob_free((*blob)); 2079 } 2080 2081 *blob = NULL; 2082 2083 spdk_bs_sequence_finish(seq, bserrno); 2084 } 2085 2086 void spdk_bs_md_close_blob(struct spdk_blob **b, 2087 spdk_blob_op_complete cb_fn, void *cb_arg) 2088 { 2089 struct spdk_bs_cpl cpl; 2090 struct spdk_blob *blob; 2091 spdk_bs_sequence_t *seq; 2092 2093 assert(b != NULL); 2094 blob = *b; 2095 assert(blob != NULL); 2096 2097 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2098 2099 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2100 blob->state != SPDK_BLOB_STATE_SYNCING); 2101 2102 if (blob->open_ref == 0) { 2103 cb_fn(cb_arg, -EBADF); 2104 return; 2105 } 2106 2107 blob->open_ref--; 2108 2109 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2110 cpl.u.blob_basic.cb_fn = cb_fn; 2111 cpl.u.blob_basic.cb_arg = cb_arg; 2112 2113 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2114 if (!seq) { 2115 cb_fn(cb_arg, -ENOMEM); 2116 return; 2117 } 2118 2119 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2120 _spdk_blob_close_cpl(seq, b, 0); 2121 return; 2122 } 2123 2124 /* Sync metadata */ 2125 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2126 } 2127 2128 /* END spdk_bs_md_close_blob */ 2129 2130 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2131 { 2132 return spdk_get_io_channel(&bs->io_target); 2133 } 2134 2135 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2136 { 2137 spdk_put_io_channel(channel); 2138 } 2139 2140 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2141 spdk_blob_op_complete cb_fn, void *cb_arg) 2142 { 2143 /* Flush is synchronous right now */ 2144 cb_fn(cb_arg, 0); 2145 } 2146 2147 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2148 void *payload, uint64_t offset, uint64_t length, 2149 spdk_blob_op_complete cb_fn, void *cb_arg) 2150 { 2151 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2152 } 2153 2154 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2155 void *payload, uint64_t offset, uint64_t length, 2156 spdk_blob_op_complete cb_fn, void *cb_arg) 2157 { 2158 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2159 } 2160 2161 struct spdk_bs_iter_ctx { 2162 int64_t page_num; 2163 struct spdk_blob_store *bs; 2164 2165 spdk_blob_op_with_handle_complete cb_fn; 2166 void *cb_arg; 2167 }; 2168 2169 static void 2170 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2171 { 2172 struct spdk_bs_iter_ctx *ctx = cb_arg; 2173 struct spdk_blob_store *bs = ctx->bs; 2174 spdk_blob_id id; 2175 2176 if (bserrno == 0) { 2177 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2178 free(ctx); 2179 return; 2180 } 2181 2182 ctx->page_num++; 2183 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2184 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2185 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2186 free(ctx); 2187 return; 2188 } 2189 2190 id = (1ULL << 32) | ctx->page_num; 2191 2192 blob = _spdk_blob_lookup(bs, id); 2193 if (blob) { 2194 blob->open_ref++; 2195 ctx->cb_fn(ctx->cb_arg, blob, 0); 2196 free(ctx); 2197 return; 2198 } 2199 2200 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2201 } 2202 2203 void 2204 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2205 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2206 { 2207 struct spdk_bs_iter_ctx *ctx; 2208 2209 ctx = calloc(1, sizeof(*ctx)); 2210 if (!ctx) { 2211 cb_fn(cb_arg, NULL, -ENOMEM); 2212 return; 2213 } 2214 2215 ctx->page_num = -1; 2216 ctx->bs = bs; 2217 ctx->cb_fn = cb_fn; 2218 ctx->cb_arg = cb_arg; 2219 2220 _spdk_bs_iter_cpl(ctx, NULL, -1); 2221 } 2222 2223 static void 2224 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2225 { 2226 struct spdk_bs_iter_ctx *ctx = cb_arg; 2227 2228 _spdk_bs_iter_cpl(ctx, NULL, -1); 2229 } 2230 2231 void 2232 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2233 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2234 { 2235 struct spdk_bs_iter_ctx *ctx; 2236 struct spdk_blob *blob; 2237 2238 assert(b != NULL); 2239 blob = *b; 2240 assert(blob != NULL); 2241 2242 ctx = calloc(1, sizeof(*ctx)); 2243 if (!ctx) { 2244 cb_fn(cb_arg, NULL, -ENOMEM); 2245 return; 2246 } 2247 2248 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2249 ctx->bs = bs; 2250 ctx->cb_fn = cb_fn; 2251 ctx->cb_arg = cb_arg; 2252 2253 /* Close the existing blob */ 2254 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2255 } 2256 2257 int 2258 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2259 uint16_t value_len) 2260 { 2261 struct spdk_xattr *xattr; 2262 2263 assert(blob != NULL); 2264 2265 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2266 blob->state != SPDK_BLOB_STATE_SYNCING); 2267 2268 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2269 if (!strcmp(name, xattr->name)) { 2270 free(xattr->value); 2271 xattr->value_len = value_len; 2272 xattr->value = malloc(value_len); 2273 memcpy(xattr->value, value, value_len); 2274 2275 blob->state = SPDK_BLOB_STATE_DIRTY; 2276 2277 return 0; 2278 } 2279 } 2280 2281 xattr = calloc(1, sizeof(*xattr)); 2282 if (!xattr) { 2283 return -1; 2284 } 2285 xattr->name = strdup(name); 2286 xattr->value_len = value_len; 2287 xattr->value = malloc(value_len); 2288 memcpy(xattr->value, value, value_len); 2289 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2290 2291 blob->state = SPDK_BLOB_STATE_DIRTY; 2292 2293 return 0; 2294 } 2295 2296 int 2297 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2298 { 2299 struct spdk_xattr *xattr; 2300 2301 assert(blob != NULL); 2302 2303 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2304 blob->state != SPDK_BLOB_STATE_SYNCING); 2305 2306 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2307 if (!strcmp(name, xattr->name)) { 2308 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2309 free(xattr->value); 2310 free(xattr->name); 2311 free(xattr); 2312 2313 blob->state = SPDK_BLOB_STATE_DIRTY; 2314 2315 return 0; 2316 } 2317 } 2318 2319 return -ENOENT; 2320 } 2321 2322 int 2323 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2324 const void **value, size_t *value_len) 2325 { 2326 struct spdk_xattr *xattr; 2327 2328 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2329 if (!strcmp(name, xattr->name)) { 2330 *value = xattr->value; 2331 *value_len = xattr->value_len; 2332 return 0; 2333 } 2334 } 2335 2336 return -ENOENT; 2337 } 2338 2339 struct spdk_xattr_names { 2340 uint32_t count; 2341 const char *names[0]; 2342 }; 2343 2344 int 2345 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2346 struct spdk_xattr_names **names) 2347 { 2348 struct spdk_xattr *xattr; 2349 int count = 0; 2350 2351 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2352 count++; 2353 } 2354 2355 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2356 if (*names == NULL) { 2357 return -ENOMEM; 2358 } 2359 2360 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2361 (*names)->names[(*names)->count++] = xattr->name; 2362 } 2363 2364 return 0; 2365 } 2366 2367 uint32_t 2368 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2369 { 2370 assert(names != NULL); 2371 2372 return names->count; 2373 } 2374 2375 const char * 2376 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2377 { 2378 if (index >= names->count) { 2379 return NULL; 2380 } 2381 2382 return names->names[index]; 2383 } 2384 2385 void 2386 spdk_xattr_names_free(struct spdk_xattr_names *names) 2387 { 2388 free(names); 2389 } 2390 2391 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2392