1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include <stdbool.h> 35 #include <assert.h> 36 #include <errno.h> 37 #include <limits.h> 38 #include <stdlib.h> 39 #include <string.h> 40 41 #include "spdk/blob.h" 42 #include "spdk/env.h" 43 #include "spdk/queue.h" 44 #include "spdk/io_channel.h" 45 #include "spdk/bit_array.h" 46 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 #include "request.h" 51 52 static inline size_t 53 divide_round_up(size_t num, size_t divisor) 54 { 55 return (num + divisor - 1) / divisor; 56 } 57 58 static void 59 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 60 { 61 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 62 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 63 assert(bs->num_free_clusters > 0); 64 65 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 66 67 spdk_bit_array_set(bs->used_clusters, cluster_num); 68 bs->num_free_clusters--; 69 } 70 71 static void 72 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 73 { 74 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 75 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 76 assert(bs->num_free_clusters < bs->total_clusters); 77 78 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 79 80 spdk_bit_array_clear(bs->used_clusters, cluster_num); 81 bs->num_free_clusters++; 82 } 83 84 static struct spdk_blob * 85 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 86 { 87 struct spdk_blob *blob; 88 89 blob = calloc(1, sizeof(*blob)); 90 if (!blob) { 91 return NULL; 92 } 93 94 blob->id = id; 95 blob->bs = bs; 96 97 blob->state = SPDK_BLOB_STATE_DIRTY; 98 blob->active.num_pages = 1; 99 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 100 if (!blob->active.pages) { 101 free(blob); 102 return NULL; 103 } 104 105 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 106 107 TAILQ_INIT(&blob->xattrs); 108 109 return blob; 110 } 111 112 static void 113 _spdk_blob_free(struct spdk_blob *blob) 114 { 115 struct spdk_xattr *xattr, *xattr_tmp; 116 117 assert(blob != NULL); 118 119 free(blob->active.clusters); 120 free(blob->clean.clusters); 121 free(blob->active.pages); 122 free(blob->clean.pages); 123 124 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 125 TAILQ_REMOVE(&blob->xattrs, xattr, link); 126 free(xattr->name); 127 free(xattr->value); 128 free(xattr); 129 } 130 131 free(blob); 132 } 133 134 static int 135 _spdk_blob_mark_clean(struct spdk_blob *blob) 136 { 137 uint64_t *clusters = NULL; 138 uint32_t *pages = NULL; 139 140 assert(blob != NULL); 141 assert(blob->state == SPDK_BLOB_STATE_LOADING || 142 blob->state == SPDK_BLOB_STATE_SYNCING); 143 144 if (blob->active.num_clusters) { 145 assert(blob->active.clusters); 146 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 147 if (!clusters) { 148 return -1; 149 } 150 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 151 } 152 153 if (blob->active.num_pages) { 154 assert(blob->active.pages); 155 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 156 if (!pages) { 157 free(clusters); 158 return -1; 159 } 160 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 161 } 162 163 free(blob->clean.clusters); 164 free(blob->clean.pages); 165 166 blob->clean.num_clusters = blob->active.num_clusters; 167 blob->clean.clusters = blob->active.clusters; 168 blob->clean.num_pages = blob->active.num_pages; 169 blob->clean.pages = blob->active.pages; 170 171 blob->active.clusters = clusters; 172 blob->active.pages = pages; 173 174 blob->state = SPDK_BLOB_STATE_CLEAN; 175 176 return 0; 177 } 178 179 static void 180 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 181 { 182 struct spdk_blob_md_descriptor *desc; 183 size_t cur_desc = 0; 184 void *tmp; 185 186 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 187 while (cur_desc < sizeof(page->descriptors)) { 188 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 189 if (desc->length == 0) { 190 /* If padding and length are 0, this terminates the page */ 191 break; 192 } 193 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 194 struct spdk_blob_md_descriptor_extent *desc_extent; 195 unsigned int i, j; 196 unsigned int cluster_count = blob->active.num_clusters; 197 198 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 199 200 assert(desc_extent->length > 0); 201 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 202 203 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 204 for (j = 0; j < desc_extent->extents[i].length; j++) { 205 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 206 cluster_count++; 207 } 208 } 209 210 assert(cluster_count > 0); 211 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 212 assert(tmp != NULL); 213 blob->active.clusters = tmp; 214 blob->active.cluster_array_size = cluster_count; 215 216 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 217 for (j = 0; j < desc_extent->extents[i].length; j++) { 218 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 219 desc_extent->extents[i].cluster_idx + j); 220 } 221 } 222 223 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 224 struct spdk_blob_md_descriptor_xattr *desc_xattr; 225 struct spdk_xattr *xattr; 226 227 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 228 229 xattr = calloc(1, sizeof(*xattr)); 230 assert(xattr != NULL); 231 232 xattr->name = malloc(desc_xattr->name_length + 1); 233 assert(xattr->name); 234 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 235 xattr->name[desc_xattr->name_length] = '\0'; 236 237 xattr->value = malloc(desc_xattr->value_length); 238 assert(xattr->value != NULL); 239 xattr->value_len = desc_xattr->value_length; 240 memcpy(xattr->value, 241 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 242 desc_xattr->value_length); 243 244 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 245 } else { 246 /* Error */ 247 break; 248 } 249 250 /* Advance to the next descriptor */ 251 cur_desc += sizeof(*desc) + desc->length; 252 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 253 break; 254 } 255 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 256 } 257 } 258 259 static int 260 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 261 struct spdk_blob *blob) 262 { 263 const struct spdk_blob_md_page *page; 264 uint32_t i; 265 266 assert(page_count > 0); 267 assert(pages[0].sequence_num == 0); 268 assert(blob != NULL); 269 assert(blob->state == SPDK_BLOB_STATE_LOADING); 270 assert(blob->active.clusters == NULL); 271 assert(blob->id == pages[0].id); 272 assert(blob->state == SPDK_BLOB_STATE_LOADING); 273 274 for (i = 0; i < page_count; i++) { 275 page = &pages[i]; 276 277 assert(page->id == blob->id); 278 assert(page->sequence_num == i); 279 280 _spdk_blob_parse_page(page, blob); 281 } 282 283 return 0; 284 } 285 286 static int 287 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 288 struct spdk_blob_md_page **pages, 289 uint32_t *page_count, 290 struct spdk_blob_md_page **last_page) 291 { 292 struct spdk_blob_md_page *page; 293 294 assert(pages != NULL); 295 assert(page_count != NULL); 296 297 if (*page_count == 0) { 298 assert(*pages == NULL); 299 *page_count = 1; 300 *pages = spdk_malloc(sizeof(struct spdk_blob_md_page), 301 sizeof(struct spdk_blob_md_page), 302 NULL); 303 } else { 304 assert(*pages != NULL); 305 (*page_count)++; 306 *pages = spdk_realloc(*pages, 307 sizeof(struct spdk_blob_md_page) * (*page_count), 308 sizeof(struct spdk_blob_md_page), 309 NULL); 310 } 311 312 if (*pages == NULL) { 313 *page_count = 0; 314 *last_page = NULL; 315 return -ENOMEM; 316 } 317 318 page = &(*pages)[*page_count - 1]; 319 memset(page, 0, sizeof(*page)); 320 page->id = blob->id; 321 page->sequence_num = *page_count - 1; 322 page->next = SPDK_INVALID_MD_PAGE; 323 *last_page = page; 324 325 return 0; 326 } 327 328 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 329 * Update required_sz on both success and failure. 330 * 331 */ 332 static int 333 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 334 uint8_t *buf, size_t buf_sz, 335 size_t *required_sz) 336 { 337 struct spdk_blob_md_descriptor_xattr *desc; 338 339 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 340 strlen(xattr->name) + 341 xattr->value_len; 342 343 if (buf_sz < *required_sz) { 344 return -1; 345 } 346 347 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 348 349 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 350 desc->length = sizeof(desc->name_length) + 351 sizeof(desc->value_length) + 352 strlen(xattr->name) + 353 xattr->value_len; 354 desc->name_length = strlen(xattr->name); 355 desc->value_length = xattr->value_len; 356 357 memcpy(desc->name, xattr->name, desc->name_length); 358 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 359 xattr->value, 360 desc->value_length); 361 362 return 0; 363 } 364 365 static void 366 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 367 uint64_t start_cluster, uint64_t *next_cluster, 368 uint8_t *buf, size_t buf_sz) 369 { 370 struct spdk_blob_md_descriptor_extent *desc; 371 size_t cur_sz; 372 uint64_t i, extent_idx; 373 uint32_t lba, lba_per_cluster, lba_count; 374 375 /* The buffer must have room for at least one extent */ 376 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 377 if (buf_sz < cur_sz) { 378 *next_cluster = start_cluster; 379 return; 380 } 381 382 desc = (struct spdk_blob_md_descriptor_extent *)buf; 383 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 384 385 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 386 387 lba = blob->active.clusters[start_cluster]; 388 lba_count = lba_per_cluster; 389 extent_idx = 0; 390 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 391 if ((lba + lba_count) == blob->active.clusters[i]) { 392 lba_count += lba_per_cluster; 393 continue; 394 } 395 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 396 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 397 extent_idx++; 398 399 cur_sz += sizeof(desc->extents[extent_idx]); 400 401 if (buf_sz < cur_sz) { 402 /* If we ran out of buffer space, return */ 403 desc->length = sizeof(desc->extents[0]) * extent_idx; 404 *next_cluster = i; 405 return; 406 } 407 408 lba = blob->active.clusters[i]; 409 lba_count = lba_per_cluster; 410 } 411 412 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 413 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 414 extent_idx++; 415 416 desc->length = sizeof(desc->extents[0]) * extent_idx; 417 *next_cluster = blob->active.num_clusters; 418 419 return; 420 } 421 422 static int 423 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 424 uint32_t *page_count) 425 { 426 struct spdk_blob_md_page *cur_page; 427 const struct spdk_xattr *xattr; 428 int rc; 429 uint8_t *buf; 430 size_t remaining_sz; 431 432 assert(pages != NULL); 433 assert(page_count != NULL); 434 assert(blob != NULL); 435 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 436 437 *pages = NULL; 438 *page_count = 0; 439 440 /* A blob always has at least 1 page, even if it has no descriptors */ 441 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 442 if (rc < 0) { 443 return rc; 444 } 445 446 buf = (uint8_t *)cur_page->descriptors; 447 remaining_sz = sizeof(cur_page->descriptors); 448 449 /* Serialize xattrs */ 450 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 451 size_t required_sz = 0; 452 rc = _spdk_blob_serialize_xattr(xattr, 453 buf, remaining_sz, 454 &required_sz); 455 if (rc < 0) { 456 /* Need to add a new page to the chain */ 457 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 458 &cur_page); 459 if (rc < 0) { 460 spdk_free(*pages); 461 *pages = NULL; 462 *page_count = 0; 463 return rc; 464 } 465 466 buf = (uint8_t *)cur_page->descriptors; 467 remaining_sz = sizeof(cur_page->descriptors); 468 469 /* Try again */ 470 required_sz = 0; 471 rc = _spdk_blob_serialize_xattr(xattr, 472 buf, remaining_sz, 473 &required_sz); 474 475 if (rc < 0) { 476 spdk_free(*pages); 477 *pages = NULL; 478 *page_count = 0; 479 return -1; 480 } 481 } 482 483 remaining_sz -= required_sz; 484 buf += required_sz; 485 } 486 487 /* Serialize extents */ 488 uint64_t last_cluster = 0; 489 while (last_cluster < blob->active.num_clusters) { 490 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 491 buf, remaining_sz); 492 493 if (last_cluster == blob->active.num_clusters) { 494 break; 495 } 496 497 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 498 &cur_page); 499 if (rc < 0) { 500 return rc; 501 } 502 503 buf = (uint8_t *)cur_page->descriptors; 504 remaining_sz = sizeof(cur_page->descriptors); 505 } 506 507 return 0; 508 } 509 510 struct spdk_blob_load_ctx { 511 struct spdk_blob *blob; 512 513 struct spdk_blob_md_page *pages; 514 uint32_t num_pages; 515 516 spdk_bs_sequence_cpl cb_fn; 517 void *cb_arg; 518 }; 519 520 static void 521 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 522 { 523 struct spdk_blob_load_ctx *ctx = cb_arg; 524 struct spdk_blob *blob = ctx->blob; 525 struct spdk_blob_md_page *page; 526 int rc; 527 528 page = &ctx->pages[ctx->num_pages - 1]; 529 530 if (page->next != SPDK_INVALID_MD_PAGE) { 531 uint32_t next_page = page->next; 532 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 533 534 535 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 536 537 /* Read the next page */ 538 ctx->num_pages++; 539 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 540 sizeof(*page), NULL); 541 if (ctx->pages == NULL) { 542 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 543 free(ctx); 544 return; 545 } 546 547 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 548 next_lba, 549 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 550 _spdk_blob_load_cpl, ctx); 551 return; 552 } 553 554 /* Parse the pages */ 555 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 556 557 _spdk_blob_mark_clean(blob); 558 559 ctx->cb_fn(seq, ctx->cb_arg, rc); 560 561 /* Free the memory */ 562 spdk_free(ctx->pages); 563 free(ctx); 564 } 565 566 /* Load a blob from disk given a blobid */ 567 static void 568 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 569 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 570 { 571 struct spdk_blob_load_ctx *ctx; 572 struct spdk_blob_store *bs; 573 uint32_t page_num; 574 uint64_t lba; 575 576 assert(blob != NULL); 577 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 578 blob->state == SPDK_BLOB_STATE_DIRTY); 579 580 bs = blob->bs; 581 582 ctx = calloc(1, sizeof(*ctx)); 583 if (!ctx) { 584 cb_fn(seq, cb_arg, -ENOMEM); 585 return; 586 } 587 588 ctx->blob = blob; 589 ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page), 590 sizeof(struct spdk_blob_md_page), NULL); 591 if (!ctx->pages) { 592 free(ctx); 593 cb_fn(seq, cb_arg, -ENOMEM); 594 return; 595 } 596 ctx->num_pages = 1; 597 ctx->cb_fn = cb_fn; 598 ctx->cb_arg = cb_arg; 599 600 page_num = _spdk_bs_blobid_to_page(blob->id); 601 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 602 603 blob->state = SPDK_BLOB_STATE_LOADING; 604 605 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 606 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)), 607 _spdk_blob_load_cpl, ctx); 608 } 609 610 struct spdk_blob_persist_ctx { 611 struct spdk_blob *blob; 612 613 struct spdk_blob_md_page *pages; 614 615 uint64_t idx; 616 617 spdk_bs_sequence_cpl cb_fn; 618 void *cb_arg; 619 }; 620 621 static void 622 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 623 { 624 struct spdk_blob_persist_ctx *ctx = cb_arg; 625 struct spdk_blob *blob = ctx->blob; 626 627 if (bserrno == 0) { 628 _spdk_blob_mark_clean(blob); 629 } 630 631 /* Call user callback */ 632 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 633 634 /* Free the memory */ 635 spdk_free(ctx->pages); 636 free(ctx); 637 } 638 639 static void 640 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 641 { 642 struct spdk_blob_persist_ctx *ctx = cb_arg; 643 struct spdk_blob *blob = ctx->blob; 644 struct spdk_blob_store *bs = blob->bs; 645 void *tmp; 646 size_t i; 647 648 /* Release all clusters that were truncated */ 649 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 650 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 651 652 _spdk_bs_release_cluster(bs, cluster_num); 653 } 654 655 if (blob->active.num_clusters == 0) { 656 free(blob->active.clusters); 657 blob->active.clusters = NULL; 658 blob->active.cluster_array_size = 0; 659 } else { 660 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 661 assert(tmp != NULL); 662 blob->active.clusters = tmp; 663 blob->active.cluster_array_size = blob->active.num_clusters; 664 } 665 666 _spdk_blob_persist_complete(seq, ctx, bserrno); 667 } 668 669 static void 670 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 671 { 672 struct spdk_blob_persist_ctx *ctx = cb_arg; 673 struct spdk_blob *blob = ctx->blob; 674 struct spdk_blob_store *bs = blob->bs; 675 spdk_bs_batch_t *batch; 676 size_t i; 677 678 /* Clusters don't move around in blobs. The list shrinks or grows 679 * at the end, but no changes ever occur in the middle of the list. 680 */ 681 682 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 683 684 /* Unmap all clusters that were truncated */ 685 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 686 uint64_t lba = blob->active.clusters[i]; 687 uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1); 688 689 spdk_bs_batch_unmap(batch, lba, lba_count); 690 } 691 692 spdk_bs_batch_close(batch); 693 } 694 695 static void 696 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 697 { 698 struct spdk_blob_persist_ctx *ctx = cb_arg; 699 struct spdk_blob *blob = ctx->blob; 700 struct spdk_blob_store *bs = blob->bs; 701 size_t i; 702 703 /* This loop starts at 1 because the first page is special and handled 704 * below. The pages (except the first) are never written in place, 705 * so any pages in the clean list must be unmapped. 706 */ 707 for (i = 1; i < blob->clean.num_pages; i++) { 708 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 709 } 710 711 if (blob->active.num_pages == 0) { 712 uint32_t page_num; 713 714 page_num = _spdk_bs_blobid_to_page(blob->id); 715 spdk_bit_array_clear(bs->used_md_pages, page_num); 716 } 717 718 /* Move on to unmapping clusters */ 719 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 720 } 721 722 static void 723 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 724 { 725 struct spdk_blob_persist_ctx *ctx = cb_arg; 726 struct spdk_blob *blob = ctx->blob; 727 struct spdk_blob_store *bs = blob->bs; 728 uint64_t lba; 729 uint32_t lba_count; 730 spdk_bs_batch_t *batch; 731 size_t i; 732 733 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 734 735 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)); 736 737 /* This loop starts at 1 because the first page is special and handled 738 * below. The pages (except the first) are never written in place, 739 * so any pages in the clean list must be unmapped. 740 */ 741 for (i = 1; i < blob->clean.num_pages; i++) { 742 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 743 744 spdk_bs_batch_unmap(batch, lba, lba_count); 745 } 746 747 /* The first page will only be unmapped if this is a delete. */ 748 if (blob->active.num_pages == 0) { 749 uint32_t page_num; 750 751 /* The first page in the metadata goes where the blobid indicates */ 752 page_num = _spdk_bs_blobid_to_page(blob->id); 753 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 754 755 spdk_bs_batch_unmap(batch, lba, lba_count); 756 } 757 758 spdk_bs_batch_close(batch); 759 } 760 761 static void 762 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 763 { 764 struct spdk_blob_persist_ctx *ctx = cb_arg; 765 struct spdk_blob *blob = ctx->blob; 766 struct spdk_blob_store *bs = blob->bs; 767 uint64_t lba; 768 uint32_t lba_count; 769 struct spdk_blob_md_page *page; 770 771 if (blob->active.num_pages == 0) { 772 /* Move on to the next step */ 773 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 774 return; 775 } 776 777 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 778 779 page = &ctx->pages[0]; 780 /* The first page in the metadata goes where the blobid indicates */ 781 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 782 783 spdk_bs_sequence_write(seq, page, lba, lba_count, 784 _spdk_blob_persist_unmap_pages, ctx); 785 } 786 787 static void 788 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 789 { 790 struct spdk_blob_persist_ctx *ctx = cb_arg; 791 struct spdk_blob *blob = ctx->blob; 792 struct spdk_blob_store *bs = blob->bs; 793 uint64_t lba; 794 uint32_t lba_count; 795 struct spdk_blob_md_page *page; 796 spdk_bs_batch_t *batch; 797 size_t i; 798 799 /* Clusters don't move around in blobs. The list shrinks or grows 800 * at the end, but no changes ever occur in the middle of the list. 801 */ 802 803 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 804 805 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 806 807 /* This starts at 1. The root page is not written until 808 * all of the others are finished 809 */ 810 for (i = 1; i < blob->active.num_pages; i++) { 811 page = &ctx->pages[i]; 812 assert(page->sequence_num == i); 813 814 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 815 816 spdk_bs_batch_write(batch, page, lba, lba_count); 817 } 818 819 spdk_bs_batch_close(batch); 820 } 821 822 static int 823 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 824 { 825 uint64_t i; 826 uint64_t *tmp; 827 uint64_t lfc; /* lowest free cluster */ 828 struct spdk_blob_store *bs; 829 830 bs = blob->bs; 831 832 assert(blob->state != SPDK_BLOB_STATE_LOADING && 833 blob->state != SPDK_BLOB_STATE_SYNCING); 834 835 if (blob->active.num_clusters == sz) { 836 return 0; 837 } 838 839 if (blob->active.num_clusters < blob->active.cluster_array_size) { 840 /* If this blob was resized to be larger, then smaller, then 841 * larger without syncing, then the cluster array already 842 * contains spare assigned clusters we can use. 843 */ 844 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 845 sz); 846 } 847 848 blob->state = SPDK_BLOB_STATE_DIRTY; 849 850 /* Do two passes - one to verify that we can obtain enough clusters 851 * and another to actually claim them. 852 */ 853 854 lfc = 0; 855 for (i = blob->active.num_clusters; i < sz; i++) { 856 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 857 if (lfc >= bs->total_clusters) { 858 /* No more free clusters. Cannot satisfy the request */ 859 assert(false); 860 return -1; 861 } 862 lfc++; 863 } 864 865 if (sz > blob->active.num_clusters) { 866 /* Expand the cluster array if necessary. 867 * We only shrink the array when persisting. 868 */ 869 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 870 if (sz > 0 && tmp == NULL) { 871 assert(false); 872 return -1; 873 } 874 blob->active.clusters = tmp; 875 blob->active.cluster_array_size = sz; 876 } 877 878 lfc = 0; 879 for (i = blob->active.num_clusters; i < sz; i++) { 880 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 881 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 882 _spdk_bs_claim_cluster(bs, lfc); 883 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 884 lfc++; 885 } 886 887 blob->active.num_clusters = sz; 888 889 return 0; 890 } 891 892 /* Write a blob to disk */ 893 static void 894 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 895 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 896 { 897 struct spdk_blob_persist_ctx *ctx; 898 int rc; 899 uint64_t i; 900 uint32_t page_num; 901 struct spdk_blob_store *bs; 902 903 assert(blob != NULL); 904 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 905 blob->state == SPDK_BLOB_STATE_DIRTY); 906 907 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 908 cb_fn(seq, cb_arg, 0); 909 return; 910 } 911 912 bs = blob->bs; 913 914 ctx = calloc(1, sizeof(*ctx)); 915 if (!ctx) { 916 cb_fn(seq, cb_arg, -ENOMEM); 917 return; 918 } 919 ctx->blob = blob; 920 ctx->cb_fn = cb_fn; 921 ctx->cb_arg = cb_arg; 922 923 blob->state = SPDK_BLOB_STATE_SYNCING; 924 925 if (blob->active.num_pages == 0) { 926 /* This is the signal that the blob should be deleted. 927 * Immediately jump to the clean up routine. */ 928 assert(blob->clean.num_pages > 0); 929 ctx->idx = blob->clean.num_pages - 1; 930 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 931 return; 932 933 } 934 935 /* Generate the new metadata */ 936 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 937 if (rc < 0) { 938 free(ctx); 939 cb_fn(seq, cb_arg, rc); 940 return; 941 } 942 943 assert(blob->active.num_pages >= 1); 944 945 /* Resize the cache of page indices */ 946 blob->active.pages = realloc(blob->active.pages, 947 blob->active.num_pages * sizeof(*blob->active.pages)); 948 if (!blob->active.pages) { 949 free(ctx); 950 cb_fn(seq, cb_arg, -ENOMEM); 951 return; 952 } 953 954 /* Assign this metadata to pages. This requires two passes - 955 * one to verify that there are enough pages and a second 956 * to actually claim them. */ 957 page_num = 0; 958 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 959 for (i = 1; i < blob->active.num_pages; i++) { 960 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 961 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 962 spdk_free(ctx->pages); 963 free(ctx); 964 blob->state = SPDK_BLOB_STATE_DIRTY; 965 cb_fn(seq, cb_arg, -ENOMEM); 966 return; 967 } 968 page_num++; 969 } 970 971 page_num = 0; 972 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 973 for (i = 1; i < blob->active.num_pages; i++) { 974 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 975 ctx->pages[i - 1].next = page_num; 976 blob->active.pages[i] = page_num; 977 spdk_bit_array_set(bs->used_md_pages, page_num); 978 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 979 page_num++; 980 } 981 982 /* Start writing the metadata from last page to first */ 983 ctx->idx = blob->active.num_pages - 1; 984 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 985 } 986 987 static void 988 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 989 void *payload, uint64_t offset, uint64_t length, 990 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 991 { 992 spdk_bs_batch_t *batch; 993 struct spdk_bs_cpl cpl; 994 uint64_t lba; 995 uint32_t lba_count; 996 uint8_t *buf; 997 uint64_t page; 998 999 assert(blob != NULL); 1000 1001 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1002 cb_fn(cb_arg, -EINVAL); 1003 return; 1004 } 1005 1006 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1007 cpl.u.blob_basic.cb_fn = cb_fn; 1008 cpl.u.blob_basic.cb_arg = cb_arg; 1009 1010 batch = spdk_bs_batch_open(_channel, &cpl); 1011 if (!batch) { 1012 cb_fn(cb_arg, -ENOMEM); 1013 return; 1014 } 1015 1016 length = _spdk_bs_page_to_lba(blob->bs, length); 1017 page = offset; 1018 buf = payload; 1019 while (length > 0) { 1020 lba = _spdk_bs_blob_page_to_lba(blob, page); 1021 lba_count = spdk_min(length, 1022 _spdk_bs_page_to_lba(blob->bs, 1023 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1024 1025 if (read) { 1026 spdk_bs_batch_read(batch, buf, lba, lba_count); 1027 } else { 1028 spdk_bs_batch_write(batch, buf, lba, lba_count); 1029 } 1030 1031 length -= lba_count; 1032 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1033 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1034 } 1035 1036 spdk_bs_batch_close(batch); 1037 } 1038 1039 static struct spdk_blob * 1040 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1041 { 1042 struct spdk_blob *blob; 1043 1044 TAILQ_FOREACH(blob, &bs->blobs, link) { 1045 if (blob->id == blobid) { 1046 return blob; 1047 } 1048 } 1049 1050 return NULL; 1051 } 1052 1053 static int 1054 _spdk_bs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 1055 { 1056 struct spdk_blob_store *bs = io_device; 1057 struct spdk_bs_dev *dev = bs->dev; 1058 struct spdk_bs_channel *channel = ctx_buf; 1059 uint32_t max_ops = *(uint32_t *)unique_ctx; 1060 uint32_t i; 1061 1062 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1063 if (!channel->req_mem) { 1064 free(channel); 1065 return -1; 1066 } 1067 1068 TAILQ_INIT(&channel->reqs); 1069 1070 for (i = 0; i < max_ops; i++) { 1071 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1072 } 1073 1074 channel->bs = bs; 1075 channel->dev = dev; 1076 channel->dev_channel = dev->create_channel(dev); 1077 1078 return 0; 1079 } 1080 1081 static void 1082 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1083 { 1084 struct spdk_bs_channel *channel = ctx_buf; 1085 1086 free(channel->req_mem); 1087 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1088 } 1089 1090 static void 1091 _spdk_bs_free(struct spdk_blob_store *bs) 1092 { 1093 struct spdk_blob *blob, *blob_tmp; 1094 1095 spdk_bs_unregister_md_thread(bs); 1096 spdk_io_device_unregister(bs); 1097 1098 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1099 TAILQ_REMOVE(&bs->blobs, blob, link); 1100 _spdk_blob_free(blob); 1101 } 1102 1103 spdk_bit_array_free(&bs->used_md_pages); 1104 spdk_bit_array_free(&bs->used_clusters); 1105 1106 bs->dev->destroy(bs->dev); 1107 free(bs); 1108 } 1109 1110 void 1111 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1112 { 1113 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1114 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1115 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1116 } 1117 1118 static struct spdk_blob_store * 1119 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1120 { 1121 struct spdk_blob_store *bs; 1122 1123 bs = calloc(1, sizeof(struct spdk_blob_store)); 1124 if (!bs) { 1125 return NULL; 1126 } 1127 1128 TAILQ_INIT(&bs->blobs); 1129 bs->dev = dev; 1130 1131 /* 1132 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1133 * even multiple of the cluster size. 1134 */ 1135 bs->cluster_sz = opts->cluster_sz; 1136 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1137 bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1138 bs->num_free_clusters = bs->total_clusters; 1139 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1140 if (bs->used_clusters == NULL) { 1141 _spdk_bs_free(bs); 1142 return NULL; 1143 } 1144 1145 bs->max_md_ops = opts->max_md_ops; 1146 bs->super_blob = SPDK_BLOBID_INVALID; 1147 1148 /* The metadata is assumed to be at least 1 page */ 1149 bs->used_md_pages = spdk_bit_array_create(1); 1150 1151 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 1152 sizeof(struct spdk_bs_channel)); 1153 spdk_bs_register_md_thread(bs); 1154 1155 return bs; 1156 } 1157 1158 /* START spdk_bs_load */ 1159 1160 struct spdk_bs_load_ctx { 1161 struct spdk_blob_store *bs; 1162 struct spdk_bs_super_block *super; 1163 1164 struct spdk_bs_md_mask *mask; 1165 }; 1166 1167 static void 1168 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1169 { 1170 struct spdk_bs_load_ctx *ctx = cb_arg; 1171 uint32_t i, j; 1172 int rc; 1173 1174 /* The type must be correct */ 1175 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1176 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1177 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1178 struct spdk_blob_md_page) * 8)); 1179 /* The length of the mask must be exactly equal to the total number of clusters */ 1180 assert(ctx->mask->length == ctx->bs->total_clusters); 1181 1182 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1183 if (rc < 0) { 1184 spdk_free(ctx->super); 1185 spdk_free(ctx->mask); 1186 _spdk_bs_free(ctx->bs); 1187 free(ctx); 1188 spdk_bs_sequence_finish(seq, -ENOMEM); 1189 return; 1190 } 1191 1192 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1193 for (i = 0; i < ctx->mask->length / 8; i++) { 1194 uint8_t segment = ctx->mask->mask[i]; 1195 for (j = 0; segment && (j < 8); j++) { 1196 if (segment & 1U) { 1197 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1198 assert(ctx->bs->num_free_clusters > 0); 1199 ctx->bs->num_free_clusters--; 1200 } 1201 segment >>= 1U; 1202 } 1203 } 1204 1205 spdk_free(ctx->super); 1206 spdk_free(ctx->mask); 1207 free(ctx); 1208 1209 spdk_bs_sequence_finish(seq, bserrno); 1210 } 1211 1212 static void 1213 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1214 { 1215 struct spdk_bs_load_ctx *ctx = cb_arg; 1216 uint64_t lba, lba_count; 1217 uint32_t i, j; 1218 int rc; 1219 1220 /* The type must be correct */ 1221 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1222 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1223 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) * 1224 8)); 1225 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1226 assert(ctx->mask->length == ctx->super->md_len); 1227 1228 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1229 if (rc < 0) { 1230 spdk_free(ctx->super); 1231 spdk_free(ctx->mask); 1232 _spdk_bs_free(ctx->bs); 1233 free(ctx); 1234 spdk_bs_sequence_finish(seq, -ENOMEM); 1235 return; 1236 } 1237 1238 for (i = 0; i < ctx->mask->length / 8; i++) { 1239 uint8_t segment = ctx->mask->mask[i]; 1240 for (j = 0; segment && (j < 8); j++) { 1241 if (segment & 1U) { 1242 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1243 } 1244 segment >>= 1U; 1245 } 1246 } 1247 spdk_free(ctx->mask); 1248 1249 /* Read the used clusters mask */ 1250 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page), 1251 0x1000, NULL); 1252 if (!ctx->mask) { 1253 spdk_free(ctx->super); 1254 _spdk_bs_free(ctx->bs); 1255 free(ctx); 1256 spdk_bs_sequence_finish(seq, -ENOMEM); 1257 return; 1258 } 1259 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1260 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1261 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1262 _spdk_bs_load_used_clusters_cpl, ctx); 1263 } 1264 1265 static void 1266 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1267 { 1268 struct spdk_bs_load_ctx *ctx = cb_arg; 1269 uint64_t lba, lba_count; 1270 1271 if (ctx->super->version != SPDK_BS_VERSION) { 1272 spdk_free(ctx->super); 1273 _spdk_bs_free(ctx->bs); 1274 free(ctx); 1275 spdk_bs_sequence_finish(seq, -EILSEQ); 1276 return; 1277 } 1278 1279 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1280 sizeof(ctx->super->signature)) != 0) { 1281 spdk_free(ctx->super); 1282 _spdk_bs_free(ctx->bs); 1283 free(ctx); 1284 spdk_bs_sequence_finish(seq, -EILSEQ); 1285 return; 1286 } 1287 1288 if (ctx->super->clean != 1) { 1289 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1290 * All of the necessary data to recover is available 1291 * on disk - the code just has not been written yet. 1292 */ 1293 assert(false); 1294 spdk_free(ctx->super); 1295 _spdk_bs_free(ctx->bs); 1296 free(ctx); 1297 spdk_bs_sequence_finish(seq, -EILSEQ); 1298 return; 1299 } 1300 ctx->super->clean = 0; 1301 1302 /* Parse the super block */ 1303 ctx->bs->cluster_sz = ctx->super->cluster_size; 1304 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1305 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1306 ctx->bs->md_start = ctx->super->md_start; 1307 ctx->bs->md_len = ctx->super->md_len; 1308 1309 /* Read the used pages mask */ 1310 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000, 1311 NULL); 1312 if (!ctx->mask) { 1313 spdk_free(ctx->super); 1314 _spdk_bs_free(ctx->bs); 1315 free(ctx); 1316 spdk_bs_sequence_finish(seq, -ENOMEM); 1317 return; 1318 } 1319 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1320 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1321 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1322 _spdk_bs_load_used_pages_cpl, ctx); 1323 } 1324 1325 void 1326 spdk_bs_load(struct spdk_bs_dev *dev, 1327 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1328 { 1329 struct spdk_blob_store *bs; 1330 struct spdk_bs_cpl cpl; 1331 spdk_bs_sequence_t *seq; 1332 struct spdk_bs_load_ctx *ctx; 1333 struct spdk_bs_opts opts = {}; 1334 1335 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1336 1337 spdk_bs_opts_init(&opts); 1338 1339 bs = _spdk_bs_alloc(dev, &opts); 1340 if (!bs) { 1341 cb_fn(cb_arg, NULL, -ENOMEM); 1342 return; 1343 } 1344 1345 ctx = calloc(1, sizeof(*ctx)); 1346 if (!ctx) { 1347 _spdk_bs_free(bs); 1348 cb_fn(cb_arg, NULL, -ENOMEM); 1349 return; 1350 } 1351 1352 ctx->bs = bs; 1353 1354 /* Allocate memory for the super block */ 1355 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1356 if (!ctx->super) { 1357 free(ctx); 1358 _spdk_bs_free(bs); 1359 return; 1360 } 1361 1362 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1363 cpl.u.bs_handle.cb_fn = cb_fn; 1364 cpl.u.bs_handle.cb_arg = cb_arg; 1365 cpl.u.bs_handle.bs = bs; 1366 1367 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1368 if (!seq) { 1369 spdk_free(ctx->super); 1370 free(ctx); 1371 _spdk_bs_free(bs); 1372 cb_fn(cb_arg, NULL, -ENOMEM); 1373 return; 1374 } 1375 1376 /* Read the super block */ 1377 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1378 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1379 _spdk_bs_load_super_cpl, ctx); 1380 } 1381 1382 /* END spdk_bs_load */ 1383 1384 /* START spdk_bs_init */ 1385 1386 struct spdk_bs_init_ctx { 1387 struct spdk_blob_store *bs; 1388 struct spdk_bs_super_block *super; 1389 }; 1390 1391 static void 1392 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1393 { 1394 struct spdk_bs_init_ctx *ctx = cb_arg; 1395 1396 spdk_free(ctx->super); 1397 free(ctx); 1398 1399 spdk_bs_sequence_finish(seq, bserrno); 1400 } 1401 1402 static void 1403 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1404 { 1405 struct spdk_bs_init_ctx *ctx = cb_arg; 1406 1407 /* Write super block */ 1408 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1409 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1410 _spdk_bs_init_persist_super_cpl, ctx); 1411 } 1412 1413 void 1414 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1415 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1416 { 1417 struct spdk_bs_init_ctx *ctx; 1418 struct spdk_blob_store *bs; 1419 struct spdk_bs_cpl cpl; 1420 spdk_bs_sequence_t *seq; 1421 uint64_t num_md_pages; 1422 uint32_t i; 1423 struct spdk_bs_opts opts = {}; 1424 int rc; 1425 1426 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1427 1428 if (o) { 1429 opts = *o; 1430 } else { 1431 spdk_bs_opts_init(&opts); 1432 } 1433 1434 bs = _spdk_bs_alloc(dev, &opts); 1435 if (!bs) { 1436 cb_fn(cb_arg, NULL, -ENOMEM); 1437 return; 1438 } 1439 1440 if (opts.num_md_pages == UINT32_MAX) { 1441 /* By default, allocate 1 page per cluster. 1442 * Technically, this over-allocates metadata 1443 * because more metadata will reduce the number 1444 * of usable clusters. This can be addressed with 1445 * more complex math in the future. 1446 */ 1447 bs->md_len = bs->total_clusters; 1448 } else { 1449 bs->md_len = opts.num_md_pages; 1450 } 1451 1452 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1453 if (rc < 0) { 1454 _spdk_bs_free(bs); 1455 cb_fn(cb_arg, NULL, -ENOMEM); 1456 return; 1457 } 1458 1459 ctx = calloc(1, sizeof(*ctx)); 1460 if (!ctx) { 1461 _spdk_bs_free(bs); 1462 cb_fn(cb_arg, NULL, -ENOMEM); 1463 return; 1464 } 1465 1466 ctx->bs = bs; 1467 1468 /* Allocate memory for the super block */ 1469 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1470 if (!ctx->super) { 1471 free(ctx); 1472 _spdk_bs_free(bs); 1473 return; 1474 } 1475 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1476 sizeof(ctx->super->signature)); 1477 ctx->super->version = SPDK_BS_VERSION; 1478 ctx->super->length = sizeof(*ctx->super); 1479 ctx->super->super_blob = bs->super_blob; 1480 ctx->super->clean = 0; 1481 ctx->super->cluster_size = bs->cluster_sz; 1482 1483 /* Calculate how many pages the metadata consumes at the front 1484 * of the disk. 1485 */ 1486 1487 /* The super block uses 1 page */ 1488 num_md_pages = 1; 1489 1490 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1491 * up to the nearest page, plus a header. 1492 */ 1493 ctx->super->used_page_mask_start = num_md_pages; 1494 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1495 divide_round_up(bs->md_len, 8), 1496 sizeof(struct spdk_blob_md_page)); 1497 num_md_pages += ctx->super->used_page_mask_len; 1498 1499 /* The used_clusters mask requires 1 bit per cluster, rounded 1500 * up to the nearest page, plus a header. 1501 */ 1502 ctx->super->used_cluster_mask_start = num_md_pages; 1503 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1504 divide_round_up(bs->total_clusters, 8), 1505 sizeof(struct spdk_blob_md_page)); 1506 num_md_pages += ctx->super->used_cluster_mask_len; 1507 1508 /* The metadata region size was chosen above */ 1509 ctx->super->md_start = bs->md_start = num_md_pages; 1510 ctx->super->md_len = bs->md_len; 1511 num_md_pages += bs->md_len; 1512 1513 /* Claim all of the clusters used by the metadata */ 1514 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1515 _spdk_bs_claim_cluster(bs, i); 1516 } 1517 1518 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1519 cpl.u.bs_handle.cb_fn = cb_fn; 1520 cpl.u.bs_handle.cb_arg = cb_arg; 1521 cpl.u.bs_handle.bs = bs; 1522 1523 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1524 if (!seq) { 1525 spdk_free(ctx->super); 1526 free(ctx); 1527 _spdk_bs_free(bs); 1528 cb_fn(cb_arg, NULL, -ENOMEM); 1529 return; 1530 } 1531 1532 /* TRIM the entire device */ 1533 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1534 } 1535 1536 /* END spdk_bs_init */ 1537 1538 /* START spdk_bs_unload */ 1539 1540 struct spdk_bs_unload_ctx { 1541 struct spdk_blob_store *bs; 1542 struct spdk_bs_super_block *super; 1543 1544 struct spdk_bs_md_mask *mask; 1545 }; 1546 1547 static void 1548 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1549 { 1550 struct spdk_bs_unload_ctx *ctx = cb_arg; 1551 1552 spdk_free(ctx->super); 1553 1554 spdk_bs_sequence_finish(seq, bserrno); 1555 1556 _spdk_bs_free(ctx->bs); 1557 free(ctx); 1558 } 1559 1560 static void 1561 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1562 { 1563 struct spdk_bs_unload_ctx *ctx = cb_arg; 1564 1565 spdk_free(ctx->mask); 1566 1567 /* Update the values in the super block */ 1568 ctx->super->super_blob = ctx->bs->super_blob; 1569 ctx->super->clean = 1; 1570 1571 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1572 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1573 _spdk_bs_unload_write_super_cpl, ctx); 1574 } 1575 1576 static void 1577 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1578 { 1579 struct spdk_bs_unload_ctx *ctx = cb_arg; 1580 uint32_t i; 1581 uint64_t lba, lba_count; 1582 1583 spdk_free(ctx->mask); 1584 1585 /* Write out the used clusters mask */ 1586 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page), 1587 0x1000, NULL); 1588 if (!ctx->mask) { 1589 spdk_free(ctx->super); 1590 free(ctx); 1591 spdk_bs_sequence_finish(seq, -ENOMEM); 1592 return; 1593 } 1594 1595 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1596 ctx->mask->length = ctx->bs->total_clusters; 1597 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1598 1599 i = 0; 1600 while (true) { 1601 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1602 if (i > ctx->mask->length) { 1603 break; 1604 } 1605 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1606 i++; 1607 } 1608 1609 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1610 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1611 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1612 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1613 } 1614 1615 static void 1616 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1617 { 1618 struct spdk_bs_unload_ctx *ctx = cb_arg; 1619 uint32_t i; 1620 uint64_t lba, lba_count; 1621 1622 /* Write out the used page mask */ 1623 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 1624 0x1000, NULL); 1625 if (!ctx->mask) { 1626 spdk_free(ctx->super); 1627 free(ctx); 1628 spdk_bs_sequence_finish(seq, -ENOMEM); 1629 return; 1630 } 1631 1632 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1633 ctx->mask->length = ctx->super->md_len; 1634 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1635 1636 i = 0; 1637 while (true) { 1638 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1639 if (i > ctx->mask->length) { 1640 break; 1641 } 1642 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1643 i++; 1644 } 1645 1646 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1647 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1648 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1649 _spdk_bs_unload_write_used_pages_cpl, ctx); 1650 } 1651 1652 void 1653 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1654 { 1655 struct spdk_bs_cpl cpl; 1656 spdk_bs_sequence_t *seq; 1657 struct spdk_bs_unload_ctx *ctx; 1658 1659 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1660 1661 ctx = calloc(1, sizeof(*ctx)); 1662 if (!ctx) { 1663 cb_fn(cb_arg, -ENOMEM); 1664 return; 1665 } 1666 1667 ctx->bs = bs; 1668 1669 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1670 if (!ctx->super) { 1671 free(ctx); 1672 cb_fn(cb_arg, -ENOMEM); 1673 return; 1674 } 1675 1676 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1677 cpl.u.bs_basic.cb_fn = cb_fn; 1678 cpl.u.bs_basic.cb_arg = cb_arg; 1679 1680 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1681 if (!seq) { 1682 spdk_free(ctx->super); 1683 free(ctx); 1684 cb_fn(cb_arg, -ENOMEM); 1685 return; 1686 } 1687 1688 assert(TAILQ_EMPTY(&bs->blobs)); 1689 1690 /* Read super block */ 1691 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1692 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1693 _spdk_bs_unload_read_super_cpl, ctx); 1694 } 1695 1696 /* END spdk_bs_unload */ 1697 1698 void 1699 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1700 spdk_bs_op_complete cb_fn, void *cb_arg) 1701 { 1702 bs->super_blob = blobid; 1703 cb_fn(cb_arg, 0); 1704 } 1705 1706 void 1707 spdk_bs_get_super(struct spdk_blob_store *bs, 1708 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1709 { 1710 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1711 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1712 } else { 1713 cb_fn(cb_arg, bs->super_blob, 0); 1714 } 1715 } 1716 1717 uint64_t 1718 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1719 { 1720 return bs->cluster_sz; 1721 } 1722 1723 uint64_t 1724 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1725 { 1726 return sizeof(struct spdk_blob_md_page); 1727 } 1728 1729 uint64_t 1730 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1731 { 1732 return bs->num_free_clusters; 1733 } 1734 1735 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1736 { 1737 bs->md_channel = spdk_get_io_channel(bs, SPDK_IO_PRIORITY_DEFAULT, true, 1738 (void *)&bs->max_md_ops); 1739 1740 return 0; 1741 } 1742 1743 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1744 { 1745 spdk_put_io_channel(bs->md_channel); 1746 1747 return 0; 1748 } 1749 1750 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1751 { 1752 assert(blob != NULL); 1753 1754 return blob->id; 1755 } 1756 1757 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1758 { 1759 assert(blob != NULL); 1760 1761 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1762 } 1763 1764 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1765 { 1766 assert(blob != NULL); 1767 1768 return blob->active.num_clusters; 1769 } 1770 1771 /* START spdk_bs_md_create_blob */ 1772 1773 static void 1774 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1775 { 1776 struct spdk_blob *blob = cb_arg; 1777 1778 _spdk_blob_free(blob); 1779 1780 spdk_bs_sequence_finish(seq, bserrno); 1781 } 1782 1783 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 1784 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1785 { 1786 struct spdk_blob *blob; 1787 uint32_t page_idx; 1788 struct spdk_bs_cpl cpl; 1789 spdk_bs_sequence_t *seq; 1790 spdk_blob_id id; 1791 1792 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 1793 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 1794 cb_fn(cb_arg, 0, -ENOMEM); 1795 return; 1796 } 1797 spdk_bit_array_set(bs->used_md_pages, page_idx); 1798 1799 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 1800 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 1801 * code assumes blob id == page_idx. 1802 */ 1803 id = (1ULL << 32) | page_idx; 1804 1805 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 1806 1807 blob = _spdk_blob_alloc(bs, id); 1808 if (!blob) { 1809 cb_fn(cb_arg, 0, -ENOMEM); 1810 return; 1811 } 1812 1813 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 1814 cpl.u.blobid.cb_fn = cb_fn; 1815 cpl.u.blobid.cb_arg = cb_arg; 1816 cpl.u.blobid.blobid = blob->id; 1817 1818 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1819 if (!seq) { 1820 _spdk_blob_free(blob); 1821 cb_fn(cb_arg, 0, -ENOMEM); 1822 return; 1823 } 1824 1825 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 1826 } 1827 1828 /* END spdk_bs_md_create_blob */ 1829 1830 /* START spdk_bs_md_resize_blob */ 1831 int 1832 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 1833 { 1834 int rc; 1835 1836 assert(blob != NULL); 1837 1838 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 1839 1840 if (sz == blob->active.num_clusters) { 1841 return 0; 1842 } 1843 1844 rc = _spdk_resize_blob(blob, sz); 1845 if (rc < 0) { 1846 return rc; 1847 } 1848 1849 return 0; 1850 } 1851 1852 /* END spdk_bs_md_resize_blob */ 1853 1854 1855 /* START spdk_bs_md_delete_blob */ 1856 1857 static void 1858 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1859 { 1860 struct spdk_blob *blob = cb_arg; 1861 1862 _spdk_blob_free(blob); 1863 1864 spdk_bs_sequence_finish(seq, bserrno); 1865 } 1866 1867 static void 1868 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1869 { 1870 struct spdk_blob *blob = cb_arg; 1871 1872 blob->state = SPDK_BLOB_STATE_DIRTY; 1873 blob->active.num_pages = 0; 1874 _spdk_resize_blob(blob, 0); 1875 1876 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 1877 } 1878 1879 void 1880 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1881 spdk_blob_op_complete cb_fn, void *cb_arg) 1882 { 1883 struct spdk_blob *blob; 1884 struct spdk_bs_cpl cpl; 1885 spdk_bs_sequence_t *seq; 1886 1887 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 1888 1889 blob = _spdk_blob_lookup(bs, blobid); 1890 if (blob) { 1891 assert(blob->open_ref > 0); 1892 cb_fn(cb_arg, -EINVAL); 1893 return; 1894 } 1895 1896 blob = _spdk_blob_alloc(bs, blobid); 1897 if (!blob) { 1898 cb_fn(cb_arg, -ENOMEM); 1899 return; 1900 } 1901 1902 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1903 cpl.u.blob_basic.cb_fn = cb_fn; 1904 cpl.u.blob_basic.cb_arg = cb_arg; 1905 1906 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1907 if (!seq) { 1908 _spdk_blob_free(blob); 1909 cb_fn(cb_arg, -ENOMEM); 1910 return; 1911 } 1912 1913 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 1914 } 1915 1916 /* END spdk_bs_md_delete_blob */ 1917 1918 /* START spdk_bs_md_open_blob */ 1919 1920 static void 1921 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1922 { 1923 struct spdk_blob *blob = cb_arg; 1924 1925 blob->open_ref++; 1926 1927 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 1928 1929 spdk_bs_sequence_finish(seq, bserrno); 1930 } 1931 1932 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1933 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 1934 { 1935 struct spdk_blob *blob; 1936 struct spdk_bs_cpl cpl; 1937 spdk_bs_sequence_t *seq; 1938 uint32_t page_num; 1939 1940 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 1941 1942 blob = _spdk_blob_lookup(bs, blobid); 1943 if (blob) { 1944 blob->open_ref++; 1945 cb_fn(cb_arg, blob, 0); 1946 return; 1947 } 1948 1949 page_num = _spdk_bs_blobid_to_page(blobid); 1950 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 1951 /* Invalid blobid */ 1952 cb_fn(cb_arg, NULL, -ENOENT); 1953 return; 1954 } 1955 1956 blob = _spdk_blob_alloc(bs, blobid); 1957 if (!blob) { 1958 cb_fn(cb_arg, NULL, -ENOMEM); 1959 return; 1960 } 1961 1962 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 1963 cpl.u.blob_handle.cb_fn = cb_fn; 1964 cpl.u.blob_handle.cb_arg = cb_arg; 1965 cpl.u.blob_handle.blob = blob; 1966 1967 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1968 if (!seq) { 1969 _spdk_blob_free(blob); 1970 cb_fn(cb_arg, NULL, -ENOMEM); 1971 return; 1972 } 1973 1974 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 1975 } 1976 1977 /* START spdk_bs_md_sync_blob */ 1978 static void 1979 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1980 { 1981 spdk_bs_sequence_finish(seq, bserrno); 1982 } 1983 1984 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 1985 spdk_blob_op_complete cb_fn, void *cb_arg) 1986 { 1987 struct spdk_bs_cpl cpl; 1988 spdk_bs_sequence_t *seq; 1989 1990 assert(blob != NULL); 1991 1992 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 1993 1994 assert(blob->state != SPDK_BLOB_STATE_LOADING && 1995 blob->state != SPDK_BLOB_STATE_SYNCING); 1996 1997 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1998 cb_fn(cb_arg, 0); 1999 return; 2000 } 2001 2002 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2003 cpl.u.blob_basic.cb_fn = cb_fn; 2004 cpl.u.blob_basic.cb_arg = cb_arg; 2005 2006 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2007 if (!seq) { 2008 cb_fn(cb_arg, -ENOMEM); 2009 return; 2010 } 2011 2012 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2013 } 2014 2015 /* END spdk_bs_md_sync_blob */ 2016 2017 /* START spdk_bs_md_close_blob */ 2018 2019 static void 2020 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2021 { 2022 struct spdk_blob **blob = cb_arg; 2023 2024 if ((*blob)->open_ref == 0) { 2025 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2026 _spdk_blob_free((*blob)); 2027 } 2028 2029 *blob = NULL; 2030 2031 spdk_bs_sequence_finish(seq, bserrno); 2032 } 2033 2034 void spdk_bs_md_close_blob(struct spdk_blob **b, 2035 spdk_blob_op_complete cb_fn, void *cb_arg) 2036 { 2037 struct spdk_bs_cpl cpl; 2038 struct spdk_blob *blob; 2039 spdk_bs_sequence_t *seq; 2040 2041 assert(b != NULL); 2042 blob = *b; 2043 assert(blob != NULL); 2044 2045 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2046 2047 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2048 blob->state != SPDK_BLOB_STATE_SYNCING); 2049 2050 if (blob->open_ref == 0) { 2051 cb_fn(cb_arg, -EBADF); 2052 return; 2053 } 2054 2055 blob->open_ref--; 2056 2057 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2058 cpl.u.blob_basic.cb_fn = cb_fn; 2059 cpl.u.blob_basic.cb_arg = cb_arg; 2060 2061 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2062 if (!seq) { 2063 cb_fn(cb_arg, -ENOMEM); 2064 return; 2065 } 2066 2067 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2068 _spdk_blob_close_cpl(seq, b, 0); 2069 return; 2070 } 2071 2072 /* Sync metadata */ 2073 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2074 } 2075 2076 /* END spdk_bs_md_close_blob */ 2077 2078 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs, 2079 uint32_t priority, uint32_t max_ops) 2080 { 2081 return spdk_get_io_channel(bs, priority, true, (void *)&max_ops); 2082 } 2083 2084 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2085 { 2086 spdk_put_io_channel(channel); 2087 } 2088 2089 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2090 spdk_blob_op_complete cb_fn, void *cb_arg) 2091 { 2092 /* Flush is synchronous right now */ 2093 cb_fn(cb_arg, 0); 2094 } 2095 2096 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2097 void *payload, uint64_t offset, uint64_t length, 2098 spdk_blob_op_complete cb_fn, void *cb_arg) 2099 { 2100 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2101 } 2102 2103 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2104 void *payload, uint64_t offset, uint64_t length, 2105 spdk_blob_op_complete cb_fn, void *cb_arg) 2106 { 2107 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2108 } 2109 2110 struct spdk_bs_iter_ctx { 2111 int64_t page_num; 2112 struct spdk_blob_store *bs; 2113 2114 spdk_blob_op_with_handle_complete cb_fn; 2115 void *cb_arg; 2116 }; 2117 2118 static void 2119 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2120 { 2121 struct spdk_bs_iter_ctx *ctx = cb_arg; 2122 struct spdk_blob_store *bs = ctx->bs; 2123 spdk_blob_id id; 2124 2125 if (bserrno == 0) { 2126 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2127 free(ctx); 2128 return; 2129 } 2130 2131 ctx->page_num++; 2132 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2133 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2134 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2135 free(ctx); 2136 return; 2137 } 2138 2139 id = (1ULL << 32) | ctx->page_num; 2140 2141 blob = _spdk_blob_lookup(bs, id); 2142 if (blob) { 2143 blob->open_ref++; 2144 ctx->cb_fn(ctx->cb_arg, blob, 0); 2145 free(ctx); 2146 return; 2147 } 2148 2149 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2150 } 2151 2152 void 2153 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2154 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2155 { 2156 struct spdk_bs_iter_ctx *ctx; 2157 2158 ctx = calloc(1, sizeof(*ctx)); 2159 if (!ctx) { 2160 cb_fn(cb_arg, NULL, -ENOMEM); 2161 return; 2162 } 2163 2164 ctx->page_num = -1; 2165 ctx->bs = bs; 2166 ctx->cb_fn = cb_fn; 2167 ctx->cb_arg = cb_arg; 2168 2169 _spdk_bs_iter_cpl(ctx, NULL, -1); 2170 } 2171 2172 static void 2173 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2174 { 2175 struct spdk_bs_iter_ctx *ctx = cb_arg; 2176 2177 _spdk_bs_iter_cpl(ctx, NULL, -1); 2178 } 2179 2180 void 2181 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2182 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2183 { 2184 struct spdk_bs_iter_ctx *ctx; 2185 struct spdk_blob *blob; 2186 2187 assert(b != NULL); 2188 blob = *b; 2189 assert(blob != NULL); 2190 2191 ctx = calloc(1, sizeof(*ctx)); 2192 if (!ctx) { 2193 cb_fn(cb_arg, NULL, -ENOMEM); 2194 return; 2195 } 2196 2197 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2198 ctx->bs = bs; 2199 ctx->cb_fn = cb_fn; 2200 ctx->cb_arg = cb_arg; 2201 2202 /* Close the existing blob */ 2203 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2204 } 2205 2206 int 2207 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2208 uint16_t value_len) 2209 { 2210 struct spdk_xattr *xattr; 2211 2212 assert(blob != NULL); 2213 2214 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2215 blob->state != SPDK_BLOB_STATE_SYNCING); 2216 2217 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2218 if (!strcmp(name, xattr->name)) { 2219 free(xattr->value); 2220 xattr->value_len = value_len; 2221 xattr->value = malloc(value_len); 2222 memcpy(xattr->value, value, value_len); 2223 2224 blob->state = SPDK_BLOB_STATE_DIRTY; 2225 2226 return 0; 2227 } 2228 } 2229 2230 xattr = calloc(1, sizeof(*xattr)); 2231 if (!xattr) { 2232 return -1; 2233 } 2234 xattr->name = strdup(name); 2235 xattr->value_len = value_len; 2236 xattr->value = malloc(value_len); 2237 memcpy(xattr->value, value, value_len); 2238 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2239 2240 blob->state = SPDK_BLOB_STATE_DIRTY; 2241 2242 return 0; 2243 } 2244 2245 int 2246 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2247 { 2248 struct spdk_xattr *xattr; 2249 2250 assert(blob != NULL); 2251 2252 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2253 blob->state != SPDK_BLOB_STATE_SYNCING); 2254 2255 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2256 if (!strcmp(name, xattr->name)) { 2257 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2258 free(xattr->value); 2259 free(xattr->name); 2260 free(xattr); 2261 2262 blob->state = SPDK_BLOB_STATE_DIRTY; 2263 2264 return 0; 2265 } 2266 } 2267 2268 return -ENOENT; 2269 } 2270 2271 int 2272 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2273 const void **value, size_t *value_len) 2274 { 2275 struct spdk_xattr *xattr; 2276 2277 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2278 if (!strcmp(name, xattr->name)) { 2279 *value = xattr->value; 2280 *value_len = xattr->value_len; 2281 return 0; 2282 } 2283 } 2284 2285 return -ENOENT; 2286 } 2287 2288 struct spdk_xattr_names { 2289 uint32_t count; 2290 const char *names[0]; 2291 }; 2292 2293 int 2294 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2295 struct spdk_xattr_names **names) 2296 { 2297 struct spdk_xattr *xattr; 2298 int count = 0; 2299 2300 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2301 count++; 2302 } 2303 2304 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2305 if (*names == NULL) { 2306 return -ENOMEM; 2307 } 2308 2309 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2310 (*names)->names[(*names)->count++] = xattr->name; 2311 } 2312 2313 return 0; 2314 } 2315 2316 uint32_t 2317 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2318 { 2319 assert(names != NULL); 2320 2321 return names->count; 2322 } 2323 2324 const char * 2325 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2326 { 2327 if (index >= names->count) { 2328 return NULL; 2329 } 2330 2331 return names->names[index]; 2332 } 2333 2334 void 2335 spdk_xattr_names_free(struct spdk_xattr_names *names) 2336 { 2337 free(names); 2338 } 2339 2340 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2341