1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk_internal/log.h" 41 42 #include "libpmem.h" 43 44 /* Always round up the size of the PM region to the nearest cacheline. */ 45 #define REDUCE_PM_SIZE_ALIGNMENT 64 46 47 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 48 49 /* Offset into the backing device where the persistent memory file's path is stored. */ 50 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 51 52 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 53 54 #define REDUCE_NUM_VOL_REQUESTS 256 55 56 /* Structure written to offset 0 of both the pm file and the backing device. */ 57 struct spdk_reduce_vol_superblock { 58 uint8_t signature[8]; 59 struct spdk_reduce_vol_params params; 60 uint8_t reserved[4048]; 61 }; 62 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 63 64 #define REDUCE_PATH_MAX 4096 65 66 /** 67 * Describes a persistent memory file used to hold metadata associated with a 68 * compressed volume. 69 */ 70 struct spdk_reduce_pm_file { 71 char path[REDUCE_PATH_MAX]; 72 void *pm_buf; 73 int pm_is_pmem; 74 uint64_t size; 75 }; 76 77 struct spdk_reduce_vol_request { 78 uint8_t *buf; 79 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 80 }; 81 82 struct spdk_reduce_vol { 83 struct spdk_reduce_vol_params params; 84 uint32_t backing_io_units_per_chunk; 85 uint32_t logical_blocks_per_chunk; 86 struct spdk_reduce_pm_file pm_file; 87 struct spdk_reduce_backing_dev *backing_dev; 88 struct spdk_reduce_vol_superblock *backing_super; 89 struct spdk_reduce_vol_superblock *pm_super; 90 uint64_t *pm_logical_map; 91 uint64_t *pm_chunk_maps; 92 93 struct spdk_bit_array *allocated_chunk_maps; 94 struct spdk_bit_array *allocated_backing_io_units; 95 96 struct spdk_reduce_vol_request *request_mem; 97 TAILQ_HEAD(, spdk_reduce_vol_request) requests; 98 uint8_t *bufspace; 99 }; 100 101 /* 102 * Allocate extra metadata chunks and corresponding backing io units to account for 103 * outstanding IO in worst case scenario where logical map is completely allocated 104 * and no data can be compressed. We need extra chunks in this case to handle 105 * in-flight writes since reduce never writes data in place. 106 */ 107 #define REDUCE_NUM_EXTRA_CHUNKS 128 108 109 static void 110 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 111 { 112 if (vol->pm_file.pm_is_pmem) { 113 pmem_persist(addr, len); 114 } else { 115 pmem_msync(addr, len); 116 } 117 } 118 119 static inline uint64_t 120 divide_round_up(uint64_t num, uint64_t divisor) 121 { 122 return (num + divisor - 1) / divisor; 123 } 124 125 static uint64_t 126 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 127 { 128 uint64_t chunks_in_logical_map, logical_map_size; 129 130 chunks_in_logical_map = vol_size / chunk_size; 131 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 132 133 /* Round up to next cacheline. */ 134 return divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * REDUCE_PM_SIZE_ALIGNMENT; 135 } 136 137 static uint64_t 138 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 139 { 140 uint64_t num_chunks; 141 142 num_chunks = vol_size / chunk_size; 143 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 144 145 return num_chunks; 146 } 147 148 static uint64_t 149 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 150 { 151 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 152 153 num_chunks = _get_total_chunks(vol_size, chunk_size); 154 io_units_per_chunk = chunk_size / backing_io_unit_size; 155 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 156 157 return divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * REDUCE_PM_SIZE_ALIGNMENT; 158 } 159 160 static int 161 _validate_vol_params(struct spdk_reduce_vol_params *params) 162 { 163 if (params->vol_size == 0 || params->chunk_size == 0 || 164 params->backing_io_unit_size == 0 || params->logical_block_size == 0) { 165 return -EINVAL; 166 } 167 168 /* Chunk size must be an even multiple of the backing io unit size. */ 169 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 170 return -EINVAL; 171 } 172 173 /* Chunk size must be an even multiple of the logical block size. */ 174 if ((params->chunk_size % params->logical_block_size) != 0) { 175 return -1; 176 } 177 178 /* Volume size must be an even multiple of the chunk size. */ 179 if ((params->vol_size % params->chunk_size) != 0) { 180 return -EINVAL; 181 } 182 183 return 0; 184 } 185 186 int64_t 187 spdk_reduce_get_pm_file_size(struct spdk_reduce_vol_params *params) 188 { 189 uint64_t total_pm_size; 190 int rc; 191 192 rc = _validate_vol_params(params); 193 if (rc != 0) { 194 return rc; 195 } 196 197 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 198 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 199 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 200 params->backing_io_unit_size); 201 return total_pm_size; 202 } 203 204 int64_t 205 spdk_reduce_get_backing_device_size(struct spdk_reduce_vol_params *params) 206 { 207 uint64_t total_backing_size, num_chunks; 208 int rc; 209 210 rc = _validate_vol_params(params); 211 if (rc != 0) { 212 return rc; 213 } 214 215 num_chunks = _get_total_chunks(params->vol_size, params->chunk_size); 216 total_backing_size = num_chunks * params->chunk_size; 217 total_backing_size += sizeof(struct spdk_reduce_vol_superblock); 218 219 return total_backing_size; 220 } 221 222 const struct spdk_uuid * 223 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 224 { 225 return &vol->params.uuid; 226 } 227 228 static void 229 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 230 { 231 /* Superblock is at the beginning of the pm file. */ 232 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 233 234 /* Logical map immediately follows the super block. */ 235 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 236 237 /* Chunks maps follow the logical map. */ 238 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 239 } 240 241 /* We need 2 iovs during load - one for the superblock, another for the path */ 242 #define LOAD_IOV_COUNT 2 243 244 struct reduce_init_load_ctx { 245 struct spdk_reduce_vol *vol; 246 struct spdk_reduce_vol_cb_args backing_cb_args; 247 spdk_reduce_vol_op_with_handle_complete cb_fn; 248 void *cb_arg; 249 struct iovec iov[LOAD_IOV_COUNT]; 250 void *path; 251 }; 252 253 static int 254 _allocate_vol_requests(struct spdk_reduce_vol *vol) 255 { 256 struct spdk_reduce_vol_request *req; 257 int i; 258 259 vol->bufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 260 if (vol->bufspace == NULL) { 261 return -ENOMEM; 262 } 263 264 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 265 if (vol->request_mem == NULL) { 266 free(vol->bufspace); 267 return -ENOMEM; 268 } 269 270 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 271 req = &vol->request_mem[i]; 272 TAILQ_INSERT_HEAD(&vol->requests, req, tailq); 273 req->buf = vol->bufspace + i * vol->params.chunk_size; 274 } 275 276 return 0; 277 } 278 279 static void 280 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 281 { 282 if (ctx != NULL) { 283 spdk_dma_free(ctx->path); 284 free(ctx); 285 } 286 287 if (vol != NULL) { 288 spdk_dma_free(vol->backing_super); 289 spdk_bit_array_free(&vol->allocated_chunk_maps); 290 spdk_bit_array_free(&vol->allocated_backing_io_units); 291 free(vol->request_mem); 292 spdk_dma_free(vol->bufspace); 293 free(vol); 294 } 295 } 296 297 static void 298 _init_write_super_cpl(void *cb_arg, int ziperrno) 299 { 300 struct reduce_init_load_ctx *init_ctx = cb_arg; 301 int rc; 302 303 rc = _allocate_vol_requests(init_ctx->vol); 304 if (rc != 0) { 305 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 306 _init_load_cleanup(init_ctx->vol, init_ctx); 307 return; 308 } 309 310 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, ziperrno); 311 /* Only clean up the ctx - the vol has been passed to the application 312 * for use now that initialization was successful. 313 */ 314 _init_load_cleanup(NULL, init_ctx); 315 } 316 317 static void 318 _init_write_path_cpl(void *cb_arg, int ziperrno) 319 { 320 struct reduce_init_load_ctx *init_ctx = cb_arg; 321 struct spdk_reduce_vol *vol = init_ctx->vol; 322 323 init_ctx->iov[0].iov_base = vol->backing_super; 324 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 325 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 326 init_ctx->backing_cb_args.cb_arg = init_ctx; 327 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 328 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 329 &init_ctx->backing_cb_args); 330 } 331 332 static int 333 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 334 { 335 uint64_t total_chunks, total_backing_io_units; 336 337 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 338 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 339 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 340 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 341 342 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 343 return -ENOMEM; 344 } 345 346 /* Set backing io unit bits associated with metadata. */ 347 spdk_bit_array_set(vol->allocated_backing_io_units, 0); 348 spdk_bit_array_set(vol->allocated_backing_io_units, 1); 349 350 return 0; 351 } 352 353 void 354 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 355 struct spdk_reduce_backing_dev *backing_dev, 356 const char *pm_file_dir, 357 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 358 { 359 struct spdk_reduce_vol *vol; 360 struct reduce_init_load_ctx *init_ctx; 361 int64_t size, size_needed; 362 size_t mapped_len; 363 int dir_len, max_dir_len, rc; 364 365 /* We need to append a path separator and the UUID to the supplied 366 * path. 367 */ 368 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 369 dir_len = strnlen(pm_file_dir, max_dir_len); 370 /* Strip trailing slash if the user provided one - we will add it back 371 * later when appending the filename. 372 */ 373 if (pm_file_dir[dir_len - 1] == '/') { 374 dir_len--; 375 } 376 if (dir_len == max_dir_len) { 377 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 378 cb_fn(cb_arg, NULL, -EINVAL); 379 return; 380 } 381 382 rc = _validate_vol_params(params); 383 if (rc != 0) { 384 SPDK_ERRLOG("invalid vol params\n"); 385 cb_fn(cb_arg, NULL, rc); 386 return; 387 } 388 389 size_needed = spdk_reduce_get_backing_device_size(params); 390 size = backing_dev->blockcnt * backing_dev->blocklen; 391 if (size_needed > size) { 392 SPDK_ERRLOG("backing device size %" PRIi64 " but %" PRIi64 " needed\n", 393 size, size_needed); 394 cb_fn(cb_arg, NULL, -EINVAL); 395 return; 396 } 397 398 if (size_needed > size) { 399 SPDK_ERRLOG("pm file size %" PRIi64 " but %" PRIi64 " needed\n", 400 size, size_needed); 401 cb_fn(cb_arg, NULL, -EINVAL); 402 return; 403 } 404 405 if (backing_dev->close == NULL || backing_dev->readv == NULL || 406 backing_dev->writev == NULL || backing_dev->unmap == NULL) { 407 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 408 cb_fn(cb_arg, NULL, -EINVAL); 409 return; 410 } 411 412 vol = calloc(1, sizeof(*vol)); 413 if (vol == NULL) { 414 cb_fn(cb_arg, NULL, -ENOMEM); 415 return; 416 } 417 418 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 419 if (vol->backing_super == NULL) { 420 cb_fn(cb_arg, NULL, -ENOMEM); 421 _init_load_cleanup(vol, NULL); 422 return; 423 } 424 425 init_ctx = calloc(1, sizeof(*init_ctx)); 426 if (init_ctx == NULL) { 427 cb_fn(cb_arg, NULL, -ENOMEM); 428 _init_load_cleanup(vol, NULL); 429 return; 430 } 431 432 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 433 if (init_ctx->path == NULL) { 434 cb_fn(cb_arg, NULL, -ENOMEM); 435 _init_load_cleanup(vol, init_ctx); 436 return; 437 } 438 439 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 440 spdk_uuid_generate(¶ms->uuid); 441 } 442 443 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 444 vol->pm_file.path[dir_len] = '/'; 445 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 446 ¶ms->uuid); 447 vol->pm_file.size = spdk_reduce_get_pm_file_size(params); 448 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 449 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 450 &mapped_len, &vol->pm_file.pm_is_pmem); 451 if (vol->pm_file.pm_buf == NULL) { 452 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 453 vol->pm_file.path, strerror(errno)); 454 cb_fn(cb_arg, NULL, -errno); 455 _init_load_cleanup(vol, init_ctx); 456 return; 457 } 458 459 if (vol->pm_file.size != mapped_len) { 460 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 461 vol->pm_file.size, mapped_len); 462 cb_fn(cb_arg, NULL, -ENOMEM); 463 _init_load_cleanup(vol, init_ctx); 464 return; 465 } 466 467 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 468 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 469 memcpy(&vol->params, params, sizeof(*params)); 470 471 rc = _allocate_bit_arrays(vol); 472 if (rc != 0) { 473 cb_fn(cb_arg, NULL, rc); 474 _init_load_cleanup(vol, init_ctx); 475 return; 476 } 477 478 vol->backing_dev = backing_dev; 479 480 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 481 sizeof(vol->backing_super->signature)); 482 memcpy(&vol->backing_super->params, params, sizeof(*params)); 483 484 _initialize_vol_pm_pointers(vol); 485 486 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 487 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 488 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 489 */ 490 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 491 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 492 493 init_ctx->vol = vol; 494 init_ctx->cb_fn = cb_fn; 495 init_ctx->cb_arg = cb_arg; 496 497 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 498 init_ctx->iov[0].iov_base = init_ctx->path; 499 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 500 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 501 init_ctx->backing_cb_args.cb_arg = init_ctx; 502 /* Write path to offset 4K on backing device - just after where the super 503 * block will be written. We wait until this is committed before writing the 504 * super block to guarantee we don't get the super block written without the 505 * the path if the system crashed in the middle of a write operation. 506 */ 507 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 508 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 509 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 510 &init_ctx->backing_cb_args); 511 } 512 513 static void 514 _load_read_super_and_path_cpl(void *cb_arg, int ziperrno) 515 { 516 struct reduce_init_load_ctx *load_ctx = cb_arg; 517 struct spdk_reduce_vol *vol = load_ctx->vol; 518 int64_t size, size_needed; 519 size_t mapped_len; 520 int rc; 521 522 if (memcmp(vol->backing_super->signature, 523 SPDK_REDUCE_SIGNATURE, 524 sizeof(vol->backing_super->signature)) != 0) { 525 /* This backing device isn't a libreduce backing device. */ 526 rc = -EILSEQ; 527 goto error; 528 } 529 530 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 531 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 532 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 533 534 rc = _allocate_bit_arrays(vol); 535 if (rc != 0) { 536 goto error; 537 } 538 539 size_needed = spdk_reduce_get_backing_device_size(&vol->params); 540 size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 541 if (size_needed > size) { 542 SPDK_ERRLOG("backing device size %" PRIi64 " but %" PRIi64 " expected\n", 543 size, size_needed); 544 rc = -EILSEQ; 545 goto error; 546 } 547 548 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 549 vol->pm_file.size = spdk_reduce_get_pm_file_size(&vol->params); 550 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 551 &vol->pm_file.pm_is_pmem); 552 if (vol->pm_file.pm_buf == NULL) { 553 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 554 rc = -errno; 555 goto error; 556 } 557 558 if (vol->pm_file.size != mapped_len) { 559 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 560 vol->pm_file.size, mapped_len); 561 rc = -ENOMEM; 562 goto error; 563 } 564 565 rc = _allocate_vol_requests(vol); 566 if (rc != 0) { 567 goto error; 568 } 569 570 _initialize_vol_pm_pointers(vol); 571 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 572 /* Only clean up the ctx - the vol has been passed to the application 573 * for use now that volume load was successful. 574 */ 575 _init_load_cleanup(NULL, load_ctx); 576 return; 577 578 error: 579 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 580 _init_load_cleanup(vol, load_ctx); 581 } 582 583 void 584 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 585 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 586 { 587 struct spdk_reduce_vol *vol; 588 struct reduce_init_load_ctx *load_ctx; 589 590 if (backing_dev->close == NULL || backing_dev->readv == NULL || 591 backing_dev->writev == NULL || backing_dev->unmap == NULL) { 592 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 593 cb_fn(cb_arg, NULL, -EINVAL); 594 return; 595 } 596 597 vol = calloc(1, sizeof(*vol)); 598 if (vol == NULL) { 599 cb_fn(cb_arg, NULL, -ENOMEM); 600 return; 601 } 602 603 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 604 if (vol->backing_super == NULL) { 605 _init_load_cleanup(vol, NULL); 606 cb_fn(cb_arg, NULL, -ENOMEM); 607 return; 608 } 609 610 vol->backing_dev = backing_dev; 611 612 load_ctx = calloc(1, sizeof(*load_ctx)); 613 if (load_ctx == NULL) { 614 _init_load_cleanup(vol, NULL); 615 cb_fn(cb_arg, NULL, -ENOMEM); 616 return; 617 } 618 619 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 620 if (load_ctx->path == NULL) { 621 _init_load_cleanup(vol, load_ctx); 622 cb_fn(cb_arg, NULL, -ENOMEM); 623 return; 624 } 625 626 load_ctx->vol = vol; 627 load_ctx->cb_fn = cb_fn; 628 load_ctx->cb_arg = cb_arg; 629 630 load_ctx->iov[0].iov_base = vol->backing_super; 631 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 632 load_ctx->iov[1].iov_base = load_ctx->path; 633 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 634 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 635 load_ctx->backing_cb_args.cb_arg = load_ctx; 636 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 637 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 638 vol->backing_dev->blocklen, 639 &load_ctx->backing_cb_args); 640 } 641 642 void 643 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 644 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 645 { 646 if (vol == NULL) { 647 /* This indicates a programming error. */ 648 assert(false); 649 cb_fn(cb_arg, -EINVAL); 650 return; 651 } 652 653 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 654 655 vol->backing_dev->close(vol->backing_dev); 656 657 _init_load_cleanup(vol, NULL); 658 cb_fn(cb_arg, 0); 659 } 660 661 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 662