1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/env.h" 38 #include "spdk/queue.h" 39 #include "spdk/io_channel.h" 40 #include "spdk/bit_array.h" 41 42 #include "spdk_internal/log.h" 43 44 #include "blobstore.h" 45 #include "request.h" 46 47 static inline size_t 48 divide_round_up(size_t num, size_t divisor) 49 { 50 return (num + divisor - 1) / divisor; 51 } 52 53 static void 54 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 55 { 56 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 57 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 58 assert(bs->num_free_clusters > 0); 59 60 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 61 62 spdk_bit_array_set(bs->used_clusters, cluster_num); 63 bs->num_free_clusters--; 64 } 65 66 static void 67 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 68 { 69 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 70 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 71 assert(bs->num_free_clusters < bs->total_clusters); 72 73 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 74 75 spdk_bit_array_clear(bs->used_clusters, cluster_num); 76 bs->num_free_clusters++; 77 } 78 79 static struct spdk_blob * 80 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 81 { 82 struct spdk_blob *blob; 83 84 blob = calloc(1, sizeof(*blob)); 85 if (!blob) { 86 return NULL; 87 } 88 89 blob->id = id; 90 blob->bs = bs; 91 92 blob->state = SPDK_BLOB_STATE_DIRTY; 93 blob->active.num_pages = 1; 94 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 95 if (!blob->active.pages) { 96 free(blob); 97 return NULL; 98 } 99 100 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 101 102 TAILQ_INIT(&blob->xattrs); 103 104 return blob; 105 } 106 107 static void 108 _spdk_blob_free(struct spdk_blob *blob) 109 { 110 struct spdk_xattr *xattr, *xattr_tmp; 111 112 assert(blob != NULL); 113 114 free(blob->active.clusters); 115 free(blob->clean.clusters); 116 free(blob->active.pages); 117 free(blob->clean.pages); 118 119 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 120 TAILQ_REMOVE(&blob->xattrs, xattr, link); 121 free(xattr->name); 122 free(xattr->value); 123 free(xattr); 124 } 125 126 free(blob); 127 } 128 129 static int 130 _spdk_blob_mark_clean(struct spdk_blob *blob) 131 { 132 uint64_t *clusters = NULL; 133 uint32_t *pages = NULL; 134 135 assert(blob != NULL); 136 assert(blob->state == SPDK_BLOB_STATE_LOADING || 137 blob->state == SPDK_BLOB_STATE_SYNCING); 138 139 if (blob->active.num_clusters) { 140 assert(blob->active.clusters); 141 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 142 if (!clusters) { 143 return -1; 144 } 145 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 146 } 147 148 if (blob->active.num_pages) { 149 assert(blob->active.pages); 150 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 151 if (!pages) { 152 free(clusters); 153 return -1; 154 } 155 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 156 } 157 158 free(blob->clean.clusters); 159 free(blob->clean.pages); 160 161 blob->clean.num_clusters = blob->active.num_clusters; 162 blob->clean.clusters = blob->active.clusters; 163 blob->clean.num_pages = blob->active.num_pages; 164 blob->clean.pages = blob->active.pages; 165 166 blob->active.clusters = clusters; 167 blob->active.pages = pages; 168 169 blob->state = SPDK_BLOB_STATE_CLEAN; 170 171 return 0; 172 } 173 174 static void 175 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 176 { 177 struct spdk_blob_md_descriptor *desc; 178 size_t cur_desc = 0; 179 void *tmp; 180 181 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 182 while (cur_desc < sizeof(page->descriptors)) { 183 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 184 if (desc->length == 0) { 185 /* If padding and length are 0, this terminates the page */ 186 break; 187 } 188 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 189 struct spdk_blob_md_descriptor_extent *desc_extent; 190 unsigned int i, j; 191 unsigned int cluster_count = blob->active.num_clusters; 192 193 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 194 195 assert(desc_extent->length > 0); 196 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 197 198 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 199 for (j = 0; j < desc_extent->extents[i].length; j++) { 200 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 201 cluster_count++; 202 } 203 } 204 205 assert(cluster_count > 0); 206 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 207 assert(tmp != NULL); 208 blob->active.clusters = tmp; 209 blob->active.cluster_array_size = cluster_count; 210 211 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 212 for (j = 0; j < desc_extent->extents[i].length; j++) { 213 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 214 desc_extent->extents[i].cluster_idx + j); 215 } 216 } 217 218 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 219 struct spdk_blob_md_descriptor_xattr *desc_xattr; 220 struct spdk_xattr *xattr; 221 222 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 223 224 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 225 sizeof(desc_xattr->value_length) + 226 desc_xattr->name_length + desc_xattr->value_length); 227 228 xattr = calloc(1, sizeof(*xattr)); 229 assert(xattr != NULL); 230 231 xattr->name = malloc(desc_xattr->name_length + 1); 232 assert(xattr->name); 233 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 234 xattr->name[desc_xattr->name_length] = '\0'; 235 236 xattr->value = malloc(desc_xattr->value_length); 237 assert(xattr->value != NULL); 238 xattr->value_len = desc_xattr->value_length; 239 memcpy(xattr->value, 240 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 241 desc_xattr->value_length); 242 243 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 244 } else { 245 /* Error */ 246 break; 247 } 248 249 /* Advance to the next descriptor */ 250 cur_desc += sizeof(*desc) + desc->length; 251 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 252 break; 253 } 254 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 255 } 256 } 257 258 static int 259 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 260 struct spdk_blob *blob) 261 { 262 const struct spdk_blob_md_page *page; 263 uint32_t i; 264 265 assert(page_count > 0); 266 assert(pages[0].sequence_num == 0); 267 assert(blob != NULL); 268 assert(blob->state == SPDK_BLOB_STATE_LOADING); 269 assert(blob->active.clusters == NULL); 270 assert(blob->id == pages[0].id); 271 assert(blob->state == SPDK_BLOB_STATE_LOADING); 272 273 for (i = 0; i < page_count; i++) { 274 page = &pages[i]; 275 276 assert(page->id == blob->id); 277 assert(page->sequence_num == i); 278 279 _spdk_blob_parse_page(page, blob); 280 } 281 282 return 0; 283 } 284 285 static int 286 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 287 struct spdk_blob_md_page **pages, 288 uint32_t *page_count, 289 struct spdk_blob_md_page **last_page) 290 { 291 struct spdk_blob_md_page *page; 292 293 assert(pages != NULL); 294 assert(page_count != NULL); 295 296 if (*page_count == 0) { 297 assert(*pages == NULL); 298 *page_count = 1; 299 *pages = spdk_malloc(sizeof(struct spdk_blob_md_page), 300 sizeof(struct spdk_blob_md_page), 301 NULL); 302 } else { 303 assert(*pages != NULL); 304 (*page_count)++; 305 *pages = spdk_realloc(*pages, 306 sizeof(struct spdk_blob_md_page) * (*page_count), 307 sizeof(struct spdk_blob_md_page), 308 NULL); 309 } 310 311 if (*pages == NULL) { 312 *page_count = 0; 313 *last_page = NULL; 314 return -ENOMEM; 315 } 316 317 page = &(*pages)[*page_count - 1]; 318 memset(page, 0, sizeof(*page)); 319 page->id = blob->id; 320 page->sequence_num = *page_count - 1; 321 page->next = SPDK_INVALID_MD_PAGE; 322 *last_page = page; 323 324 return 0; 325 } 326 327 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 328 * Update required_sz on both success and failure. 329 * 330 */ 331 static int 332 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 333 uint8_t *buf, size_t buf_sz, 334 size_t *required_sz) 335 { 336 struct spdk_blob_md_descriptor_xattr *desc; 337 338 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 339 strlen(xattr->name) + 340 xattr->value_len; 341 342 if (buf_sz < *required_sz) { 343 return -1; 344 } 345 346 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 347 348 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 349 desc->length = sizeof(desc->name_length) + 350 sizeof(desc->value_length) + 351 strlen(xattr->name) + 352 xattr->value_len; 353 desc->name_length = strlen(xattr->name); 354 desc->value_length = xattr->value_len; 355 356 memcpy(desc->name, xattr->name, desc->name_length); 357 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 358 xattr->value, 359 desc->value_length); 360 361 return 0; 362 } 363 364 static void 365 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 366 uint64_t start_cluster, uint64_t *next_cluster, 367 uint8_t *buf, size_t buf_sz) 368 { 369 struct spdk_blob_md_descriptor_extent *desc; 370 size_t cur_sz; 371 uint64_t i, extent_idx; 372 uint32_t lba, lba_per_cluster, lba_count; 373 374 /* The buffer must have room for at least one extent */ 375 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 376 if (buf_sz < cur_sz) { 377 *next_cluster = start_cluster; 378 return; 379 } 380 381 desc = (struct spdk_blob_md_descriptor_extent *)buf; 382 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 383 384 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 385 386 lba = blob->active.clusters[start_cluster]; 387 lba_count = lba_per_cluster; 388 extent_idx = 0; 389 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 390 if ((lba + lba_count) == blob->active.clusters[i]) { 391 lba_count += lba_per_cluster; 392 continue; 393 } 394 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 395 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 396 extent_idx++; 397 398 cur_sz += sizeof(desc->extents[extent_idx]); 399 400 if (buf_sz < cur_sz) { 401 /* If we ran out of buffer space, return */ 402 desc->length = sizeof(desc->extents[0]) * extent_idx; 403 *next_cluster = i; 404 return; 405 } 406 407 lba = blob->active.clusters[i]; 408 lba_count = lba_per_cluster; 409 } 410 411 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 412 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 413 extent_idx++; 414 415 desc->length = sizeof(desc->extents[0]) * extent_idx; 416 *next_cluster = blob->active.num_clusters; 417 418 return; 419 } 420 421 static int 422 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 423 uint32_t *page_count) 424 { 425 struct spdk_blob_md_page *cur_page; 426 const struct spdk_xattr *xattr; 427 int rc; 428 uint8_t *buf; 429 size_t remaining_sz; 430 431 assert(pages != NULL); 432 assert(page_count != NULL); 433 assert(blob != NULL); 434 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 435 436 *pages = NULL; 437 *page_count = 0; 438 439 /* A blob always has at least 1 page, even if it has no descriptors */ 440 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 441 if (rc < 0) { 442 return rc; 443 } 444 445 buf = (uint8_t *)cur_page->descriptors; 446 remaining_sz = sizeof(cur_page->descriptors); 447 448 /* Serialize xattrs */ 449 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 450 size_t required_sz = 0; 451 rc = _spdk_blob_serialize_xattr(xattr, 452 buf, remaining_sz, 453 &required_sz); 454 if (rc < 0) { 455 /* Need to add a new page to the chain */ 456 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 457 &cur_page); 458 if (rc < 0) { 459 spdk_free(*pages); 460 *pages = NULL; 461 *page_count = 0; 462 return rc; 463 } 464 465 buf = (uint8_t *)cur_page->descriptors; 466 remaining_sz = sizeof(cur_page->descriptors); 467 468 /* Try again */ 469 required_sz = 0; 470 rc = _spdk_blob_serialize_xattr(xattr, 471 buf, remaining_sz, 472 &required_sz); 473 474 if (rc < 0) { 475 spdk_free(*pages); 476 *pages = NULL; 477 *page_count = 0; 478 return -1; 479 } 480 } 481 482 remaining_sz -= required_sz; 483 buf += required_sz; 484 } 485 486 /* Serialize extents */ 487 uint64_t last_cluster = 0; 488 while (last_cluster < blob->active.num_clusters) { 489 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 490 buf, remaining_sz); 491 492 if (last_cluster == blob->active.num_clusters) { 493 break; 494 } 495 496 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 497 &cur_page); 498 if (rc < 0) { 499 return rc; 500 } 501 502 buf = (uint8_t *)cur_page->descriptors; 503 remaining_sz = sizeof(cur_page->descriptors); 504 } 505 506 return 0; 507 } 508 509 struct spdk_blob_load_ctx { 510 struct spdk_blob *blob; 511 512 struct spdk_blob_md_page *pages; 513 uint32_t num_pages; 514 515 spdk_bs_sequence_cpl cb_fn; 516 void *cb_arg; 517 }; 518 519 static void 520 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 521 { 522 struct spdk_blob_load_ctx *ctx = cb_arg; 523 struct spdk_blob *blob = ctx->blob; 524 struct spdk_blob_md_page *page; 525 int rc; 526 527 page = &ctx->pages[ctx->num_pages - 1]; 528 529 if (page->next != SPDK_INVALID_MD_PAGE) { 530 uint32_t next_page = page->next; 531 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 532 533 534 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 535 536 /* Read the next page */ 537 ctx->num_pages++; 538 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 539 sizeof(*page), NULL); 540 if (ctx->pages == NULL) { 541 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 542 free(ctx); 543 return; 544 } 545 546 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 547 next_lba, 548 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 549 _spdk_blob_load_cpl, ctx); 550 return; 551 } 552 553 /* Parse the pages */ 554 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 555 556 _spdk_blob_mark_clean(blob); 557 558 ctx->cb_fn(seq, ctx->cb_arg, rc); 559 560 /* Free the memory */ 561 spdk_free(ctx->pages); 562 free(ctx); 563 } 564 565 /* Load a blob from disk given a blobid */ 566 static void 567 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 568 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 569 { 570 struct spdk_blob_load_ctx *ctx; 571 struct spdk_blob_store *bs; 572 uint32_t page_num; 573 uint64_t lba; 574 575 assert(blob != NULL); 576 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 577 blob->state == SPDK_BLOB_STATE_DIRTY); 578 579 bs = blob->bs; 580 581 ctx = calloc(1, sizeof(*ctx)); 582 if (!ctx) { 583 cb_fn(seq, cb_arg, -ENOMEM); 584 return; 585 } 586 587 ctx->blob = blob; 588 ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page), 589 sizeof(struct spdk_blob_md_page), NULL); 590 if (!ctx->pages) { 591 free(ctx); 592 cb_fn(seq, cb_arg, -ENOMEM); 593 return; 594 } 595 ctx->num_pages = 1; 596 ctx->cb_fn = cb_fn; 597 ctx->cb_arg = cb_arg; 598 599 page_num = _spdk_bs_blobid_to_page(blob->id); 600 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 601 602 blob->state = SPDK_BLOB_STATE_LOADING; 603 604 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 605 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)), 606 _spdk_blob_load_cpl, ctx); 607 } 608 609 struct spdk_blob_persist_ctx { 610 struct spdk_blob *blob; 611 612 struct spdk_blob_md_page *pages; 613 614 uint64_t idx; 615 616 spdk_bs_sequence_cpl cb_fn; 617 void *cb_arg; 618 }; 619 620 static void 621 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 622 { 623 struct spdk_blob_persist_ctx *ctx = cb_arg; 624 struct spdk_blob *blob = ctx->blob; 625 626 if (bserrno == 0) { 627 _spdk_blob_mark_clean(blob); 628 } 629 630 /* Call user callback */ 631 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 632 633 /* Free the memory */ 634 spdk_free(ctx->pages); 635 free(ctx); 636 } 637 638 static void 639 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 640 { 641 struct spdk_blob_persist_ctx *ctx = cb_arg; 642 struct spdk_blob *blob = ctx->blob; 643 struct spdk_blob_store *bs = blob->bs; 644 void *tmp; 645 size_t i; 646 647 /* Release all clusters that were truncated */ 648 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 649 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 650 651 _spdk_bs_release_cluster(bs, cluster_num); 652 } 653 654 if (blob->active.num_clusters == 0) { 655 free(blob->active.clusters); 656 blob->active.clusters = NULL; 657 blob->active.cluster_array_size = 0; 658 } else { 659 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 660 assert(tmp != NULL); 661 blob->active.clusters = tmp; 662 blob->active.cluster_array_size = blob->active.num_clusters; 663 } 664 665 _spdk_blob_persist_complete(seq, ctx, bserrno); 666 } 667 668 static void 669 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 670 { 671 struct spdk_blob_persist_ctx *ctx = cb_arg; 672 struct spdk_blob *blob = ctx->blob; 673 struct spdk_blob_store *bs = blob->bs; 674 spdk_bs_batch_t *batch; 675 size_t i; 676 677 /* Clusters don't move around in blobs. The list shrinks or grows 678 * at the end, but no changes ever occur in the middle of the list. 679 */ 680 681 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 682 683 /* Unmap all clusters that were truncated */ 684 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 685 uint64_t lba = blob->active.clusters[i]; 686 uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1); 687 688 spdk_bs_batch_unmap(batch, lba, lba_count); 689 } 690 691 spdk_bs_batch_close(batch); 692 } 693 694 static void 695 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 696 { 697 struct spdk_blob_persist_ctx *ctx = cb_arg; 698 struct spdk_blob *blob = ctx->blob; 699 struct spdk_blob_store *bs = blob->bs; 700 size_t i; 701 702 /* This loop starts at 1 because the first page is special and handled 703 * below. The pages (except the first) are never written in place, 704 * so any pages in the clean list must be unmapped. 705 */ 706 for (i = 1; i < blob->clean.num_pages; i++) { 707 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 708 } 709 710 if (blob->active.num_pages == 0) { 711 uint32_t page_num; 712 713 page_num = _spdk_bs_blobid_to_page(blob->id); 714 spdk_bit_array_clear(bs->used_md_pages, page_num); 715 } 716 717 /* Move on to unmapping clusters */ 718 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 719 } 720 721 static void 722 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 723 { 724 struct spdk_blob_persist_ctx *ctx = cb_arg; 725 struct spdk_blob *blob = ctx->blob; 726 struct spdk_blob_store *bs = blob->bs; 727 uint64_t lba; 728 uint32_t lba_count; 729 spdk_bs_batch_t *batch; 730 size_t i; 731 732 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 733 734 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)); 735 736 /* This loop starts at 1 because the first page is special and handled 737 * below. The pages (except the first) are never written in place, 738 * so any pages in the clean list must be unmapped. 739 */ 740 for (i = 1; i < blob->clean.num_pages; i++) { 741 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 742 743 spdk_bs_batch_unmap(batch, lba, lba_count); 744 } 745 746 /* The first page will only be unmapped if this is a delete. */ 747 if (blob->active.num_pages == 0) { 748 uint32_t page_num; 749 750 /* The first page in the metadata goes where the blobid indicates */ 751 page_num = _spdk_bs_blobid_to_page(blob->id); 752 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 753 754 spdk_bs_batch_unmap(batch, lba, lba_count); 755 } 756 757 spdk_bs_batch_close(batch); 758 } 759 760 static void 761 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 762 { 763 struct spdk_blob_persist_ctx *ctx = cb_arg; 764 struct spdk_blob *blob = ctx->blob; 765 struct spdk_blob_store *bs = blob->bs; 766 uint64_t lba; 767 uint32_t lba_count; 768 struct spdk_blob_md_page *page; 769 770 if (blob->active.num_pages == 0) { 771 /* Move on to the next step */ 772 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 773 return; 774 } 775 776 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 777 778 page = &ctx->pages[0]; 779 /* The first page in the metadata goes where the blobid indicates */ 780 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 781 782 spdk_bs_sequence_write(seq, page, lba, lba_count, 783 _spdk_blob_persist_unmap_pages, ctx); 784 } 785 786 static void 787 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 788 { 789 struct spdk_blob_persist_ctx *ctx = cb_arg; 790 struct spdk_blob *blob = ctx->blob; 791 struct spdk_blob_store *bs = blob->bs; 792 uint64_t lba; 793 uint32_t lba_count; 794 struct spdk_blob_md_page *page; 795 spdk_bs_batch_t *batch; 796 size_t i; 797 798 /* Clusters don't move around in blobs. The list shrinks or grows 799 * at the end, but no changes ever occur in the middle of the list. 800 */ 801 802 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 803 804 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 805 806 /* This starts at 1. The root page is not written until 807 * all of the others are finished 808 */ 809 for (i = 1; i < blob->active.num_pages; i++) { 810 page = &ctx->pages[i]; 811 assert(page->sequence_num == i); 812 813 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 814 815 spdk_bs_batch_write(batch, page, lba, lba_count); 816 } 817 818 spdk_bs_batch_close(batch); 819 } 820 821 static int 822 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 823 { 824 uint64_t i; 825 uint64_t *tmp; 826 uint64_t lfc; /* lowest free cluster */ 827 struct spdk_blob_store *bs; 828 829 bs = blob->bs; 830 831 assert(blob->state != SPDK_BLOB_STATE_LOADING && 832 blob->state != SPDK_BLOB_STATE_SYNCING); 833 834 if (blob->active.num_clusters == sz) { 835 return 0; 836 } 837 838 if (blob->active.num_clusters < blob->active.cluster_array_size) { 839 /* If this blob was resized to be larger, then smaller, then 840 * larger without syncing, then the cluster array already 841 * contains spare assigned clusters we can use. 842 */ 843 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 844 sz); 845 } 846 847 blob->state = SPDK_BLOB_STATE_DIRTY; 848 849 /* Do two passes - one to verify that we can obtain enough clusters 850 * and another to actually claim them. 851 */ 852 853 lfc = 0; 854 for (i = blob->active.num_clusters; i < sz; i++) { 855 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 856 if (lfc >= bs->total_clusters) { 857 /* No more free clusters. Cannot satisfy the request */ 858 assert(false); 859 return -1; 860 } 861 lfc++; 862 } 863 864 if (sz > blob->active.num_clusters) { 865 /* Expand the cluster array if necessary. 866 * We only shrink the array when persisting. 867 */ 868 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 869 if (sz > 0 && tmp == NULL) { 870 assert(false); 871 return -1; 872 } 873 blob->active.clusters = tmp; 874 blob->active.cluster_array_size = sz; 875 } 876 877 lfc = 0; 878 for (i = blob->active.num_clusters; i < sz; i++) { 879 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 880 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 881 _spdk_bs_claim_cluster(bs, lfc); 882 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 883 lfc++; 884 } 885 886 blob->active.num_clusters = sz; 887 888 return 0; 889 } 890 891 /* Write a blob to disk */ 892 static void 893 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 894 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 895 { 896 struct spdk_blob_persist_ctx *ctx; 897 int rc; 898 uint64_t i; 899 uint32_t page_num; 900 struct spdk_blob_store *bs; 901 902 assert(blob != NULL); 903 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 904 blob->state == SPDK_BLOB_STATE_DIRTY); 905 906 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 907 cb_fn(seq, cb_arg, 0); 908 return; 909 } 910 911 bs = blob->bs; 912 913 ctx = calloc(1, sizeof(*ctx)); 914 if (!ctx) { 915 cb_fn(seq, cb_arg, -ENOMEM); 916 return; 917 } 918 ctx->blob = blob; 919 ctx->cb_fn = cb_fn; 920 ctx->cb_arg = cb_arg; 921 922 blob->state = SPDK_BLOB_STATE_SYNCING; 923 924 if (blob->active.num_pages == 0) { 925 /* This is the signal that the blob should be deleted. 926 * Immediately jump to the clean up routine. */ 927 assert(blob->clean.num_pages > 0); 928 ctx->idx = blob->clean.num_pages - 1; 929 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 930 return; 931 932 } 933 934 /* Generate the new metadata */ 935 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 936 if (rc < 0) { 937 free(ctx); 938 cb_fn(seq, cb_arg, rc); 939 return; 940 } 941 942 assert(blob->active.num_pages >= 1); 943 944 /* Resize the cache of page indices */ 945 blob->active.pages = realloc(blob->active.pages, 946 blob->active.num_pages * sizeof(*blob->active.pages)); 947 if (!blob->active.pages) { 948 free(ctx); 949 cb_fn(seq, cb_arg, -ENOMEM); 950 return; 951 } 952 953 /* Assign this metadata to pages. This requires two passes - 954 * one to verify that there are enough pages and a second 955 * to actually claim them. */ 956 page_num = 0; 957 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 958 for (i = 1; i < blob->active.num_pages; i++) { 959 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 960 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 961 spdk_free(ctx->pages); 962 free(ctx); 963 blob->state = SPDK_BLOB_STATE_DIRTY; 964 cb_fn(seq, cb_arg, -ENOMEM); 965 return; 966 } 967 page_num++; 968 } 969 970 page_num = 0; 971 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 972 for (i = 1; i < blob->active.num_pages; i++) { 973 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 974 ctx->pages[i - 1].next = page_num; 975 blob->active.pages[i] = page_num; 976 spdk_bit_array_set(bs->used_md_pages, page_num); 977 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 978 page_num++; 979 } 980 981 /* Start writing the metadata from last page to first */ 982 ctx->idx = blob->active.num_pages - 1; 983 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 984 } 985 986 static void 987 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 988 void *payload, uint64_t offset, uint64_t length, 989 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 990 { 991 spdk_bs_batch_t *batch; 992 struct spdk_bs_cpl cpl; 993 uint64_t lba; 994 uint32_t lba_count; 995 uint8_t *buf; 996 uint64_t page; 997 998 assert(blob != NULL); 999 1000 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1001 cb_fn(cb_arg, -EINVAL); 1002 return; 1003 } 1004 1005 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1006 cpl.u.blob_basic.cb_fn = cb_fn; 1007 cpl.u.blob_basic.cb_arg = cb_arg; 1008 1009 batch = spdk_bs_batch_open(_channel, &cpl); 1010 if (!batch) { 1011 cb_fn(cb_arg, -ENOMEM); 1012 return; 1013 } 1014 1015 length = _spdk_bs_page_to_lba(blob->bs, length); 1016 page = offset; 1017 buf = payload; 1018 while (length > 0) { 1019 lba = _spdk_bs_blob_page_to_lba(blob, page); 1020 lba_count = spdk_min(length, 1021 _spdk_bs_page_to_lba(blob->bs, 1022 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1023 1024 if (read) { 1025 spdk_bs_batch_read(batch, buf, lba, lba_count); 1026 } else { 1027 spdk_bs_batch_write(batch, buf, lba, lba_count); 1028 } 1029 1030 length -= lba_count; 1031 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1032 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1033 } 1034 1035 spdk_bs_batch_close(batch); 1036 } 1037 1038 static struct spdk_blob * 1039 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1040 { 1041 struct spdk_blob *blob; 1042 1043 TAILQ_FOREACH(blob, &bs->blobs, link) { 1044 if (blob->id == blobid) { 1045 return blob; 1046 } 1047 } 1048 1049 return NULL; 1050 } 1051 1052 static int 1053 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1054 uint32_t max_ops) 1055 { 1056 struct spdk_bs_dev *dev; 1057 uint32_t i; 1058 1059 dev = bs->dev; 1060 1061 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1062 if (!channel->req_mem) { 1063 return -1; 1064 } 1065 1066 TAILQ_INIT(&channel->reqs); 1067 1068 for (i = 0; i < max_ops; i++) { 1069 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1070 } 1071 1072 channel->bs = bs; 1073 channel->dev = dev; 1074 channel->dev_channel = dev->create_channel(dev); 1075 1076 return 0; 1077 } 1078 1079 static int 1080 _spdk_bs_md_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 1081 { 1082 struct spdk_blob_store *bs; 1083 struct spdk_bs_channel *channel = ctx_buf; 1084 1085 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1086 1087 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1088 } 1089 1090 static int 1091 _spdk_bs_io_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 1092 { 1093 struct spdk_blob_store *bs; 1094 struct spdk_bs_channel *channel = ctx_buf; 1095 1096 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1097 1098 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1099 } 1100 1101 1102 static void 1103 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1104 { 1105 struct spdk_bs_channel *channel = ctx_buf; 1106 1107 free(channel->req_mem); 1108 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1109 } 1110 1111 static void 1112 _spdk_bs_free(struct spdk_blob_store *bs) 1113 { 1114 struct spdk_blob *blob, *blob_tmp; 1115 1116 spdk_bs_unregister_md_thread(bs); 1117 spdk_io_device_unregister(&bs->io_target); 1118 spdk_io_device_unregister(&bs->md_target); 1119 1120 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1121 TAILQ_REMOVE(&bs->blobs, blob, link); 1122 _spdk_blob_free(blob); 1123 } 1124 1125 spdk_bit_array_free(&bs->used_md_pages); 1126 spdk_bit_array_free(&bs->used_clusters); 1127 1128 bs->dev->destroy(bs->dev); 1129 free(bs); 1130 } 1131 1132 void 1133 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1134 { 1135 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1136 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1137 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1138 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1139 } 1140 1141 static struct spdk_blob_store * 1142 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1143 { 1144 struct spdk_blob_store *bs; 1145 1146 bs = calloc(1, sizeof(struct spdk_blob_store)); 1147 if (!bs) { 1148 return NULL; 1149 } 1150 1151 TAILQ_INIT(&bs->blobs); 1152 bs->dev = dev; 1153 1154 /* 1155 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1156 * even multiple of the cluster size. 1157 */ 1158 bs->cluster_sz = opts->cluster_sz; 1159 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1160 bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1161 bs->num_free_clusters = bs->total_clusters; 1162 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1163 if (bs->used_clusters == NULL) { 1164 _spdk_bs_free(bs); 1165 return NULL; 1166 } 1167 1168 bs->md_target.max_md_ops = opts->max_md_ops; 1169 bs->io_target.max_channel_ops = opts->max_channel_ops; 1170 bs->super_blob = SPDK_BLOBID_INVALID; 1171 1172 /* The metadata is assumed to be at least 1 page */ 1173 bs->used_md_pages = spdk_bit_array_create(1); 1174 1175 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1176 sizeof(struct spdk_bs_channel)); 1177 spdk_bs_register_md_thread(bs); 1178 1179 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1180 sizeof(struct spdk_bs_channel)); 1181 1182 return bs; 1183 } 1184 1185 /* START spdk_bs_load */ 1186 1187 struct spdk_bs_load_ctx { 1188 struct spdk_blob_store *bs; 1189 struct spdk_bs_super_block *super; 1190 1191 struct spdk_bs_md_mask *mask; 1192 }; 1193 1194 static void 1195 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1196 { 1197 struct spdk_bs_load_ctx *ctx = cb_arg; 1198 uint32_t i, j; 1199 int rc; 1200 1201 /* The type must be correct */ 1202 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1203 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1204 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1205 struct spdk_blob_md_page) * 8)); 1206 /* The length of the mask must be exactly equal to the total number of clusters */ 1207 assert(ctx->mask->length == ctx->bs->total_clusters); 1208 1209 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1210 if (rc < 0) { 1211 spdk_free(ctx->super); 1212 spdk_free(ctx->mask); 1213 _spdk_bs_free(ctx->bs); 1214 free(ctx); 1215 spdk_bs_sequence_finish(seq, -ENOMEM); 1216 return; 1217 } 1218 1219 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1220 for (i = 0; i < ctx->mask->length / 8; i++) { 1221 uint8_t segment = ctx->mask->mask[i]; 1222 for (j = 0; segment && (j < 8); j++) { 1223 if (segment & 1U) { 1224 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1225 assert(ctx->bs->num_free_clusters > 0); 1226 ctx->bs->num_free_clusters--; 1227 } 1228 segment >>= 1U; 1229 } 1230 } 1231 1232 spdk_free(ctx->super); 1233 spdk_free(ctx->mask); 1234 free(ctx); 1235 1236 spdk_bs_sequence_finish(seq, bserrno); 1237 } 1238 1239 static void 1240 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1241 { 1242 struct spdk_bs_load_ctx *ctx = cb_arg; 1243 uint64_t lba, lba_count; 1244 uint32_t i, j; 1245 int rc; 1246 1247 /* The type must be correct */ 1248 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1249 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1250 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) * 1251 8)); 1252 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1253 assert(ctx->mask->length == ctx->super->md_len); 1254 1255 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1256 if (rc < 0) { 1257 spdk_free(ctx->super); 1258 spdk_free(ctx->mask); 1259 _spdk_bs_free(ctx->bs); 1260 free(ctx); 1261 spdk_bs_sequence_finish(seq, -ENOMEM); 1262 return; 1263 } 1264 1265 for (i = 0; i < ctx->mask->length / 8; i++) { 1266 uint8_t segment = ctx->mask->mask[i]; 1267 for (j = 0; segment && (j < 8); j++) { 1268 if (segment & 1U) { 1269 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1270 } 1271 segment >>= 1U; 1272 } 1273 } 1274 spdk_free(ctx->mask); 1275 1276 /* Read the used clusters mask */ 1277 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page), 1278 0x1000, NULL); 1279 if (!ctx->mask) { 1280 spdk_free(ctx->super); 1281 _spdk_bs_free(ctx->bs); 1282 free(ctx); 1283 spdk_bs_sequence_finish(seq, -ENOMEM); 1284 return; 1285 } 1286 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1287 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1288 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1289 _spdk_bs_load_used_clusters_cpl, ctx); 1290 } 1291 1292 static void 1293 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1294 { 1295 struct spdk_bs_load_ctx *ctx = cb_arg; 1296 uint64_t lba, lba_count; 1297 1298 if (ctx->super->version != SPDK_BS_VERSION) { 1299 spdk_free(ctx->super); 1300 _spdk_bs_free(ctx->bs); 1301 free(ctx); 1302 spdk_bs_sequence_finish(seq, -EILSEQ); 1303 return; 1304 } 1305 1306 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1307 sizeof(ctx->super->signature)) != 0) { 1308 spdk_free(ctx->super); 1309 _spdk_bs_free(ctx->bs); 1310 free(ctx); 1311 spdk_bs_sequence_finish(seq, -EILSEQ); 1312 return; 1313 } 1314 1315 if (ctx->super->clean != 1) { 1316 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1317 * All of the necessary data to recover is available 1318 * on disk - the code just has not been written yet. 1319 */ 1320 assert(false); 1321 spdk_free(ctx->super); 1322 _spdk_bs_free(ctx->bs); 1323 free(ctx); 1324 spdk_bs_sequence_finish(seq, -EILSEQ); 1325 return; 1326 } 1327 ctx->super->clean = 0; 1328 1329 /* Parse the super block */ 1330 ctx->bs->cluster_sz = ctx->super->cluster_size; 1331 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1332 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1333 ctx->bs->md_start = ctx->super->md_start; 1334 ctx->bs->md_len = ctx->super->md_len; 1335 1336 /* Read the used pages mask */ 1337 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000, 1338 NULL); 1339 if (!ctx->mask) { 1340 spdk_free(ctx->super); 1341 _spdk_bs_free(ctx->bs); 1342 free(ctx); 1343 spdk_bs_sequence_finish(seq, -ENOMEM); 1344 return; 1345 } 1346 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1347 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1348 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1349 _spdk_bs_load_used_pages_cpl, ctx); 1350 } 1351 1352 void 1353 spdk_bs_load(struct spdk_bs_dev *dev, 1354 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1355 { 1356 struct spdk_blob_store *bs; 1357 struct spdk_bs_cpl cpl; 1358 spdk_bs_sequence_t *seq; 1359 struct spdk_bs_load_ctx *ctx; 1360 struct spdk_bs_opts opts = {}; 1361 1362 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1363 1364 spdk_bs_opts_init(&opts); 1365 1366 bs = _spdk_bs_alloc(dev, &opts); 1367 if (!bs) { 1368 cb_fn(cb_arg, NULL, -ENOMEM); 1369 return; 1370 } 1371 1372 ctx = calloc(1, sizeof(*ctx)); 1373 if (!ctx) { 1374 _spdk_bs_free(bs); 1375 cb_fn(cb_arg, NULL, -ENOMEM); 1376 return; 1377 } 1378 1379 ctx->bs = bs; 1380 1381 /* Allocate memory for the super block */ 1382 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1383 if (!ctx->super) { 1384 free(ctx); 1385 _spdk_bs_free(bs); 1386 return; 1387 } 1388 1389 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1390 cpl.u.bs_handle.cb_fn = cb_fn; 1391 cpl.u.bs_handle.cb_arg = cb_arg; 1392 cpl.u.bs_handle.bs = bs; 1393 1394 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1395 if (!seq) { 1396 spdk_free(ctx->super); 1397 free(ctx); 1398 _spdk_bs_free(bs); 1399 cb_fn(cb_arg, NULL, -ENOMEM); 1400 return; 1401 } 1402 1403 /* Read the super block */ 1404 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1405 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1406 _spdk_bs_load_super_cpl, ctx); 1407 } 1408 1409 /* END spdk_bs_load */ 1410 1411 /* START spdk_bs_init */ 1412 1413 struct spdk_bs_init_ctx { 1414 struct spdk_blob_store *bs; 1415 struct spdk_bs_super_block *super; 1416 }; 1417 1418 static void 1419 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1420 { 1421 struct spdk_bs_init_ctx *ctx = cb_arg; 1422 1423 spdk_free(ctx->super); 1424 free(ctx); 1425 1426 spdk_bs_sequence_finish(seq, bserrno); 1427 } 1428 1429 static void 1430 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1431 { 1432 struct spdk_bs_init_ctx *ctx = cb_arg; 1433 1434 /* Write super block */ 1435 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1436 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1437 _spdk_bs_init_persist_super_cpl, ctx); 1438 } 1439 1440 void 1441 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1442 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1443 { 1444 struct spdk_bs_init_ctx *ctx; 1445 struct spdk_blob_store *bs; 1446 struct spdk_bs_cpl cpl; 1447 spdk_bs_sequence_t *seq; 1448 uint64_t num_md_pages; 1449 uint32_t i; 1450 struct spdk_bs_opts opts = {}; 1451 int rc; 1452 1453 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1454 1455 if (o) { 1456 opts = *o; 1457 } else { 1458 spdk_bs_opts_init(&opts); 1459 } 1460 1461 bs = _spdk_bs_alloc(dev, &opts); 1462 if (!bs) { 1463 cb_fn(cb_arg, NULL, -ENOMEM); 1464 return; 1465 } 1466 1467 if (opts.num_md_pages == UINT32_MAX) { 1468 /* By default, allocate 1 page per cluster. 1469 * Technically, this over-allocates metadata 1470 * because more metadata will reduce the number 1471 * of usable clusters. This can be addressed with 1472 * more complex math in the future. 1473 */ 1474 bs->md_len = bs->total_clusters; 1475 } else { 1476 bs->md_len = opts.num_md_pages; 1477 } 1478 1479 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1480 if (rc < 0) { 1481 _spdk_bs_free(bs); 1482 cb_fn(cb_arg, NULL, -ENOMEM); 1483 return; 1484 } 1485 1486 ctx = calloc(1, sizeof(*ctx)); 1487 if (!ctx) { 1488 _spdk_bs_free(bs); 1489 cb_fn(cb_arg, NULL, -ENOMEM); 1490 return; 1491 } 1492 1493 ctx->bs = bs; 1494 1495 /* Allocate memory for the super block */ 1496 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1497 if (!ctx->super) { 1498 free(ctx); 1499 _spdk_bs_free(bs); 1500 return; 1501 } 1502 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1503 sizeof(ctx->super->signature)); 1504 ctx->super->version = SPDK_BS_VERSION; 1505 ctx->super->length = sizeof(*ctx->super); 1506 ctx->super->super_blob = bs->super_blob; 1507 ctx->super->clean = 0; 1508 ctx->super->cluster_size = bs->cluster_sz; 1509 1510 /* Calculate how many pages the metadata consumes at the front 1511 * of the disk. 1512 */ 1513 1514 /* The super block uses 1 page */ 1515 num_md_pages = 1; 1516 1517 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1518 * up to the nearest page, plus a header. 1519 */ 1520 ctx->super->used_page_mask_start = num_md_pages; 1521 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1522 divide_round_up(bs->md_len, 8), 1523 sizeof(struct spdk_blob_md_page)); 1524 num_md_pages += ctx->super->used_page_mask_len; 1525 1526 /* The used_clusters mask requires 1 bit per cluster, rounded 1527 * up to the nearest page, plus a header. 1528 */ 1529 ctx->super->used_cluster_mask_start = num_md_pages; 1530 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1531 divide_round_up(bs->total_clusters, 8), 1532 sizeof(struct spdk_blob_md_page)); 1533 num_md_pages += ctx->super->used_cluster_mask_len; 1534 1535 /* The metadata region size was chosen above */ 1536 ctx->super->md_start = bs->md_start = num_md_pages; 1537 ctx->super->md_len = bs->md_len; 1538 num_md_pages += bs->md_len; 1539 1540 /* Claim all of the clusters used by the metadata */ 1541 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1542 _spdk_bs_claim_cluster(bs, i); 1543 } 1544 1545 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1546 cpl.u.bs_handle.cb_fn = cb_fn; 1547 cpl.u.bs_handle.cb_arg = cb_arg; 1548 cpl.u.bs_handle.bs = bs; 1549 1550 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1551 if (!seq) { 1552 spdk_free(ctx->super); 1553 free(ctx); 1554 _spdk_bs_free(bs); 1555 cb_fn(cb_arg, NULL, -ENOMEM); 1556 return; 1557 } 1558 1559 /* TRIM the entire device */ 1560 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1561 } 1562 1563 /* END spdk_bs_init */ 1564 1565 /* START spdk_bs_unload */ 1566 1567 struct spdk_bs_unload_ctx { 1568 struct spdk_blob_store *bs; 1569 struct spdk_bs_super_block *super; 1570 1571 struct spdk_bs_md_mask *mask; 1572 }; 1573 1574 static void 1575 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1576 { 1577 struct spdk_bs_unload_ctx *ctx = cb_arg; 1578 1579 spdk_free(ctx->super); 1580 1581 spdk_bs_sequence_finish(seq, bserrno); 1582 1583 _spdk_bs_free(ctx->bs); 1584 free(ctx); 1585 } 1586 1587 static void 1588 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1589 { 1590 struct spdk_bs_unload_ctx *ctx = cb_arg; 1591 1592 spdk_free(ctx->mask); 1593 1594 /* Update the values in the super block */ 1595 ctx->super->super_blob = ctx->bs->super_blob; 1596 ctx->super->clean = 1; 1597 1598 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1599 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1600 _spdk_bs_unload_write_super_cpl, ctx); 1601 } 1602 1603 static void 1604 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1605 { 1606 struct spdk_bs_unload_ctx *ctx = cb_arg; 1607 uint32_t i; 1608 uint64_t lba, lba_count; 1609 1610 spdk_free(ctx->mask); 1611 1612 /* Write out the used clusters mask */ 1613 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page), 1614 0x1000, NULL); 1615 if (!ctx->mask) { 1616 spdk_free(ctx->super); 1617 free(ctx); 1618 spdk_bs_sequence_finish(seq, -ENOMEM); 1619 return; 1620 } 1621 1622 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1623 ctx->mask->length = ctx->bs->total_clusters; 1624 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1625 1626 i = 0; 1627 while (true) { 1628 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1629 if (i > ctx->mask->length) { 1630 break; 1631 } 1632 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1633 i++; 1634 } 1635 1636 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1637 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1638 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1639 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1640 } 1641 1642 static void 1643 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1644 { 1645 struct spdk_bs_unload_ctx *ctx = cb_arg; 1646 uint32_t i; 1647 uint64_t lba, lba_count; 1648 1649 /* Write out the used page mask */ 1650 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 1651 0x1000, NULL); 1652 if (!ctx->mask) { 1653 spdk_free(ctx->super); 1654 free(ctx); 1655 spdk_bs_sequence_finish(seq, -ENOMEM); 1656 return; 1657 } 1658 1659 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1660 ctx->mask->length = ctx->super->md_len; 1661 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1662 1663 i = 0; 1664 while (true) { 1665 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1666 if (i > ctx->mask->length) { 1667 break; 1668 } 1669 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1670 i++; 1671 } 1672 1673 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1674 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1675 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1676 _spdk_bs_unload_write_used_pages_cpl, ctx); 1677 } 1678 1679 void 1680 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1681 { 1682 struct spdk_bs_cpl cpl; 1683 spdk_bs_sequence_t *seq; 1684 struct spdk_bs_unload_ctx *ctx; 1685 1686 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1687 1688 ctx = calloc(1, sizeof(*ctx)); 1689 if (!ctx) { 1690 cb_fn(cb_arg, -ENOMEM); 1691 return; 1692 } 1693 1694 ctx->bs = bs; 1695 1696 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1697 if (!ctx->super) { 1698 free(ctx); 1699 cb_fn(cb_arg, -ENOMEM); 1700 return; 1701 } 1702 1703 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1704 cpl.u.bs_basic.cb_fn = cb_fn; 1705 cpl.u.bs_basic.cb_arg = cb_arg; 1706 1707 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1708 if (!seq) { 1709 spdk_free(ctx->super); 1710 free(ctx); 1711 cb_fn(cb_arg, -ENOMEM); 1712 return; 1713 } 1714 1715 assert(TAILQ_EMPTY(&bs->blobs)); 1716 1717 /* Read super block */ 1718 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1719 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1720 _spdk_bs_unload_read_super_cpl, ctx); 1721 } 1722 1723 /* END spdk_bs_unload */ 1724 1725 void 1726 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1727 spdk_bs_op_complete cb_fn, void *cb_arg) 1728 { 1729 bs->super_blob = blobid; 1730 cb_fn(cb_arg, 0); 1731 } 1732 1733 void 1734 spdk_bs_get_super(struct spdk_blob_store *bs, 1735 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1736 { 1737 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1738 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1739 } else { 1740 cb_fn(cb_arg, bs->super_blob, 0); 1741 } 1742 } 1743 1744 uint64_t 1745 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1746 { 1747 return bs->cluster_sz; 1748 } 1749 1750 uint64_t 1751 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1752 { 1753 return sizeof(struct spdk_blob_md_page); 1754 } 1755 1756 uint64_t 1757 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1758 { 1759 return bs->num_free_clusters; 1760 } 1761 1762 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1763 { 1764 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target, SPDK_IO_PRIORITY_DEFAULT, 1765 false, NULL); 1766 1767 return 0; 1768 } 1769 1770 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1771 { 1772 spdk_put_io_channel(bs->md_target.md_channel); 1773 1774 return 0; 1775 } 1776 1777 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1778 { 1779 assert(blob != NULL); 1780 1781 return blob->id; 1782 } 1783 1784 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1785 { 1786 assert(blob != NULL); 1787 1788 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1789 } 1790 1791 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1792 { 1793 assert(blob != NULL); 1794 1795 return blob->active.num_clusters; 1796 } 1797 1798 /* START spdk_bs_md_create_blob */ 1799 1800 static void 1801 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1802 { 1803 struct spdk_blob *blob = cb_arg; 1804 1805 _spdk_blob_free(blob); 1806 1807 spdk_bs_sequence_finish(seq, bserrno); 1808 } 1809 1810 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 1811 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1812 { 1813 struct spdk_blob *blob; 1814 uint32_t page_idx; 1815 struct spdk_bs_cpl cpl; 1816 spdk_bs_sequence_t *seq; 1817 spdk_blob_id id; 1818 1819 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 1820 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 1821 cb_fn(cb_arg, 0, -ENOMEM); 1822 return; 1823 } 1824 spdk_bit_array_set(bs->used_md_pages, page_idx); 1825 1826 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 1827 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 1828 * code assumes blob id == page_idx. 1829 */ 1830 id = (1ULL << 32) | page_idx; 1831 1832 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 1833 1834 blob = _spdk_blob_alloc(bs, id); 1835 if (!blob) { 1836 cb_fn(cb_arg, 0, -ENOMEM); 1837 return; 1838 } 1839 1840 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 1841 cpl.u.blobid.cb_fn = cb_fn; 1842 cpl.u.blobid.cb_arg = cb_arg; 1843 cpl.u.blobid.blobid = blob->id; 1844 1845 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1846 if (!seq) { 1847 _spdk_blob_free(blob); 1848 cb_fn(cb_arg, 0, -ENOMEM); 1849 return; 1850 } 1851 1852 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 1853 } 1854 1855 /* END spdk_bs_md_create_blob */ 1856 1857 /* START spdk_bs_md_resize_blob */ 1858 int 1859 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 1860 { 1861 int rc; 1862 1863 assert(blob != NULL); 1864 1865 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 1866 1867 if (sz == blob->active.num_clusters) { 1868 return 0; 1869 } 1870 1871 rc = _spdk_resize_blob(blob, sz); 1872 if (rc < 0) { 1873 return rc; 1874 } 1875 1876 return 0; 1877 } 1878 1879 /* END spdk_bs_md_resize_blob */ 1880 1881 1882 /* START spdk_bs_md_delete_blob */ 1883 1884 static void 1885 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1886 { 1887 struct spdk_blob *blob = cb_arg; 1888 1889 _spdk_blob_free(blob); 1890 1891 spdk_bs_sequence_finish(seq, bserrno); 1892 } 1893 1894 static void 1895 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1896 { 1897 struct spdk_blob *blob = cb_arg; 1898 1899 blob->state = SPDK_BLOB_STATE_DIRTY; 1900 blob->active.num_pages = 0; 1901 _spdk_resize_blob(blob, 0); 1902 1903 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 1904 } 1905 1906 void 1907 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1908 spdk_blob_op_complete cb_fn, void *cb_arg) 1909 { 1910 struct spdk_blob *blob; 1911 struct spdk_bs_cpl cpl; 1912 spdk_bs_sequence_t *seq; 1913 1914 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 1915 1916 blob = _spdk_blob_lookup(bs, blobid); 1917 if (blob) { 1918 assert(blob->open_ref > 0); 1919 cb_fn(cb_arg, -EINVAL); 1920 return; 1921 } 1922 1923 blob = _spdk_blob_alloc(bs, blobid); 1924 if (!blob) { 1925 cb_fn(cb_arg, -ENOMEM); 1926 return; 1927 } 1928 1929 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1930 cpl.u.blob_basic.cb_fn = cb_fn; 1931 cpl.u.blob_basic.cb_arg = cb_arg; 1932 1933 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1934 if (!seq) { 1935 _spdk_blob_free(blob); 1936 cb_fn(cb_arg, -ENOMEM); 1937 return; 1938 } 1939 1940 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 1941 } 1942 1943 /* END spdk_bs_md_delete_blob */ 1944 1945 /* START spdk_bs_md_open_blob */ 1946 1947 static void 1948 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1949 { 1950 struct spdk_blob *blob = cb_arg; 1951 1952 blob->open_ref++; 1953 1954 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 1955 1956 spdk_bs_sequence_finish(seq, bserrno); 1957 } 1958 1959 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1960 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 1961 { 1962 struct spdk_blob *blob; 1963 struct spdk_bs_cpl cpl; 1964 spdk_bs_sequence_t *seq; 1965 uint32_t page_num; 1966 1967 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 1968 1969 blob = _spdk_blob_lookup(bs, blobid); 1970 if (blob) { 1971 blob->open_ref++; 1972 cb_fn(cb_arg, blob, 0); 1973 return; 1974 } 1975 1976 page_num = _spdk_bs_blobid_to_page(blobid); 1977 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 1978 /* Invalid blobid */ 1979 cb_fn(cb_arg, NULL, -ENOENT); 1980 return; 1981 } 1982 1983 blob = _spdk_blob_alloc(bs, blobid); 1984 if (!blob) { 1985 cb_fn(cb_arg, NULL, -ENOMEM); 1986 return; 1987 } 1988 1989 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 1990 cpl.u.blob_handle.cb_fn = cb_fn; 1991 cpl.u.blob_handle.cb_arg = cb_arg; 1992 cpl.u.blob_handle.blob = blob; 1993 1994 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 1995 if (!seq) { 1996 _spdk_blob_free(blob); 1997 cb_fn(cb_arg, NULL, -ENOMEM); 1998 return; 1999 } 2000 2001 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2002 } 2003 2004 /* START spdk_bs_md_sync_blob */ 2005 static void 2006 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2007 { 2008 spdk_bs_sequence_finish(seq, bserrno); 2009 } 2010 2011 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2012 spdk_blob_op_complete cb_fn, void *cb_arg) 2013 { 2014 struct spdk_bs_cpl cpl; 2015 spdk_bs_sequence_t *seq; 2016 2017 assert(blob != NULL); 2018 2019 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2020 2021 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2022 blob->state != SPDK_BLOB_STATE_SYNCING); 2023 2024 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2025 cb_fn(cb_arg, 0); 2026 return; 2027 } 2028 2029 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2030 cpl.u.blob_basic.cb_fn = cb_fn; 2031 cpl.u.blob_basic.cb_arg = cb_arg; 2032 2033 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2034 if (!seq) { 2035 cb_fn(cb_arg, -ENOMEM); 2036 return; 2037 } 2038 2039 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2040 } 2041 2042 /* END spdk_bs_md_sync_blob */ 2043 2044 /* START spdk_bs_md_close_blob */ 2045 2046 static void 2047 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2048 { 2049 struct spdk_blob **blob = cb_arg; 2050 2051 if ((*blob)->open_ref == 0) { 2052 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2053 _spdk_blob_free((*blob)); 2054 } 2055 2056 *blob = NULL; 2057 2058 spdk_bs_sequence_finish(seq, bserrno); 2059 } 2060 2061 void spdk_bs_md_close_blob(struct spdk_blob **b, 2062 spdk_blob_op_complete cb_fn, void *cb_arg) 2063 { 2064 struct spdk_bs_cpl cpl; 2065 struct spdk_blob *blob; 2066 spdk_bs_sequence_t *seq; 2067 2068 assert(b != NULL); 2069 blob = *b; 2070 assert(blob != NULL); 2071 2072 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2073 2074 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2075 blob->state != SPDK_BLOB_STATE_SYNCING); 2076 2077 if (blob->open_ref == 0) { 2078 cb_fn(cb_arg, -EBADF); 2079 return; 2080 } 2081 2082 blob->open_ref--; 2083 2084 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2085 cpl.u.blob_basic.cb_fn = cb_fn; 2086 cpl.u.blob_basic.cb_arg = cb_arg; 2087 2088 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2089 if (!seq) { 2090 cb_fn(cb_arg, -ENOMEM); 2091 return; 2092 } 2093 2094 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2095 _spdk_blob_close_cpl(seq, b, 0); 2096 return; 2097 } 2098 2099 /* Sync metadata */ 2100 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2101 } 2102 2103 /* END spdk_bs_md_close_blob */ 2104 2105 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs, 2106 uint32_t priority) 2107 { 2108 return spdk_get_io_channel(&bs->io_target, priority, false, NULL); 2109 } 2110 2111 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2112 { 2113 spdk_put_io_channel(channel); 2114 } 2115 2116 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2117 spdk_blob_op_complete cb_fn, void *cb_arg) 2118 { 2119 /* Flush is synchronous right now */ 2120 cb_fn(cb_arg, 0); 2121 } 2122 2123 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2124 void *payload, uint64_t offset, uint64_t length, 2125 spdk_blob_op_complete cb_fn, void *cb_arg) 2126 { 2127 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2128 } 2129 2130 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2131 void *payload, uint64_t offset, uint64_t length, 2132 spdk_blob_op_complete cb_fn, void *cb_arg) 2133 { 2134 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2135 } 2136 2137 struct spdk_bs_iter_ctx { 2138 int64_t page_num; 2139 struct spdk_blob_store *bs; 2140 2141 spdk_blob_op_with_handle_complete cb_fn; 2142 void *cb_arg; 2143 }; 2144 2145 static void 2146 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2147 { 2148 struct spdk_bs_iter_ctx *ctx = cb_arg; 2149 struct spdk_blob_store *bs = ctx->bs; 2150 spdk_blob_id id; 2151 2152 if (bserrno == 0) { 2153 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2154 free(ctx); 2155 return; 2156 } 2157 2158 ctx->page_num++; 2159 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2160 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2161 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2162 free(ctx); 2163 return; 2164 } 2165 2166 id = (1ULL << 32) | ctx->page_num; 2167 2168 blob = _spdk_blob_lookup(bs, id); 2169 if (blob) { 2170 blob->open_ref++; 2171 ctx->cb_fn(ctx->cb_arg, blob, 0); 2172 free(ctx); 2173 return; 2174 } 2175 2176 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2177 } 2178 2179 void 2180 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2181 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2182 { 2183 struct spdk_bs_iter_ctx *ctx; 2184 2185 ctx = calloc(1, sizeof(*ctx)); 2186 if (!ctx) { 2187 cb_fn(cb_arg, NULL, -ENOMEM); 2188 return; 2189 } 2190 2191 ctx->page_num = -1; 2192 ctx->bs = bs; 2193 ctx->cb_fn = cb_fn; 2194 ctx->cb_arg = cb_arg; 2195 2196 _spdk_bs_iter_cpl(ctx, NULL, -1); 2197 } 2198 2199 static void 2200 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2201 { 2202 struct spdk_bs_iter_ctx *ctx = cb_arg; 2203 2204 _spdk_bs_iter_cpl(ctx, NULL, -1); 2205 } 2206 2207 void 2208 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2209 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2210 { 2211 struct spdk_bs_iter_ctx *ctx; 2212 struct spdk_blob *blob; 2213 2214 assert(b != NULL); 2215 blob = *b; 2216 assert(blob != NULL); 2217 2218 ctx = calloc(1, sizeof(*ctx)); 2219 if (!ctx) { 2220 cb_fn(cb_arg, NULL, -ENOMEM); 2221 return; 2222 } 2223 2224 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2225 ctx->bs = bs; 2226 ctx->cb_fn = cb_fn; 2227 ctx->cb_arg = cb_arg; 2228 2229 /* Close the existing blob */ 2230 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2231 } 2232 2233 int 2234 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2235 uint16_t value_len) 2236 { 2237 struct spdk_xattr *xattr; 2238 2239 assert(blob != NULL); 2240 2241 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2242 blob->state != SPDK_BLOB_STATE_SYNCING); 2243 2244 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2245 if (!strcmp(name, xattr->name)) { 2246 free(xattr->value); 2247 xattr->value_len = value_len; 2248 xattr->value = malloc(value_len); 2249 memcpy(xattr->value, value, value_len); 2250 2251 blob->state = SPDK_BLOB_STATE_DIRTY; 2252 2253 return 0; 2254 } 2255 } 2256 2257 xattr = calloc(1, sizeof(*xattr)); 2258 if (!xattr) { 2259 return -1; 2260 } 2261 xattr->name = strdup(name); 2262 xattr->value_len = value_len; 2263 xattr->value = malloc(value_len); 2264 memcpy(xattr->value, value, value_len); 2265 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2266 2267 blob->state = SPDK_BLOB_STATE_DIRTY; 2268 2269 return 0; 2270 } 2271 2272 int 2273 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2274 { 2275 struct spdk_xattr *xattr; 2276 2277 assert(blob != NULL); 2278 2279 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2280 blob->state != SPDK_BLOB_STATE_SYNCING); 2281 2282 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2283 if (!strcmp(name, xattr->name)) { 2284 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2285 free(xattr->value); 2286 free(xattr->name); 2287 free(xattr); 2288 2289 blob->state = SPDK_BLOB_STATE_DIRTY; 2290 2291 return 0; 2292 } 2293 } 2294 2295 return -ENOENT; 2296 } 2297 2298 int 2299 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2300 const void **value, size_t *value_len) 2301 { 2302 struct spdk_xattr *xattr; 2303 2304 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2305 if (!strcmp(name, xattr->name)) { 2306 *value = xattr->value; 2307 *value_len = xattr->value_len; 2308 return 0; 2309 } 2310 } 2311 2312 return -ENOENT; 2313 } 2314 2315 struct spdk_xattr_names { 2316 uint32_t count; 2317 const char *names[0]; 2318 }; 2319 2320 int 2321 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2322 struct spdk_xattr_names **names) 2323 { 2324 struct spdk_xattr *xattr; 2325 int count = 0; 2326 2327 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2328 count++; 2329 } 2330 2331 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2332 if (*names == NULL) { 2333 return -ENOMEM; 2334 } 2335 2336 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2337 (*names)->names[(*names)->count++] = xattr->name; 2338 } 2339 2340 return 0; 2341 } 2342 2343 uint32_t 2344 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2345 { 2346 assert(names != NULL); 2347 2348 return names->count; 2349 } 2350 2351 const char * 2352 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2353 { 2354 if (index >= names->count) { 2355 return NULL; 2356 } 2357 2358 return names->names[index]; 2359 } 2360 2361 void 2362 spdk_xattr_names_free(struct spdk_xattr_names *names) 2363 { 2364 free(names); 2365 } 2366 2367 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2368