1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blob.h" 37 #include "spdk/crc32.h" 38 #include "spdk/env.h" 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/bit_array.h" 42 #include "spdk/likely.h" 43 44 #include "spdk_internal/log.h" 45 46 #include "blobstore.h" 47 48 #define BLOB_CRC32C_INITIAL 0xffffffffUL 49 50 static inline size_t 51 divide_round_up(size_t num, size_t divisor) 52 { 53 return (num + divisor - 1) / divisor; 54 } 55 56 static void 57 _spdk_bs_claim_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 58 { 59 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 60 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == false); 61 assert(bs->num_free_clusters > 0); 62 63 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %u\n", cluster_num); 64 65 spdk_bit_array_set(bs->used_clusters, cluster_num); 66 bs->num_free_clusters--; 67 } 68 69 static void 70 _spdk_bs_release_cluster(struct spdk_blob_store *bs, uint32_t cluster_num) 71 { 72 assert(cluster_num < spdk_bit_array_capacity(bs->used_clusters)); 73 assert(spdk_bit_array_get(bs->used_clusters, cluster_num) == true); 74 assert(bs->num_free_clusters < bs->total_clusters); 75 76 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Releasing cluster %u\n", cluster_num); 77 78 spdk_bit_array_clear(bs->used_clusters, cluster_num); 79 bs->num_free_clusters++; 80 } 81 82 static struct spdk_blob * 83 _spdk_blob_alloc(struct spdk_blob_store *bs, spdk_blob_id id) 84 { 85 struct spdk_blob *blob; 86 87 blob = calloc(1, sizeof(*blob)); 88 if (!blob) { 89 return NULL; 90 } 91 92 blob->id = id; 93 blob->bs = bs; 94 95 blob->state = SPDK_BLOB_STATE_DIRTY; 96 blob->active.num_pages = 1; 97 blob->active.pages = calloc(1, sizeof(*blob->active.pages)); 98 if (!blob->active.pages) { 99 free(blob); 100 return NULL; 101 } 102 103 blob->active.pages[0] = _spdk_bs_blobid_to_page(id); 104 105 TAILQ_INIT(&blob->xattrs); 106 107 return blob; 108 } 109 110 static void 111 _spdk_blob_free(struct spdk_blob *blob) 112 { 113 struct spdk_xattr *xattr, *xattr_tmp; 114 115 assert(blob != NULL); 116 117 free(blob->active.clusters); 118 free(blob->clean.clusters); 119 free(blob->active.pages); 120 free(blob->clean.pages); 121 122 TAILQ_FOREACH_SAFE(xattr, &blob->xattrs, link, xattr_tmp) { 123 TAILQ_REMOVE(&blob->xattrs, xattr, link); 124 free(xattr->name); 125 free(xattr->value); 126 free(xattr); 127 } 128 129 free(blob); 130 } 131 132 static int 133 _spdk_blob_mark_clean(struct spdk_blob *blob) 134 { 135 uint64_t *clusters = NULL; 136 uint32_t *pages = NULL; 137 138 assert(blob != NULL); 139 assert(blob->state == SPDK_BLOB_STATE_LOADING || 140 blob->state == SPDK_BLOB_STATE_SYNCING); 141 142 if (blob->active.num_clusters) { 143 assert(blob->active.clusters); 144 clusters = calloc(blob->active.num_clusters, sizeof(*blob->active.clusters)); 145 if (!clusters) { 146 return -1; 147 } 148 memcpy(clusters, blob->active.clusters, blob->active.num_clusters * sizeof(*clusters)); 149 } 150 151 if (blob->active.num_pages) { 152 assert(blob->active.pages); 153 pages = calloc(blob->active.num_pages, sizeof(*blob->active.pages)); 154 if (!pages) { 155 free(clusters); 156 return -1; 157 } 158 memcpy(pages, blob->active.pages, blob->active.num_pages * sizeof(*pages)); 159 } 160 161 free(blob->clean.clusters); 162 free(blob->clean.pages); 163 164 blob->clean.num_clusters = blob->active.num_clusters; 165 blob->clean.clusters = blob->active.clusters; 166 blob->clean.num_pages = blob->active.num_pages; 167 blob->clean.pages = blob->active.pages; 168 169 blob->active.clusters = clusters; 170 blob->active.pages = pages; 171 172 blob->state = SPDK_BLOB_STATE_CLEAN; 173 174 return 0; 175 } 176 177 static void 178 _spdk_blob_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob *blob) 179 { 180 struct spdk_blob_md_descriptor *desc; 181 size_t cur_desc = 0; 182 void *tmp; 183 184 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 185 while (cur_desc < sizeof(page->descriptors)) { 186 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 187 if (desc->length == 0) { 188 /* If padding and length are 0, this terminates the page */ 189 break; 190 } 191 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 192 struct spdk_blob_md_descriptor_extent *desc_extent; 193 unsigned int i, j; 194 unsigned int cluster_count = blob->active.num_clusters; 195 196 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 197 198 assert(desc_extent->length > 0); 199 assert(desc_extent->length % sizeof(desc_extent->extents[0]) == 0); 200 201 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 202 for (j = 0; j < desc_extent->extents[i].length; j++) { 203 assert(spdk_bit_array_get(blob->bs->used_clusters, desc_extent->extents[i].cluster_idx + j)); 204 cluster_count++; 205 } 206 } 207 208 assert(cluster_count > 0); 209 tmp = realloc(blob->active.clusters, cluster_count * sizeof(uint64_t)); 210 assert(tmp != NULL); 211 blob->active.clusters = tmp; 212 blob->active.cluster_array_size = cluster_count; 213 214 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 215 for (j = 0; j < desc_extent->extents[i].length; j++) { 216 blob->active.clusters[blob->active.num_clusters++] = _spdk_bs_cluster_to_lba(blob->bs, 217 desc_extent->extents[i].cluster_idx + j); 218 } 219 } 220 221 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 222 struct spdk_blob_md_descriptor_xattr *desc_xattr; 223 struct spdk_xattr *xattr; 224 225 desc_xattr = (struct spdk_blob_md_descriptor_xattr *)desc; 226 227 assert(desc_xattr->length == sizeof(desc_xattr->name_length) + 228 sizeof(desc_xattr->value_length) + 229 desc_xattr->name_length + desc_xattr->value_length); 230 231 xattr = calloc(1, sizeof(*xattr)); 232 assert(xattr != NULL); 233 234 xattr->name = malloc(desc_xattr->name_length + 1); 235 assert(xattr->name); 236 strncpy(xattr->name, desc_xattr->name, desc_xattr->name_length); 237 xattr->name[desc_xattr->name_length] = '\0'; 238 239 xattr->value = malloc(desc_xattr->value_length); 240 assert(xattr->value != NULL); 241 xattr->value_len = desc_xattr->value_length; 242 memcpy(xattr->value, 243 (void *)((uintptr_t)desc_xattr->name + desc_xattr->name_length), 244 desc_xattr->value_length); 245 246 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 247 } else { 248 /* Error */ 249 break; 250 } 251 252 /* Advance to the next descriptor */ 253 cur_desc += sizeof(*desc) + desc->length; 254 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 255 break; 256 } 257 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 258 } 259 } 260 261 static int 262 _spdk_blob_parse(const struct spdk_blob_md_page *pages, uint32_t page_count, 263 struct spdk_blob *blob) 264 { 265 const struct spdk_blob_md_page *page; 266 uint32_t i; 267 268 assert(page_count > 0); 269 assert(pages[0].sequence_num == 0); 270 assert(blob != NULL); 271 assert(blob->state == SPDK_BLOB_STATE_LOADING); 272 assert(blob->active.clusters == NULL); 273 assert(blob->state == SPDK_BLOB_STATE_LOADING); 274 275 /* The blobid provided doesn't match what's in the MD, this can 276 * happen for example if a bogus blobid is passed in through open. 277 */ 278 if (blob->id != pages[0].id) { 279 SPDK_ERRLOG("Blobid (%lu) doesn't match what's in metadata (%lu)\n", 280 blob->id, pages[0].id); 281 return -ENOENT; 282 } 283 284 for (i = 0; i < page_count; i++) { 285 page = &pages[i]; 286 287 assert(page->id == blob->id); 288 assert(page->sequence_num == i); 289 290 _spdk_blob_parse_page(page, blob); 291 } 292 293 return 0; 294 } 295 296 static int 297 _spdk_blob_serialize_add_page(const struct spdk_blob *blob, 298 struct spdk_blob_md_page **pages, 299 uint32_t *page_count, 300 struct spdk_blob_md_page **last_page) 301 { 302 struct spdk_blob_md_page *page; 303 304 assert(pages != NULL); 305 assert(page_count != NULL); 306 307 if (*page_count == 0) { 308 assert(*pages == NULL); 309 *page_count = 1; 310 *pages = spdk_dma_malloc(SPDK_BS_PAGE_SIZE, 311 SPDK_BS_PAGE_SIZE, 312 NULL); 313 } else { 314 assert(*pages != NULL); 315 (*page_count)++; 316 *pages = spdk_dma_realloc(*pages, 317 SPDK_BS_PAGE_SIZE * (*page_count), 318 SPDK_BS_PAGE_SIZE, 319 NULL); 320 } 321 322 if (*pages == NULL) { 323 *page_count = 0; 324 *last_page = NULL; 325 return -ENOMEM; 326 } 327 328 page = &(*pages)[*page_count - 1]; 329 memset(page, 0, sizeof(*page)); 330 page->id = blob->id; 331 page->sequence_num = *page_count - 1; 332 page->next = SPDK_INVALID_MD_PAGE; 333 *last_page = page; 334 335 return 0; 336 } 337 338 /* Transform the in-memory representation 'xattr' into an on-disk xattr descriptor. 339 * Update required_sz on both success and failure. 340 * 341 */ 342 static int 343 _spdk_blob_serialize_xattr(const struct spdk_xattr *xattr, 344 uint8_t *buf, size_t buf_sz, 345 size_t *required_sz) 346 { 347 struct spdk_blob_md_descriptor_xattr *desc; 348 349 *required_sz = sizeof(struct spdk_blob_md_descriptor_xattr) + 350 strlen(xattr->name) + 351 xattr->value_len; 352 353 if (buf_sz < *required_sz) { 354 return -1; 355 } 356 357 desc = (struct spdk_blob_md_descriptor_xattr *)buf; 358 359 desc->type = SPDK_MD_DESCRIPTOR_TYPE_XATTR; 360 desc->length = sizeof(desc->name_length) + 361 sizeof(desc->value_length) + 362 strlen(xattr->name) + 363 xattr->value_len; 364 desc->name_length = strlen(xattr->name); 365 desc->value_length = xattr->value_len; 366 367 memcpy(desc->name, xattr->name, desc->name_length); 368 memcpy((void *)((uintptr_t)desc->name + desc->name_length), 369 xattr->value, 370 desc->value_length); 371 372 return 0; 373 } 374 375 static void 376 _spdk_blob_serialize_extent(const struct spdk_blob *blob, 377 uint64_t start_cluster, uint64_t *next_cluster, 378 uint8_t *buf, size_t buf_sz) 379 { 380 struct spdk_blob_md_descriptor_extent *desc; 381 size_t cur_sz; 382 uint64_t i, extent_idx; 383 uint32_t lba, lba_per_cluster, lba_count; 384 385 /* The buffer must have room for at least one extent */ 386 cur_sz = sizeof(struct spdk_blob_md_descriptor) + sizeof(desc->extents[0]); 387 if (buf_sz < cur_sz) { 388 *next_cluster = start_cluster; 389 return; 390 } 391 392 desc = (struct spdk_blob_md_descriptor_extent *)buf; 393 desc->type = SPDK_MD_DESCRIPTOR_TYPE_EXTENT; 394 395 lba_per_cluster = _spdk_bs_cluster_to_lba(blob->bs, 1); 396 397 lba = blob->active.clusters[start_cluster]; 398 lba_count = lba_per_cluster; 399 extent_idx = 0; 400 for (i = start_cluster + 1; i < blob->active.num_clusters; i++) { 401 if ((lba + lba_count) == blob->active.clusters[i]) { 402 lba_count += lba_per_cluster; 403 continue; 404 } 405 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 406 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 407 extent_idx++; 408 409 cur_sz += sizeof(desc->extents[extent_idx]); 410 411 if (buf_sz < cur_sz) { 412 /* If we ran out of buffer space, return */ 413 desc->length = sizeof(desc->extents[0]) * extent_idx; 414 *next_cluster = i; 415 return; 416 } 417 418 lba = blob->active.clusters[i]; 419 lba_count = lba_per_cluster; 420 } 421 422 desc->extents[extent_idx].cluster_idx = lba / lba_per_cluster; 423 desc->extents[extent_idx].length = lba_count / lba_per_cluster; 424 extent_idx++; 425 426 desc->length = sizeof(desc->extents[0]) * extent_idx; 427 *next_cluster = blob->active.num_clusters; 428 429 return; 430 } 431 432 static int 433 _spdk_blob_serialize(const struct spdk_blob *blob, struct spdk_blob_md_page **pages, 434 uint32_t *page_count) 435 { 436 struct spdk_blob_md_page *cur_page; 437 const struct spdk_xattr *xattr; 438 int rc; 439 uint8_t *buf; 440 size_t remaining_sz; 441 uint64_t last_cluster; 442 443 assert(pages != NULL); 444 assert(page_count != NULL); 445 assert(blob != NULL); 446 assert(blob->state == SPDK_BLOB_STATE_SYNCING); 447 448 *pages = NULL; 449 *page_count = 0; 450 451 /* A blob always has at least 1 page, even if it has no descriptors */ 452 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, &cur_page); 453 if (rc < 0) { 454 return rc; 455 } 456 457 buf = (uint8_t *)cur_page->descriptors; 458 remaining_sz = sizeof(cur_page->descriptors); 459 460 /* Serialize xattrs */ 461 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 462 size_t required_sz = 0; 463 rc = _spdk_blob_serialize_xattr(xattr, 464 buf, remaining_sz, 465 &required_sz); 466 if (rc < 0) { 467 /* Need to add a new page to the chain */ 468 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 469 &cur_page); 470 if (rc < 0) { 471 spdk_dma_free(*pages); 472 *pages = NULL; 473 *page_count = 0; 474 return rc; 475 } 476 477 buf = (uint8_t *)cur_page->descriptors; 478 remaining_sz = sizeof(cur_page->descriptors); 479 480 /* Try again */ 481 required_sz = 0; 482 rc = _spdk_blob_serialize_xattr(xattr, 483 buf, remaining_sz, 484 &required_sz); 485 486 if (rc < 0) { 487 spdk_dma_free(*pages); 488 *pages = NULL; 489 *page_count = 0; 490 return -1; 491 } 492 } 493 494 remaining_sz -= required_sz; 495 buf += required_sz; 496 } 497 498 /* Serialize extents */ 499 last_cluster = 0; 500 while (last_cluster < blob->active.num_clusters) { 501 _spdk_blob_serialize_extent(blob, last_cluster, &last_cluster, 502 buf, remaining_sz); 503 504 if (last_cluster == blob->active.num_clusters) { 505 break; 506 } 507 508 rc = _spdk_blob_serialize_add_page(blob, pages, page_count, 509 &cur_page); 510 if (rc < 0) { 511 return rc; 512 } 513 514 buf = (uint8_t *)cur_page->descriptors; 515 remaining_sz = sizeof(cur_page->descriptors); 516 } 517 518 return 0; 519 } 520 521 struct spdk_blob_load_ctx { 522 struct spdk_blob *blob; 523 524 struct spdk_blob_md_page *pages; 525 uint32_t num_pages; 526 527 spdk_bs_sequence_cpl cb_fn; 528 void *cb_arg; 529 }; 530 531 static uint32_t 532 _spdk_blob_md_page_calc_crc(void *page) 533 { 534 uint32_t crc; 535 536 crc = BLOB_CRC32C_INITIAL; 537 crc = spdk_crc32c_update(page, SPDK_BS_PAGE_SIZE - 4, crc); 538 crc ^= BLOB_CRC32C_INITIAL; 539 540 return crc; 541 542 } 543 544 static void 545 _spdk_blob_load_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 546 { 547 struct spdk_blob_load_ctx *ctx = cb_arg; 548 struct spdk_blob *blob = ctx->blob; 549 struct spdk_blob_md_page *page; 550 int rc; 551 uint32_t crc; 552 553 page = &ctx->pages[ctx->num_pages - 1]; 554 crc = _spdk_blob_md_page_calc_crc(page); 555 if (crc != page->crc) { 556 SPDK_ERRLOG("Metadata page %d crc mismatch\n", ctx->num_pages); 557 _spdk_blob_free(blob); 558 ctx->cb_fn(seq, NULL, -EINVAL); 559 spdk_dma_free(ctx->pages); 560 free(ctx); 561 return; 562 } 563 564 if (page->next != SPDK_INVALID_MD_PAGE) { 565 uint32_t next_page = page->next; 566 uint64_t next_lba = _spdk_bs_page_to_lba(blob->bs, blob->bs->md_start + next_page); 567 568 569 assert(next_lba < (blob->bs->md_start + blob->bs->md_len)); 570 571 /* Read the next page */ 572 ctx->num_pages++; 573 ctx->pages = spdk_dma_realloc(ctx->pages, (sizeof(*page) * ctx->num_pages), 574 sizeof(*page), NULL); 575 if (ctx->pages == NULL) { 576 ctx->cb_fn(seq, ctx->cb_arg, -ENOMEM); 577 free(ctx); 578 return; 579 } 580 581 spdk_bs_sequence_read(seq, &ctx->pages[ctx->num_pages - 1], 582 next_lba, 583 _spdk_bs_byte_to_lba(blob->bs, sizeof(*page)), 584 _spdk_blob_load_cpl, ctx); 585 return; 586 } 587 588 /* Parse the pages */ 589 rc = _spdk_blob_parse(ctx->pages, ctx->num_pages, blob); 590 if (rc) { 591 _spdk_blob_free(blob); 592 ctx->cb_fn(seq, NULL, rc); 593 spdk_dma_free(ctx->pages); 594 free(ctx); 595 return; 596 } 597 598 _spdk_blob_mark_clean(blob); 599 600 ctx->cb_fn(seq, ctx->cb_arg, rc); 601 602 /* Free the memory */ 603 spdk_dma_free(ctx->pages); 604 free(ctx); 605 } 606 607 /* Load a blob from disk given a blobid */ 608 static void 609 _spdk_blob_load(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 610 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 611 { 612 struct spdk_blob_load_ctx *ctx; 613 struct spdk_blob_store *bs; 614 uint32_t page_num; 615 uint64_t lba; 616 617 assert(blob != NULL); 618 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 619 blob->state == SPDK_BLOB_STATE_DIRTY); 620 621 bs = blob->bs; 622 623 ctx = calloc(1, sizeof(*ctx)); 624 if (!ctx) { 625 cb_fn(seq, cb_arg, -ENOMEM); 626 return; 627 } 628 629 ctx->blob = blob; 630 ctx->pages = spdk_dma_realloc(ctx->pages, SPDK_BS_PAGE_SIZE, 631 SPDK_BS_PAGE_SIZE, NULL); 632 if (!ctx->pages) { 633 free(ctx); 634 cb_fn(seq, cb_arg, -ENOMEM); 635 return; 636 } 637 ctx->num_pages = 1; 638 ctx->cb_fn = cb_fn; 639 ctx->cb_arg = cb_arg; 640 641 page_num = _spdk_bs_blobid_to_page(blob->id); 642 lba = _spdk_bs_page_to_lba(blob->bs, bs->md_start + page_num); 643 644 blob->state = SPDK_BLOB_STATE_LOADING; 645 646 spdk_bs_sequence_read(seq, &ctx->pages[0], lba, 647 _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE), 648 _spdk_blob_load_cpl, ctx); 649 } 650 651 struct spdk_blob_persist_ctx { 652 struct spdk_blob *blob; 653 654 struct spdk_blob_md_page *pages; 655 656 uint64_t idx; 657 658 spdk_bs_sequence_cpl cb_fn; 659 void *cb_arg; 660 }; 661 662 static void 663 _spdk_blob_persist_complete(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 664 { 665 struct spdk_blob_persist_ctx *ctx = cb_arg; 666 struct spdk_blob *blob = ctx->blob; 667 668 if (bserrno == 0) { 669 _spdk_blob_mark_clean(blob); 670 } 671 672 /* Call user callback */ 673 ctx->cb_fn(seq, ctx->cb_arg, bserrno); 674 675 /* Free the memory */ 676 spdk_dma_free(ctx->pages); 677 free(ctx); 678 } 679 680 static void 681 _spdk_blob_persist_unmap_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 682 { 683 struct spdk_blob_persist_ctx *ctx = cb_arg; 684 struct spdk_blob *blob = ctx->blob; 685 struct spdk_blob_store *bs = blob->bs; 686 void *tmp; 687 size_t i; 688 689 /* Release all clusters that were truncated */ 690 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 691 uint32_t cluster_num = _spdk_bs_lba_to_cluster(bs, blob->active.clusters[i]); 692 693 _spdk_bs_release_cluster(bs, cluster_num); 694 } 695 696 if (blob->active.num_clusters == 0) { 697 free(blob->active.clusters); 698 blob->active.clusters = NULL; 699 blob->active.cluster_array_size = 0; 700 } else { 701 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * blob->active.num_clusters); 702 assert(tmp != NULL); 703 blob->active.clusters = tmp; 704 blob->active.cluster_array_size = blob->active.num_clusters; 705 } 706 707 _spdk_blob_persist_complete(seq, ctx, bserrno); 708 } 709 710 static void 711 _spdk_blob_persist_unmap_clusters(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 712 { 713 struct spdk_blob_persist_ctx *ctx = cb_arg; 714 struct spdk_blob *blob = ctx->blob; 715 struct spdk_blob_store *bs = blob->bs; 716 spdk_bs_batch_t *batch; 717 size_t i; 718 uint64_t lba; 719 uint32_t lba_count; 720 721 /* Clusters don't move around in blobs. The list shrinks or grows 722 * at the end, but no changes ever occur in the middle of the list. 723 */ 724 725 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_unmap_clusters_cpl, ctx); 726 727 /* Unmap all clusters that were truncated */ 728 lba = 0; 729 lba_count = 0; 730 for (i = blob->active.num_clusters; i < blob->active.cluster_array_size; i++) { 731 uint64_t next_lba = blob->active.clusters[i]; 732 uint32_t next_lba_count = _spdk_bs_cluster_to_lba(bs, 1); 733 734 if ((lba + lba_count) == next_lba) { 735 /* This cluster is contiguous with the previous one. */ 736 lba_count += next_lba_count; 737 continue; 738 } 739 740 /* This cluster is not contiguous with the previous one. */ 741 742 /* If a run of LBAs previously existing, send them 743 * as an unmap. 744 */ 745 if (lba_count > 0) { 746 spdk_bs_batch_unmap(batch, lba, lba_count); 747 } 748 749 /* Start building the next batch */ 750 lba = next_lba; 751 lba_count = next_lba_count; 752 } 753 754 /* If we ended with a contiguous set of LBAs, send the unmap now */ 755 if (lba_count > 0) { 756 spdk_bs_batch_unmap(batch, lba, lba_count); 757 } 758 759 spdk_bs_batch_close(batch); 760 } 761 762 static void 763 _spdk_blob_persist_zero_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 764 { 765 struct spdk_blob_persist_ctx *ctx = cb_arg; 766 struct spdk_blob *blob = ctx->blob; 767 struct spdk_blob_store *bs = blob->bs; 768 size_t i; 769 770 /* This loop starts at 1 because the first page is special and handled 771 * below. The pages (except the first) are never written in place, 772 * so any pages in the clean list must be zeroed. 773 */ 774 for (i = 1; i < blob->clean.num_pages; i++) { 775 spdk_bit_array_clear(bs->used_md_pages, blob->clean.pages[i]); 776 } 777 778 if (blob->active.num_pages == 0) { 779 uint32_t page_num; 780 781 page_num = _spdk_bs_blobid_to_page(blob->id); 782 spdk_bit_array_clear(bs->used_md_pages, page_num); 783 } 784 785 /* Move on to unmapping clusters */ 786 _spdk_blob_persist_unmap_clusters(seq, ctx, 0); 787 } 788 789 static void 790 _spdk_blob_persist_zero_pages(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 791 { 792 struct spdk_blob_persist_ctx *ctx = cb_arg; 793 struct spdk_blob *blob = ctx->blob; 794 struct spdk_blob_store *bs = blob->bs; 795 uint64_t lba; 796 uint32_t lba_count; 797 spdk_bs_batch_t *batch; 798 size_t i; 799 800 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_zero_pages_cpl, ctx); 801 802 lba_count = _spdk_bs_byte_to_lba(bs, SPDK_BS_PAGE_SIZE); 803 804 /* This loop starts at 1 because the first page is special and handled 805 * below. The pages (except the first) are never written in place, 806 * so any pages in the clean list must be zeroed. 807 */ 808 for (i = 1; i < blob->clean.num_pages; i++) { 809 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->clean.pages[i]); 810 811 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 812 } 813 814 /* The first page will only be zeroed if this is a delete. */ 815 if (blob->active.num_pages == 0) { 816 uint32_t page_num; 817 818 /* The first page in the metadata goes where the blobid indicates */ 819 page_num = _spdk_bs_blobid_to_page(blob->id); 820 lba = _spdk_bs_page_to_lba(bs, bs->md_start + page_num); 821 822 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 823 } 824 825 spdk_bs_batch_close(batch); 826 } 827 828 static void 829 _spdk_blob_persist_write_page_root(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 830 { 831 struct spdk_blob_persist_ctx *ctx = cb_arg; 832 struct spdk_blob *blob = ctx->blob; 833 struct spdk_blob_store *bs = blob->bs; 834 uint64_t lba; 835 uint32_t lba_count; 836 struct spdk_blob_md_page *page; 837 838 if (blob->active.num_pages == 0) { 839 /* Move on to the next step */ 840 _spdk_blob_persist_zero_pages(seq, ctx, 0); 841 return; 842 } 843 844 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 845 846 page = &ctx->pages[0]; 847 /* The first page in the metadata goes where the blobid indicates */ 848 lba = _spdk_bs_page_to_lba(bs, bs->md_start + _spdk_bs_blobid_to_page(blob->id)); 849 850 spdk_bs_sequence_write(seq, page, lba, lba_count, 851 _spdk_blob_persist_zero_pages, ctx); 852 } 853 854 static void 855 _spdk_blob_persist_write_page_chain(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 856 { 857 struct spdk_blob_persist_ctx *ctx = cb_arg; 858 struct spdk_blob *blob = ctx->blob; 859 struct spdk_blob_store *bs = blob->bs; 860 uint64_t lba; 861 uint32_t lba_count; 862 struct spdk_blob_md_page *page; 863 spdk_bs_batch_t *batch; 864 size_t i; 865 866 /* Clusters don't move around in blobs. The list shrinks or grows 867 * at the end, but no changes ever occur in the middle of the list. 868 */ 869 870 lba_count = _spdk_bs_byte_to_lba(bs, sizeof(*page)); 871 872 batch = spdk_bs_sequence_to_batch(seq, _spdk_blob_persist_write_page_root, ctx); 873 874 /* This starts at 1. The root page is not written until 875 * all of the others are finished 876 */ 877 for (i = 1; i < blob->active.num_pages; i++) { 878 page = &ctx->pages[i]; 879 assert(page->sequence_num == i); 880 881 lba = _spdk_bs_page_to_lba(bs, bs->md_start + blob->active.pages[i]); 882 883 spdk_bs_batch_write(batch, page, lba, lba_count); 884 } 885 886 spdk_bs_batch_close(batch); 887 } 888 889 static int 890 _spdk_resize_blob(struct spdk_blob *blob, uint64_t sz) 891 { 892 uint64_t i; 893 uint64_t *tmp; 894 uint64_t lfc; /* lowest free cluster */ 895 struct spdk_blob_store *bs; 896 897 bs = blob->bs; 898 899 assert(blob->state != SPDK_BLOB_STATE_LOADING && 900 blob->state != SPDK_BLOB_STATE_SYNCING); 901 902 if (blob->active.num_clusters == sz) { 903 return 0; 904 } 905 906 if (blob->active.num_clusters < blob->active.cluster_array_size) { 907 /* If this blob was resized to be larger, then smaller, then 908 * larger without syncing, then the cluster array already 909 * contains spare assigned clusters we can use. 910 */ 911 blob->active.num_clusters = spdk_min(blob->active.cluster_array_size, 912 sz); 913 } 914 915 blob->state = SPDK_BLOB_STATE_DIRTY; 916 917 /* Do two passes - one to verify that we can obtain enough clusters 918 * and another to actually claim them. 919 */ 920 921 lfc = 0; 922 for (i = blob->active.num_clusters; i < sz; i++) { 923 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 924 if (lfc >= bs->total_clusters) { 925 /* No more free clusters. Cannot satisfy the request */ 926 assert(false); 927 return -1; 928 } 929 lfc++; 930 } 931 932 if (sz > blob->active.num_clusters) { 933 /* Expand the cluster array if necessary. 934 * We only shrink the array when persisting. 935 */ 936 tmp = realloc(blob->active.clusters, sizeof(uint64_t) * sz); 937 if (sz > 0 && tmp == NULL) { 938 assert(false); 939 return -1; 940 } 941 blob->active.clusters = tmp; 942 blob->active.cluster_array_size = sz; 943 } 944 945 lfc = 0; 946 for (i = blob->active.num_clusters; i < sz; i++) { 947 lfc = spdk_bit_array_find_first_clear(bs->used_clusters, lfc); 948 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming cluster %lu for blob %lu\n", lfc, blob->id); 949 _spdk_bs_claim_cluster(bs, lfc); 950 blob->active.clusters[i] = _spdk_bs_cluster_to_lba(bs, lfc); 951 lfc++; 952 } 953 954 blob->active.num_clusters = sz; 955 956 return 0; 957 } 958 959 /* Write a blob to disk */ 960 static void 961 _spdk_blob_persist(spdk_bs_sequence_t *seq, struct spdk_blob *blob, 962 spdk_bs_sequence_cpl cb_fn, void *cb_arg) 963 { 964 struct spdk_blob_persist_ctx *ctx; 965 int rc; 966 uint64_t i; 967 uint32_t page_num; 968 struct spdk_blob_store *bs; 969 970 assert(blob != NULL); 971 assert(blob->state == SPDK_BLOB_STATE_CLEAN || 972 blob->state == SPDK_BLOB_STATE_DIRTY); 973 974 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 975 cb_fn(seq, cb_arg, 0); 976 return; 977 } 978 979 bs = blob->bs; 980 981 ctx = calloc(1, sizeof(*ctx)); 982 if (!ctx) { 983 cb_fn(seq, cb_arg, -ENOMEM); 984 return; 985 } 986 ctx->blob = blob; 987 ctx->cb_fn = cb_fn; 988 ctx->cb_arg = cb_arg; 989 990 blob->state = SPDK_BLOB_STATE_SYNCING; 991 992 if (blob->active.num_pages == 0) { 993 /* This is the signal that the blob should be deleted. 994 * Immediately jump to the clean up routine. */ 995 assert(blob->clean.num_pages > 0); 996 ctx->idx = blob->clean.num_pages - 1; 997 _spdk_blob_persist_zero_pages(seq, ctx, 0); 998 return; 999 1000 } 1001 1002 /* Generate the new metadata */ 1003 rc = _spdk_blob_serialize(blob, &ctx->pages, &blob->active.num_pages); 1004 if (rc < 0) { 1005 free(ctx); 1006 cb_fn(seq, cb_arg, rc); 1007 return; 1008 } 1009 1010 assert(blob->active.num_pages >= 1); 1011 1012 /* Resize the cache of page indices */ 1013 blob->active.pages = realloc(blob->active.pages, 1014 blob->active.num_pages * sizeof(*blob->active.pages)); 1015 if (!blob->active.pages) { 1016 free(ctx); 1017 cb_fn(seq, cb_arg, -ENOMEM); 1018 return; 1019 } 1020 1021 /* Assign this metadata to pages. This requires two passes - 1022 * one to verify that there are enough pages and a second 1023 * to actually claim them. */ 1024 page_num = 0; 1025 /* Note that this loop starts at one. The first page location is fixed by the blobid. */ 1026 for (i = 1; i < blob->active.num_pages; i++) { 1027 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1028 if (page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 1029 spdk_dma_free(ctx->pages); 1030 free(ctx); 1031 blob->state = SPDK_BLOB_STATE_DIRTY; 1032 cb_fn(seq, cb_arg, -ENOMEM); 1033 return; 1034 } 1035 page_num++; 1036 } 1037 1038 page_num = 0; 1039 blob->active.pages[0] = _spdk_bs_blobid_to_page(blob->id); 1040 for (i = 1; i < blob->active.num_pages; i++) { 1041 page_num = spdk_bit_array_find_first_clear(bs->used_md_pages, page_num); 1042 ctx->pages[i - 1].next = page_num; 1043 /* Now that previous metadata page is complete, calculate the crc for it. */ 1044 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1045 blob->active.pages[i] = page_num; 1046 spdk_bit_array_set(bs->used_md_pages, page_num); 1047 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Claiming page %u for blob %lu\n", page_num, blob->id); 1048 page_num++; 1049 } 1050 ctx->pages[i - 1].crc = _spdk_blob_md_page_calc_crc(&ctx->pages[i - 1]); 1051 /* Start writing the metadata from last page to first */ 1052 ctx->idx = blob->active.num_pages - 1; 1053 _spdk_blob_persist_write_page_chain(seq, ctx, 0); 1054 } 1055 1056 static void 1057 _spdk_blob_request_submit_op(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1058 void *payload, uint64_t offset, uint64_t length, 1059 spdk_blob_op_complete cb_fn, void *cb_arg, enum spdk_blob_op_type op_type) 1060 { 1061 spdk_bs_batch_t *batch; 1062 struct spdk_bs_cpl cpl; 1063 uint64_t lba; 1064 uint32_t lba_count; 1065 uint8_t *buf; 1066 uint64_t page; 1067 1068 assert(blob != NULL); 1069 1070 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1071 cb_fn(cb_arg, -EINVAL); 1072 return; 1073 } 1074 1075 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1076 cpl.u.blob_basic.cb_fn = cb_fn; 1077 cpl.u.blob_basic.cb_arg = cb_arg; 1078 1079 batch = spdk_bs_batch_open(_channel, &cpl); 1080 if (!batch) { 1081 cb_fn(cb_arg, -ENOMEM); 1082 return; 1083 } 1084 1085 length = _spdk_bs_page_to_lba(blob->bs, length); 1086 page = offset; 1087 buf = payload; 1088 while (length > 0) { 1089 lba = _spdk_bs_blob_page_to_lba(blob, page); 1090 lba_count = spdk_min(length, 1091 _spdk_bs_page_to_lba(blob->bs, 1092 _spdk_bs_num_pages_to_cluster_boundary(blob, page))); 1093 1094 switch (op_type) { 1095 case SPDK_BLOB_READ: 1096 spdk_bs_batch_read(batch, buf, lba, lba_count); 1097 break; 1098 case SPDK_BLOB_WRITE: 1099 spdk_bs_batch_write(batch, buf, lba, lba_count); 1100 break; 1101 case SPDK_BLOB_UNMAP: 1102 spdk_bs_batch_unmap(batch, lba, lba_count); 1103 break; 1104 case SPDK_BLOB_WRITE_ZEROES: 1105 spdk_bs_batch_write_zeroes(batch, lba, lba_count); 1106 break; 1107 } 1108 1109 length -= lba_count; 1110 page += _spdk_bs_lba_to_page(blob->bs, lba_count); 1111 if (op_type == SPDK_BLOB_WRITE || op_type == SPDK_BLOB_READ) { 1112 buf += _spdk_bs_lba_to_byte(blob->bs, lba_count); 1113 } 1114 } 1115 1116 spdk_bs_batch_close(batch); 1117 } 1118 1119 struct rw_iov_ctx { 1120 struct spdk_blob *blob; 1121 bool read; 1122 int iovcnt; 1123 struct iovec *orig_iov; 1124 uint64_t page_offset; 1125 uint64_t pages_remaining; 1126 uint64_t pages_done; 1127 struct iovec iov[0]; 1128 }; 1129 1130 static void 1131 _spdk_rw_iov_done(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1132 { 1133 assert(cb_arg == NULL); 1134 spdk_bs_sequence_finish(seq, bserrno); 1135 } 1136 1137 static void 1138 _spdk_rw_iov_split_next(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1139 { 1140 struct rw_iov_ctx *ctx = cb_arg; 1141 struct iovec *iov, *orig_iov; 1142 int iovcnt; 1143 size_t orig_iovoff; 1144 uint64_t lba; 1145 uint64_t page_count, pages_to_boundary; 1146 uint32_t lba_count; 1147 uint64_t byte_count; 1148 1149 if (bserrno != 0 || ctx->pages_remaining == 0) { 1150 free(ctx); 1151 spdk_bs_sequence_finish(seq, bserrno); 1152 return; 1153 } 1154 1155 pages_to_boundary = _spdk_bs_num_pages_to_cluster_boundary(ctx->blob, ctx->page_offset); 1156 page_count = spdk_min(ctx->pages_remaining, pages_to_boundary); 1157 lba = _spdk_bs_blob_page_to_lba(ctx->blob, ctx->page_offset); 1158 lba_count = _spdk_bs_page_to_lba(ctx->blob->bs, page_count); 1159 1160 /* 1161 * Get index and offset into the original iov array for our current position in the I/O sequence. 1162 * byte_count will keep track of how many bytes remaining until orig_iov and orig_iovoff will 1163 * point to the current position in the I/O sequence. 1164 */ 1165 byte_count = ctx->pages_done * sizeof(struct spdk_blob_md_page); 1166 orig_iov = &ctx->orig_iov[0]; 1167 orig_iovoff = 0; 1168 while (byte_count > 0) { 1169 if (byte_count >= orig_iov->iov_len) { 1170 byte_count -= orig_iov->iov_len; 1171 orig_iov++; 1172 } else { 1173 orig_iovoff = byte_count; 1174 byte_count = 0; 1175 } 1176 } 1177 1178 /* 1179 * Build an iov array for the next I/O in the sequence. byte_count will keep track of how many 1180 * bytes of this next I/O remain to be accounted for in the new iov array. 1181 */ 1182 byte_count = page_count * sizeof(struct spdk_blob_md_page); 1183 iov = &ctx->iov[0]; 1184 iovcnt = 0; 1185 while (byte_count > 0) { 1186 iov->iov_len = spdk_min(byte_count, orig_iov->iov_len - orig_iovoff); 1187 iov->iov_base = orig_iov->iov_base + orig_iovoff; 1188 byte_count -= iov->iov_len; 1189 orig_iovoff = 0; 1190 orig_iov++; 1191 iov++; 1192 iovcnt++; 1193 } 1194 1195 ctx->page_offset += page_count; 1196 ctx->pages_done += page_count; 1197 ctx->pages_remaining -= page_count; 1198 iov = &ctx->iov[0]; 1199 1200 if (ctx->read) { 1201 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1202 } else { 1203 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_split_next, ctx); 1204 } 1205 } 1206 1207 static void 1208 _spdk_blob_request_submit_rw_iov(struct spdk_blob *blob, struct spdk_io_channel *_channel, 1209 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1210 spdk_blob_op_complete cb_fn, void *cb_arg, bool read) 1211 { 1212 spdk_bs_sequence_t *seq; 1213 struct spdk_bs_cpl cpl; 1214 1215 assert(blob != NULL); 1216 1217 if (length == 0) { 1218 cb_fn(cb_arg, 0); 1219 return; 1220 } 1221 1222 if (offset + length > blob->active.num_clusters * blob->bs->pages_per_cluster) { 1223 cb_fn(cb_arg, -EINVAL); 1224 return; 1225 } 1226 1227 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 1228 cpl.u.blob_basic.cb_fn = cb_fn; 1229 cpl.u.blob_basic.cb_arg = cb_arg; 1230 1231 /* 1232 * For now, we implement readv/writev using a sequence (instead of a batch) to account for having 1233 * to split a request that spans a cluster boundary. For I/O that do not span a cluster boundary, 1234 * there will be no noticeable difference compared to using a batch. For I/O that do span a cluster 1235 * boundary, the target LBAs (after blob offset to LBA translation) may not be contiguous, so we need 1236 * to allocate a separate iov array and split the I/O such that none of the resulting 1237 * smaller I/O cross a cluster boundary. These smaller I/O will be issued in sequence (not in parallel) 1238 * but since this case happens very infrequently, any performance impact will be negligible. 1239 * 1240 * This could be optimized in the future to allocate a big enough iov array to account for all of the iovs 1241 * for all of the smaller I/Os, pre-build all of the iov arrays for the smaller I/Os, then issue them 1242 * in a batch. That would also require creating an intermediate spdk_bs_cpl that would get called 1243 * when the batch was completed, to allow for freeing the memory for the iov arrays. 1244 */ 1245 seq = spdk_bs_sequence_start(_channel, &cpl); 1246 if (!seq) { 1247 cb_fn(cb_arg, -ENOMEM); 1248 return; 1249 } 1250 1251 if (spdk_likely(length <= _spdk_bs_num_pages_to_cluster_boundary(blob, offset))) { 1252 uint64_t lba = _spdk_bs_blob_page_to_lba(blob, offset); 1253 uint32_t lba_count = _spdk_bs_page_to_lba(blob->bs, length); 1254 1255 if (read) { 1256 spdk_bs_sequence_readv(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1257 } else { 1258 spdk_bs_sequence_writev(seq, iov, iovcnt, lba, lba_count, _spdk_rw_iov_done, NULL); 1259 } 1260 } else { 1261 struct rw_iov_ctx *ctx; 1262 1263 ctx = calloc(1, sizeof(struct rw_iov_ctx) + iovcnt * sizeof(struct iovec)); 1264 if (ctx == NULL) { 1265 spdk_bs_sequence_finish(seq, -ENOMEM); 1266 return; 1267 } 1268 1269 ctx->blob = blob; 1270 ctx->read = read; 1271 ctx->orig_iov = iov; 1272 ctx->iovcnt = iovcnt; 1273 ctx->page_offset = offset; 1274 ctx->pages_remaining = length; 1275 ctx->pages_done = 0; 1276 1277 _spdk_rw_iov_split_next(seq, ctx, 0); 1278 } 1279 } 1280 1281 static struct spdk_blob * 1282 _spdk_blob_lookup(struct spdk_blob_store *bs, spdk_blob_id blobid) 1283 { 1284 struct spdk_blob *blob; 1285 1286 TAILQ_FOREACH(blob, &bs->blobs, link) { 1287 if (blob->id == blobid) { 1288 return blob; 1289 } 1290 } 1291 1292 return NULL; 1293 } 1294 1295 static int 1296 _spdk_bs_channel_create(struct spdk_blob_store *bs, struct spdk_bs_channel *channel, 1297 uint32_t max_ops) 1298 { 1299 struct spdk_bs_dev *dev; 1300 uint32_t i; 1301 1302 dev = bs->dev; 1303 1304 channel->req_mem = calloc(max_ops, sizeof(struct spdk_bs_request_set)); 1305 if (!channel->req_mem) { 1306 return -1; 1307 } 1308 1309 TAILQ_INIT(&channel->reqs); 1310 1311 for (i = 0; i < max_ops; i++) { 1312 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 1313 } 1314 1315 channel->bs = bs; 1316 channel->dev = dev; 1317 channel->dev_channel = dev->create_channel(dev); 1318 1319 if (!channel->dev_channel) { 1320 SPDK_ERRLOG("Failed to create device channel.\n"); 1321 free(channel->req_mem); 1322 return -1; 1323 } 1324 1325 return 0; 1326 } 1327 1328 static int 1329 _spdk_bs_md_channel_create(void *io_device, void *ctx_buf) 1330 { 1331 struct spdk_blob_store *bs; 1332 struct spdk_bs_channel *channel = ctx_buf; 1333 1334 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1335 1336 return _spdk_bs_channel_create(bs, channel, bs->md_target.max_md_ops); 1337 } 1338 1339 static int 1340 _spdk_bs_io_channel_create(void *io_device, void *ctx_buf) 1341 { 1342 struct spdk_blob_store *bs; 1343 struct spdk_bs_channel *channel = ctx_buf; 1344 1345 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, io_target); 1346 1347 return _spdk_bs_channel_create(bs, channel, bs->io_target.max_channel_ops); 1348 } 1349 1350 1351 static void 1352 _spdk_bs_channel_destroy(void *io_device, void *ctx_buf) 1353 { 1354 struct spdk_bs_channel *channel = ctx_buf; 1355 1356 free(channel->req_mem); 1357 channel->dev->destroy_channel(channel->dev, channel->dev_channel); 1358 } 1359 1360 static void 1361 _spdk_bs_dev_destroy(void *io_device) 1362 { 1363 struct spdk_blob_store *bs; 1364 struct spdk_blob *blob, *blob_tmp; 1365 1366 bs = SPDK_CONTAINEROF(io_device, struct spdk_blob_store, md_target); 1367 bs->dev->destroy(bs->dev); 1368 1369 TAILQ_FOREACH_SAFE(blob, &bs->blobs, link, blob_tmp) { 1370 TAILQ_REMOVE(&bs->blobs, blob, link); 1371 _spdk_blob_free(blob); 1372 } 1373 1374 spdk_bit_array_free(&bs->used_md_pages); 1375 spdk_bit_array_free(&bs->used_clusters); 1376 /* 1377 * If this function is called for any reason except a successful unload, 1378 * the unload_cpl type will be NONE and this will be a nop. 1379 */ 1380 spdk_bs_call_cpl(&bs->unload_cpl, bs->unload_err); 1381 1382 free(bs); 1383 } 1384 1385 static void 1386 _spdk_bs_free(struct spdk_blob_store *bs) 1387 { 1388 spdk_bs_unregister_md_thread(bs); 1389 spdk_io_device_unregister(&bs->io_target, NULL); 1390 spdk_io_device_unregister(&bs->md_target, _spdk_bs_dev_destroy); 1391 } 1392 1393 void 1394 spdk_bs_opts_init(struct spdk_bs_opts *opts) 1395 { 1396 opts->cluster_sz = SPDK_BLOB_OPTS_CLUSTER_SZ; 1397 opts->num_md_pages = SPDK_BLOB_OPTS_NUM_MD_PAGES; 1398 opts->max_md_ops = SPDK_BLOB_OPTS_MAX_MD_OPS; 1399 opts->max_channel_ops = SPDK_BLOB_OPTS_MAX_CHANNEL_OPS; 1400 memset(&opts->bstype, 0, sizeof(opts->bstype)); 1401 } 1402 1403 static int 1404 _spdk_bs_opts_verify(struct spdk_bs_opts *opts) 1405 { 1406 if (opts->cluster_sz == 0 || opts->num_md_pages == 0 || opts->max_md_ops == 0 || 1407 opts->max_channel_ops == 0) { 1408 SPDK_ERRLOG("Blobstore options cannot be set to 0\n"); 1409 return -1; 1410 } 1411 1412 return 0; 1413 } 1414 1415 static struct spdk_blob_store * 1416 _spdk_bs_alloc(struct spdk_bs_dev *dev, struct spdk_bs_opts *opts) 1417 { 1418 struct spdk_blob_store *bs; 1419 uint64_t dev_size; 1420 int rc; 1421 1422 dev_size = dev->blocklen * dev->blockcnt; 1423 if (dev_size < opts->cluster_sz) { 1424 /* Device size cannot be smaller than cluster size of blobstore */ 1425 SPDK_ERRLOG("Device size %" PRIu64 " is smaller than cluster size %d\n", dev_size, 1426 opts->cluster_sz); 1427 return NULL; 1428 } 1429 if (opts->cluster_sz < SPDK_BS_PAGE_SIZE) { 1430 /* Cluster size cannot be smaller than page size */ 1431 SPDK_ERRLOG("Cluster size %d is smaller than page size %d\n", 1432 opts->cluster_sz, SPDK_BS_PAGE_SIZE); 1433 return NULL; 1434 } 1435 bs = calloc(1, sizeof(struct spdk_blob_store)); 1436 if (!bs) { 1437 return NULL; 1438 } 1439 1440 TAILQ_INIT(&bs->blobs); 1441 bs->dev = dev; 1442 1443 /* 1444 * Do not use _spdk_bs_lba_to_cluster() here since blockcnt may not be an 1445 * even multiple of the cluster size. 1446 */ 1447 bs->cluster_sz = opts->cluster_sz; 1448 bs->total_clusters = dev->blockcnt / (bs->cluster_sz / dev->blocklen); 1449 bs->pages_per_cluster = bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1450 bs->num_free_clusters = bs->total_clusters; 1451 bs->used_clusters = spdk_bit_array_create(bs->total_clusters); 1452 if (bs->used_clusters == NULL) { 1453 free(bs); 1454 return NULL; 1455 } 1456 1457 bs->md_target.max_md_ops = opts->max_md_ops; 1458 bs->io_target.max_channel_ops = opts->max_channel_ops; 1459 bs->super_blob = SPDK_BLOBID_INVALID; 1460 memcpy(&bs->bstype, &opts->bstype, sizeof(opts->bstype)); 1461 1462 /* The metadata is assumed to be at least 1 page */ 1463 bs->used_md_pages = spdk_bit_array_create(1); 1464 1465 spdk_io_device_register(&bs->md_target, _spdk_bs_md_channel_create, _spdk_bs_channel_destroy, 1466 sizeof(struct spdk_bs_channel)); 1467 rc = spdk_bs_register_md_thread(bs); 1468 if (rc == -1) { 1469 spdk_io_device_unregister(&bs->md_target, NULL); 1470 spdk_bit_array_free(&bs->used_md_pages); 1471 spdk_bit_array_free(&bs->used_clusters); 1472 free(bs); 1473 return NULL; 1474 } 1475 1476 spdk_io_device_register(&bs->io_target, _spdk_bs_io_channel_create, _spdk_bs_channel_destroy, 1477 sizeof(struct spdk_bs_channel)); 1478 1479 return bs; 1480 } 1481 1482 /* START spdk_bs_load, spdk_bs_load_ctx will used for both load and unload. */ 1483 1484 struct spdk_bs_load_ctx { 1485 struct spdk_blob_store *bs; 1486 struct spdk_bs_super_block *super; 1487 1488 struct spdk_bs_md_mask *mask; 1489 bool in_page_chain; 1490 uint32_t page_index; 1491 uint32_t cur_page; 1492 struct spdk_blob_md_page *page; 1493 }; 1494 1495 static void 1496 _spdk_bs_set_mask(struct spdk_bit_array *array, struct spdk_bs_md_mask *mask) 1497 { 1498 uint32_t i = 0; 1499 1500 while (true) { 1501 i = spdk_bit_array_find_first_set(array, i); 1502 if (i >= mask->length) { 1503 break; 1504 } 1505 mask->mask[i / 8] |= 1U << (i % 8); 1506 i++; 1507 } 1508 } 1509 1510 static void 1511 _spdk_bs_write_super(spdk_bs_sequence_t *seq, struct spdk_blob_store *bs, 1512 struct spdk_bs_super_block *super, spdk_bs_sequence_cpl cb_fn, void *cb_arg) 1513 { 1514 /* Update the values in the super block */ 1515 super->super_blob = bs->super_blob; 1516 memcpy(&super->bstype, &bs->bstype, sizeof(bs->bstype)); 1517 super->crc = _spdk_blob_md_page_calc_crc(super); 1518 spdk_bs_sequence_write(seq, super, _spdk_bs_page_to_lba(bs, 0), 1519 _spdk_bs_byte_to_lba(bs, sizeof(*super)), 1520 cb_fn, cb_arg); 1521 } 1522 1523 static void 1524 _spdk_bs_write_used_clusters(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1525 { 1526 struct spdk_bs_load_ctx *ctx = arg; 1527 uint64_t mask_size, lba, lba_count; 1528 1529 /* Write out the used clusters mask */ 1530 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1531 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1532 if (!ctx->mask) { 1533 spdk_dma_free(ctx->super); 1534 free(ctx); 1535 spdk_bs_sequence_finish(seq, -ENOMEM); 1536 return; 1537 } 1538 1539 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_CLUSTERS; 1540 ctx->mask->length = ctx->bs->total_clusters; 1541 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_clusters)); 1542 1543 _spdk_bs_set_mask(ctx->bs->used_clusters, ctx->mask); 1544 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1545 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1546 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1547 } 1548 1549 static void 1550 _spdk_bs_write_used_md(spdk_bs_sequence_t *seq, void *arg, spdk_bs_sequence_cpl cb_fn) 1551 { 1552 struct spdk_bs_load_ctx *ctx = arg; 1553 uint64_t mask_size, lba, lba_count; 1554 1555 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1556 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1557 if (!ctx->mask) { 1558 spdk_dma_free(ctx->super); 1559 free(ctx); 1560 spdk_bs_sequence_finish(seq, -ENOMEM); 1561 return; 1562 } 1563 1564 ctx->mask->type = SPDK_MD_MASK_TYPE_USED_PAGES; 1565 ctx->mask->length = ctx->super->md_len; 1566 assert(ctx->mask->length == spdk_bit_array_capacity(ctx->bs->used_md_pages)); 1567 1568 _spdk_bs_set_mask(ctx->bs->used_md_pages, ctx->mask); 1569 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1570 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1571 spdk_bs_sequence_write(seq, ctx->mask, lba, lba_count, cb_fn, arg); 1572 } 1573 1574 static void 1575 _spdk_bs_load_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1576 { 1577 struct spdk_bs_load_ctx *ctx = cb_arg; 1578 uint32_t i, j; 1579 int rc; 1580 1581 /* The type must be correct */ 1582 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_CLUSTERS); 1583 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1584 assert(ctx->mask->length <= (ctx->super->used_cluster_mask_len * sizeof( 1585 struct spdk_blob_md_page) * 8)); 1586 /* The length of the mask must be exactly equal to the total number of clusters */ 1587 assert(ctx->mask->length == ctx->bs->total_clusters); 1588 1589 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1590 if (rc < 0) { 1591 spdk_dma_free(ctx->super); 1592 spdk_dma_free(ctx->mask); 1593 _spdk_bs_free(ctx->bs); 1594 free(ctx); 1595 spdk_bs_sequence_finish(seq, -ENOMEM); 1596 return; 1597 } 1598 1599 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1600 for (i = 0; i < ctx->mask->length / 8; i++) { 1601 uint8_t segment = ctx->mask->mask[i]; 1602 for (j = 0; segment && (j < 8); j++) { 1603 if (segment & 1U) { 1604 spdk_bit_array_set(ctx->bs->used_clusters, (i * 8) + j); 1605 assert(ctx->bs->num_free_clusters > 0); 1606 ctx->bs->num_free_clusters--; 1607 } 1608 segment >>= 1U; 1609 } 1610 } 1611 1612 spdk_dma_free(ctx->super); 1613 spdk_dma_free(ctx->mask); 1614 free(ctx); 1615 1616 spdk_bs_sequence_finish(seq, bserrno); 1617 } 1618 1619 static void 1620 _spdk_bs_load_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1621 { 1622 struct spdk_bs_load_ctx *ctx = cb_arg; 1623 uint64_t lba, lba_count, mask_size; 1624 uint32_t i, j; 1625 int rc; 1626 1627 /* The type must be correct */ 1628 assert(ctx->mask->type == SPDK_MD_MASK_TYPE_USED_PAGES); 1629 /* The length of the mask (in bits) must not be greater than the length of the buffer (converted to bits) */ 1630 assert(ctx->mask->length <= (ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE * 1631 8)); 1632 /* The length of the mask must be exactly equal to the size (in pages) of the metadata region */ 1633 assert(ctx->mask->length == ctx->super->md_len); 1634 1635 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->mask->length); 1636 if (rc < 0) { 1637 spdk_dma_free(ctx->super); 1638 spdk_dma_free(ctx->mask); 1639 _spdk_bs_free(ctx->bs); 1640 free(ctx); 1641 spdk_bs_sequence_finish(seq, -ENOMEM); 1642 return; 1643 } 1644 1645 for (i = 0; i < ctx->mask->length / 8; i++) { 1646 uint8_t segment = ctx->mask->mask[i]; 1647 for (j = 0; segment && (j < 8); j++) { 1648 if (segment & 1U) { 1649 spdk_bit_array_set(ctx->bs->used_md_pages, (i * 8) + j); 1650 } 1651 segment >>= 1U; 1652 } 1653 } 1654 spdk_dma_free(ctx->mask); 1655 1656 /* Read the used clusters mask */ 1657 mask_size = ctx->super->used_cluster_mask_len * SPDK_BS_PAGE_SIZE; 1658 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1659 if (!ctx->mask) { 1660 spdk_dma_free(ctx->super); 1661 _spdk_bs_free(ctx->bs); 1662 free(ctx); 1663 spdk_bs_sequence_finish(seq, -ENOMEM); 1664 return; 1665 } 1666 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_start); 1667 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_cluster_mask_len); 1668 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1669 _spdk_bs_load_used_clusters_cpl, ctx); 1670 } 1671 1672 static void 1673 _spdk_bs_load_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1674 { 1675 struct spdk_bs_load_ctx *ctx = cb_arg; 1676 uint64_t lba, lba_count, mask_size; 1677 1678 /* Read the used pages mask */ 1679 mask_size = ctx->super->used_page_mask_len * SPDK_BS_PAGE_SIZE; 1680 ctx->mask = spdk_dma_zmalloc(mask_size, 0x1000, NULL); 1681 if (!ctx->mask) { 1682 spdk_dma_free(ctx->super); 1683 _spdk_bs_free(ctx->bs); 1684 free(ctx); 1685 spdk_bs_sequence_finish(seq, -ENOMEM); 1686 return; 1687 } 1688 1689 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_start); 1690 lba_count = _spdk_bs_page_to_lba(ctx->bs, ctx->super->used_page_mask_len); 1691 spdk_bs_sequence_read(seq, ctx->mask, lba, lba_count, 1692 _spdk_bs_load_used_pages_cpl, ctx); 1693 } 1694 1695 static int 1696 _spdk_bs_load_replay_md_parse_page(const struct spdk_blob_md_page *page, struct spdk_blob_store *bs) 1697 { 1698 struct spdk_blob_md_descriptor *desc; 1699 size_t cur_desc = 0; 1700 1701 desc = (struct spdk_blob_md_descriptor *)page->descriptors; 1702 while (cur_desc < sizeof(page->descriptors)) { 1703 if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_PADDING) { 1704 if (desc->length == 0) { 1705 /* If padding and length are 0, this terminates the page */ 1706 break; 1707 } 1708 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_EXTENT) { 1709 struct spdk_blob_md_descriptor_extent *desc_extent; 1710 unsigned int i, j; 1711 unsigned int cluster_count = 0; 1712 1713 desc_extent = (struct spdk_blob_md_descriptor_extent *)desc; 1714 1715 for (i = 0; i < desc_extent->length / sizeof(desc_extent->extents[0]); i++) { 1716 for (j = 0; j < desc_extent->extents[i].length; j++) { 1717 spdk_bit_array_set(bs->used_clusters, desc_extent->extents[i].cluster_idx + j); 1718 if (bs->num_free_clusters == 0) { 1719 return -1; 1720 } 1721 bs->num_free_clusters--; 1722 cluster_count++; 1723 } 1724 } 1725 if (cluster_count == 0) { 1726 return -1; 1727 } 1728 } else if (desc->type == SPDK_MD_DESCRIPTOR_TYPE_XATTR) { 1729 /* Skip this item */ 1730 } else { 1731 /* Error */ 1732 return -1; 1733 } 1734 /* Advance to the next descriptor */ 1735 cur_desc += sizeof(*desc) + desc->length; 1736 if (cur_desc + sizeof(*desc) > sizeof(page->descriptors)) { 1737 break; 1738 } 1739 desc = (struct spdk_blob_md_descriptor *)((uintptr_t)page->descriptors + cur_desc); 1740 } 1741 return 0; 1742 } 1743 1744 static bool _spdk_bs_load_cur_md_page_valid(struct spdk_bs_load_ctx *ctx) 1745 { 1746 uint32_t crc; 1747 1748 crc = _spdk_blob_md_page_calc_crc(ctx->page); 1749 if (crc != ctx->page->crc) { 1750 return false; 1751 } 1752 1753 if (_spdk_bs_page_to_blobid(ctx->cur_page) != ctx->page->id) { 1754 return false; 1755 } 1756 return true; 1757 } 1758 1759 static void 1760 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg); 1761 1762 static void 1763 _spdk_bs_load_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1764 { 1765 struct spdk_bs_load_ctx *ctx = cb_arg; 1766 1767 spdk_dma_free(ctx->mask); 1768 spdk_dma_free(ctx->super); 1769 spdk_bs_sequence_finish(seq, bserrno); 1770 free(ctx); 1771 } 1772 1773 static void 1774 _spdk_bs_load_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1775 { 1776 struct spdk_bs_load_ctx *ctx = cb_arg; 1777 1778 spdk_dma_free(ctx->mask); 1779 1780 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_load_write_used_clusters_cpl); 1781 } 1782 1783 static void 1784 _spdk_bs_load_write_used_md(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1785 { 1786 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_load_write_used_pages_cpl); 1787 } 1788 1789 static void 1790 _spdk_bs_load_replay_md_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1791 { 1792 struct spdk_bs_load_ctx *ctx = cb_arg; 1793 uint32_t page_num; 1794 1795 if (bserrno != 0) { 1796 spdk_dma_free(ctx->super); 1797 _spdk_bs_free(ctx->bs); 1798 free(ctx); 1799 spdk_bs_sequence_finish(seq, bserrno); 1800 return; 1801 } 1802 1803 page_num = ctx->cur_page; 1804 if (_spdk_bs_load_cur_md_page_valid(ctx) == true) { 1805 if (ctx->page->sequence_num == 0 || ctx->in_page_chain == true) { 1806 spdk_bit_array_set(ctx->bs->used_md_pages, page_num); 1807 if (_spdk_bs_load_replay_md_parse_page(ctx->page, ctx->bs)) { 1808 spdk_dma_free(ctx->super); 1809 _spdk_bs_free(ctx->bs); 1810 free(ctx); 1811 spdk_bs_sequence_finish(seq, -EILSEQ); 1812 return; 1813 } 1814 if (ctx->page->next != SPDK_INVALID_MD_PAGE) { 1815 ctx->in_page_chain = true; 1816 ctx->cur_page = ctx->page->next; 1817 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1818 return; 1819 } 1820 } 1821 } 1822 1823 ctx->in_page_chain = false; 1824 1825 do { 1826 ctx->page_index++; 1827 } while (spdk_bit_array_get(ctx->bs->used_md_pages, ctx->page_index) == true); 1828 1829 if (ctx->page_index < ctx->super->md_len) { 1830 ctx->cur_page = ctx->page_index; 1831 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1832 } else { 1833 spdk_dma_free(ctx->page); 1834 _spdk_bs_load_write_used_md(seq, ctx, bserrno); 1835 } 1836 } 1837 1838 static void 1839 _spdk_bs_load_replay_cur_md_page(spdk_bs_sequence_t *seq, void *cb_arg) 1840 { 1841 struct spdk_bs_load_ctx *ctx = cb_arg; 1842 uint64_t lba; 1843 1844 assert(ctx->cur_page < ctx->super->md_len); 1845 lba = _spdk_bs_page_to_lba(ctx->bs, ctx->super->md_start + ctx->cur_page); 1846 spdk_bs_sequence_read(seq, ctx->page, lba, 1847 _spdk_bs_byte_to_lba(ctx->bs, SPDK_BS_PAGE_SIZE), 1848 _spdk_bs_load_replay_md_cpl, ctx); 1849 } 1850 1851 static void 1852 _spdk_bs_load_replay_md(spdk_bs_sequence_t *seq, void *cb_arg) 1853 { 1854 struct spdk_bs_load_ctx *ctx = cb_arg; 1855 1856 ctx->page_index = 0; 1857 ctx->cur_page = 0; 1858 ctx->page = spdk_dma_zmalloc(SPDK_BS_PAGE_SIZE, 1859 SPDK_BS_PAGE_SIZE, 1860 NULL); 1861 if (!ctx->page) { 1862 spdk_dma_free(ctx->super); 1863 _spdk_bs_free(ctx->bs); 1864 free(ctx); 1865 spdk_bs_sequence_finish(seq, -ENOMEM); 1866 return; 1867 } 1868 _spdk_bs_load_replay_cur_md_page(seq, cb_arg); 1869 } 1870 1871 static void 1872 _spdk_bs_recover(spdk_bs_sequence_t *seq, void *cb_arg) 1873 { 1874 struct spdk_bs_load_ctx *ctx = cb_arg; 1875 int rc; 1876 1877 rc = spdk_bit_array_resize(&ctx->bs->used_md_pages, ctx->super->md_len); 1878 if (rc < 0) { 1879 spdk_dma_free(ctx->super); 1880 _spdk_bs_free(ctx->bs); 1881 free(ctx); 1882 spdk_bs_sequence_finish(seq, -ENOMEM); 1883 return; 1884 } 1885 1886 rc = spdk_bit_array_resize(&ctx->bs->used_clusters, ctx->bs->total_clusters); 1887 if (rc < 0) { 1888 spdk_dma_free(ctx->super); 1889 _spdk_bs_free(ctx->bs); 1890 free(ctx); 1891 spdk_bs_sequence_finish(seq, -ENOMEM); 1892 return; 1893 } 1894 1895 ctx->bs->num_free_clusters = ctx->bs->total_clusters; 1896 _spdk_bs_load_replay_md(seq, cb_arg); 1897 } 1898 1899 static void 1900 _spdk_bs_load_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 1901 { 1902 struct spdk_bs_load_ctx *ctx = cb_arg; 1903 uint32_t crc; 1904 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 1905 1906 if (ctx->super->version > SPDK_BS_VERSION || 1907 ctx->super->version < SPDK_BS_INITIAL_VERSION) { 1908 spdk_dma_free(ctx->super); 1909 _spdk_bs_free(ctx->bs); 1910 free(ctx); 1911 spdk_bs_sequence_finish(seq, -EILSEQ); 1912 return; 1913 } 1914 1915 if (memcmp(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 1916 sizeof(ctx->super->signature)) != 0) { 1917 spdk_dma_free(ctx->super); 1918 _spdk_bs_free(ctx->bs); 1919 free(ctx); 1920 spdk_bs_sequence_finish(seq, -EILSEQ); 1921 return; 1922 } 1923 1924 crc = _spdk_blob_md_page_calc_crc(ctx->super); 1925 if (crc != ctx->super->crc) { 1926 spdk_dma_free(ctx->super); 1927 _spdk_bs_free(ctx->bs); 1928 free(ctx); 1929 spdk_bs_sequence_finish(seq, -EILSEQ); 1930 return; 1931 } 1932 1933 if (memcmp(&ctx->bs->bstype, &ctx->super->bstype, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 1934 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Bstype matched - loading blobstore\n"); 1935 } else if (memcmp(&ctx->bs->bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH) == 0) { 1936 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Bstype wildcard used - loading blobstore regardless bstype\n"); 1937 } else { 1938 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Unexpected bstype\n"); 1939 SPDK_TRACEDUMP(SPDK_TRACE_BLOB, "Expected:", ctx->bs->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 1940 SPDK_TRACEDUMP(SPDK_TRACE_BLOB, "Found:", ctx->super->bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH); 1941 spdk_dma_free(ctx->super); 1942 _spdk_bs_free(ctx->bs); 1943 free(ctx); 1944 spdk_bs_sequence_finish(seq, -ENXIO); 1945 return; 1946 } 1947 1948 /* Parse the super block */ 1949 ctx->bs->cluster_sz = ctx->super->cluster_size; 1950 ctx->bs->total_clusters = ctx->bs->dev->blockcnt / (ctx->bs->cluster_sz / ctx->bs->dev->blocklen); 1951 ctx->bs->pages_per_cluster = ctx->bs->cluster_sz / SPDK_BS_PAGE_SIZE; 1952 ctx->bs->md_start = ctx->super->md_start; 1953 ctx->bs->md_len = ctx->super->md_len; 1954 ctx->bs->total_data_clusters = ctx->bs->total_clusters - divide_round_up( 1955 ctx->bs->md_start + ctx->bs->md_len, ctx->bs->pages_per_cluster); 1956 ctx->bs->super_blob = ctx->super->super_blob; 1957 memcpy(&ctx->bs->bstype, &ctx->super->bstype, sizeof(ctx->super->bstype)); 1958 1959 if (ctx->super->clean == 1) { 1960 ctx->super->clean = 0; 1961 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_load_write_super_cpl, ctx); 1962 } else { 1963 _spdk_bs_recover(seq, ctx); 1964 } 1965 } 1966 1967 void 1968 spdk_bs_load(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 1969 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 1970 { 1971 struct spdk_blob_store *bs; 1972 struct spdk_bs_cpl cpl; 1973 spdk_bs_sequence_t *seq; 1974 struct spdk_bs_load_ctx *ctx; 1975 struct spdk_bs_opts opts = {}; 1976 1977 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Loading blobstore from dev %p\n", dev); 1978 1979 if (o) { 1980 opts = *o; 1981 } else { 1982 spdk_bs_opts_init(&opts); 1983 } 1984 1985 if (opts.max_md_ops == 0 || opts.max_channel_ops == 0) { 1986 cb_fn(cb_arg, NULL, -EINVAL); 1987 return; 1988 } 1989 1990 bs = _spdk_bs_alloc(dev, &opts); 1991 if (!bs) { 1992 cb_fn(cb_arg, NULL, -ENOMEM); 1993 return; 1994 } 1995 1996 ctx = calloc(1, sizeof(*ctx)); 1997 if (!ctx) { 1998 _spdk_bs_free(bs); 1999 cb_fn(cb_arg, NULL, -ENOMEM); 2000 return; 2001 } 2002 2003 ctx->bs = bs; 2004 2005 /* Allocate memory for the super block */ 2006 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2007 if (!ctx->super) { 2008 free(ctx); 2009 _spdk_bs_free(bs); 2010 return; 2011 } 2012 2013 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2014 cpl.u.bs_handle.cb_fn = cb_fn; 2015 cpl.u.bs_handle.cb_arg = cb_arg; 2016 cpl.u.bs_handle.bs = bs; 2017 2018 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2019 if (!seq) { 2020 spdk_dma_free(ctx->super); 2021 free(ctx); 2022 _spdk_bs_free(bs); 2023 cb_fn(cb_arg, NULL, -ENOMEM); 2024 return; 2025 } 2026 2027 /* Read the super block */ 2028 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2029 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2030 _spdk_bs_load_super_cpl, ctx); 2031 } 2032 2033 /* END spdk_bs_load */ 2034 2035 /* START spdk_bs_init */ 2036 2037 struct spdk_bs_init_ctx { 2038 struct spdk_blob_store *bs; 2039 struct spdk_bs_super_block *super; 2040 }; 2041 2042 static void 2043 _spdk_bs_init_persist_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2044 { 2045 struct spdk_bs_init_ctx *ctx = cb_arg; 2046 2047 spdk_dma_free(ctx->super); 2048 free(ctx); 2049 2050 spdk_bs_sequence_finish(seq, bserrno); 2051 } 2052 2053 static void 2054 _spdk_bs_init_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2055 { 2056 struct spdk_bs_init_ctx *ctx = cb_arg; 2057 2058 /* Write super block */ 2059 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(ctx->bs, 0), 2060 _spdk_bs_byte_to_lba(ctx->bs, sizeof(*ctx->super)), 2061 _spdk_bs_init_persist_super_cpl, ctx); 2062 } 2063 2064 void 2065 spdk_bs_init(struct spdk_bs_dev *dev, struct spdk_bs_opts *o, 2066 spdk_bs_op_with_handle_complete cb_fn, void *cb_arg) 2067 { 2068 struct spdk_bs_init_ctx *ctx; 2069 struct spdk_blob_store *bs; 2070 struct spdk_bs_cpl cpl; 2071 spdk_bs_sequence_t *seq; 2072 spdk_bs_batch_t *batch; 2073 uint64_t num_md_lba; 2074 uint64_t num_md_pages; 2075 uint64_t num_md_clusters; 2076 uint32_t i; 2077 struct spdk_bs_opts opts = {}; 2078 int rc; 2079 2080 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Initializing blobstore on dev %p\n", dev); 2081 2082 if ((SPDK_BS_PAGE_SIZE % dev->blocklen) != 0) { 2083 SPDK_ERRLOG("unsupported dev block length of %d\n", 2084 dev->blocklen); 2085 dev->destroy(dev); 2086 cb_fn(cb_arg, NULL, -EINVAL); 2087 return; 2088 } 2089 2090 if (o) { 2091 opts = *o; 2092 } else { 2093 spdk_bs_opts_init(&opts); 2094 } 2095 2096 if (_spdk_bs_opts_verify(&opts) != 0) { 2097 dev->destroy(dev); 2098 cb_fn(cb_arg, NULL, -EINVAL); 2099 return; 2100 } 2101 2102 bs = _spdk_bs_alloc(dev, &opts); 2103 if (!bs) { 2104 dev->destroy(dev); 2105 cb_fn(cb_arg, NULL, -ENOMEM); 2106 return; 2107 } 2108 2109 if (opts.num_md_pages == SPDK_BLOB_OPTS_NUM_MD_PAGES) { 2110 /* By default, allocate 1 page per cluster. 2111 * Technically, this over-allocates metadata 2112 * because more metadata will reduce the number 2113 * of usable clusters. This can be addressed with 2114 * more complex math in the future. 2115 */ 2116 bs->md_len = bs->total_clusters; 2117 } else { 2118 bs->md_len = opts.num_md_pages; 2119 } 2120 2121 rc = spdk_bit_array_resize(&bs->used_md_pages, bs->md_len); 2122 if (rc < 0) { 2123 _spdk_bs_free(bs); 2124 cb_fn(cb_arg, NULL, -ENOMEM); 2125 return; 2126 } 2127 2128 ctx = calloc(1, sizeof(*ctx)); 2129 if (!ctx) { 2130 _spdk_bs_free(bs); 2131 cb_fn(cb_arg, NULL, -ENOMEM); 2132 return; 2133 } 2134 2135 ctx->bs = bs; 2136 2137 /* Allocate memory for the super block */ 2138 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2139 if (!ctx->super) { 2140 free(ctx); 2141 _spdk_bs_free(bs); 2142 return; 2143 } 2144 memcpy(ctx->super->signature, SPDK_BS_SUPER_BLOCK_SIG, 2145 sizeof(ctx->super->signature)); 2146 ctx->super->version = SPDK_BS_VERSION; 2147 ctx->super->length = sizeof(*ctx->super); 2148 ctx->super->super_blob = bs->super_blob; 2149 ctx->super->clean = 0; 2150 ctx->super->cluster_size = bs->cluster_sz; 2151 memcpy(&ctx->super->bstype, &bs->bstype, sizeof(bs->bstype)); 2152 2153 /* Calculate how many pages the metadata consumes at the front 2154 * of the disk. 2155 */ 2156 2157 /* The super block uses 1 page */ 2158 num_md_pages = 1; 2159 2160 /* The used_md_pages mask requires 1 bit per metadata page, rounded 2161 * up to the nearest page, plus a header. 2162 */ 2163 ctx->super->used_page_mask_start = num_md_pages; 2164 ctx->super->used_page_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2165 divide_round_up(bs->md_len, 8), 2166 SPDK_BS_PAGE_SIZE); 2167 num_md_pages += ctx->super->used_page_mask_len; 2168 2169 /* The used_clusters mask requires 1 bit per cluster, rounded 2170 * up to the nearest page, plus a header. 2171 */ 2172 ctx->super->used_cluster_mask_start = num_md_pages; 2173 ctx->super->used_cluster_mask_len = divide_round_up(sizeof(struct spdk_bs_md_mask) + 2174 divide_round_up(bs->total_clusters, 8), 2175 SPDK_BS_PAGE_SIZE); 2176 num_md_pages += ctx->super->used_cluster_mask_len; 2177 2178 /* The metadata region size was chosen above */ 2179 ctx->super->md_start = bs->md_start = num_md_pages; 2180 ctx->super->md_len = bs->md_len; 2181 num_md_pages += bs->md_len; 2182 num_md_lba = _spdk_bs_page_to_lba(bs, num_md_pages); 2183 2184 ctx->super->crc = _spdk_blob_md_page_calc_crc(ctx->super); 2185 2186 num_md_clusters = divide_round_up(num_md_pages, bs->pages_per_cluster); 2187 if (num_md_clusters > bs->total_clusters) { 2188 SPDK_ERRLOG("Blobstore metadata cannot use more clusters than is available, " 2189 "please decrease number of pages reserved for metadata " 2190 "or increase cluster size.\n"); 2191 spdk_dma_free(ctx->super); 2192 free(ctx); 2193 _spdk_bs_free(bs); 2194 cb_fn(cb_arg, NULL, -ENOMEM); 2195 return; 2196 } 2197 /* Claim all of the clusters used by the metadata */ 2198 for (i = 0; i < num_md_clusters; i++) { 2199 _spdk_bs_claim_cluster(bs, i); 2200 } 2201 2202 bs->total_data_clusters = bs->num_free_clusters; 2203 2204 cpl.type = SPDK_BS_CPL_TYPE_BS_HANDLE; 2205 cpl.u.bs_handle.cb_fn = cb_fn; 2206 cpl.u.bs_handle.cb_arg = cb_arg; 2207 cpl.u.bs_handle.bs = bs; 2208 2209 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2210 if (!seq) { 2211 spdk_dma_free(ctx->super); 2212 free(ctx); 2213 _spdk_bs_free(bs); 2214 cb_fn(cb_arg, NULL, -ENOMEM); 2215 return; 2216 } 2217 2218 batch = spdk_bs_sequence_to_batch(seq, _spdk_bs_init_trim_cpl, ctx); 2219 2220 /* Clear metadata space */ 2221 spdk_bs_batch_write_zeroes(batch, 0, num_md_lba); 2222 /* Trim data clusters */ 2223 spdk_bs_batch_unmap(batch, num_md_lba, ctx->bs->dev->blockcnt - num_md_lba); 2224 2225 spdk_bs_batch_close(batch); 2226 } 2227 2228 /* END spdk_bs_init */ 2229 2230 /* START spdk_bs_destroy */ 2231 2232 static void 2233 _spdk_bs_destroy_trim_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2234 { 2235 struct spdk_bs_init_ctx *ctx = cb_arg; 2236 struct spdk_blob_store *bs = ctx->bs; 2237 2238 /* 2239 * We need to defer calling spdk_bs_call_cpl() until after 2240 * dev destruction, so tuck these away for later use. 2241 */ 2242 bs->unload_err = bserrno; 2243 memcpy(&bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2244 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2245 2246 spdk_bs_sequence_finish(seq, bserrno); 2247 2248 _spdk_bs_free(bs); 2249 spdk_dma_free(ctx->super); 2250 free(ctx); 2251 } 2252 2253 void 2254 spdk_bs_destroy(struct spdk_blob_store *bs, bool unmap_device, spdk_bs_op_complete cb_fn, 2255 void *cb_arg) 2256 { 2257 struct spdk_bs_cpl cpl; 2258 spdk_bs_sequence_t *seq; 2259 struct spdk_bs_init_ctx *ctx; 2260 2261 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Destroying blobstore\n"); 2262 2263 if (!TAILQ_EMPTY(&bs->blobs)) { 2264 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2265 cb_fn(cb_arg, -EBUSY); 2266 return; 2267 } 2268 2269 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2270 cpl.u.bs_basic.cb_fn = cb_fn; 2271 cpl.u.bs_basic.cb_arg = cb_arg; 2272 2273 ctx = calloc(1, sizeof(*ctx)); 2274 if (!ctx) { 2275 cb_fn(cb_arg, -ENOMEM); 2276 return; 2277 } 2278 2279 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2280 if (!ctx->super) { 2281 free(ctx); 2282 cb_fn(cb_arg, -ENOMEM); 2283 return; 2284 } 2285 2286 ctx->bs = bs; 2287 2288 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2289 if (!seq) { 2290 spdk_dma_free(ctx->super); 2291 free(ctx); 2292 cb_fn(cb_arg, -ENOMEM); 2293 return; 2294 } 2295 2296 if (unmap_device) { 2297 /* TRIM the entire device */ 2298 spdk_bs_sequence_unmap(seq, 0, bs->dev->blockcnt, _spdk_bs_destroy_trim_cpl, ctx); 2299 } else { 2300 /* Write zeroes to the super block */ 2301 spdk_bs_sequence_write(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), _spdk_bs_byte_to_lba(bs, 2302 sizeof(*ctx->super)), _spdk_bs_destroy_trim_cpl, ctx); 2303 } 2304 } 2305 2306 /* END spdk_bs_destroy */ 2307 2308 /* START spdk_bs_unload */ 2309 2310 static void 2311 _spdk_bs_unload_write_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2312 { 2313 struct spdk_bs_load_ctx *ctx = cb_arg; 2314 2315 spdk_dma_free(ctx->super); 2316 2317 /* 2318 * We need to defer calling spdk_bs_call_cpl() until after 2319 * dev destuction, so tuck these away for later use. 2320 */ 2321 ctx->bs->unload_err = bserrno; 2322 memcpy(&ctx->bs->unload_cpl, &seq->cpl, sizeof(struct spdk_bs_cpl)); 2323 seq->cpl.type = SPDK_BS_CPL_TYPE_NONE; 2324 2325 spdk_bs_sequence_finish(seq, bserrno); 2326 2327 _spdk_bs_free(ctx->bs); 2328 free(ctx); 2329 } 2330 2331 static void 2332 _spdk_bs_unload_write_used_clusters_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2333 { 2334 struct spdk_bs_load_ctx *ctx = cb_arg; 2335 2336 spdk_dma_free(ctx->mask); 2337 ctx->super->clean = 1; 2338 2339 _spdk_bs_write_super(seq, ctx->bs, ctx->super, _spdk_bs_unload_write_super_cpl, ctx); 2340 } 2341 2342 static void 2343 _spdk_bs_unload_write_used_pages_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2344 { 2345 struct spdk_bs_load_ctx *ctx = cb_arg; 2346 2347 spdk_dma_free(ctx->mask); 2348 2349 _spdk_bs_write_used_clusters(seq, cb_arg, _spdk_bs_unload_write_used_clusters_cpl); 2350 } 2351 2352 static void 2353 _spdk_bs_unload_read_super_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2354 { 2355 _spdk_bs_write_used_md(seq, cb_arg, _spdk_bs_unload_write_used_pages_cpl); 2356 } 2357 2358 void 2359 spdk_bs_unload(struct spdk_blob_store *bs, spdk_bs_op_complete cb_fn, void *cb_arg) 2360 { 2361 struct spdk_bs_cpl cpl; 2362 spdk_bs_sequence_t *seq; 2363 struct spdk_bs_load_ctx *ctx; 2364 2365 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blobstore\n"); 2366 2367 if (!TAILQ_EMPTY(&bs->blobs)) { 2368 SPDK_ERRLOG("Blobstore still has open blobs\n"); 2369 cb_fn(cb_arg, -EBUSY); 2370 return; 2371 } 2372 2373 ctx = calloc(1, sizeof(*ctx)); 2374 if (!ctx) { 2375 cb_fn(cb_arg, -ENOMEM); 2376 return; 2377 } 2378 2379 ctx->bs = bs; 2380 2381 ctx->super = spdk_dma_zmalloc(sizeof(*ctx->super), 0x1000, NULL); 2382 if (!ctx->super) { 2383 free(ctx); 2384 cb_fn(cb_arg, -ENOMEM); 2385 return; 2386 } 2387 2388 cpl.type = SPDK_BS_CPL_TYPE_BS_BASIC; 2389 cpl.u.bs_basic.cb_fn = cb_fn; 2390 cpl.u.bs_basic.cb_arg = cb_arg; 2391 2392 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2393 if (!seq) { 2394 spdk_dma_free(ctx->super); 2395 free(ctx); 2396 cb_fn(cb_arg, -ENOMEM); 2397 return; 2398 } 2399 2400 /* Read super block */ 2401 spdk_bs_sequence_read(seq, ctx->super, _spdk_bs_page_to_lba(bs, 0), 2402 _spdk_bs_byte_to_lba(bs, sizeof(*ctx->super)), 2403 _spdk_bs_unload_read_super_cpl, ctx); 2404 } 2405 2406 /* END spdk_bs_unload */ 2407 2408 void 2409 spdk_bs_set_super(struct spdk_blob_store *bs, spdk_blob_id blobid, 2410 spdk_bs_op_complete cb_fn, void *cb_arg) 2411 { 2412 bs->super_blob = blobid; 2413 cb_fn(cb_arg, 0); 2414 } 2415 2416 void 2417 spdk_bs_get_super(struct spdk_blob_store *bs, 2418 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2419 { 2420 if (bs->super_blob == SPDK_BLOBID_INVALID) { 2421 cb_fn(cb_arg, SPDK_BLOBID_INVALID, -ENOENT); 2422 } else { 2423 cb_fn(cb_arg, bs->super_blob, 0); 2424 } 2425 } 2426 2427 uint64_t 2428 spdk_bs_get_cluster_size(struct spdk_blob_store *bs) 2429 { 2430 return bs->cluster_sz; 2431 } 2432 2433 uint64_t 2434 spdk_bs_get_page_size(struct spdk_blob_store *bs) 2435 { 2436 return SPDK_BS_PAGE_SIZE; 2437 } 2438 2439 uint64_t 2440 spdk_bs_free_cluster_count(struct spdk_blob_store *bs) 2441 { 2442 return bs->num_free_clusters; 2443 } 2444 2445 uint64_t 2446 spdk_bs_total_data_cluster_count(struct spdk_blob_store *bs) 2447 { 2448 return bs->total_data_clusters; 2449 } 2450 2451 int spdk_bs_register_md_thread(struct spdk_blob_store *bs) 2452 { 2453 bs->md_target.md_channel = spdk_get_io_channel(&bs->md_target); 2454 if (!bs->md_target.md_channel) { 2455 SPDK_ERRLOG("Failed to get IO channel.\n"); 2456 return -1; 2457 } 2458 2459 return 0; 2460 } 2461 2462 int spdk_bs_unregister_md_thread(struct spdk_blob_store *bs) 2463 { 2464 spdk_put_io_channel(bs->md_target.md_channel); 2465 2466 return 0; 2467 } 2468 2469 spdk_blob_id spdk_blob_get_id(struct spdk_blob *blob) 2470 { 2471 assert(blob != NULL); 2472 2473 return blob->id; 2474 } 2475 2476 uint64_t spdk_blob_get_num_pages(struct spdk_blob *blob) 2477 { 2478 assert(blob != NULL); 2479 2480 return _spdk_bs_cluster_to_page(blob->bs, blob->active.num_clusters); 2481 } 2482 2483 uint64_t spdk_blob_get_num_clusters(struct spdk_blob *blob) 2484 { 2485 assert(blob != NULL); 2486 2487 return blob->active.num_clusters; 2488 } 2489 2490 /* START spdk_bs_md_create_blob */ 2491 2492 static void 2493 _spdk_bs_md_create_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2494 { 2495 struct spdk_blob *blob = cb_arg; 2496 2497 _spdk_blob_free(blob); 2498 2499 spdk_bs_sequence_finish(seq, bserrno); 2500 } 2501 2502 void spdk_bs_md_create_blob(struct spdk_blob_store *bs, 2503 spdk_blob_op_with_id_complete cb_fn, void *cb_arg) 2504 { 2505 struct spdk_blob *blob; 2506 uint32_t page_idx; 2507 struct spdk_bs_cpl cpl; 2508 spdk_bs_sequence_t *seq; 2509 spdk_blob_id id; 2510 2511 page_idx = spdk_bit_array_find_first_clear(bs->used_md_pages, 0); 2512 if (page_idx >= spdk_bit_array_capacity(bs->used_md_pages)) { 2513 cb_fn(cb_arg, 0, -ENOMEM); 2514 return; 2515 } 2516 spdk_bit_array_set(bs->used_md_pages, page_idx); 2517 2518 id = _spdk_bs_page_to_blobid(page_idx); 2519 2520 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Creating blob with id %lu at page %u\n", id, page_idx); 2521 2522 blob = _spdk_blob_alloc(bs, id); 2523 if (!blob) { 2524 cb_fn(cb_arg, 0, -ENOMEM); 2525 return; 2526 } 2527 2528 cpl.type = SPDK_BS_CPL_TYPE_BLOBID; 2529 cpl.u.blobid.cb_fn = cb_fn; 2530 cpl.u.blobid.cb_arg = cb_arg; 2531 cpl.u.blobid.blobid = blob->id; 2532 2533 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2534 if (!seq) { 2535 _spdk_blob_free(blob); 2536 cb_fn(cb_arg, 0, -ENOMEM); 2537 return; 2538 } 2539 2540 _spdk_blob_persist(seq, blob, _spdk_bs_md_create_blob_cpl, blob); 2541 } 2542 2543 /* END spdk_bs_md_create_blob */ 2544 2545 /* START spdk_bs_md_resize_blob */ 2546 int 2547 spdk_bs_md_resize_blob(struct spdk_blob *blob, uint64_t sz) 2548 { 2549 int rc; 2550 2551 assert(blob != NULL); 2552 2553 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Resizing blob %lu to %lu clusters\n", blob->id, sz); 2554 2555 if (sz == blob->active.num_clusters) { 2556 return 0; 2557 } 2558 2559 rc = _spdk_resize_blob(blob, sz); 2560 if (rc < 0) { 2561 return rc; 2562 } 2563 2564 return 0; 2565 } 2566 2567 /* END spdk_bs_md_resize_blob */ 2568 2569 2570 /* START spdk_bs_md_delete_blob */ 2571 2572 static void 2573 _spdk_bs_md_delete_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2574 { 2575 struct spdk_blob *blob = cb_arg; 2576 2577 _spdk_blob_free(blob); 2578 2579 spdk_bs_sequence_finish(seq, bserrno); 2580 } 2581 2582 static void 2583 _spdk_bs_md_delete_open_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2584 { 2585 struct spdk_blob *blob = cb_arg; 2586 2587 /* If the blob have crc error, we just return NULL. */ 2588 if (blob == NULL) { 2589 spdk_bs_sequence_finish(seq, bserrno); 2590 return; 2591 } 2592 blob->state = SPDK_BLOB_STATE_DIRTY; 2593 blob->active.num_pages = 0; 2594 _spdk_resize_blob(blob, 0); 2595 2596 _spdk_blob_persist(seq, blob, _spdk_bs_md_delete_blob_cpl, blob); 2597 } 2598 2599 void 2600 spdk_bs_md_delete_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2601 spdk_blob_op_complete cb_fn, void *cb_arg) 2602 { 2603 struct spdk_blob *blob; 2604 struct spdk_bs_cpl cpl; 2605 spdk_bs_sequence_t *seq; 2606 2607 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Deleting blob %lu\n", blobid); 2608 2609 blob = _spdk_blob_lookup(bs, blobid); 2610 if (blob) { 2611 assert(blob->open_ref > 0); 2612 cb_fn(cb_arg, -EINVAL); 2613 return; 2614 } 2615 2616 blob = _spdk_blob_alloc(bs, blobid); 2617 if (!blob) { 2618 cb_fn(cb_arg, -ENOMEM); 2619 return; 2620 } 2621 2622 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2623 cpl.u.blob_basic.cb_fn = cb_fn; 2624 cpl.u.blob_basic.cb_arg = cb_arg; 2625 2626 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2627 if (!seq) { 2628 _spdk_blob_free(blob); 2629 cb_fn(cb_arg, -ENOMEM); 2630 return; 2631 } 2632 2633 _spdk_blob_load(seq, blob, _spdk_bs_md_delete_open_cpl, blob); 2634 } 2635 2636 /* END spdk_bs_md_delete_blob */ 2637 2638 /* START spdk_bs_md_open_blob */ 2639 2640 static void 2641 _spdk_bs_md_open_blob_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2642 { 2643 struct spdk_blob *blob = cb_arg; 2644 2645 /* If the blob have crc error, we just return NULL. */ 2646 if (blob == NULL) { 2647 seq->cpl.u.blob_handle.blob = NULL; 2648 spdk_bs_sequence_finish(seq, bserrno); 2649 return; 2650 } 2651 2652 blob->open_ref++; 2653 2654 TAILQ_INSERT_HEAD(&blob->bs->blobs, blob, link); 2655 2656 spdk_bs_sequence_finish(seq, bserrno); 2657 } 2658 2659 void spdk_bs_md_open_blob(struct spdk_blob_store *bs, spdk_blob_id blobid, 2660 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2661 { 2662 struct spdk_blob *blob; 2663 struct spdk_bs_cpl cpl; 2664 spdk_bs_sequence_t *seq; 2665 uint32_t page_num; 2666 2667 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Opening blob %lu\n", blobid); 2668 2669 blob = _spdk_blob_lookup(bs, blobid); 2670 if (blob) { 2671 blob->open_ref++; 2672 cb_fn(cb_arg, blob, 0); 2673 return; 2674 } 2675 2676 page_num = _spdk_bs_blobid_to_page(blobid); 2677 if (spdk_bit_array_get(bs->used_md_pages, page_num) == false) { 2678 /* Invalid blobid */ 2679 cb_fn(cb_arg, NULL, -ENOENT); 2680 return; 2681 } 2682 2683 blob = _spdk_blob_alloc(bs, blobid); 2684 if (!blob) { 2685 cb_fn(cb_arg, NULL, -ENOMEM); 2686 return; 2687 } 2688 2689 cpl.type = SPDK_BS_CPL_TYPE_BLOB_HANDLE; 2690 cpl.u.blob_handle.cb_fn = cb_fn; 2691 cpl.u.blob_handle.cb_arg = cb_arg; 2692 cpl.u.blob_handle.blob = blob; 2693 2694 seq = spdk_bs_sequence_start(bs->md_target.md_channel, &cpl); 2695 if (!seq) { 2696 _spdk_blob_free(blob); 2697 cb_fn(cb_arg, NULL, -ENOMEM); 2698 return; 2699 } 2700 2701 _spdk_blob_load(seq, blob, _spdk_bs_md_open_blob_cpl, blob); 2702 } 2703 2704 /* START spdk_bs_md_sync_blob */ 2705 static void 2706 _spdk_blob_sync_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2707 { 2708 spdk_bs_sequence_finish(seq, bserrno); 2709 } 2710 2711 void spdk_bs_md_sync_blob(struct spdk_blob *blob, 2712 spdk_blob_op_complete cb_fn, void *cb_arg) 2713 { 2714 struct spdk_bs_cpl cpl; 2715 spdk_bs_sequence_t *seq; 2716 2717 assert(blob != NULL); 2718 2719 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Syncing blob %lu\n", blob->id); 2720 2721 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2722 blob->state != SPDK_BLOB_STATE_SYNCING); 2723 2724 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2725 cb_fn(cb_arg, 0); 2726 return; 2727 } 2728 2729 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2730 cpl.u.blob_basic.cb_fn = cb_fn; 2731 cpl.u.blob_basic.cb_arg = cb_arg; 2732 2733 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2734 if (!seq) { 2735 cb_fn(cb_arg, -ENOMEM); 2736 return; 2737 } 2738 2739 _spdk_blob_persist(seq, blob, _spdk_blob_sync_cpl, blob); 2740 } 2741 2742 /* END spdk_bs_md_sync_blob */ 2743 2744 /* START spdk_bs_md_close_blob */ 2745 2746 static void 2747 _spdk_blob_close_cpl(spdk_bs_sequence_t *seq, void *cb_arg, int bserrno) 2748 { 2749 struct spdk_blob **blob = cb_arg; 2750 2751 if ((*blob)->open_ref == 0) { 2752 TAILQ_REMOVE(&(*blob)->bs->blobs, (*blob), link); 2753 _spdk_blob_free((*blob)); 2754 } 2755 2756 *blob = NULL; 2757 2758 spdk_bs_sequence_finish(seq, bserrno); 2759 } 2760 2761 void spdk_bs_md_close_blob(struct spdk_blob **b, 2762 spdk_blob_op_complete cb_fn, void *cb_arg) 2763 { 2764 struct spdk_bs_cpl cpl; 2765 struct spdk_blob *blob; 2766 spdk_bs_sequence_t *seq; 2767 2768 assert(b != NULL); 2769 blob = *b; 2770 assert(blob != NULL); 2771 2772 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "Closing blob %lu\n", blob->id); 2773 2774 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2775 blob->state != SPDK_BLOB_STATE_SYNCING); 2776 2777 if (blob->open_ref == 0) { 2778 cb_fn(cb_arg, -EBADF); 2779 return; 2780 } 2781 2782 blob->open_ref--; 2783 2784 cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC; 2785 cpl.u.blob_basic.cb_fn = cb_fn; 2786 cpl.u.blob_basic.cb_arg = cb_arg; 2787 2788 seq = spdk_bs_sequence_start(blob->bs->md_target.md_channel, &cpl); 2789 if (!seq) { 2790 cb_fn(cb_arg, -ENOMEM); 2791 return; 2792 } 2793 2794 if (blob->state == SPDK_BLOB_STATE_CLEAN) { 2795 _spdk_blob_close_cpl(seq, b, 0); 2796 return; 2797 } 2798 2799 /* Sync metadata */ 2800 _spdk_blob_persist(seq, blob, _spdk_blob_close_cpl, b); 2801 } 2802 2803 /* END spdk_bs_md_close_blob */ 2804 2805 struct spdk_io_channel *spdk_bs_alloc_io_channel(struct spdk_blob_store *bs) 2806 { 2807 return spdk_get_io_channel(&bs->io_target); 2808 } 2809 2810 void spdk_bs_free_io_channel(struct spdk_io_channel *channel) 2811 { 2812 spdk_put_io_channel(channel); 2813 } 2814 2815 void spdk_bs_io_flush_channel(struct spdk_io_channel *channel, 2816 spdk_blob_op_complete cb_fn, void *cb_arg) 2817 { 2818 /* Flush is synchronous right now */ 2819 cb_fn(cb_arg, 0); 2820 } 2821 2822 void spdk_bs_io_unmap_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2823 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2824 { 2825 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2826 SPDK_BLOB_UNMAP); 2827 } 2828 2829 void spdk_bs_io_write_zeroes_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2830 uint64_t offset, uint64_t length, spdk_blob_op_complete cb_fn, void *cb_arg) 2831 { 2832 _spdk_blob_request_submit_op(blob, channel, NULL, offset, length, cb_fn, cb_arg, 2833 SPDK_BLOB_WRITE_ZEROES); 2834 } 2835 2836 void spdk_bs_io_write_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2837 void *payload, uint64_t offset, uint64_t length, 2838 spdk_blob_op_complete cb_fn, void *cb_arg) 2839 { 2840 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2841 SPDK_BLOB_WRITE); 2842 } 2843 2844 void spdk_bs_io_read_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2845 void *payload, uint64_t offset, uint64_t length, 2846 spdk_blob_op_complete cb_fn, void *cb_arg) 2847 { 2848 _spdk_blob_request_submit_op(blob, channel, payload, offset, length, cb_fn, cb_arg, 2849 SPDK_BLOB_READ); 2850 } 2851 2852 void spdk_bs_io_writev_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2853 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2854 spdk_blob_op_complete cb_fn, void *cb_arg) 2855 { 2856 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, false); 2857 } 2858 2859 void spdk_bs_io_readv_blob(struct spdk_blob *blob, struct spdk_io_channel *channel, 2860 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 2861 spdk_blob_op_complete cb_fn, void *cb_arg) 2862 { 2863 _spdk_blob_request_submit_rw_iov(blob, channel, iov, iovcnt, offset, length, cb_fn, cb_arg, true); 2864 } 2865 2866 struct spdk_bs_iter_ctx { 2867 int64_t page_num; 2868 struct spdk_blob_store *bs; 2869 2870 spdk_blob_op_with_handle_complete cb_fn; 2871 void *cb_arg; 2872 }; 2873 2874 static void 2875 _spdk_bs_iter_cpl(void *cb_arg, struct spdk_blob *blob, int bserrno) 2876 { 2877 struct spdk_bs_iter_ctx *ctx = cb_arg; 2878 struct spdk_blob_store *bs = ctx->bs; 2879 spdk_blob_id id; 2880 2881 if (bserrno == 0) { 2882 ctx->cb_fn(ctx->cb_arg, blob, bserrno); 2883 free(ctx); 2884 return; 2885 } 2886 2887 ctx->page_num++; 2888 ctx->page_num = spdk_bit_array_find_first_set(bs->used_md_pages, ctx->page_num); 2889 if (ctx->page_num >= spdk_bit_array_capacity(bs->used_md_pages)) { 2890 ctx->cb_fn(ctx->cb_arg, NULL, -ENOENT); 2891 free(ctx); 2892 return; 2893 } 2894 2895 id = _spdk_bs_page_to_blobid(ctx->page_num); 2896 2897 blob = _spdk_blob_lookup(bs, id); 2898 if (blob) { 2899 blob->open_ref++; 2900 ctx->cb_fn(ctx->cb_arg, blob, 0); 2901 free(ctx); 2902 return; 2903 } 2904 2905 spdk_bs_md_open_blob(bs, id, _spdk_bs_iter_cpl, ctx); 2906 } 2907 2908 void 2909 spdk_bs_md_iter_first(struct spdk_blob_store *bs, 2910 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2911 { 2912 struct spdk_bs_iter_ctx *ctx; 2913 2914 ctx = calloc(1, sizeof(*ctx)); 2915 if (!ctx) { 2916 cb_fn(cb_arg, NULL, -ENOMEM); 2917 return; 2918 } 2919 2920 ctx->page_num = -1; 2921 ctx->bs = bs; 2922 ctx->cb_fn = cb_fn; 2923 ctx->cb_arg = cb_arg; 2924 2925 _spdk_bs_iter_cpl(ctx, NULL, -1); 2926 } 2927 2928 static void 2929 _spdk_bs_iter_close_cpl(void *cb_arg, int bserrno) 2930 { 2931 struct spdk_bs_iter_ctx *ctx = cb_arg; 2932 2933 _spdk_bs_iter_cpl(ctx, NULL, -1); 2934 } 2935 2936 void 2937 spdk_bs_md_iter_next(struct spdk_blob_store *bs, struct spdk_blob **b, 2938 spdk_blob_op_with_handle_complete cb_fn, void *cb_arg) 2939 { 2940 struct spdk_bs_iter_ctx *ctx; 2941 struct spdk_blob *blob; 2942 2943 assert(b != NULL); 2944 blob = *b; 2945 assert(blob != NULL); 2946 2947 ctx = calloc(1, sizeof(*ctx)); 2948 if (!ctx) { 2949 cb_fn(cb_arg, NULL, -ENOMEM); 2950 return; 2951 } 2952 2953 ctx->page_num = _spdk_bs_blobid_to_page(blob->id); 2954 ctx->bs = bs; 2955 ctx->cb_fn = cb_fn; 2956 ctx->cb_arg = cb_arg; 2957 2958 /* Close the existing blob */ 2959 spdk_bs_md_close_blob(b, _spdk_bs_iter_close_cpl, ctx); 2960 } 2961 2962 int 2963 spdk_blob_md_set_xattr(struct spdk_blob *blob, const char *name, const void *value, 2964 uint16_t value_len) 2965 { 2966 struct spdk_xattr *xattr; 2967 2968 assert(blob != NULL); 2969 2970 assert(blob->state != SPDK_BLOB_STATE_LOADING && 2971 blob->state != SPDK_BLOB_STATE_SYNCING); 2972 2973 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 2974 if (!strcmp(name, xattr->name)) { 2975 free(xattr->value); 2976 xattr->value_len = value_len; 2977 xattr->value = malloc(value_len); 2978 memcpy(xattr->value, value, value_len); 2979 2980 blob->state = SPDK_BLOB_STATE_DIRTY; 2981 2982 return 0; 2983 } 2984 } 2985 2986 xattr = calloc(1, sizeof(*xattr)); 2987 if (!xattr) { 2988 return -1; 2989 } 2990 xattr->name = strdup(name); 2991 xattr->value_len = value_len; 2992 xattr->value = malloc(value_len); 2993 memcpy(xattr->value, value, value_len); 2994 TAILQ_INSERT_TAIL(&blob->xattrs, xattr, link); 2995 2996 blob->state = SPDK_BLOB_STATE_DIRTY; 2997 2998 return 0; 2999 } 3000 3001 int 3002 spdk_blob_md_remove_xattr(struct spdk_blob *blob, const char *name) 3003 { 3004 struct spdk_xattr *xattr; 3005 3006 assert(blob != NULL); 3007 3008 assert(blob->state != SPDK_BLOB_STATE_LOADING && 3009 blob->state != SPDK_BLOB_STATE_SYNCING); 3010 3011 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3012 if (!strcmp(name, xattr->name)) { 3013 TAILQ_REMOVE(&blob->xattrs, xattr, link); 3014 free(xattr->value); 3015 free(xattr->name); 3016 free(xattr); 3017 3018 blob->state = SPDK_BLOB_STATE_DIRTY; 3019 3020 return 0; 3021 } 3022 } 3023 3024 return -ENOENT; 3025 } 3026 3027 int 3028 spdk_bs_md_get_xattr_value(struct spdk_blob *blob, const char *name, 3029 const void **value, size_t *value_len) 3030 { 3031 struct spdk_xattr *xattr; 3032 3033 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3034 if (!strcmp(name, xattr->name)) { 3035 *value = xattr->value; 3036 *value_len = xattr->value_len; 3037 return 0; 3038 } 3039 } 3040 3041 return -ENOENT; 3042 } 3043 3044 struct spdk_xattr_names { 3045 uint32_t count; 3046 const char *names[0]; 3047 }; 3048 3049 int 3050 spdk_bs_md_get_xattr_names(struct spdk_blob *blob, 3051 struct spdk_xattr_names **names) 3052 { 3053 struct spdk_xattr *xattr; 3054 int count = 0; 3055 3056 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3057 count++; 3058 } 3059 3060 *names = calloc(1, sizeof(struct spdk_xattr_names) + count * sizeof(char *)); 3061 if (*names == NULL) { 3062 return -ENOMEM; 3063 } 3064 3065 TAILQ_FOREACH(xattr, &blob->xattrs, link) { 3066 (*names)->names[(*names)->count++] = xattr->name; 3067 } 3068 3069 return 0; 3070 } 3071 3072 uint32_t 3073 spdk_xattr_names_get_count(struct spdk_xattr_names *names) 3074 { 3075 assert(names != NULL); 3076 3077 return names->count; 3078 } 3079 3080 const char * 3081 spdk_xattr_names_get_name(struct spdk_xattr_names *names, uint32_t index) 3082 { 3083 if (index >= names->count) { 3084 return NULL; 3085 } 3086 3087 return names->names[index]; 3088 } 3089 3090 void 3091 spdk_xattr_names_free(struct spdk_xattr_names *names) 3092 { 3093 free(names); 3094 } 3095 3096 struct spdk_bs_type 3097 spdk_bs_get_bstype(struct spdk_blob_store *bs) 3098 { 3099 return bs->bstype; 3100 } 3101 3102 void 3103 spdk_bs_set_bstype(struct spdk_blob_store *bs, struct spdk_bs_type bstype) 3104 { 3105 memcpy(&bs->bstype, &bstype, sizeof(bstype)); 3106 } 3107 3108 SPDK_LOG_REGISTER_TRACE_FLAG("blob", SPDK_TRACE_BLOB); 3109