1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include <stdbool.h> 35 #include <assert.h> 36 #include <errno.h> 37 #include <limits.h> 38 #include <stdlib.h> 39 #include <string.h> 40 41 #include "spdk/blob.h" 42 #include "spdk/env.h" 43 #include "spdk/queue.h" 44 #include "spdk/io_channel.h" 45 #include "spdk/bit_array.h" 46 47 #include "spdk_internal/log.h" 48 49 #include "blobstore.h" 50 #include "request.h" 51 52 static inline size_t 53 divide_round_up(size_t num, size_t divisor) 54 { 55 return (num + divisor - 1) / divisor; 56 } 57 58 static void 59 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 60 { 61 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 62 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 63 assert(bs->num_free_clusters > 0); 64 65 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 66 67 spdk_bit_array_set(bs->used_clusters, cluster_num); 68 bs->num_free_clusters--; 69 } 70 71 static void 72 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 73 { 74 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 75 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 76 assert(bs->num_free_clusters < bs->total_clusters); 77 78 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 79 80 spdk_bit_array_clear(bs->used_clusters, cluster_num); 81 bs->num_free_clusters++; 82 } 83 84 static struct spdk_blob * 85 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 86 { 87 struct spdk_blob *blob; 88 89 blob = calloc(1, sizeof(*blob)); 90 if (!blob) { 91 return NULL; 92 } 93 94 blob->id = id; 95 blob->bs = bs; 96 97 blob->state = SPDK_BLOB_STATE_DIRTY; 98 blob->active.num_pages = 1; 99 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 100 if (!blob->active.pages) { 101 free(blob); 102 return NULL; 103 } 104 105 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 106 107 TAILQ_INIT(&blob->xattrs); 108 109 return blob; 110 } 111 112 static void 113 _spdk_blob_free(struct spdk_blob *blob) 114 { 115 struct spdk_xattr *xattr, *xattr_tmp; 116 117 assert(blob != NULL); 118 assert(blob->state == SPDK_BLOB_STATE_CLEAN); 119 120 free(blob->active.clusters); 121 free(blob->clean.clusters); 122 free(blob->active.pages); 123 free(blob->clean.pages); 124 125 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 126 TAILQ_REMOVE(&blob->xattrs, xattr, link); 127 free(xattr->name); 128 free(xattr->value); 129 free(xattr); 130 } 131 132 free(blob); 133 } 134 135 static int 136 _spdk_blob_mark_clean(struct spdk_blob *blob) 137 { 138 uint64_t *clusters = NULL; 139 uint32_t *pages = NULL; 140 141 assert(blob != NULL); 142 assert(blob->state == SPDK_BLOB_STATE_LOADING || 143 blob->state == SPDK_BLOB_STATE_SYNCING); 144 145 if (blob->active.num_clusters) { 146 assert(blob->active.clusters); 147 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 148 if (!clusters) { 149 return -1; 150 } 151 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 152 } 153 154 if (blob->active.num_pages) { 155 assert(blob->active.pages); 156 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 157 if (!pages) { 158 free(clusters); 159 return -1; 160 } 161 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 162 } 163 164 free(blob->clean.clusters); 165 free(blob->clean.pages); 166 167 blob->clean.num_clusters = blob->active.num_clusters; 168 blob->clean.clusters = blob->active.clusters; 169 blob->clean.num_pages = blob->active.num_pages; 170 blob->clean.pages = blob->active.pages; 171 172 blob->active.clusters = clusters; 173 blob->active.pages = pages; 174 175 blob->state = SPDK_BLOB_STATE_CLEAN; 176 177 return 0; 178 } 179 180 static void 181 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 182 { 183 struct spdk_blob_md_descriptor *desc; 184 size_t cur_desc = 0; 185 void *tmp; 186 187 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 188 while (cur_desc < sizeof(page->descriptors)) { 189 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 190 if (desc->length == 0) { 191 /* If padding and length are 0, this terminates the page */ 192 break; 193 } 194 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 195 struct spdk_blob_md_descriptor_extent *desc_extent; 196 unsigned int i, j; 197 unsigned int cluster_count = blob->active.num_clusters; 198 199 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 200 201 assert(desc_extent->length > 0); 202 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 203 204 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 205 for (j = 0; j < desc_extent->extents[i].length; j++) { 206 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 207 cluster_count++; 208 } 209 } 210 211 assert(cluster_count > 0); 212 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 213 assert(tmp != NULL); 214 blob->active.clusters = tmp; 215 blob->active.cluster_array_size = cluster_count; 216 217 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 218 for (j = 0; j < desc_extent->extents[i].length; j++) { 219 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 220 desc_extent->extents[i].cluster_idx + j); 221 } 222 } 223 224 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 225 struct spdk_blob_md_descriptor_xattr *desc_xattr; 226 struct spdk_xattr *xattr; 227 228 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 229 230 xattr = calloc(1, sizeof(*xattr)); 231 assert(xattr != NULL); 232 233 xattr->name = malloc(desc_xattr->name_length + 1); 234 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 235 xattr->name[desc_xattr->name_length] = '\0'; 236 237 xattr->value = malloc(desc_xattr->value_length); 238 assert(xattr->value != NULL); 239 xattr->value_len = desc_xattr->value_length; 240 memcpy(xattr->value, 241 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 242 desc_xattr->value_length); 243 244 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 245 } else { 246 /* Error */ 247 break; 248 } 249 250 /* Advance to the next descriptor */ 251 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)desc + sizeof(*desc) + desc->length); 252 cur_desc += sizeof(*desc) + desc->length; 253 } 254 } 255 256 static int 257 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 258 struct spdk_blob *blob) 259 { 260 const struct spdk_blob_md_page *page; 261 uint32_t i; 262 263 assert(page_count > 0); 264 assert(pages[0].sequence_num == 0); 265 assert(blob != NULL); 266 assert(blob->state == SPDK_BLOB_STATE_LOADING); 267 assert(blob->active.clusters == NULL); 268 assert(blob->id == pages[0].id); 269 assert(blob->state == SPDK_BLOB_STATE_LOADING); 270 271 for (i = 0; i < page_count; i++) { 272 page = &pages[i]; 273 274 assert(page->id == blob->id); 275 assert(page->sequence_num == i); 276 277 _spdk_blob_parse_page(page, blob); 278 } 279 280 return 0; 281 } 282 283 static int 284 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 285 struct spdk_blob_md_page **pages, 286 uint32_t *page_count, 287 struct spdk_blob_md_page **last_page) 288 { 289 struct spdk_blob_md_page *page; 290 291 assert(pages != NULL); 292 assert(page_count != NULL); 293 294 if (*page_count == 0) { 295 assert(*pages == NULL); 296 *page_count = 1; 297 *pages = spdk_zmalloc(sizeof(struct spdk_blob_md_page), 298 sizeof(struct spdk_blob_md_page), 299 NULL); 300 } else { 301 assert(*pages != NULL); 302 (*page_count)++; 303 *pages = spdk_realloc(*pages, 304 sizeof(struct spdk_blob_md_page) * (*page_count), 305 sizeof(struct spdk_blob_md_page), 306 NULL); 307 } 308 309 if (*pages == NULL) { 310 *page_count = 0; 311 *last_page = NULL; 312 return -ENOMEM; 313 } 314 315 page = &(*pages)[*page_count - 1]; 316 page->id = blob->id; 317 page->sequence_num = *page_count - 1; 318 page->next = SPDK_INVALID_MD_PAGE; 319 *last_page = page; 320 321 return 0; 322 } 323 324 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 325 * Update required_sz on both success and failure. 326 * 327 */ 328 static int 329 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 330 uint8_t *buf, size_t buf_sz, 331 size_t *required_sz) 332 { 333 struct spdk_blob_md_descriptor_xattr *desc; 334 335 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 336 strlen(xattr->name) + 337 xattr->value_len; 338 339 if (buf_sz < *required_sz) { 340 return -1; 341 } 342 343 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 344 345 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 346 desc->length = sizeof(desc->name_length) + 347 sizeof(desc->value_length) + 348 strlen(xattr->name) + 349 xattr->value_len; 350 desc->name_length = strlen(xattr->name); 351 desc->value_length = xattr->value_len; 352 353 memcpy(desc->name, xattr->name, desc->name_length); 354 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 355 xattr->value, 356 desc->value_length); 357 358 return 0; 359 } 360 361 static void 362 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 363 uint64_t start_cluster, uint64_t *next_cluster, 364 uint8_t *buf, size_t buf_sz) 365 { 366 struct spdk_blob_md_descriptor_extent *desc; 367 size_t cur_sz; 368 uint64_t i, extent_idx; 369 uint32_t lba, lba_per_cluster, lba_count; 370 371 /* The buffer must have room for at least one extent */ 372 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 373 if (buf_sz < cur_sz) { 374 *next_cluster = start_cluster; 375 return; 376 } 377 378 desc = (struct spdk_blob_md_descriptor_extent *)buf; 379 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 380 381 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 382 383 lba = blob->active.clusters[start_cluster]; 384 lba_count = lba_per_cluster; 385 extent_idx = 0; 386 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 387 if ((lba + lba_count) == blob->active.clusters[i]) { 388 lba_count += lba_per_cluster; 389 continue; 390 } 391 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 392 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 393 extent_idx++; 394 395 cur_sz += sizeof(desc->extents[extent_idx]); 396 397 if (buf_sz < cur_sz) { 398 /* If we ran out of buffer space, return */ 399 desc->length = sizeof(desc->extents[0]) * extent_idx; 400 *next_cluster = i; 401 return; 402 } 403 404 lba = blob->active.clusters[i]; 405 lba_count = lba_per_cluster; 406 } 407 408 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 409 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 410 extent_idx++; 411 412 desc->length = sizeof(desc->extents[0]) * extent_idx; 413 *next_cluster = blob->active.num_clusters; 414 415 return; 416 } 417 418 static int 419 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 420 uint32_t *page_count) 421 { 422 struct spdk_blob_md_page *cur_page; 423 const struct spdk_xattr *xattr; 424 int rc; 425 uint8_t *buf; 426 size_t remaining_sz; 427 428 assert(pages != NULL); 429 assert(page_count != NULL); 430 assert(blob != NULL); 431 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 432 433 *pages = NULL; 434 *page_count = 0; 435 436 /* A blob always has at least 1 page, even if it has no descriptors */ 437 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 438 if (rc < 0) { 439 return rc; 440 } 441 442 buf = (uint8_t *)cur_page->descriptors; 443 remaining_sz = sizeof(cur_page->descriptors); 444 445 /* Serialize xattrs */ 446 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 447 size_t required_sz = 0; 448 rc = _spdk_blob_serialize_xattr(xattr, 449 buf, remaining_sz, 450 &required_sz); 451 if (rc < 0) { 452 /* Need to add a new page to the chain */ 453 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 454 &cur_page); 455 if (rc < 0) { 456 spdk_free(*pages); 457 *pages = NULL; 458 *page_count = 0; 459 return rc; 460 } 461 462 buf = (uint8_t *)cur_page->descriptors; 463 remaining_sz = sizeof(cur_page->descriptors); 464 465 /* Try again */ 466 required_sz = 0; 467 rc = _spdk_blob_serialize_xattr(xattr, 468 buf, remaining_sz, 469 &required_sz); 470 471 if (rc < 0) { 472 spdk_free(*pages); 473 *pages = NULL; 474 *page_count = 0; 475 return -1; 476 } 477 } 478 479 remaining_sz -= required_sz; 480 buf += required_sz; 481 } 482 483 /* Serialize extents */ 484 uint64_t last_cluster = 0; 485 while (last_cluster < blob->active.num_clusters) { 486 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 487 buf, remaining_sz); 488 489 if (last_cluster == blob->active.num_clusters) { 490 break; 491 } 492 493 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 494 &cur_page); 495 if (rc < 0) { 496 return rc; 497 } 498 499 buf = (uint8_t *)cur_page->descriptors; 500 remaining_sz = sizeof(cur_page->descriptors); 501 } 502 503 return 0; 504 } 505 506 struct spdk_blob_load_ctx { 507 struct spdk_blob *blob; 508 509 struct spdk_blob_md_page *pages; 510 uint32_t num_pages; 511 512 spdk_bs_sequence_cpl cb_fn; 513 void *cb_arg; 514 }; 515 516 static void 517 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 518 { 519 struct spdk_blob_load_ctx *ctx = cb_arg; 520 struct spdk_blob *blob = ctx->blob; 521 struct spdk_blob_md_page *page; 522 int rc; 523 524 page = &ctx->pages[ctx->num_pages - 1]; 525 526 if (page->next != SPDK_INVALID_MD_PAGE) { 527 uint32_t next_page = page->next; 528 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 529 530 531 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 532 533 /* Read the next page */ 534 ctx->num_pages++; 535 ctx->pages = spdk_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 536 sizeof(*page), NULL); 537 if (ctx->pages == NULL) { 538 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 539 free(ctx); 540 return; 541 } 542 543 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 544 next_lba, 545 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 546 _spdk_blob_load_cpl, ctx); 547 return; 548 } 549 550 /* Parse the pages */ 551 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 552 553 _spdk_blob_mark_clean(blob); 554 555 ctx->cb_fn(seq, ctx->cb_arg, rc); 556 557 /* Free the memory */ 558 spdk_free(ctx->pages); 559 free(ctx); 560 } 561 562 /* Load a blob from disk given a blobid */ 563 static void 564 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 565 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 566 { 567 struct spdk_blob_load_ctx *ctx; 568 struct spdk_blob_store *bs; 569 uint32_t page_num; 570 uint64_t lba; 571 572 assert(blob != NULL); 573 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 574 blob->state == SPDK_BLOB_STATE_DIRTY); 575 576 bs = blob->bs; 577 578 ctx = calloc(1, sizeof(*ctx)); 579 if (!ctx) { 580 cb_fn(seq, cb_arg, -ENOMEM); 581 return; 582 } 583 584 ctx->blob = blob; 585 ctx->pages = spdk_realloc(ctx->pages, sizeof(struct spdk_blob_md_page), 586 sizeof(struct spdk_blob_md_page), NULL); 587 if (!ctx->pages) { 588 free(ctx); 589 cb_fn(seq, cb_arg, -ENOMEM); 590 return; 591 } 592 ctx->num_pages = 1; 593 ctx->cb_fn = cb_fn; 594 ctx->cb_arg = cb_arg; 595 596 page_num = _spdk_bs_blobid_to_page(blob->id); 597 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 598 599 blob->state = SPDK_BLOB_STATE_LOADING; 600 601 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 602 _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)), 603 _spdk_blob_load_cpl, ctx); 604 } 605 606 struct spdk_blob_persist_ctx { 607 struct spdk_blob *blob; 608 609 struct spdk_blob_md_page *pages; 610 611 uint64_t idx; 612 613 spdk_bs_sequence_cpl cb_fn; 614 void *cb_arg; 615 }; 616 617 static void 618 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 619 { 620 struct spdk_blob_persist_ctx *ctx = cb_arg; 621 struct spdk_blob *blob = ctx->blob; 622 623 if (bserrno == 0) { 624 _spdk_blob_mark_clean(blob); 625 } 626 627 /* Call user callback */ 628 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 629 630 /* Free the memory */ 631 spdk_free(ctx->pages); 632 free(ctx); 633 } 634 635 static void 636 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 637 { 638 struct spdk_blob_persist_ctx *ctx = cb_arg; 639 struct spdk_blob *blob = ctx->blob; 640 struct spdk_blob_store *bs = blob->bs; 641 void *tmp; 642 size_t i; 643 644 /* Release all clusters that were truncated */ 645 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 646 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 647 648 _spdk_bs_release_cluster(bs, cluster_num); 649 } 650 651 if (blob->active.num_clusters == 0) { 652 free(blob->active.clusters); 653 blob->active.clusters = NULL; 654 blob->active.cluster_array_size = 0; 655 } else { 656 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 657 assert(tmp != NULL); 658 blob->active.clusters = tmp; 659 blob->active.cluster_array_size = blob->active.num_clusters; 660 } 661 662 _spdk_blob_persist_complete(seq, ctx, bserrno); 663 } 664 665 static void 666 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 667 { 668 struct spdk_blob_persist_ctx *ctx = cb_arg; 669 struct spdk_blob *blob = ctx->blob; 670 struct spdk_blob_store *bs = blob->bs; 671 spdk_bs_batch_t *batch; 672 size_t i; 673 674 /* Clusters don't move around in blobs. The list shrinks or grows 675 * at the end, but no changes ever occur in the middle of the list. 676 */ 677 678 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 679 680 /* Unmap all clusters that were truncated */ 681 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 682 uint64_t lba = blob->active.clusters[i]; 683 uint32_t lba_count = _spdk_bs_cluster_to_lba(bs, 1); 684 685 spdk_bs_batch_unmap(batch, lba, lba_count); 686 } 687 688 spdk_bs_batch_close(batch); 689 } 690 691 static void 692 _spdk_blob_persist_unmap_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 693 { 694 struct spdk_blob_persist_ctx *ctx = cb_arg; 695 struct spdk_blob *blob = ctx->blob; 696 struct spdk_blob_store *bs = blob->bs; 697 size_t i; 698 699 /* This loop starts at 1 because the first page is special and handled 700 * below. The pages (except the first) are never written in place, 701 * so any pages in the clean list must be unmapped. 702 */ 703 for (i = 1; i < blob->clean.num_pages; i++) { 704 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 705 } 706 707 if (blob->active.num_pages == 0) { 708 uint32_t page_num; 709 710 page_num = _spdk_bs_blobid_to_page(blob->id); 711 spdk_bit_array_clear(bs->used_md_pages, page_num); 712 } 713 714 /* Move on to unmapping clusters */ 715 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 716 } 717 718 static void 719 _spdk_blob_persist_unmap_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 720 { 721 struct spdk_blob_persist_ctx *ctx = cb_arg; 722 struct spdk_blob *blob = ctx->blob; 723 struct spdk_blob_store *bs = blob->bs; 724 uint64_t lba; 725 uint32_t lba_count; 726 spdk_bs_batch_t *batch; 727 size_t i; 728 729 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_pages_cpl, ctx); 730 731 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(struct spdk_blob_md_page)); 732 733 /* This loop starts at 1 because the first page is special and handled 734 * below. The pages (except the first) are never written in place, 735 * so any pages in the clean list must be unmapped. 736 */ 737 for (i = 1; i < blob->clean.num_pages; i++) { 738 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 739 740 spdk_bs_batch_unmap(batch, lba, lba_count); 741 } 742 743 /* The first page will only be unmapped if this is a delete. */ 744 if (blob->active.num_pages == 0) { 745 uint32_t page_num; 746 747 /* The first page in the metadata goes where the blobid indicates */ 748 page_num = _spdk_bs_blobid_to_page(blob->id); 749 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 750 751 spdk_bs_batch_unmap(batch, lba, lba_count); 752 } 753 754 spdk_bs_batch_close(batch); 755 } 756 757 static void 758 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 759 { 760 struct spdk_blob_persist_ctx *ctx = cb_arg; 761 struct spdk_blob *blob = ctx->blob; 762 struct spdk_blob_store *bs = blob->bs; 763 uint64_t lba; 764 uint32_t lba_count; 765 struct spdk_blob_md_page *page; 766 767 if (blob->active.num_pages == 0) { 768 /* Move on to the next step */ 769 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 770 return; 771 } 772 773 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 774 775 page = &ctx->pages[0]; 776 /* The first page in the metadata goes where the blobid indicates */ 777 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 778 779 spdk_bs_sequence_write(seq, page, lba, lba_count, 780 _spdk_blob_persist_unmap_pages, ctx); 781 } 782 783 static void 784 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 785 { 786 struct spdk_blob_persist_ctx *ctx = cb_arg; 787 struct spdk_blob *blob = ctx->blob; 788 struct spdk_blob_store *bs = blob->bs; 789 uint64_t lba; 790 uint32_t lba_count; 791 struct spdk_blob_md_page *page; 792 spdk_bs_batch_t *batch; 793 size_t i; 794 795 /* Clusters don't move around in blobs. The list shrinks or grows 796 * at the end, but no changes ever occur in the middle of the list. 797 */ 798 799 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 800 801 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 802 803 /* This starts at 1. The root page is not written until 804 * all of the others are finished 805 */ 806 for (i = 1; i < blob->active.num_pages; i++) { 807 page = &ctx->pages[i]; 808 assert(page->sequence_num == i); 809 810 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 811 812 spdk_bs_batch_write(batch, page, lba, lba_count); 813 } 814 815 spdk_bs_batch_close(batch); 816 } 817 818 static int 819 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 820 { 821 uint64_t i; 822 uint64_t *tmp; 823 uint64_t lfc; /* lowest free cluster */ 824 struct spdk_blob_store *bs; 825 826 bs = blob->bs; 827 828 assert(blob->state != SPDK_BLOB_STATE_LOADING && 829 blob->state != SPDK_BLOB_STATE_SYNCING); 830 831 if (blob->active.num_clusters == sz) { 832 return 0; 833 } 834 835 if (blob->active.num_clusters < blob->active.cluster_array_size) { 836 /* If this blob was resized to be larger, then smaller, then 837 * larger without syncing, then the cluster array already 838 * contains spare assigned clusters we can use. 839 */ 840 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 841 sz); 842 } 843 844 blob->state = SPDK_BLOB_STATE_DIRTY; 845 846 /* Do two passes - one to verify that we can obtain enough clusters 847 * and another to actually claim them. 848 */ 849 850 lfc = 0; 851 for (i = blob->active.num_clusters; i < sz; i++) { 852 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 853 if (lfc >= bs->total_clusters) { 854 /* No more free clusters. Cannot satisfy the request */ 855 assert(false); 856 return -1; 857 } 858 lfc++; 859 } 860 861 if (sz > blob->active.num_clusters) { 862 /* Expand the cluster array if necessary. 863 * We only shrink the array when persisting. 864 */ 865 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 866 if (sz > 0 && tmp == NULL) { 867 assert(false); 868 return -1; 869 } 870 blob->active.clusters = tmp; 871 blob->active.cluster_array_size = sz; 872 } 873 874 lfc = 0; 875 for (i = blob->active.num_clusters; i < sz; i++) { 876 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 877 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 878 _spdk_bs_claim_cluster(bs, lfc); 879 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 880 lfc++; 881 } 882 883 blob->active.num_clusters = sz; 884 885 return 0; 886 } 887 888 /* Write a blob to disk */ 889 static void 890 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 891 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 892 { 893 struct spdk_blob_persist_ctx *ctx; 894 int rc; 895 uint64_t i; 896 uint32_t page_num; 897 struct spdk_blob_store *bs; 898 899 assert(blob != NULL); 900 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 901 blob->state == SPDK_BLOB_STATE_DIRTY); 902 903 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 904 cb_fn(seq, cb_arg, 0); 905 return; 906 } 907 908 bs = blob->bs; 909 910 ctx = calloc(1, sizeof(*ctx)); 911 if (!ctx) { 912 cb_fn(seq, cb_arg, -ENOMEM); 913 return; 914 } 915 ctx->blob = blob; 916 ctx->cb_fn = cb_fn; 917 ctx->cb_arg = cb_arg; 918 919 blob->state = SPDK_BLOB_STATE_SYNCING; 920 921 if (blob->active.num_pages == 0) { 922 /* This is the signal that the blob should be deleted. 923 * Immediately jump to the clean up routine. */ 924 assert(blob->clean.num_pages > 0); 925 ctx->idx = blob->clean.num_pages - 1; 926 _spdk_blob_persist_unmap_pages(seq, ctx, 0); 927 return; 928 929 } 930 931 /* Generate the new metadata */ 932 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 933 if (rc < 0) { 934 free(ctx); 935 cb_fn(seq, cb_arg, rc); 936 return; 937 } 938 939 assert(blob->active.num_pages >= 1); 940 941 /* Resize the cache of page indices */ 942 blob->active.pages = realloc(blob->active.pages, 943 blob->active.num_pages * sizeof(*blob->active.pages)); 944 if (!blob->active.pages) { 945 free(ctx); 946 cb_fn(seq, cb_arg, -ENOMEM); 947 return; 948 } 949 950 /* Assign this metadata to pages. This requires two passes - 951 * one to verify that there are enough pages and a second 952 * to actually claim them. */ 953 page_num = 0; 954 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 955 for (i = 1; i < blob->active.num_pages; i++) { 956 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 957 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 958 spdk_free(ctx->pages); 959 free(ctx); 960 blob->state = SPDK_BLOB_STATE_DIRTY; 961 cb_fn(seq, cb_arg, -ENOMEM); 962 return; 963 } 964 page_num++; 965 } 966 967 page_num = 0; 968 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 969 for (i = 1; i < blob->active.num_pages; i++) { 970 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 971 ctx->pages[i - 1].next = page_num; 972 blob->active.pages[i] = page_num; 973 spdk_bit_array_set(bs->used_md_pages, page_num); 974 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 975 page_num++; 976 } 977 978 /* Start writing the metadata from last page to first */ 979 ctx->idx = blob->active.num_pages - 1; 980 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 981 } 982 983 static void 984 _spdk_blob_request_submit_rw(struct spdk_blob *blob, struct spdk_io_channel *_channel, 985 void *payload, uint64_t offset, uint64_t length, 986 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 987 { 988 spdk_bs_batch_t *batch; 989 struct spdk_bs_cpl cpl; 990 uint64_t lba; 991 uint32_t lba_count; 992 uint8_t *buf; 993 uint64_t page; 994 995 assert(blob != NULL); 996 997 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 998 cb_fn(cb_arg, -EINVAL); 999 return; 1000 } 1001 1002 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1003 cpl.u.blob_basic.cb_fn = cb_fn; 1004 cpl.u.blob_basic.cb_arg = cb_arg; 1005 1006 batch = spdk_bs_batch_open(_channel, &cpl); 1007 if (!batch) { 1008 cb_fn(cb_arg, -ENOMEM); 1009 return; 1010 } 1011 1012 length = _spdk_bs_page_to_lba(blob->bs, length); 1013 page = offset; 1014 buf = payload; 1015 while (length > 0) { 1016 lba = _spdk_bs_blob_page_to_lba(blob, page); 1017 lba_count = spdk_min(length, 1018 _spdk_bs_page_to_lba(blob->bs, 1019 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1020 1021 if (read) { 1022 spdk_bs_batch_read(batch, buf, lba, lba_count); 1023 } else { 1024 spdk_bs_batch_write(batch, buf, lba, lba_count); 1025 } 1026 1027 length -= lba_count; 1028 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1029 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1030 } 1031 1032 spdk_bs_batch_close(batch); 1033 } 1034 1035 static struct spdk_blob * 1036 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1037 { 1038 struct spdk_blob *blob; 1039 1040 TAILQ_FOREACH(blob, &bs->blobs, link) { 1041 if (blob->id == blobid) { 1042 return blob; 1043 } 1044 } 1045 1046 return NULL; 1047 } 1048 1049 static int 1050 _spdk_bs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 1051 { 1052 struct spdk_blob_store *bs = io_device; 1053 struct spdk_bs_dev *dev = bs->dev; 1054 struct spdk_bs_channel *channel = ctx_buf; 1055 uint32_t max_ops = *(uint32_t *)unique_ctx; 1056 uint32_t i; 1057 1058 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1059 if (!channel->req_mem) { 1060 free(channel); 1061 return -1; 1062 } 1063 1064 TAILQ_INIT(&channel->reqs); 1065 1066 for (i = 0; i < max_ops; i++) { 1067 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1068 } 1069 1070 channel->bs = bs; 1071 channel->dev = dev; 1072 channel->dev_channel = dev->create_channel(dev); 1073 1074 return 0; 1075 } 1076 1077 static void 1078 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1079 { 1080 struct spdk_bs_channel *channel = ctx_buf; 1081 1082 free(channel->req_mem); 1083 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1084 } 1085 1086 static void 1087 _spdk_bs_free(struct spdk_blob_store *bs) 1088 { 1089 struct spdk_blob *blob, *blob_tmp; 1090 1091 spdk_bs_unregister_md_thread(bs); 1092 spdk_io_device_unregister(bs); 1093 1094 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1095 TAILQ_REMOVE(&bs->blobs, blob, link); 1096 _spdk_blob_free(blob); 1097 } 1098 1099 spdk_bit_array_free(&bs->used_md_pages); 1100 spdk_bit_array_free(&bs->used_clusters); 1101 1102 bs->dev->destroy(bs->dev); 1103 free(bs); 1104 } 1105 1106 void 1107 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1108 { 1109 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1110 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1111 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1112 } 1113 1114 static struct spdk_blob_store * 1115 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1116 { 1117 struct spdk_blob_store *bs; 1118 1119 bs = calloc(1, sizeof(struct spdk_blob_store)); 1120 if (!bs) { 1121 return NULL; 1122 } 1123 1124 TAILQ_INIT(&bs->blobs); 1125 bs->dev = dev; 1126 1127 /* 1128 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1129 * even multiple of the cluster size. 1130 */ 1131 bs->cluster_sz = opts->cluster_sz; 1132 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1133 bs->pages_per_cluster = bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1134 bs->num_free_clusters = bs->total_clusters; 1135 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1136 if (bs->used_clusters == NULL) { 1137 _spdk_bs_free(bs); 1138 return NULL; 1139 } 1140 1141 bs->max_md_ops = opts->max_md_ops; 1142 bs->super_blob = SPDK_BLOBID_INVALID; 1143 1144 /* The metadata is assumed to be at least 1 page */ 1145 bs->used_md_pages = spdk_bit_array_create(1); 1146 1147 spdk_io_device_register(bs, _spdk_bs_channel_create, _spdk_bs_channel_destroy, 1148 sizeof(struct spdk_bs_channel)); 1149 spdk_bs_register_md_thread(bs); 1150 1151 return bs; 1152 } 1153 1154 /* START spdk_bs_load */ 1155 1156 struct spdk_bs_load_ctx { 1157 struct spdk_blob_store *bs; 1158 struct spdk_bs_super_block *super; 1159 1160 struct spdk_bs_md_mask *mask; 1161 }; 1162 1163 static void 1164 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1165 { 1166 struct spdk_bs_load_ctx *ctx = cb_arg; 1167 uint32_t i, j; 1168 int rc; 1169 1170 /* The type must be correct */ 1171 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1172 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1173 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1174 struct spdk_blob_md_page) * 8)); 1175 /* The length of the mask must be exactly equal to the total number of clusters*/ 1176 assert(ctx->mask->length == ctx->bs->total_clusters); 1177 1178 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1179 if (rc < 0) { 1180 spdk_free(ctx->super); 1181 spdk_free(ctx->mask); 1182 _spdk_bs_free(ctx->bs); 1183 free(ctx); 1184 spdk_bs_sequence_finish(seq, -ENOMEM); 1185 return; 1186 } 1187 1188 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1189 for (i = 0; i < ctx->mask->length / 8; i++) { 1190 uint8_t segment = ctx->mask->mask[i]; 1191 for (j = 0; segment && (j < 8); j++) { 1192 if (segment & 1U) { 1193 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1194 assert(ctx->bs->num_free_clusters > 0); 1195 ctx->bs->num_free_clusters--; 1196 } 1197 segment >>= 1U; 1198 } 1199 } 1200 1201 spdk_free(ctx->super); 1202 spdk_free(ctx->mask); 1203 free(ctx); 1204 1205 spdk_bs_sequence_finish(seq, bserrno); 1206 } 1207 1208 static void 1209 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1210 { 1211 struct spdk_bs_load_ctx *ctx = cb_arg; 1212 uint64_t lba, lba_count; 1213 uint32_t i, j; 1214 int rc; 1215 1216 /* The type must be correct */ 1217 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1218 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1219 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page) * 1220 8)); 1221 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1222 assert(ctx->mask->length == ctx->super->md_len); 1223 1224 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1225 if (rc < 0) { 1226 spdk_free(ctx->super); 1227 spdk_free(ctx->mask); 1228 _spdk_bs_free(ctx->bs); 1229 free(ctx); 1230 spdk_bs_sequence_finish(seq, -ENOMEM); 1231 return; 1232 } 1233 1234 for (i = 0; i < ctx->mask->length / 8; i++) { 1235 uint8_t segment = ctx->mask->mask[i]; 1236 for (j = 0; segment && (j < 8); j++) { 1237 if (segment & 1U) { 1238 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1239 } 1240 segment >>= 1U; 1241 } 1242 } 1243 spdk_free(ctx->mask); 1244 1245 /* Read the used clusters mask */ 1246 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page), 1247 0x1000, NULL); 1248 if (!ctx->mask) { 1249 spdk_free(ctx->super); 1250 _spdk_bs_free(ctx->bs); 1251 free(ctx); 1252 spdk_bs_sequence_finish(seq, -ENOMEM); 1253 return; 1254 } 1255 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1256 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1257 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1258 _spdk_bs_load_used_clusters_cpl, ctx); 1259 } 1260 1261 static void 1262 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1263 { 1264 struct spdk_bs_load_ctx *ctx = cb_arg; 1265 uint64_t lba, lba_count; 1266 1267 if (ctx->super->version != SPDK_BS_VERSION) { 1268 spdk_free(ctx->super); 1269 _spdk_bs_free(ctx->bs); 1270 free(ctx); 1271 spdk_bs_sequence_finish(seq, -EILSEQ); 1272 return; 1273 } 1274 1275 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1276 sizeof(ctx->super->signature)) != 0) { 1277 spdk_free(ctx->super); 1278 _spdk_bs_free(ctx->bs); 1279 free(ctx); 1280 spdk_bs_sequence_finish(seq, -EILSEQ); 1281 return; 1282 } 1283 1284 if (ctx->super->clean != 1) { 1285 /* TODO: ONLY CLEAN SHUTDOWN IS CURRENTLY SUPPORTED. 1286 * All of the necessary data to recover is available 1287 * on disk - the code just has not been written yet. 1288 */ 1289 assert(false); 1290 spdk_free(ctx->super); 1291 _spdk_bs_free(ctx->bs); 1292 free(ctx); 1293 spdk_bs_sequence_finish(seq, -EILSEQ); 1294 return; 1295 } 1296 ctx->super->clean = 0; 1297 1298 /* Parse the super block */ 1299 ctx->bs->cluster_sz = ctx->super->cluster_size; 1300 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1301 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / sizeof(struct spdk_blob_md_page); 1302 ctx->bs->md_start = ctx->super->md_start; 1303 ctx->bs->md_len = ctx->super->md_len; 1304 1305 /* Read the used pages mask */ 1306 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 0x1000, 1307 NULL); 1308 if (!ctx->mask) { 1309 spdk_free(ctx->super); 1310 _spdk_bs_free(ctx->bs); 1311 free(ctx); 1312 spdk_bs_sequence_finish(seq, -ENOMEM); 1313 return; 1314 } 1315 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1316 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1317 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1318 _spdk_bs_load_used_pages_cpl, ctx); 1319 } 1320 1321 void 1322 spdk_bs_load(struct spdk_bs_dev *dev, 1323 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1324 { 1325 struct spdk_blob_store *bs; 1326 struct spdk_bs_cpl cpl; 1327 spdk_bs_sequence_t *seq; 1328 struct spdk_bs_load_ctx *ctx; 1329 struct spdk_bs_opts opts = {}; 1330 1331 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1332 1333 spdk_bs_opts_init(&opts); 1334 1335 bs = _spdk_bs_alloc(dev, &opts); 1336 if (!bs) { 1337 cb_fn(cb_arg, NULL, -ENOMEM); 1338 return; 1339 } 1340 1341 ctx = calloc(1, sizeof(*ctx)); 1342 if (!ctx) { 1343 _spdk_bs_free(bs); 1344 cb_fn(cb_arg, NULL, -ENOMEM); 1345 return; 1346 } 1347 1348 ctx->bs = bs; 1349 1350 /* Allocate memory for the super block */ 1351 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1352 if (!ctx->super) { 1353 free(ctx); 1354 _spdk_bs_free(bs); 1355 return; 1356 } 1357 1358 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1359 cpl.u.bs_handle.cb_fn = cb_fn; 1360 cpl.u.bs_handle.cb_arg = cb_arg; 1361 cpl.u.bs_handle.bs = bs; 1362 1363 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1364 if (!seq) { 1365 spdk_free(ctx->super); 1366 free(ctx); 1367 _spdk_bs_free(bs); 1368 cb_fn(cb_arg, NULL, -ENOMEM); 1369 return; 1370 } 1371 1372 /* Read the super block */ 1373 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1374 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1375 _spdk_bs_load_super_cpl, ctx); 1376 } 1377 1378 /* END spdk_bs_load */ 1379 1380 /* START spdk_bs_init */ 1381 1382 struct spdk_bs_init_ctx { 1383 struct spdk_blob_store *bs; 1384 struct spdk_bs_super_block *super; 1385 }; 1386 1387 static void 1388 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1389 { 1390 struct spdk_bs_init_ctx *ctx = cb_arg; 1391 1392 spdk_free(ctx->super); 1393 free(ctx); 1394 1395 spdk_bs_sequence_finish(seq, bserrno); 1396 } 1397 1398 static void 1399 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1400 { 1401 struct spdk_bs_init_ctx *ctx = cb_arg; 1402 1403 /* Write super block */ 1404 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1405 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1406 _spdk_bs_init_persist_super_cpl, ctx); 1407 } 1408 1409 void 1410 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1411 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1412 { 1413 struct spdk_bs_init_ctx *ctx; 1414 struct spdk_blob_store *bs; 1415 struct spdk_bs_cpl cpl; 1416 spdk_bs_sequence_t *seq; 1417 uint64_t num_md_pages; 1418 uint32_t i; 1419 struct spdk_bs_opts opts = {}; 1420 int rc; 1421 1422 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 1423 1424 if (o) { 1425 opts = *o; 1426 } else { 1427 spdk_bs_opts_init(&opts); 1428 } 1429 1430 bs = _spdk_bs_alloc(dev, &opts); 1431 if (!bs) { 1432 cb_fn(cb_arg, NULL, -ENOMEM); 1433 return; 1434 } 1435 1436 if (opts.num_md_pages == UINT32_MAX) { 1437 /* By default, allocate 1 page per cluster. 1438 * Technically, this over-allocates metadata 1439 * because more metadata will reduce the number 1440 * of usable clusters. This can be addressed with 1441 * more complex math in the future. 1442 */ 1443 bs->md_len = bs->total_clusters; 1444 } else { 1445 bs->md_len = opts.num_md_pages; 1446 } 1447 1448 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 1449 if (rc < 0) { 1450 _spdk_bs_free(bs); 1451 cb_fn(cb_arg, NULL, -ENOMEM); 1452 return; 1453 } 1454 1455 ctx = calloc(1, sizeof(*ctx)); 1456 if (!ctx) { 1457 _spdk_bs_free(bs); 1458 cb_fn(cb_arg, NULL, -ENOMEM); 1459 return; 1460 } 1461 1462 ctx->bs = bs; 1463 1464 /* Allocate memory for the super block */ 1465 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1466 if (!ctx->super) { 1467 free(ctx); 1468 _spdk_bs_free(bs); 1469 return; 1470 } 1471 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1472 sizeof(ctx->super->signature)); 1473 ctx->super->version = SPDK_BS_VERSION; 1474 ctx->super->length = sizeof(*ctx->super); 1475 ctx->super->super_blob = bs->super_blob; 1476 ctx->super->clean = 0; 1477 ctx->super->cluster_size = bs->cluster_sz; 1478 1479 /* Calculate how many pages the metadata consumes at the front 1480 * of the disk. 1481 */ 1482 1483 /* The super block uses 1 page */ 1484 num_md_pages = 1; 1485 1486 /* The used_md_pages mask requires 1 bit per metadata page, rounded 1487 * up to the nearest page, plus a header. 1488 */ 1489 ctx->super->used_page_mask_start = num_md_pages; 1490 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1491 divide_round_up(bs->md_len, 8), 1492 sizeof(struct spdk_blob_md_page)); 1493 num_md_pages += ctx->super->used_page_mask_len; 1494 1495 /* The used_clusters mask requires 1 bit per cluster, rounded 1496 * up to the nearest page, plus a header. 1497 */ 1498 ctx->super->used_cluster_mask_start = num_md_pages; 1499 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 1500 divide_round_up(bs->total_clusters, 8), 1501 sizeof(struct spdk_blob_md_page)); 1502 num_md_pages += ctx->super->used_cluster_mask_len; 1503 1504 /* The metadata region size was chosen above */ 1505 ctx->super->md_start = bs->md_start = num_md_pages; 1506 ctx->super->md_len = bs->md_len; 1507 num_md_pages += bs->md_len; 1508 1509 /* Claim all of the clusters used by the metadata */ 1510 for (i = 0; i < divide_round_up(num_md_pages, bs->pages_per_cluster); i++) { 1511 _spdk_bs_claim_cluster(bs, i); 1512 } 1513 1514 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 1515 cpl.u.bs_handle.cb_fn = cb_fn; 1516 cpl.u.bs_handle.cb_arg = cb_arg; 1517 cpl.u.bs_handle.bs = bs; 1518 1519 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1520 if (!seq) { 1521 spdk_free(ctx->super); 1522 free(ctx); 1523 _spdk_bs_free(bs); 1524 cb_fn(cb_arg, NULL, -ENOMEM); 1525 return; 1526 } 1527 1528 /* TRIM the entire device */ 1529 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_init_trim_cpl, ctx); 1530 } 1531 1532 /* END spdk_bs_init */ 1533 1534 /* START spdk_bs_unload */ 1535 1536 struct spdk_bs_unload_ctx { 1537 struct spdk_blob_store *bs; 1538 struct spdk_bs_super_block *super; 1539 1540 struct spdk_bs_md_mask *mask; 1541 }; 1542 1543 static void 1544 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1545 { 1546 struct spdk_bs_unload_ctx *ctx = cb_arg; 1547 1548 spdk_free(ctx->super); 1549 1550 spdk_bs_sequence_finish(seq, bserrno); 1551 1552 _spdk_bs_free(ctx->bs); 1553 free(ctx); 1554 } 1555 1556 static void 1557 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1558 { 1559 struct spdk_bs_unload_ctx *ctx = cb_arg; 1560 1561 spdk_free(ctx->mask); 1562 1563 /* Update the values in the super block */ 1564 ctx->super->super_blob = ctx->bs->super_blob; 1565 ctx->super->clean = 1; 1566 1567 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 1568 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 1569 _spdk_bs_unload_write_super_cpl, ctx); 1570 } 1571 1572 static void 1573 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1574 { 1575 struct spdk_bs_unload_ctx *ctx = cb_arg; 1576 uint32_t i; 1577 uint64_t lba, lba_count; 1578 1579 spdk_free(ctx->mask); 1580 1581 /* Write out the used clusters mask */ 1582 ctx->mask = spdk_zmalloc(ctx->super->used_cluster_mask_len * sizeof(struct spdk_blob_md_page), 1583 0x1000, NULL); 1584 if (!ctx->mask) { 1585 spdk_free(ctx->super); 1586 free(ctx); 1587 spdk_bs_sequence_finish(seq, -ENOMEM); 1588 return; 1589 } 1590 1591 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1592 ctx->mask->length = ctx->bs->total_clusters; 1593 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1594 1595 i = 0; 1596 while (true) { 1597 i = spdk_bit_array_find_first_set(ctx->bs->used_clusters, i); 1598 if (i > ctx->mask->length) { 1599 break; 1600 } 1601 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1602 i++; 1603 } 1604 1605 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1606 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1607 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1608 _spdk_bs_unload_write_used_clusters_cpl, ctx); 1609 } 1610 1611 static void 1612 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1613 { 1614 struct spdk_bs_unload_ctx *ctx = cb_arg; 1615 uint32_t i; 1616 uint64_t lba, lba_count; 1617 1618 /* Write out the used page mask */ 1619 ctx->mask = spdk_zmalloc(ctx->super->used_page_mask_len * sizeof(struct spdk_blob_md_page), 1620 0x1000, NULL); 1621 if (!ctx->mask) { 1622 spdk_free(ctx->super); 1623 free(ctx); 1624 spdk_bs_sequence_finish(seq, -ENOMEM); 1625 return; 1626 } 1627 1628 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1629 ctx->mask->length = ctx->super->md_len; 1630 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1631 1632 i = 0; 1633 while (true) { 1634 i = spdk_bit_array_find_first_set(ctx->bs->used_md_pages, i); 1635 if (i > ctx->mask->length) { 1636 break; 1637 } 1638 ctx->mask->mask[i / 8] |= 1U << (i % 8); 1639 i++; 1640 } 1641 1642 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1643 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1644 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, 1645 _spdk_bs_unload_write_used_pages_cpl, ctx); 1646 } 1647 1648 void 1649 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 1650 { 1651 struct spdk_bs_cpl cpl; 1652 spdk_bs_sequence_t *seq; 1653 struct spdk_bs_unload_ctx *ctx; 1654 1655 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 1656 1657 ctx = calloc(1, sizeof(*ctx)); 1658 if (!ctx) { 1659 cb_fn(cb_arg, -ENOMEM); 1660 return; 1661 } 1662 1663 ctx->bs = bs; 1664 1665 ctx->super = spdk_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 1666 if (!ctx->super) { 1667 free(ctx); 1668 cb_fn(cb_arg, -ENOMEM); 1669 return; 1670 } 1671 1672 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 1673 cpl.u.bs_basic.cb_fn = cb_fn; 1674 cpl.u.bs_basic.cb_arg = cb_arg; 1675 1676 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1677 if (!seq) { 1678 spdk_free(ctx->super); 1679 free(ctx); 1680 cb_fn(cb_arg, -ENOMEM); 1681 return; 1682 } 1683 1684 assert(TAILQ_EMPTY(&bs->blobs)); 1685 1686 /* Read super block */ 1687 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 1688 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 1689 _spdk_bs_unload_read_super_cpl, ctx); 1690 } 1691 1692 /* END spdk_bs_unload */ 1693 1694 void 1695 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 1696 spdk_bs_op_complete cb_fn, void *cb_arg) 1697 { 1698 bs->super_blob = blobid; 1699 cb_fn(cb_arg, 0); 1700 } 1701 1702 void 1703 spdk_bs_get_super(struct spdk_blob_store *bs, 1704 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1705 { 1706 if (bs->super_blob == SPDK_BLOBID_INVALID) { 1707 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 1708 } else { 1709 cb_fn(cb_arg, bs->super_blob, 0); 1710 } 1711 } 1712 1713 uint64_t 1714 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 1715 { 1716 return bs->cluster_sz; 1717 } 1718 1719 uint64_t 1720 spdk_bs_get_page_size(struct spdk_blob_store *bs) 1721 { 1722 return sizeof(struct spdk_blob_md_page); 1723 } 1724 1725 uint64_t 1726 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 1727 { 1728 return bs->num_free_clusters; 1729 } 1730 1731 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 1732 { 1733 bs->md_channel = spdk_get_io_channel(bs, SPDK_IO_PRIORITY_DEFAULT, true, 1734 (void *)&bs->max_md_ops); 1735 1736 return 0; 1737 } 1738 1739 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 1740 { 1741 spdk_put_io_channel(bs->md_channel); 1742 1743 return 0; 1744 } 1745 1746 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 1747 { 1748 assert(blob != NULL); 1749 1750 return blob->id; 1751 } 1752 1753 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 1754 { 1755 assert(blob != NULL); 1756 1757 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 1758 } 1759 1760 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 1761 { 1762 assert(blob != NULL); 1763 1764 return blob->active.num_clusters; 1765 } 1766 1767 /* START spdk_bs_md_create_blob */ 1768 1769 static void 1770 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1771 { 1772 struct spdk_blob *blob = cb_arg; 1773 1774 _spdk_blob_free(blob); 1775 1776 spdk_bs_sequence_finish(seq, bserrno); 1777 } 1778 1779 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 1780 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 1781 { 1782 struct spdk_blob *blob; 1783 uint32_t page_idx; 1784 struct spdk_bs_cpl cpl; 1785 spdk_bs_sequence_t *seq; 1786 spdk_blob_id id; 1787 1788 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 1789 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 1790 cb_fn(cb_arg, 0, -ENOMEM); 1791 return; 1792 } 1793 spdk_bit_array_set(bs->used_md_pages, page_idx); 1794 1795 /* The blob id is a 64 bit number. The lower 32 bits are the page_idx. The upper 1796 * 32 bits are not currently used. Stick a 1 there just to catch bugs where the 1797 * code assumes blob id == page_idx. 1798 */ 1799 id = (1ULL << 32) | page_idx; 1800 1801 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 1802 1803 blob = _spdk_blob_alloc(bs, id); 1804 if (!blob) { 1805 cb_fn(cb_arg, 0, -ENOMEM); 1806 return; 1807 } 1808 1809 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 1810 cpl.u.blobid.cb_fn = cb_fn; 1811 cpl.u.blobid.cb_arg = cb_arg; 1812 cpl.u.blobid.blobid = blob->id; 1813 1814 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1815 if (!seq) { 1816 free(blob); 1817 cb_fn(cb_arg, 0, -ENOMEM); 1818 return; 1819 } 1820 1821 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 1822 } 1823 1824 /* END spdk_bs_md_create_blob */ 1825 1826 /* START spdk_bs_md_resize_blob */ 1827 int 1828 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 1829 { 1830 int rc; 1831 1832 assert(blob != NULL); 1833 1834 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 1835 1836 if (sz == blob->active.num_clusters) { 1837 return 0; 1838 } 1839 1840 rc = _spdk_resize_blob(blob, sz); 1841 if (rc < 0) { 1842 return rc; 1843 } 1844 1845 return 0; 1846 } 1847 1848 /* END spdk_bs_md_resize_blob */ 1849 1850 1851 /* START spdk_bs_md_delete_blob */ 1852 1853 static void 1854 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1855 { 1856 struct spdk_blob *blob = cb_arg; 1857 1858 _spdk_blob_free(blob); 1859 1860 spdk_bs_sequence_finish(seq, bserrno); 1861 } 1862 1863 static void 1864 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1865 { 1866 struct spdk_blob *blob = cb_arg; 1867 1868 blob->state = SPDK_BLOB_STATE_DIRTY; 1869 blob->active.num_pages = 0; 1870 _spdk_resize_blob(blob, 0); 1871 1872 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 1873 } 1874 1875 void 1876 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1877 spdk_blob_op_complete cb_fn, void *cb_arg) 1878 { 1879 struct spdk_blob *blob; 1880 struct spdk_bs_cpl cpl; 1881 spdk_bs_sequence_t *seq; 1882 1883 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 1884 1885 blob = _spdk_blob_lookup(bs, blobid); 1886 if (blob) { 1887 assert(blob->open_ref > 0); 1888 cb_fn(cb_arg, -EINVAL); 1889 return; 1890 } 1891 1892 blob = _spdk_blob_alloc(bs, blobid); 1893 if (!blob) { 1894 cb_fn(cb_arg, -ENOMEM); 1895 return; 1896 } 1897 1898 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1899 cpl.u.blob_basic.cb_fn = cb_fn; 1900 cpl.u.blob_basic.cb_arg = cb_arg; 1901 1902 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1903 if (!seq) { 1904 cb_fn(cb_arg, -ENOMEM); 1905 return; 1906 } 1907 1908 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 1909 } 1910 1911 /* END spdk_bs_md_delete_blob */ 1912 1913 /* START spdk_bs_md_open_blob */ 1914 1915 static void 1916 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1917 { 1918 struct spdk_blob *blob = cb_arg; 1919 1920 blob->open_ref++; 1921 1922 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 1923 1924 spdk_bs_sequence_finish(seq, bserrno); 1925 } 1926 1927 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 1928 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 1929 { 1930 struct spdk_blob *blob; 1931 struct spdk_bs_cpl cpl; 1932 spdk_bs_sequence_t *seq; 1933 uint32_t page_num; 1934 1935 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 1936 1937 blob = _spdk_blob_lookup(bs, blobid); 1938 if (blob) { 1939 blob->open_ref++; 1940 cb_fn(cb_arg, blob, 0); 1941 return; 1942 } 1943 1944 page_num = _spdk_bs_blobid_to_page(blobid); 1945 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 1946 /* Invalid blobid */ 1947 cb_fn(cb_arg, NULL, -ENOENT); 1948 return; 1949 } 1950 1951 blob = _spdk_blob_alloc(bs, blobid); 1952 if (!blob) { 1953 cb_fn(cb_arg, NULL, -ENOMEM); 1954 return; 1955 } 1956 1957 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 1958 cpl.u.blob_handle.cb_fn = cb_fn; 1959 cpl.u.blob_handle.cb_arg = cb_arg; 1960 cpl.u.blob_handle.blob = blob; 1961 1962 seq = spdk_bs_sequence_start(bs->md_channel, &cpl); 1963 if (!seq) { 1964 cb_fn(cb_arg, NULL, -ENOMEM); 1965 return; 1966 } 1967 1968 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 1969 } 1970 1971 /* START spdk_bs_md_sync_blob */ 1972 static void 1973 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1974 { 1975 spdk_bs_sequence_finish(seq, bserrno); 1976 } 1977 1978 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 1979 spdk_blob_op_complete cb_fn, void *cb_arg) 1980 { 1981 struct spdk_bs_cpl cpl; 1982 spdk_bs_sequence_t *seq; 1983 1984 assert(blob != NULL); 1985 1986 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 1987 1988 assert(blob->state != SPDK_BLOB_STATE_LOADING && 1989 blob->state != SPDK_BLOB_STATE_SYNCING); 1990 1991 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 1992 cb_fn(cb_arg, 0); 1993 return; 1994 } 1995 1996 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1997 cpl.u.blob_basic.cb_fn = cb_fn; 1998 cpl.u.blob_basic.cb_arg = cb_arg; 1999 2000 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2001 if (!seq) { 2002 cb_fn(cb_arg, -ENOMEM); 2003 return; 2004 } 2005 2006 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2007 } 2008 2009 /* END spdk_bs_md_sync_blob */ 2010 2011 /* START spdk_bs_md_close_blob */ 2012 2013 static void 2014 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2015 { 2016 struct spdk_blob **blob = cb_arg; 2017 2018 if ((*blob)->open_ref == 0) { 2019 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2020 _spdk_blob_free((*blob)); 2021 } 2022 2023 *blob = NULL; 2024 2025 spdk_bs_sequence_finish(seq, bserrno); 2026 } 2027 2028 void spdk_bs_md_close_blob(struct spdk_blob **b, 2029 spdk_blob_op_complete cb_fn, void *cb_arg) 2030 { 2031 struct spdk_bs_cpl cpl; 2032 struct spdk_blob *blob; 2033 spdk_bs_sequence_t *seq; 2034 2035 assert(b != NULL); 2036 blob = *b; 2037 assert(blob != NULL); 2038 2039 SPDK_TRACELOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2040 2041 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2042 blob->state != SPDK_BLOB_STATE_SYNCING); 2043 2044 if (blob->open_ref == 0) { 2045 cb_fn(cb_arg, -EBADF); 2046 return; 2047 } 2048 2049 blob->open_ref--; 2050 2051 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2052 cpl.u.blob_basic.cb_fn = cb_fn; 2053 cpl.u.blob_basic.cb_arg = cb_arg; 2054 2055 seq = spdk_bs_sequence_start(blob->bs->md_channel, &cpl); 2056 if (!seq) { 2057 cb_fn(cb_arg, -ENOMEM); 2058 return; 2059 } 2060 2061 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2062 _spdk_blob_close_cpl(seq, b, 0); 2063 return; 2064 } 2065 2066 /* Sync metadata */ 2067 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2068 } 2069 2070 /* END spdk_bs_md_close_blob */ 2071 2072 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs, 2073 uint32_t priority, uint32_t max_ops) 2074 { 2075 return spdk_get_io_channel(bs, priority, true, (void *)&max_ops); 2076 } 2077 2078 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2079 { 2080 spdk_put_io_channel(channel); 2081 } 2082 2083 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2084 spdk_blob_op_complete cb_fn, void *cb_arg) 2085 { 2086 /* Flush is synchronous right now */ 2087 cb_fn(cb_arg, 0); 2088 } 2089 2090 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2091 void *payload, uint64_t offset, uint64_t length, 2092 spdk_blob_op_complete cb_fn, void *cb_arg) 2093 { 2094 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, false); 2095 } 2096 2097 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2098 void *payload, uint64_t offset, uint64_t length, 2099 spdk_blob_op_complete cb_fn, void *cb_arg) 2100 { 2101 _spdk_blob_request_submit_rw(blob, channel, payload, offset, length, cb_fn, cb_arg, true); 2102 } 2103 2104 struct spdk_bs_iter_ctx { 2105 int64_t page_num; 2106 struct spdk_blob_store *bs; 2107 2108 spdk_blob_op_with_handle_complete cb_fn; 2109 void *cb_arg; 2110 }; 2111 2112 static void 2113 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2114 { 2115 struct spdk_bs_iter_ctx *ctx = cb_arg; 2116 struct spdk_blob_store *bs = ctx->bs; 2117 spdk_blob_id id; 2118 2119 if (bserrno == 0) { 2120 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2121 free(ctx); 2122 return; 2123 } 2124 2125 ctx->page_num++; 2126 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2127 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2128 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2129 free(ctx); 2130 return; 2131 } 2132 2133 id = (1ULL << 32) | ctx->page_num; 2134 2135 blob = _spdk_blob_lookup(bs, id); 2136 if (blob) { 2137 blob->open_ref++; 2138 ctx->cb_fn(ctx->cb_arg, blob, 0); 2139 free(ctx); 2140 return; 2141 } 2142 2143 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2144 } 2145 2146 void 2147 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2148 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2149 { 2150 struct spdk_bs_iter_ctx *ctx; 2151 2152 ctx = calloc(1, sizeof(*ctx)); 2153 if (!ctx) { 2154 cb_fn(cb_arg, NULL, -ENOMEM); 2155 return; 2156 } 2157 2158 ctx->page_num = -1; 2159 ctx->bs = bs; 2160 ctx->cb_fn = cb_fn; 2161 ctx->cb_arg = cb_arg; 2162 2163 _spdk_bs_iter_cpl(ctx, NULL, -1); 2164 } 2165 2166 static void 2167 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2168 { 2169 struct spdk_bs_iter_ctx *ctx = cb_arg; 2170 2171 _spdk_bs_iter_cpl(ctx, NULL, -1); 2172 } 2173 2174 void 2175 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2176 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2177 { 2178 struct spdk_bs_iter_ctx *ctx; 2179 struct spdk_blob *blob; 2180 2181 assert(b != NULL); 2182 blob = *b; 2183 assert(blob != NULL); 2184 2185 ctx = calloc(1, sizeof(*ctx)); 2186 if (!ctx) { 2187 cb_fn(cb_arg, NULL, -ENOMEM); 2188 return; 2189 } 2190 2191 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2192 ctx->bs = bs; 2193 ctx->cb_fn = cb_fn; 2194 ctx->cb_arg = cb_arg; 2195 2196 /* Close the existing blob */ 2197 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2198 } 2199 2200 int 2201 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2202 uint16_t value_len) 2203 { 2204 struct spdk_xattr *xattr; 2205 2206 assert(blob != NULL); 2207 2208 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2209 blob->state != SPDK_BLOB_STATE_SYNCING); 2210 2211 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2212 if (!strcmp(name, xattr->name)) { 2213 free(xattr->value); 2214 xattr->value_len = value_len; 2215 xattr->value = malloc(value_len); 2216 memcpy(xattr->value, value, value_len); 2217 2218 blob->state = SPDK_BLOB_STATE_DIRTY; 2219 2220 return 0; 2221 } 2222 } 2223 2224 /* 2225 * This is probably all going to rewritten, so do not bother checking for failed 2226 * allocations for now. 2227 */ 2228 xattr = calloc(1, sizeof(*xattr)); 2229 xattr->name = strdup(name); 2230 xattr->value_len = value_len; 2231 xattr->value = malloc(value_len); 2232 memcpy(xattr->value, value, value_len); 2233 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2234 2235 blob->state = SPDK_BLOB_STATE_DIRTY; 2236 2237 return 0; 2238 } 2239 2240 int 2241 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 2242 { 2243 struct spdk_xattr *xattr; 2244 2245 assert(blob != NULL); 2246 2247 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2248 blob->state != SPDK_BLOB_STATE_SYNCING); 2249 2250 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2251 if (!strcmp(name, xattr->name)) { 2252 TAILQ_REMOVE(&blob->xattrs, xattr, link); 2253 free(xattr->value); 2254 free(xattr->name); 2255 free(xattr); 2256 2257 blob->state = SPDK_BLOB_STATE_DIRTY; 2258 2259 return 0; 2260 } 2261 } 2262 2263 return -ENOENT; 2264 } 2265 2266 int 2267 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 2268 const void **value, size_t *value_len) 2269 { 2270 struct spdk_xattr *xattr; 2271 2272 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2273 if (!strcmp(name, xattr->name)) { 2274 *value = xattr->value; 2275 *value_len = xattr->value_len; 2276 return 0; 2277 } 2278 } 2279 2280 return -ENOENT; 2281 } 2282 2283 struct spdk_xattr_names { 2284 uint32_t count; 2285 const char *names[0]; 2286 }; 2287 2288 int 2289 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 2290 struct spdk_xattr_names **names) 2291 { 2292 struct spdk_xattr *xattr; 2293 int count = 0; 2294 2295 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2296 count++; 2297 } 2298 2299 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 2300 if (*names == NULL) { 2301 return -ENOMEM; 2302 } 2303 2304 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2305 (*names)->names[(*names)->count++] = xattr->name; 2306 } 2307 2308 return 0; 2309 } 2310 2311 uint32_t 2312 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 2313 { 2314 assert(names != NULL); 2315 2316 return names->count; 2317 } 2318 2319 const char * 2320 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 2321 { 2322 if (index >= names->count) { 2323 return NULL; 2324 } 2325 2326 return names->names[index]; 2327 } 2328 2329 void 2330 spdk_xattr_names_free(struct spdk_xattr_names *names) 2331 { 2332 free(names); 2333 } 2334 2335 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 2336