1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/env.h" 38 #include "spdk/queue.h" 39 #include "spdk/io_channel.h" 40 #include "spdk/bit_array.h" 41 42 #include "spdk_internal/log.h" 43 44 #include "blobstore.h" 45 #include "request.h" 46 47 static inline size_t 48 divide_round_up(size_t num, size_t divisor) 49 { 50 return (num + divisor - 1) / divisor; 51 } 52 53 static void 54 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 55 { 56 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 57 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 58 assert(bs->num_free_clusters > 0); 59 60 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 61 62 spdk_bit_array_set(bs->used_clusters, cluster_num); 63 bs->num_free_clusters--; 64 } 65 66 static void 67 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 68 { 69 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 70 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 71 assert(bs->num_free_clusters < bs->total_clusters); 72 73 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 74 75 spdk_bit_array_clear(bs->used_clusters, cluster_num); 76 bs->num_free_clusters++; 77 } 78 79 static struct spdk_blob * 80 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 81 { 82 struct spdk_blob *blob; 83 84 blob = calloc(1, sizeof(*blob)); 85 if (!blob) { 86 return NULL; 87 } 88 89 blob->id = id; 90 blob->bs = bs; 91 92 blob->state = SPDK_BLOB_STATE_DIRTY; 93 blob->active.num_pages = 1; 94 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 95 if (!blob->active.pages) { 96 free(blob); 97 return NULL; 98 } 99 100 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 101 102 TAILQ_INIT(&blob->xattrs); 103 104 return blob; 105 } 106 107 static void 108 _spdk_blob_free(struct spdk_blob *blob) 109 { 110 struct spdk_xattr *xattr, *xattr_tmp; 111 112 assert(blob != NULL); 113 114 free(blob->active.clusters); 115 free(blob->clean.clusters); 116 free(blob->active.pages); 117 free(blob->clean.pages); 118 119 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 120 TAILQ_REMOVE(&blob->xattrs, xattr, link); 121 free(xattr->name); 122 free(xattr->value); 123 free(xattr); 124 } 125 126 free(blob); 127 } 128 129 static int 130 _spdk_blob_mark_clean(struct spdk_blob *blob) 131 { 132 uint64_t *clusters = NULL; 133 uint32_t *pages = NULL; 134 135 assert(blob != NULL); 136 assert(blob->state == SPDK_BLOB_STATE_LOADING || 137 blob->state == SPDK_BLOB_STATE_SYNCING); 138 139 if (blob->active.num_clusters) { 140 assert(blob->active.clusters); 141 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 142 if (!clusters) { 143 return -1; 144 } 145 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 146 } 147 148 if (blob->active.num_pages) { 149 assert(blob->active.pages); 150 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 151 if (!pages) { 152 free(clusters); 153 return -1; 154 } 155 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 156 } 157 158 free(blob->clean.clusters); 159 free(blob->clean.pages); 160 161 blob->clean.num_clusters = blob->active.num_clusters; 162 blob->clean.clusters = blob->active.clusters; 163 blob->clean.num_pages = blob->active.num_pages; 164 blob->clean.pages = blob->active.pages; 165 166 blob->active.clusters = clusters; 167 blob->active.pages = pages; 168 169 blob->state = SPDK_BLOB_STATE_CLEAN; 170 171 return 0; 172 } 173 174 static void 175 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 176 { 177 struct spdk_blob_md_descriptor *desc; 178 size_t cur_desc = 0; 179 void *tmp; 180 181 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 182 while (cur_desc < sizeof(page->descriptors)) { 183 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 184 if (desc->length == 0) { 185 /* If padding and length are 0, this terminates the page */ 186 break; 187 } 188 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 189 struct spdk_blob_md_descriptor_extent *desc_extent; 190 unsigned int i, j; 191 unsigned int cluster_count = blob->active.num_clusters; 192 193 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 194 195 assert(desc_extent->length > 0); 196 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 197 198 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 199 for (j = 0; j < desc_extent->extents[i].length; j++) { 200 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 201 cluster_count++; 202 } 203 } 204 205 assert(cluster_count > 0); 206 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 207 assert(tmp != NULL); 208 blob->active.clusters = tmp; 209 blob->active.cluster_array_size = cluster_count; 210 211 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 212 for (j = 0; j < desc_extent->extents[i].length; j++) { 213 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 214 desc_extent->extents[i].cluster_idx + j); 215 } 216 } 217 218 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 219 struct spdk_blob_md_descriptor_xattr *desc_xattr; 220 struct spdk_xattr *xattr; 221 222 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 223 224 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 225 sizeof(desc_xattr->value_length) + 226 desc_xattr->name_length + desc_xattr->value_length); 227 228 xattr = calloc(1, sizeof(*xattr)); 229 assert(xattr != NULL); 230 231 xattr->name = malloc(desc_xattr->name_length + 1); 232 assert(xattr->name); 233 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 234 xattr->name[desc_xattr->name_length] = '\0'; 235 236 xattr->value = malloc(desc_xattr->value_length); 237 assert(xattr->value != NULL); 238 xattr->value_len = desc_xattr->value_length; 239 memcpy(xattr->value, 240 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 241 desc_xattr->value_length); 242 243 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 244 } else { 245 /* Error */ 246 break; 247 } 248 249 /* Advance to the next descriptor */ 250 cur_desc += sizeof(*desc) + desc->length; 251 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 252 break; 253 } 254 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 255 } 256 } 257 258 static int 259 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 260 struct spdk_blob *blob) 261 { 262 const struct spdk_blob_md_page *page; 263 uint32_t i; 264 265 assert(page_count > 0); 266 assert(pages[0].sequence_num == 0); 267 assert(blob != NULL); 268 assert(blob->state == SPDK_BLOB_STATE_LOADING); 269 assert(blob->active.clusters == NULL); 270 assert(blob->id == pages[0].id); 271 assert(blob->state == SPDK_BLOB_STATE_LOADING); 272 273 for (i = 0; i < page_count; i++) { 274 page = &pages[i]; 275 276 assert(page->id == blob->id); 277 assert(page->sequence_num == i); 278 279 _spdk_blob_parse_page(page, blob); 280 } 281 282 return 0; 283 } 284 285 static int 286 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 287 struct spdk_blob_md_page **pages, 288 uint32_t *page_count, 289 struct spdk_blob_md_page **last_page) 290 { 291 struct spdk_blob_md_page *page; 292 293 assert(pages != NULL); 294 assert(page_count != NULL); 295 296 if (*page_count == 0) { 297 assert(*pages == NULL); 298 *page_count = 1; 299 *pages = spdk_dma_malloc(sizeof(struct spdk_blob_md_page), 300 sizeof(struct spdk_blob_md_page), 301 NULL); 302 } else { 303 assert(*pages != NULL); 304 (*page_count)++; 305 *pages = spdk_dma_realloc(*pages, 306 sizeof(struct spdk_blob_md_page) * (*page_count), 307 sizeof(struct spdk_blob_md_page), 308 NULL); 309 } 310 311 if (*pages == NULL) { 312 *page_count = 0; 313 *last_page = NULL; 314 return -ENOMEM; 315 } 316 317 page = &(*pages)[*page_count - 1]; 318 memset(page, 0, sizeof(*page)); 319 page->id = blob->id; 320 page->sequence_num = *page_count - 1; 321 page->next = SPDK_INVALID_MD_PAGE; 322 *last_page = page; 323 324 return 0; 325 } 326 327 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 328 * Update required_sz on both success and failure. 329 * 330 */ 331 static int 332 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 333 uint8_t *buf, size_t buf_sz, 334 size_t *required_sz) 335 { 336 struct spdk_blob_md_descriptor_xattr *desc; 337 338 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 339 strlen(xattr->name) + 340 xattr->value_len; 341 342 if (buf_sz < *required_sz) { 343 return -1; 344 } 345 346 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 347 348 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 349 desc->length = sizeof(desc->name_length) + 350 sizeof(desc->value_length) + 351 strlen(xattr->name) + 352 xattr->value_len; 353 desc->name_length = strlen(xattr->name); 354 desc->value_length = xattr->value_len; 355 356 memcpy(desc->name, xattr->name, desc->name_length); 357 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 358 xattr->value, 359 desc->value_length); 360 361 return 0; 362 } 363 364 static void 365 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 366 uint64_t start_cluster, uint64_t *next_cluster, 367 uint8_t *buf, size_t buf_sz) 368 { 369 struct spdk_blob_md_descriptor_extent *desc; 370 size_t cur_sz; 371 uint64_t i, extent_idx; 372 uint32_t lba, lba_per_cluster, lba_count; 373 374 /* The buffer must have room for at least one extent */ 375 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 376 if (buf_sz < cur_sz) { 377 *next_cluster = start_cluster; 378 return; 379 } 380 381 desc = (struct spdk_blob_md_descriptor_extent *)buf; 382 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 383 384 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 385 386 lba = blob->active.clusters[start_cluster]; 387 lba_count = lba_per_cluster; 388 extent_idx = 0; 389 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 390 if ((lba + lba_count) == blob->active.clusters[i]) { 391 lba_count += lba_per_cluster; 392 continue; 393 } 394 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 395 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 396 extent_idx++; 397 398 cur_sz += sizeof(desc->extents[extent_idx]); 399 400 if (buf_sz < cur_sz) { 401 /* If we ran out of buffer space, return */ 402 desc->length = sizeof(desc->extents[0]) * extent_idx; 403 *next_cluster = i; 404 return; 405 } 406 407 lba = blob->active.clusters[i]; 408 lba_count = lba_per_cluster; 409 } 410 411 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 412 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 413 extent_idx++; 414 415 desc->length = sizeof(desc->extents[0]) * extent_idx; 416 *next_cluster = blob->active.num_clusters; 417 418 return; 419 } 420 421 static int 422 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 423 uint32_t *page_count) 424 { 425 struct spdk_blob_md_page *cur_page; 426 const struct spdk_xattr *xattr; 427 int rc; 428 uint8_t *buf; 429 size_t remaining_sz; 430 uint64_t last_cluster; 431 432 assert(pages != NULL); 433 assert(page_count != NULL); 434 assert(blob != NULL); 435 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 436 437 *pages = NULL; 438 *page_count = 0; 439 440 /* A blob always has at least 1 page, even if it has no descriptors */ 441 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 442 if (rc < 0) { 443 return rc; 444 } 445 446 buf = (uint8_t *)cur_page->descriptors; 447 remaining_sz = sizeof(cur_page->descriptors); 448 449 /* Serialize xattrs */ 450 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 451 size_t required_sz = 0; 452 rc = _spdk_blob_serialize_xattr(xattr, 453 buf, remaining_sz, 454 &required_sz); 455 if (rc < 0) { 456 /* Need to add a new page to the chain */ 457 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 458 &cur_page); 459 if (rc < 0) { 460 spdk_dma_free(*pages); 461 *pages = NULL; 462 *page_count = 0; 463 return rc; 464 } 465 466 buf = (uint8_t *)cur_page->descriptors; 467 remaining_sz = sizeof(cur_page->descriptors); 468 469 /* Try again */ 470 required_sz = 0; 471 rc = _spdk_blob_serialize_xattr(xattr, 472 buf, remaining_sz, 473 &required_sz); 474 475 if (rc < 0) { 476 spdk_dma_free(*pages); 477 *pages = NULL; 478 *page_count = 0; 479 return -1; 480 } 481 } 482 483 remaining_sz -= required_sz; 484 buf += required_sz; 485 } 486 487 /* Serialize extents */ 488 last_cluster = 0; 489 while (last_cluster < blob->active.num_clusters) { 490 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 491 buf, remaining_sz); 492 493 if (last_cluster == blob->active.num_clusters) { 494 break; 495 } 496 497 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 498 &cur_page); 499 if (rc < 0) { 500 return rc; 501 } 502 503 buf = (uint8_t *)cur_page->descriptors; 504 remaining_sz = sizeof(cur_page->descriptors); 505 } 506 507 return 0; 508 } 509 510 struct spdk_blob_load_ctx { 511 struct spdk_blob *blob; 512 513 struct spdk_blob_md_page *pages; 514 uint32_t num_pages; 515 516 spdk_bs_sequence_cpl cb_fn; 517 void *cb_arg; 518 }; 519 520 static void 521 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 522 { 523 struct spdk_blob_load_ctx *ctx = cb_arg; 524 struct spdk_blob *blob = ctx->blob; 525 struct spdk_blob_md_page *page; 526 int rc; 527 528 page = &ctx->pages[ctx->num_pages - 1]; 529 530 if (page->next != SPDK_INVALID_MD_PAGE) { 531 uint32_t next_page = page->next; 532 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 533 534 535 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 536 537 /* Read the next page */ 538 ctx->num_pages++; 539 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 540 sizeof(*page), NULL); 541 if (ctx->pages == NULL) { 542 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 543 free(ctx); 544 return; 545 } 546 547 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 548 next_lba, 549 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 550 _spdk_blob_load_cpl, ctx); 551 return; 552 } 553 554 /* Parse the pages */ 555 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 556 557 _spdk_blob_mark_clean(blob); 558 559 ctx->cb_fn(seq, ctx->cb_arg, rc); 560 561 /* Free the memory */ 562 spdk_dma_free(ctx->pages); 563 free(ctx); 564 } 565 566 /* Load a blob from disk given a blobid */ 567 static void 568 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 569 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 570 { 571 struct spdk_blob_load_ctx *ctx; 572 struct spdk_blob_store *bs; 573 uint32_t page_num; 574 uint64_t lba; 575 576 assert(blob != NULL); 577 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 578 blob->state == SPDK_BLOB_STATE_DIRTY); 579 580 bs = blob->bs; 581 582 ctx = calloc(1, sizeof(*ctx)); 583 if (!ctx) { 584 cb_fn(seq, cb_arg, -ENOMEM); 585 return; 586 } 587 588 ctx->blob = blob; 589 ctx->pages = spdk_dma_realloc(ctx->pages, sizeof(struct spdk_blob_md_page), 590 sizeof(struct spdk_blob_md_page), NULL); 591 if (!ctx->pages) { 592 free(ctx); 593 cb_fn(seq, cb_arg, -ENOMEM); 594 return; 595 } 596 ctx->num_pages = 1; 597 ctx->cb_fn = cb_fn; 598 ctx->cb_arg = cb_arg; 599 600 page_num = _spdk_bs_blobid_to_page(blob->id); 601 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 602 603 blob->state = SPDK_BLOB_STATE_LOADING; 604 605 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 606 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)), 607 _spdk_blob_load_cpl, ctx); 608 } 609 610 struct spdk_blob_persist_ctx { 611 struct spdk_blob *blob; 612 613 struct spdk_blob_md_page *pages; 614 615 uint64_t idx; 616 617 spdk_bs_sequence_cpl cb_fn; 618 void *cb_arg; 619 }; 620 621 static void 622 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 623 { 624 struct spdk_blob_persist_ctx *ctx = cb_arg; 625 struct spdk_blob *blob = ctx->blob; 626 627 if (bserrno == 0) { 628 _spdk_blob_mark_clean(blob); 629 } 630 631 /* Call user callback */ 632 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 633 634 /* Free the memory */ 635 spdk_dma_free(ctx->pages); 636 free(ctx); 637 } 638 639 static void 640 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 641 { 642 struct spdk_blob_persist_ctx *ctx = cb_arg; 643 struct spdk_blob *blob = ctx->blob; 644 struct spdk_blob_store *bs = blob->bs; 645 void *tmp; 646 size_t i; 647 648 /* Release all clusters that were truncated */ 649 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 650 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 651 652 _spdk_bs_release_cluster(bs, cluster_num); 653 } 654 655 if (blob->active.num_clusters == 0) { 656 free(blob->active.clusters); 657 blob->active.clusters = NULL; 658 blob->active.cluster_array_size = 0; 659 } else { 660 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 661 assert(tmp != NULL); 662 blob->active.clusters = tmp; 663 blob->active.cluster_array_size = blob->active.num_clusters; 664 } 665 666 _spdk_blob_persist_complete(seq, ctx, bserrno); 667 } 668 669 static void 670 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 671 { 672 struct spdk_blob_persist_ctx *ctx = cb_arg; 673 struct spdk_blob *blob = ctx->blob; 674 struct spdk_blob_store *bs = blob->bs; 675 spdk_bs_batch_t *batch; 676 size_t i; 677 uint64_t lba; 678 uint32_t lba_count; 679 680 /* Clusters don't move around in blobs. The list shrinks or grows 681 * at the end, but no changes ever occur in the middle of the list. 682 */ 683 684 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 685 686 /* Unmap all clusters that were truncated */ 687 lba = 0; 688 lba_count = 0; 689 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 690 uint64_t next_lba = blob->active.clusters[i]; 691 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 692 693 if ((lba + lba_count) == next_lba) { 694 /* This cluster is contiguous with the previous one. */ 695 lba_count += next_lba_count; 696 continue; 697 } 698 699 /* This cluster is not contiguous with the previous one. */ 700 701 /* If a run of LBAs previously existing, send them 702 * as an unmap. 703 */ 704 if (lba_count > 0) { 705 spdk_bs_batch_unmap(batch, lba, lba_count); 706 } 707 708 /* Start building the next batch */ 709 lba = next_lba; 710 lba_count = next_lba_count; 711 } 712 713 /* If we ended with a contiguous set of LBAs, send the unmap now */ 714 if (lba_count > 0) { 715 spdk_bs_batch_unmap(batch, lba, lba_count); 716 } 717 718 spdk_bs_batch_close(batch); 719 } 720 721 static void 722 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 723 { 724 struct spdk_blob_persist_ctx *ctx = cb_arg; 725 struct spdk_blob *blob = ctx->blob; 726 struct spdk_blob_store *bs = blob->bs; 727 size_t i; 728 729 /* This loop starts at 1 because the first page is special and handled 730 * below. The pages (except the first) are never written in place, 731 * so any pages in the clean list must be unmapped. 732 */ 733 for (i = 1; i < blob->clean.num_pages; i++) { 734 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 735 } 736 737 if (blob->active.num_pages == 0) { 738 uint32_t page_num; 739 740 page_num = _spdk_bs_blobid_to_page(blob->id); 741 spdk_bit_array_clear(bs->used_md_pages, page_num); 742 } 743 744 /* Move on to unmapping clusters */ 745 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 746 } 747 748 static void 749 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 750 { 751 struct spdk_blob_persist_ctx *ctx = cb_arg; 752 struct spdk_blob *blob = ctx->blob; 753 struct spdk_blob_store *bs = blob->bs; 754 uint64_t lba; 755 uint32_t lba_count; 756 spdk_bs_batch_t *batch; 757 size_t i; 758 759 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 760 761 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)); 762 763 /* This loop starts at 1 because the first page is special and handled 764 * below. The pages (except the first) are never written in place, 765 * so any pages in the clean list must be unmapped. 766 */ 767 for (i = 1; i < blob->clean.num_pages; i++) { 768 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 769 770 spdk_bs_batch_unmap(batch, lba, lba_count); 771 } 772 773 /* The first page will only be unmapped if this is a delete. */ 774 if (blob->active.num_pages == 0) { 775 uint32_t page_num; 776 777 /* The first page in the metadata goes where the blobid indicates */ 778 page_num = _spdk_bs_blobid_to_page(blob->id); 779 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 780 781 spdk_bs_batch_unmap(batch, lba, lba_count); 782 } 783 784 spdk_bs_batch_close(batch); 785 } 786 787 static void 788 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 789 { 790 struct spdk_blob_persist_ctx *ctx = cb_arg; 791 struct spdk_blob *blob = ctx->blob; 792 struct spdk_blob_store *bs = blob->bs; 793 uint64_t lba; 794 uint32_t lba_count; 795 struct spdk_blob_md_page *page; 796 797 if (blob->active.num_pages == 0) { 798 /* Move on to the next step */ 799 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 800 return; 801 } 802 803 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 804 805 page = &ctx->pages[0]; 806 /* The first page in the metadata goes where the blobid indicates */ 807 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 808 809 spdk_bs_sequence_write(seq, page, lba, lba_count, 810 _spdk_blob_persist_unmap_pages, ctx); 811 } 812 813 static void 814 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 815 { 816 struct spdk_blob_persist_ctx *ctx = cb_arg; 817 struct spdk_blob *blob = ctx->blob; 818 struct spdk_blob_store *bs = blob->bs; 819 uint64_t lba; 820 uint32_t lba_count; 821 struct spdk_blob_md_page *page; 822 spdk_bs_batch_t *batch; 823 size_t i; 824 825 /* Clusters don't move around in blobs. The list shrinks or grows 826 * at the end, but no changes ever occur in the middle of the list. 827 */ 828 829 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 830 831 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 832 833 /* This starts at 1. The root page is not written until 834 * all of the others are finished 835 */ 836 for (i = 1; i < blob->active.num_pages; i++) { 837 page = &ctx->pages[i]; 838 assert(page->sequence_num == i); 839 840 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 841 842 spdk_bs_batch_write(batch, page, lba, lba_count); 843 } 844 845 spdk_bs_batch_close(batch); 846 } 847 848 static int 849 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 850 { 851 uint64_t i; 852 uint64_t *tmp; 853 uint64_t lfc; /* lowest free cluster */ 854 struct spdk_blob_store *bs; 855 856 bs = blob->bs; 857 858 assert(blob->state != SPDK_BLOB_STATE_LOADING && 859 blob->state != SPDK_BLOB_STATE_SYNCING); 860 861 if (blob->active.num_clusters == sz) { 862 return 0; 863 } 864 865 if (blob->active.num_clusters < blob->active.cluster_array_size) { 866 /* If this blob was resized to be larger, then smaller, then 867 * larger without syncing, then the cluster array already 868 * contains spare assigned clusters we can use. 869 */ 870 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 871 sz); 872 } 873 874 blob->state = SPDK_BLOB_STATE_DIRTY; 875 876 /* Do two passes - one to verify that we can obtain enough clusters 877 * and another to actually claim them. 878 */ 879 880 lfc = 0; 881 for (i = blob->active.num_clusters; i < sz; i++) { 882 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 883 if (lfc >= bs->total_clusters) { 884 /* No more free clusters. Cannot satisfy the request */ 885 assert(false); 886 return -1; 887 } 888 lfc++; 889 } 890 891 if (sz > blob->active.num_clusters) { 892 /* Expand the cluster array if necessary. 893 * We only shrink the array when persisting. 894 */ 895 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 896 if (sz > 0 && tmp == NULL) { 897 assert(false); 898 return -1; 899 } 900 blob->active.clusters = tmp; 901 blob->active.cluster_array_size = sz; 902 } 903 904 lfc = 0; 905 for (i = blob->active.num_clusters; i < sz; i++) { 906 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 907 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 908 _spdk_bs_claim_cluster(bs, lfc); 909 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 910 lfc++; 911 } 912 913 blob->active.num_clusters = sz; 914 915 return 0; 916 } 917 918 /* Write a blob to disk */ 919 static void 920 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 921 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 922 { 923 struct spdk_blob_persist_ctx *ctx; 924 int rc; 925 uint64_t i; 926 uint32_t page_num; 927 struct spdk_blob_store *bs; 928 929 assert(blob != NULL); 930 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 931 blob->state == SPDK_BLOB_STATE_DIRTY); 932 933 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 934 cb_fn(seq, cb_arg, 0); 935 return; 936 } 937 938 bs = blob->bs; 939 940 ctx = calloc(1, sizeof(*ctx)); 941 if (!ctx) { 942 cb_fn(seq, cb_arg, -ENOMEM); 943 return; 944 } 945 ctx->blob = blob; 946 ctx->cb_fn = cb_fn; 947 ctx->cb_arg = cb_arg; 948 949 blob->state = SPDK_BLOB_STATE_SYNCING; 950 951 if (blob->active.num_pages == 0) { 952 /* This is the signal that the blob should be deleted. 953 * Immediately jump to the clean up routine. */ 954 assert(blob->clean.num_pages > 0); 955 ctx->idx = blob->clean.num_pages - 1; 956 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 957 return; 958 959 } 960 961 /* Generate the new metadata */ 962 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 963 if (rc < 0) { 964 free(ctx); 965 cb_fn(seq, cb_arg, rc); 966 return; 967 } 968 969 assert(blob->active.num_pages >= 1); 970 971 /* Resize the cache of page indices */ 972 blob->active.pages = realloc(blob->active.pages, 973 blob->active.num_pages * sizeof(*blob->active.pages)); 974 if (!blob->active.pages) { 975 free(ctx); 976 cb_fn(seq, cb_arg, -ENOMEM); 977 return; 978 } 979 980 /* Assign this metadata to pages. This requires two passes - 981 * one to verify that there are enough pages and a second 982 * to actually claim them. */ 983 page_num = 0; 984 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 985 for (i = 1; i < blob->active.num_pages; i++) { 986 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 987 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 988 spdk_dma_free(ctx->pages); 989 free(ctx); 990 blob->state = SPDK_BLOB_STATE_DIRTY; 991 cb_fn(seq, cb_arg, -ENOMEM); 992 return; 993 } 994 page_num++; 995 } 996 997 page_num = 0; 998 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 999 for (i = 1; i < blob->active.num_pages; i++) { 1000 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1001 ctx->pages[i - 1].next = page_num; 1002 blob->active.pages[i] = page_num; 1003 spdk_bit_array_set(bs->used_md_pages, page_num); 1004 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1005 page_num++; 1006 } 1007 1008 /* Start writing the metadata from last page to first */ 1009 ctx->idx = blob->active.num_pages - 1; 1010 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1011 } 1012 1013 static void 1014 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1015 void *payload, uint64_t offset, uint64_t length, 1016 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1017 { 1018 spdk_bs_batch_t *batch; 1019 struct spdk_bs_cpl cpl; 1020 uint64_t lba; 1021 uint32_t lba_count; 1022 uint8_t *buf; 1023 uint64_t page; 1024 1025 assert(blob != NULL); 1026 1027 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1028 cb_fn(cb_arg, -EINVAL); 1029 return; 1030 } 1031 1032 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1033 cpl.u.blob_basic.cb_fn = cb_fn; 1034 cpl.u.blob_basic.cb_arg = cb_arg; 1035 1036 batch = spdk_bs_batch_open(_channel, &cpl); 1037 if (!batch) { 1038 cb_fn(cb_arg, -ENOMEM); 1039 return; 1040 } 1041 1042 length = _spdk_bs_page_to_lba(blob->bs, length); 1043 page = offset; 1044 buf = payload; 1045 while (length > 0) { 1046 lba = _spdk_bs_blob_page_to_lba(blob, page); 1047 lba_count = spdk_min(length, 1048 _spdk_bs_page_to_lba(blob->bs, 1049 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1050 1051 if (read) { 1052 spdk_bs_batch_read(batch, buf, lba, lba_count); 1053 } else { 1054 spdk_bs_batch_write(batch, buf, lba, lba_count); 1055 } 1056 1057 length -= lba_count; 1058 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1059 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1060 } 1061 1062 spdk_bs_batch_close(batch); 1063 } 1064 1065 static struct spdk_blob * 1066 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1067 { 1068 struct spdk_blob *blob; 1069 1070 TAILQ_FOREACH(blob, &bs->blobs, link) { 1071 if (blob->id == blobid) { 1072 return blob; 1073 } 1074 } 1075 1076 return NULL; 1077 } 1078 1079 static int 1080 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1081 uint32_t max_ops) 1082 { 1083 struct spdk_bs_dev *dev; 1084 uint32_t i; 1085 1086 dev = bs->dev; 1087 1088 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1089 if (!channel->req_mem) { 1090 return -1; 1091 } 1092 1093 TAILQ_INIT(&channel->reqs); 1094 1095 for (i = 0; i < max_ops; i++) { 1096 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1097 } 1098 1099 channel->bs = bs; 1100 channel->dev = dev; 1101 channel->dev_channel = dev->create_channel(dev); 1102 1103 return 0; 1104 } 1105 1106 static int 1107 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1108 { 1109 struct spdk_blob_store *bs; 1110 struct spdk_bs_channel *channel = ctx_buf; 1111 1112 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1113 1114 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1115 } 1116 1117 static int 1118 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1119 { 1120 struct spdk_blob_store *bs; 1121 struct spdk_bs_channel *channel = ctx_buf; 1122 1123 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1124 1125 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1126 } 1127 1128 1129 static void 1130 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1131 { 1132 struct spdk_bs_channel *channel = ctx_buf; 1133 1134 free(channel->req_mem); 1135 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1136 } 1137 1138 static void 1139 _spdk_bs_free(struct spdk_blob_store *bs) 1140 { 1141 struct spdk_blob *blob, *blob_tmp; 1142 1143 spdk_bs_unregister_md_thread(bs); 1144 spdk_io_device_unregister(&bs->io_target, NULL); 1145 spdk_io_device_unregister(&bs->md_target, NULL); 1146 1147 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1148 TAILQ_REMOVE(&bs->blobs, blob, link); 1149 _spdk_blob_free(blob); 1150 } 1151 1152 spdk_bit_array_free(&bs->used_md_pages); 1153 spdk_bit_array_free(&bs->used_clusters); 1154 1155 bs->dev->destroy(bs->dev); 1156 free(bs); 1157 } 1158 1159 void 1160 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1161 { 1162 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1163 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1164 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1165 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1166 } 1167 1168 static struct spdk_blob_store * 1169 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1170 { 1171 struct spdk_blob_store *bs; 1172 1173 bs = calloc(1, sizeof(struct spdk_blob_store)); 1174 if (!bs) { 1175 return NULL; 1176 } 1177 1178 TAILQ_INIT(&bs->blobs); 1179 bs->dev = dev; 1180 1181 /* 1182 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1183 * even multiple of the cluster size. 1184 */ 1185 bs->cluster_sz = opts->cluster_sz; 1186 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1187 bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1188 bs->num_free_clusters = bs->total_clusters; 1189 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1190 if (bs->used_clusters == NULL) { 1191 _spdk_bs_free(bs); 1192 return NULL; 1193 } 1194 1195 bs->md_target.max_md_ops = opts->max_md_ops; 1196 bs->io_target.max_channel_ops = opts->max_channel_ops; 1197 bs->super_blob = SPDK_BLOBID_INVALID; 1198 1199 /* The metadata is assumed to be at least 1 page */ 1200 bs->used_md_pages = spdk_bit_array_create(1); 1201 1202 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1203 sizeof(struct spdk_bs_channel)); 1204 spdk_bs_register_md_thread(bs); 1205 1206 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1207 sizeof(struct spdk_bs_channel)); 1208 1209 return bs; 1210 } 1211 1212 /* START spdk_bs_load */ 1213 1214 struct spdk_bs_load_ctx { 1215 struct spdk_blob_store *bs; 1216 struct spdk_bs_super_block *super; 1217 1218 struct spdk_bs_md_mask *mask; 1219 }; 1220 1221 static void 1222 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1223 { 1224 struct spdk_bs_load_ctx *ctx = cb_arg; 1225 uint32_t i, j; 1226 int rc; 1227 1228 /* The type must be correct */ 1229 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1230 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1231 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1232 struct spdk_blob_md_page) * 8)); 1233 /* The length of the mask must be exactly equal to the total number of clusters */ 1234 assert(ctx->mask->length == ctx->bs->total_clusters); 1235 1236 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1237 if (rc < 0) { 1238 spdk_dma_free(ctx->super); 1239 spdk_dma_free(ctx->mask); 1240 _spdk_bs_free(ctx->bs); 1241 free(ctx); 1242 spdk_bs_sequence_finish(seq, -ENOMEM); 1243 return; 1244 } 1245 1246 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1247 for (i = 0; i < ctx->mask->length / 8; i++) { 1248 uint8_t segment = ctx->mask->mask[i]; 1249 for (j = 0; segment && (j < 8); j++) { 1250 if (segment & 1U) { 1251 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1252 assert(ctx->bs->num_free_clusters > 0); 1253 ctx->bs->num_free_clusters--; 1254 } 1255 segment >>= 1U; 1256 } 1257 } 1258 1259 spdk_dma_free(ctx->super); 1260 spdk_dma_free(ctx->mask); 1261 free(ctx); 1262 1263 spdk_bs_sequence_finish(seq, bserrno); 1264 } 1265 1266 static void 1267 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1268 { 1269 struct spdk_bs_load_ctx *ctx = cb_arg; 1270 uint64_t lba, lba_count, mask_size; 1271 uint32_t i, j; 1272 int rc; 1273 1274 /* The type must be correct */ 1275 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1276 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1277 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) * 1278 8)); 1279 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1280 assert(ctx->mask->length == ctx->super->md_len); 1281 1282 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1283 if (rc < 0) { 1284 spdk_dma_free(ctx->super); 1285 spdk_dma_free(ctx->mask); 1286 _spdk_bs_free(ctx->bs); 1287 free(ctx); 1288 spdk_bs_sequence_finish(seq, -ENOMEM); 1289 return; 1290 } 1291 1292 for (i = 0; i < ctx->mask->length / 8; i++) { 1293 uint8_t segment = ctx->mask->mask[i]; 1294 for (j = 0; segment && (j < 8); j++) { 1295 if (segment & 1U) { 1296 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1297 } 1298 segment >>= 1U; 1299 } 1300 } 1301 spdk_dma_free(ctx->mask); 1302 1303 /* Read the used clusters mask */ 1304 mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page); 1305 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1306 if (!ctx->mask) { 1307 spdk_dma_free(ctx->super); 1308 _spdk_bs_free(ctx->bs); 1309 free(ctx); 1310 spdk_bs_sequence_finish(seq, -ENOMEM); 1311 return; 1312 } 1313 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1314 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1315 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1316 _spdk_bs_load_used_clusters_cpl, ctx); 1317 } 1318 1319 static void 1320 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1321 { 1322 struct spdk_bs_load_ctx *ctx = cb_arg; 1323 uint64_t lba, lba_count, mask_size; 1324 1325 if (ctx->super->version != SPDK_BS_VERSION) { 1326 spdk_dma_free(ctx->super); 1327 _spdk_bs_free(ctx->bs); 1328 free(ctx); 1329 spdk_bs_sequence_finish(seq, -EILSEQ); 1330 return; 1331 } 1332 1333 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1334 sizeof(ctx->super->signature)) != 0) { 1335 spdk_dma_free(ctx->super); 1336 _spdk_bs_free(ctx->bs); 1337 free(ctx); 1338 spdk_bs_sequence_finish(seq, -EILSEQ); 1339 return; 1340 } 1341 1342 if (ctx->super->clean != 1) { 1343 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1344 * All of the necessary data to recover is available 1345 * on disk - the code just has not been written yet. 1346 */ 1347 assert(false); 1348 spdk_dma_free(ctx->super); 1349 _spdk_bs_free(ctx->bs); 1350 free(ctx); 1351 spdk_bs_sequence_finish(seq, -EILSEQ); 1352 return; 1353 } 1354 ctx->super->clean = 0; 1355 1356 /* Parse the super block */ 1357 ctx->bs->cluster_sz = ctx->super->cluster_size; 1358 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1359 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1360 ctx->bs->md_start = ctx->super->md_start; 1361 ctx->bs->md_len = ctx->super->md_len; 1362 ctx->bs->super_blob = ctx->super->super_blob; 1363 1364 /* Read the used pages mask */ 1365 mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page); 1366 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1367 if (!ctx->mask) { 1368 spdk_dma_free(ctx->super); 1369 _spdk_bs_free(ctx->bs); 1370 free(ctx); 1371 spdk_bs_sequence_finish(seq, -ENOMEM); 1372 return; 1373 } 1374 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1375 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1376 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1377 _spdk_bs_load_used_pages_cpl, ctx); 1378 } 1379 1380 void 1381 spdk_bs_load(struct spdk_bs_dev *dev, 1382 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1383 { 1384 struct spdk_blob_store *bs; 1385 struct spdk_bs_cpl cpl; 1386 spdk_bs_sequence_t *seq; 1387 struct spdk_bs_load_ctx *ctx; 1388 struct spdk_bs_opts opts = {}; 1389 1390 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1391 1392 spdk_bs_opts_init(&opts); 1393 1394 bs = _spdk_bs_alloc(dev, &opts); 1395 if (!bs) { 1396 cb_fn(cb_arg, NULL, -ENOMEM); 1397 return; 1398 } 1399 1400 ctx = calloc(1, sizeof(*ctx)); 1401 if (!ctx) { 1402 _spdk_bs_free(bs); 1403 cb_fn(cb_arg, NULL, -ENOMEM); 1404 return; 1405 } 1406 1407 ctx->bs = bs; 1408 1409 /* Allocate memory for the super block */ 1410 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1411 if (!ctx->super) { 1412 free(ctx); 1413 _spdk_bs_free(bs); 1414 return; 1415 } 1416 1417 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1418 cpl.u.bs_handle.cb_fn = cb_fn; 1419 cpl.u.bs_handle.cb_arg = cb_arg; 1420 cpl.u.bs_handle.bs = bs; 1421 1422 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1423 if (!seq) { 1424 spdk_dma_free(ctx->super); 1425 free(ctx); 1426 _spdk_bs_free(bs); 1427 cb_fn(cb_arg, NULL, -ENOMEM); 1428 return; 1429 } 1430 1431 /* Read the super block */ 1432 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1433 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1434 _spdk_bs_load_super_cpl, ctx); 1435 } 1436 1437 /* END spdk_bs_load */ 1438 1439 /* START spdk_bs_init */ 1440 1441 struct spdk_bs_init_ctx { 1442 struct spdk_blob_store *bs; 1443 struct spdk_bs_super_block *super; 1444 }; 1445 1446 static void 1447 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1448 { 1449 struct spdk_bs_init_ctx *ctx = cb_arg; 1450 1451 spdk_dma_free(ctx->super); 1452 free(ctx); 1453 1454 spdk_bs_sequence_finish(seq, bserrno); 1455 } 1456 1457 static void 1458 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1459 { 1460 struct spdk_bs_init_ctx *ctx = cb_arg; 1461 1462 /* Write super block */ 1463 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1464 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1465 _spdk_bs_init_persist_super_cpl, ctx); 1466 } 1467 1468 void 1469 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1470 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1471 { 1472 struct spdk_bs_init_ctx *ctx; 1473 struct spdk_blob_store *bs; 1474 struct spdk_bs_cpl cpl; 1475 spdk_bs_sequence_t *seq; 1476 uint64_t num_md_pages; 1477 uint32_t i; 1478 struct spdk_bs_opts opts = {}; 1479 int rc; 1480 1481 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1482 1483 if (o) { 1484 opts = *o; 1485 } else { 1486 spdk_bs_opts_init(&opts); 1487 } 1488 1489 bs = _spdk_bs_alloc(dev, &opts); 1490 if (!bs) { 1491 cb_fn(cb_arg, NULL, -ENOMEM); 1492 return; 1493 } 1494 1495 if (opts.num_md_pages == UINT32_MAX) { 1496 /* By default, allocate 1 page per cluster. 1497 * Technically, this over-allocates metadata 1498 * because more metadata will reduce the number 1499 * of usable clusters. This can be addressed with 1500 * more complex math in the future. 1501 */ 1502 bs->md_len = bs->total_clusters; 1503 } else { 1504 bs->md_len = opts.num_md_pages; 1505 } 1506 1507 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1508 if (rc < 0) { 1509 _spdk_bs_free(bs); 1510 cb_fn(cb_arg, NULL, -ENOMEM); 1511 return; 1512 } 1513 1514 ctx = calloc(1, sizeof(*ctx)); 1515 if (!ctx) { 1516 _spdk_bs_free(bs); 1517 cb_fn(cb_arg, NULL, -ENOMEM); 1518 return; 1519 } 1520 1521 ctx->bs = bs; 1522 1523 /* Allocate memory for the super block */ 1524 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1525 if (!ctx->super) { 1526 free(ctx); 1527 _spdk_bs_free(bs); 1528 return; 1529 } 1530 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1531 sizeof(ctx->super->signature)); 1532 ctx->super->version = SPDK_BS_VERSION; 1533 ctx->super->length = sizeof(*ctx->super); 1534 ctx->super->super_blob = bs->super_blob; 1535 ctx->super->clean = 0; 1536 ctx->super->cluster_size = bs->cluster_sz; 1537 1538 /* Calculate how many pages the metadata consumes at the front 1539 * of the disk. 1540 */ 1541 1542 /* The super block uses 1 page */ 1543 num_md_pages = 1; 1544 1545 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1546 * up to the nearest page, plus a header. 1547 */ 1548 ctx->super->used_page_mask_start = num_md_pages; 1549 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1550 divide_round_up(bs->md_len, 8), 1551 sizeof(struct spdk_blob_md_page)); 1552 num_md_pages += ctx->super->used_page_mask_len; 1553 1554 /* The used_clusters mask requires 1 bit per cluster, rounded 1555 * up to the nearest page, plus a header. 1556 */ 1557 ctx->super->used_cluster_mask_start = num_md_pages; 1558 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1559 divide_round_up(bs->total_clusters, 8), 1560 sizeof(struct spdk_blob_md_page)); 1561 num_md_pages += ctx->super->used_cluster_mask_len; 1562 1563 /* The metadata region size was chosen above */ 1564 ctx->super->md_start = bs->md_start = num_md_pages; 1565 ctx->super->md_len = bs->md_len; 1566 num_md_pages += bs->md_len; 1567 1568 /* Claim all of the clusters used by the metadata */ 1569 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1570 _spdk_bs_claim_cluster(bs, i); 1571 } 1572 1573 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1574 cpl.u.bs_handle.cb_fn = cb_fn; 1575 cpl.u.bs_handle.cb_arg = cb_arg; 1576 cpl.u.bs_handle.bs = bs; 1577 1578 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1579 if (!seq) { 1580 spdk_dma_free(ctx->super); 1581 free(ctx); 1582 _spdk_bs_free(bs); 1583 cb_fn(cb_arg, NULL, -ENOMEM); 1584 return; 1585 } 1586 1587 /* TRIM the entire device */ 1588 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1589 } 1590 1591 /* END spdk_bs_init */ 1592 1593 /* START spdk_bs_unload */ 1594 1595 struct spdk_bs_unload_ctx { 1596 struct spdk_blob_store *bs; 1597 struct spdk_bs_super_block *super; 1598 1599 struct spdk_bs_md_mask *mask; 1600 }; 1601 1602 static void 1603 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1604 { 1605 struct spdk_bs_unload_ctx *ctx = cb_arg; 1606 1607 spdk_dma_free(ctx->super); 1608 1609 spdk_bs_sequence_finish(seq, bserrno); 1610 1611 _spdk_bs_free(ctx->bs); 1612 free(ctx); 1613 } 1614 1615 static void 1616 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1617 { 1618 struct spdk_bs_unload_ctx *ctx = cb_arg; 1619 1620 spdk_dma_free(ctx->mask); 1621 1622 /* Update the values in the super block */ 1623 ctx->super->super_blob = ctx->bs->super_blob; 1624 ctx->super->clean = 1; 1625 1626 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1627 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1628 _spdk_bs_unload_write_super_cpl, ctx); 1629 } 1630 1631 static void 1632 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1633 { 1634 struct spdk_bs_unload_ctx *ctx = cb_arg; 1635 uint32_t i; 1636 uint64_t lba, lba_count, mask_size; 1637 1638 spdk_dma_free(ctx->mask); 1639 1640 /* Write out the used clusters mask */ 1641 mask_size = ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page); 1642 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1643 if (!ctx->mask) { 1644 spdk_dma_free(ctx->super); 1645 free(ctx); 1646 spdk_bs_sequence_finish(seq, -ENOMEM); 1647 return; 1648 } 1649 1650 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1651 ctx->mask->length = ctx->bs->total_clusters; 1652 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1653 1654 i = 0; 1655 while (true) { 1656 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1657 if (i > ctx->mask->length) { 1658 break; 1659 } 1660 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1661 i++; 1662 } 1663 1664 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1665 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1666 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1667 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1668 } 1669 1670 static void 1671 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1672 { 1673 struct spdk_bs_unload_ctx *ctx = cb_arg; 1674 uint32_t i; 1675 uint64_t lba, lba_count, mask_size; 1676 1677 /* Write out the used page mask */ 1678 mask_size = ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page); 1679 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1680 if (!ctx->mask) { 1681 spdk_dma_free(ctx->super); 1682 free(ctx); 1683 spdk_bs_sequence_finish(seq, -ENOMEM); 1684 return; 1685 } 1686 1687 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1688 ctx->mask->length = ctx->super->md_len; 1689 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1690 1691 i = 0; 1692 while (true) { 1693 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1694 if (i > ctx->mask->length) { 1695 break; 1696 } 1697 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1698 i++; 1699 } 1700 1701 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1702 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1703 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1704 _spdk_bs_unload_write_used_pages_cpl, ctx); 1705 } 1706 1707 void 1708 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1709 { 1710 struct spdk_bs_cpl cpl; 1711 spdk_bs_sequence_t *seq; 1712 struct spdk_bs_unload_ctx *ctx; 1713 1714 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1715 1716 ctx = calloc(1, sizeof(*ctx)); 1717 if (!ctx) { 1718 cb_fn(cb_arg, -ENOMEM); 1719 return; 1720 } 1721 1722 ctx->bs = bs; 1723 1724 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1725 if (!ctx->super) { 1726 free(ctx); 1727 cb_fn(cb_arg, -ENOMEM); 1728 return; 1729 } 1730 1731 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1732 cpl.u.bs_basic.cb_fn = cb_fn; 1733 cpl.u.bs_basic.cb_arg = cb_arg; 1734 1735 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1736 if (!seq) { 1737 spdk_dma_free(ctx->super); 1738 free(ctx); 1739 cb_fn(cb_arg, -ENOMEM); 1740 return; 1741 } 1742 1743 assert(TAILQ_EMPTY(&bs->blobs)); 1744 1745 /* Read super block */ 1746 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1747 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1748 _spdk_bs_unload_read_super_cpl, ctx); 1749 } 1750 1751 /* END spdk_bs_unload */ 1752 1753 void 1754 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1755 spdk_bs_op_complete cb_fn, void *cb_arg) 1756 { 1757 bs->super_blob = blobid; 1758 cb_fn(cb_arg, 0); 1759 } 1760 1761 void 1762 spdk_bs_get_super(struct spdk_blob_store *bs, 1763 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1764 { 1765 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1766 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1767 } else { 1768 cb_fn(cb_arg, bs->super_blob, 0); 1769 } 1770 } 1771 1772 uint64_t 1773 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1774 { 1775 return bs->cluster_sz; 1776 } 1777 1778 uint64_t 1779 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1780 { 1781 return sizeof(struct spdk_blob_md_page); 1782 } 1783 1784 uint64_t 1785 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1786 { 1787 return bs->num_free_clusters; 1788 } 1789 1790 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1791 { 1792 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 1793 1794 return 0; 1795 } 1796 1797 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1798 { 1799 spdk_put_io_channel(bs->md_target.md_channel); 1800 1801 return 0; 1802 } 1803 1804 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1805 { 1806 assert(blob != NULL); 1807 1808 return blob->id; 1809 } 1810 1811 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1812 { 1813 assert(blob != NULL); 1814 1815 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1816 } 1817 1818 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1819 { 1820 assert(blob != NULL); 1821 1822 return blob->active.num_clusters; 1823 } 1824 1825 /* START spdk_bs_md_create_blob */ 1826 1827 static void 1828 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1829 { 1830 struct spdk_blob *blob = cb_arg; 1831 1832 _spdk_blob_free(blob); 1833 1834 spdk_bs_sequence_finish(seq, bserrno); 1835 } 1836 1837 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 1838 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1839 { 1840 struct spdk_blob *blob; 1841 uint32_t page_idx; 1842 struct spdk_bs_cpl cpl; 1843 spdk_bs_sequence_t *seq; 1844 spdk_blob_id id; 1845 1846 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 1847 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 1848 cb_fn(cb_arg, 0, -ENOMEM); 1849 return; 1850 } 1851 spdk_bit_array_set(bs->used_md_pages, page_idx); 1852 1853 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 1854 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 1855 * code assumes blob id == page_idx. 1856 */ 1857 id = (1ULL << 32) | page_idx; 1858 1859 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 1860 1861 blob = _spdk_blob_alloc(bs, id); 1862 if (!blob) { 1863 cb_fn(cb_arg, 0, -ENOMEM); 1864 return; 1865 } 1866 1867 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 1868 cpl.u.blobid.cb_fn = cb_fn; 1869 cpl.u.blobid.cb_arg = cb_arg; 1870 cpl.u.blobid.blobid = blob->id; 1871 1872 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1873 if (!seq) { 1874 _spdk_blob_free(blob); 1875 cb_fn(cb_arg, 0, -ENOMEM); 1876 return; 1877 } 1878 1879 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 1880 } 1881 1882 /* END spdk_bs_md_create_blob */ 1883 1884 /* START spdk_bs_md_resize_blob */ 1885 int 1886 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 1887 { 1888 int rc; 1889 1890 assert(blob != NULL); 1891 1892 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 1893 1894 if (sz == blob->active.num_clusters) { 1895 return 0; 1896 } 1897 1898 rc = _spdk_resize_blob(blob, sz); 1899 if (rc < 0) { 1900 return rc; 1901 } 1902 1903 return 0; 1904 } 1905 1906 /* END spdk_bs_md_resize_blob */ 1907 1908 1909 /* START spdk_bs_md_delete_blob */ 1910 1911 static void 1912 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1913 { 1914 struct spdk_blob *blob = cb_arg; 1915 1916 _spdk_blob_free(blob); 1917 1918 spdk_bs_sequence_finish(seq, bserrno); 1919 } 1920 1921 static void 1922 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1923 { 1924 struct spdk_blob *blob = cb_arg; 1925 1926 blob->state = SPDK_BLOB_STATE_DIRTY; 1927 blob->active.num_pages = 0; 1928 _spdk_resize_blob(blob, 0); 1929 1930 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 1931 } 1932 1933 void 1934 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1935 spdk_blob_op_complete cb_fn, void *cb_arg) 1936 { 1937 struct spdk_blob *blob; 1938 struct spdk_bs_cpl cpl; 1939 spdk_bs_sequence_t *seq; 1940 1941 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 1942 1943 blob = _spdk_blob_lookup(bs, blobid); 1944 if (blob) { 1945 assert(blob->open_ref > 0); 1946 cb_fn(cb_arg, -EINVAL); 1947 return; 1948 } 1949 1950 blob = _spdk_blob_alloc(bs, blobid); 1951 if (!blob) { 1952 cb_fn(cb_arg, -ENOMEM); 1953 return; 1954 } 1955 1956 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1957 cpl.u.blob_basic.cb_fn = cb_fn; 1958 cpl.u.blob_basic.cb_arg = cb_arg; 1959 1960 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1961 if (!seq) { 1962 _spdk_blob_free(blob); 1963 cb_fn(cb_arg, -ENOMEM); 1964 return; 1965 } 1966 1967 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 1968 } 1969 1970 /* END spdk_bs_md_delete_blob */ 1971 1972 /* START spdk_bs_md_open_blob */ 1973 1974 static void 1975 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1976 { 1977 struct spdk_blob *blob = cb_arg; 1978 1979 blob->open_ref++; 1980 1981 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 1982 1983 spdk_bs_sequence_finish(seq, bserrno); 1984 } 1985 1986 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1987 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 1988 { 1989 struct spdk_blob *blob; 1990 struct spdk_bs_cpl cpl; 1991 spdk_bs_sequence_t *seq; 1992 uint32_t page_num; 1993 1994 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 1995 1996 blob = _spdk_blob_lookup(bs, blobid); 1997 if (blob) { 1998 blob->open_ref++; 1999 cb_fn(cb_arg, blob, 0); 2000 return; 2001 } 2002 2003 page_num = _spdk_bs_blobid_to_page(blobid); 2004 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2005 /* Invalid blobid */ 2006 cb_fn(cb_arg, NULL, -ENOENT); 2007 return; 2008 } 2009 2010 blob = _spdk_blob_alloc(bs, blobid); 2011 if (!blob) { 2012 cb_fn(cb_arg, NULL, -ENOMEM); 2013 return; 2014 } 2015 2016 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2017 cpl.u.blob_handle.cb_fn = cb_fn; 2018 cpl.u.blob_handle.cb_arg = cb_arg; 2019 cpl.u.blob_handle.blob = blob; 2020 2021 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2022 if (!seq) { 2023 _spdk_blob_free(blob); 2024 cb_fn(cb_arg, NULL, -ENOMEM); 2025 return; 2026 } 2027 2028 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2029 } 2030 2031 /* START spdk_bs_md_sync_blob */ 2032 static void 2033 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2034 { 2035 spdk_bs_sequence_finish(seq, bserrno); 2036 } 2037 2038 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2039 spdk_blob_op_complete cb_fn, void *cb_arg) 2040 { 2041 struct spdk_bs_cpl cpl; 2042 spdk_bs_sequence_t *seq; 2043 2044 assert(blob != NULL); 2045 2046 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2047 2048 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2049 blob->state != SPDK_BLOB_STATE_SYNCING); 2050 2051 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2052 cb_fn(cb_arg, 0); 2053 return; 2054 } 2055 2056 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2057 cpl.u.blob_basic.cb_fn = cb_fn; 2058 cpl.u.blob_basic.cb_arg = cb_arg; 2059 2060 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2061 if (!seq) { 2062 cb_fn(cb_arg, -ENOMEM); 2063 return; 2064 } 2065 2066 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2067 } 2068 2069 /* END spdk_bs_md_sync_blob */ 2070 2071 /* START spdk_bs_md_close_blob */ 2072 2073 static void 2074 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2075 { 2076 struct spdk_blob **blob = cb_arg; 2077 2078 if ((*blob)->open_ref == 0) { 2079 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2080 _spdk_blob_free((*blob)); 2081 } 2082 2083 *blob = NULL; 2084 2085 spdk_bs_sequence_finish(seq, bserrno); 2086 } 2087 2088 void spdk_bs_md_close_blob(struct spdk_blob **b, 2089 spdk_blob_op_complete cb_fn, void *cb_arg) 2090 { 2091 struct spdk_bs_cpl cpl; 2092 struct spdk_blob *blob; 2093 spdk_bs_sequence_t *seq; 2094 2095 assert(b != NULL); 2096 blob = *b; 2097 assert(blob != NULL); 2098 2099 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2100 2101 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2102 blob->state != SPDK_BLOB_STATE_SYNCING); 2103 2104 if (blob->open_ref == 0) { 2105 cb_fn(cb_arg, -EBADF); 2106 return; 2107 } 2108 2109 blob->open_ref--; 2110 2111 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2112 cpl.u.blob_basic.cb_fn = cb_fn; 2113 cpl.u.blob_basic.cb_arg = cb_arg; 2114 2115 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2116 if (!seq) { 2117 cb_fn(cb_arg, -ENOMEM); 2118 return; 2119 } 2120 2121 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2122 _spdk_blob_close_cpl(seq, b, 0); 2123 return; 2124 } 2125 2126 /* Sync metadata */ 2127 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2128 } 2129 2130 /* END spdk_bs_md_close_blob */ 2131 2132 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2133 { 2134 return spdk_get_io_channel(&bs->io_target); 2135 } 2136 2137 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2138 { 2139 spdk_put_io_channel(channel); 2140 } 2141 2142 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2143 spdk_blob_op_complete cb_fn, void *cb_arg) 2144 { 2145 /* Flush is synchronous right now */ 2146 cb_fn(cb_arg, 0); 2147 } 2148 2149 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2150 void *payload, uint64_t offset, uint64_t length, 2151 spdk_blob_op_complete cb_fn, void *cb_arg) 2152 { 2153 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2154 } 2155 2156 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2157 void *payload, uint64_t offset, uint64_t length, 2158 spdk_blob_op_complete cb_fn, void *cb_arg) 2159 { 2160 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2161 } 2162 2163 struct spdk_bs_iter_ctx { 2164 int64_t page_num; 2165 struct spdk_blob_store *bs; 2166 2167 spdk_blob_op_with_handle_complete cb_fn; 2168 void *cb_arg; 2169 }; 2170 2171 static void 2172 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2173 { 2174 struct spdk_bs_iter_ctx *ctx = cb_arg; 2175 struct spdk_blob_store *bs = ctx->bs; 2176 spdk_blob_id id; 2177 2178 if (bserrno == 0) { 2179 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2180 free(ctx); 2181 return; 2182 } 2183 2184 ctx->page_num++; 2185 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2186 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2187 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2188 free(ctx); 2189 return; 2190 } 2191 2192 id = (1ULL << 32) | ctx->page_num; 2193 2194 blob = _spdk_blob_lookup(bs, id); 2195 if (blob) { 2196 blob->open_ref++; 2197 ctx->cb_fn(ctx->cb_arg, blob, 0); 2198 free(ctx); 2199 return; 2200 } 2201 2202 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2203 } 2204 2205 void 2206 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2207 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2208 { 2209 struct spdk_bs_iter_ctx *ctx; 2210 2211 ctx = calloc(1, sizeof(*ctx)); 2212 if (!ctx) { 2213 cb_fn(cb_arg, NULL, -ENOMEM); 2214 return; 2215 } 2216 2217 ctx->page_num = -1; 2218 ctx->bs = bs; 2219 ctx->cb_fn = cb_fn; 2220 ctx->cb_arg = cb_arg; 2221 2222 _spdk_bs_iter_cpl(ctx, NULL, -1); 2223 } 2224 2225 static void 2226 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2227 { 2228 struct spdk_bs_iter_ctx *ctx = cb_arg; 2229 2230 _spdk_bs_iter_cpl(ctx, NULL, -1); 2231 } 2232 2233 void 2234 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2235 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2236 { 2237 struct spdk_bs_iter_ctx *ctx; 2238 struct spdk_blob *blob; 2239 2240 assert(b != NULL); 2241 blob = *b; 2242 assert(blob != NULL); 2243 2244 ctx = calloc(1, sizeof(*ctx)); 2245 if (!ctx) { 2246 cb_fn(cb_arg, NULL, -ENOMEM); 2247 return; 2248 } 2249 2250 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2251 ctx->bs = bs; 2252 ctx->cb_fn = cb_fn; 2253 ctx->cb_arg = cb_arg; 2254 2255 /* Close the existing blob */ 2256 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2257 } 2258 2259 int 2260 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2261 uint16_t value_len) 2262 { 2263 struct spdk_xattr *xattr; 2264 2265 assert(blob != NULL); 2266 2267 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2268 blob->state != SPDK_BLOB_STATE_SYNCING); 2269 2270 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2271 if (!strcmp(name, xattr->name)) { 2272 free(xattr->value); 2273 xattr->value_len = value_len; 2274 xattr->value = malloc(value_len); 2275 memcpy(xattr->value, value, value_len); 2276 2277 blob->state = SPDK_BLOB_STATE_DIRTY; 2278 2279 return 0; 2280 } 2281 } 2282 2283 xattr = calloc(1, sizeof(*xattr)); 2284 if (!xattr) { 2285 return -1; 2286 } 2287 xattr->name = strdup(name); 2288 xattr->value_len = value_len; 2289 xattr->value = malloc(value_len); 2290 memcpy(xattr->value, value, value_len); 2291 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2292 2293 blob->state = SPDK_BLOB_STATE_DIRTY; 2294 2295 return 0; 2296 } 2297 2298 int 2299 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2300 { 2301 struct spdk_xattr *xattr; 2302 2303 assert(blob != NULL); 2304 2305 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2306 blob->state != SPDK_BLOB_STATE_SYNCING); 2307 2308 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2309 if (!strcmp(name, xattr->name)) { 2310 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2311 free(xattr->value); 2312 free(xattr->name); 2313 free(xattr); 2314 2315 blob->state = SPDK_BLOB_STATE_DIRTY; 2316 2317 return 0; 2318 } 2319 } 2320 2321 return -ENOENT; 2322 } 2323 2324 int 2325 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2326 const void **value, size_t *value_len) 2327 { 2328 struct spdk_xattr *xattr; 2329 2330 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2331 if (!strcmp(name, xattr->name)) { 2332 *value = xattr->value; 2333 *value_len = xattr->value_len; 2334 return 0; 2335 } 2336 } 2337 2338 return -ENOENT; 2339 } 2340 2341 struct spdk_xattr_names { 2342 uint32_t count; 2343 const char *names[0]; 2344 }; 2345 2346 int 2347 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2348 struct spdk_xattr_names **names) 2349 { 2350 struct spdk_xattr *xattr; 2351 int count = 0; 2352 2353 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2354 count++; 2355 } 2356 2357 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2358 if (*names == NULL) { 2359 return -ENOMEM; 2360 } 2361 2362 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2363 (*names)->names[(*names)->count++] = xattr->name; 2364 } 2365 2366 return 0; 2367 } 2368 2369 uint32_t 2370 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2371 { 2372 assert(names != NULL); 2373 2374 return names->count; 2375 } 2376 2377 const char * 2378 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2379 { 2380 if (index >= names->count) { 2381 return NULL; 2382 } 2383 2384 return names->names[index]; 2385 } 2386 2387 void 2388 spdk_xattr_names_free(struct spdk_xattr_names *names) 2389 { 2390 free(names); 2391 } 2392 2393 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2394