1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 struct spdk_reduce_vol_request { 82 /** 83 * Scratch buffer used for read/modify/write operations on 84 * I/Os less than a full chunk size, and as the intermediate 85 * buffer for compress/decompress operations. 86 */ 87 uint8_t *buf; 88 struct iovec *buf_iov; 89 struct iovec *iov; 90 struct spdk_reduce_vol *vol; 91 int reduce_errno; 92 int iovcnt; 93 int num_backing_ops; 94 uint64_t offset; 95 uint64_t length; 96 uint64_t chunk_map_index; 97 uint64_t *chunk; 98 spdk_reduce_vol_op_complete cb_fn; 99 void *cb_arg; 100 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 101 struct spdk_reduce_vol_cb_args backing_cb_args; 102 }; 103 104 struct spdk_reduce_vol { 105 struct spdk_reduce_vol_params params; 106 uint32_t backing_io_units_per_chunk; 107 uint32_t backing_lba_per_io_unit; 108 uint32_t logical_blocks_per_chunk; 109 struct spdk_reduce_pm_file pm_file; 110 struct spdk_reduce_backing_dev *backing_dev; 111 struct spdk_reduce_vol_superblock *backing_super; 112 struct spdk_reduce_vol_superblock *pm_super; 113 uint64_t *pm_logical_map; 114 uint64_t *pm_chunk_maps; 115 116 struct spdk_bit_array *allocated_chunk_maps; 117 struct spdk_bit_array *allocated_backing_io_units; 118 119 struct spdk_reduce_vol_request *request_mem; 120 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 121 122 /* Single contiguous buffer used for all request buffers for this volume. */ 123 uint8_t *reqbufspace; 124 struct iovec *buf_iov_mem; 125 }; 126 127 /* 128 * Allocate extra metadata chunks and corresponding backing io units to account for 129 * outstanding IO in worst case scenario where logical map is completely allocated 130 * and no data can be compressed. We need extra chunks in this case to handle 131 * in-flight writes since reduce never writes data in place. 132 */ 133 #define REDUCE_NUM_EXTRA_CHUNKS 128 134 135 static void 136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 137 { 138 if (vol->pm_file.pm_is_pmem) { 139 pmem_persist(addr, len); 140 } else { 141 pmem_msync(addr, len); 142 } 143 } 144 145 static uint64_t 146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 147 { 148 uint64_t chunks_in_logical_map, logical_map_size; 149 150 chunks_in_logical_map = vol_size / chunk_size; 151 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 152 153 /* Round up to next cacheline. */ 154 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 155 REDUCE_PM_SIZE_ALIGNMENT; 156 } 157 158 static uint64_t 159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 160 { 161 uint64_t num_chunks; 162 163 num_chunks = vol_size / chunk_size; 164 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 165 166 return num_chunks; 167 } 168 169 static uint64_t 170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 171 { 172 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 173 174 num_chunks = _get_total_chunks(vol_size, chunk_size); 175 io_units_per_chunk = chunk_size / backing_io_unit_size; 176 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 177 178 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 179 REDUCE_PM_SIZE_ALIGNMENT; 180 } 181 182 static uint64_t * 183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 184 { 185 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 186 187 return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk); 188 } 189 190 static int 191 _validate_vol_params(struct spdk_reduce_vol_params *params) 192 { 193 if (params->vol_size > 0) { 194 /** 195 * User does not pass in the vol size - it gets calculated by libreduce from 196 * values in this structure plus the size of the backing device. 197 */ 198 return -EINVAL; 199 } 200 201 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 202 params->logical_block_size == 0) { 203 return -EINVAL; 204 } 205 206 /* Chunk size must be an even multiple of the backing io unit size. */ 207 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 208 return -EINVAL; 209 } 210 211 /* Chunk size must be an even multiple of the logical block size. */ 212 if ((params->chunk_size % params->logical_block_size) != 0) { 213 return -1; 214 } 215 216 return 0; 217 } 218 219 static uint64_t 220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 221 { 222 uint64_t num_chunks; 223 224 num_chunks = backing_dev_size / chunk_size; 225 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 226 return 0; 227 } 228 229 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 230 return num_chunks * chunk_size; 231 } 232 233 static uint64_t 234 _get_pm_file_size(struct spdk_reduce_vol_params *params) 235 { 236 uint64_t total_pm_size; 237 238 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 239 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 240 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 241 params->backing_io_unit_size); 242 return total_pm_size; 243 } 244 245 const struct spdk_uuid * 246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 247 { 248 return &vol->params.uuid; 249 } 250 251 static void 252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 253 { 254 /* Superblock is at the beginning of the pm file. */ 255 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 256 257 /* Logical map immediately follows the super block. */ 258 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 259 260 /* Chunks maps follow the logical map. */ 261 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 262 } 263 264 /* We need 2 iovs during load - one for the superblock, another for the path */ 265 #define LOAD_IOV_COUNT 2 266 267 struct reduce_init_load_ctx { 268 struct spdk_reduce_vol *vol; 269 struct spdk_reduce_vol_cb_args backing_cb_args; 270 spdk_reduce_vol_op_with_handle_complete cb_fn; 271 void *cb_arg; 272 struct iovec iov[LOAD_IOV_COUNT]; 273 void *path; 274 }; 275 276 static int 277 _allocate_vol_requests(struct spdk_reduce_vol *vol) 278 { 279 struct spdk_reduce_vol_request *req; 280 int i; 281 282 vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 283 if (vol->reqbufspace == NULL) { 284 return -ENOMEM; 285 } 286 287 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 288 if (vol->request_mem == NULL) { 289 spdk_dma_free(vol->reqbufspace); 290 vol->reqbufspace = NULL; 291 return -ENOMEM; 292 } 293 294 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 295 sizeof(struct iovec) * vol->backing_io_units_per_chunk); 296 if (vol->buf_iov_mem == NULL) { 297 free(vol->request_mem); 298 spdk_dma_free(vol->reqbufspace); 299 vol->request_mem = NULL; 300 vol->reqbufspace = NULL; 301 return -ENOMEM; 302 } 303 304 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 305 req = &vol->request_mem[i]; 306 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 307 req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk]; 308 req->buf = vol->reqbufspace + i * vol->params.chunk_size; 309 } 310 311 return 0; 312 } 313 314 static void 315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 316 { 317 if (ctx != NULL) { 318 spdk_dma_free(ctx->path); 319 free(ctx); 320 } 321 322 if (vol != NULL) { 323 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 324 spdk_dma_free(vol->backing_super); 325 spdk_bit_array_free(&vol->allocated_chunk_maps); 326 spdk_bit_array_free(&vol->allocated_backing_io_units); 327 free(vol->request_mem); 328 free(vol->buf_iov_mem); 329 spdk_dma_free(vol->reqbufspace); 330 free(vol); 331 } 332 } 333 334 static void 335 _init_write_super_cpl(void *cb_arg, int reduce_errno) 336 { 337 struct reduce_init_load_ctx *init_ctx = cb_arg; 338 int rc; 339 340 rc = _allocate_vol_requests(init_ctx->vol); 341 if (rc != 0) { 342 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 343 _init_load_cleanup(init_ctx->vol, init_ctx); 344 return; 345 } 346 347 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 348 /* Only clean up the ctx - the vol has been passed to the application 349 * for use now that initialization was successful. 350 */ 351 _init_load_cleanup(NULL, init_ctx); 352 } 353 354 static void 355 _init_write_path_cpl(void *cb_arg, int reduce_errno) 356 { 357 struct reduce_init_load_ctx *init_ctx = cb_arg; 358 struct spdk_reduce_vol *vol = init_ctx->vol; 359 360 init_ctx->iov[0].iov_base = vol->backing_super; 361 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 362 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 363 init_ctx->backing_cb_args.cb_arg = init_ctx; 364 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 365 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 366 &init_ctx->backing_cb_args); 367 } 368 369 static int 370 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 371 { 372 uint64_t total_chunks, total_backing_io_units; 373 374 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 375 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 376 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 377 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 378 379 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 380 return -ENOMEM; 381 } 382 383 /* Set backing io unit bits associated with metadata. */ 384 spdk_bit_array_set(vol->allocated_backing_io_units, 0); 385 spdk_bit_array_set(vol->allocated_backing_io_units, 1); 386 387 return 0; 388 } 389 390 void 391 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 392 struct spdk_reduce_backing_dev *backing_dev, 393 const char *pm_file_dir, 394 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 395 { 396 struct spdk_reduce_vol *vol; 397 struct reduce_init_load_ctx *init_ctx; 398 uint64_t backing_dev_size; 399 size_t mapped_len; 400 int dir_len, max_dir_len, rc; 401 402 /* We need to append a path separator and the UUID to the supplied 403 * path. 404 */ 405 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 406 dir_len = strnlen(pm_file_dir, max_dir_len); 407 /* Strip trailing slash if the user provided one - we will add it back 408 * later when appending the filename. 409 */ 410 if (pm_file_dir[dir_len - 1] == '/') { 411 dir_len--; 412 } 413 if (dir_len == max_dir_len) { 414 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 415 cb_fn(cb_arg, NULL, -EINVAL); 416 return; 417 } 418 419 rc = _validate_vol_params(params); 420 if (rc != 0) { 421 SPDK_ERRLOG("invalid vol params\n"); 422 cb_fn(cb_arg, NULL, rc); 423 return; 424 } 425 426 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 427 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 428 if (params->vol_size == 0) { 429 SPDK_ERRLOG("backing device is too small\n"); 430 cb_fn(cb_arg, NULL, -EINVAL); 431 return; 432 } 433 434 if (backing_dev->close == NULL || backing_dev->readv == NULL || 435 backing_dev->writev == NULL || backing_dev->unmap == NULL) { 436 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 437 cb_fn(cb_arg, NULL, -EINVAL); 438 return; 439 } 440 441 vol = calloc(1, sizeof(*vol)); 442 if (vol == NULL) { 443 cb_fn(cb_arg, NULL, -ENOMEM); 444 return; 445 } 446 447 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 448 if (vol->backing_super == NULL) { 449 cb_fn(cb_arg, NULL, -ENOMEM); 450 _init_load_cleanup(vol, NULL); 451 return; 452 } 453 454 init_ctx = calloc(1, sizeof(*init_ctx)); 455 if (init_ctx == NULL) { 456 cb_fn(cb_arg, NULL, -ENOMEM); 457 _init_load_cleanup(vol, NULL); 458 return; 459 } 460 461 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 462 if (init_ctx->path == NULL) { 463 cb_fn(cb_arg, NULL, -ENOMEM); 464 _init_load_cleanup(vol, init_ctx); 465 return; 466 } 467 468 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 469 spdk_uuid_generate(¶ms->uuid); 470 } 471 472 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 473 vol->pm_file.path[dir_len] = '/'; 474 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 475 ¶ms->uuid); 476 vol->pm_file.size = _get_pm_file_size(params); 477 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 478 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 479 &mapped_len, &vol->pm_file.pm_is_pmem); 480 if (vol->pm_file.pm_buf == NULL) { 481 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 482 vol->pm_file.path, strerror(errno)); 483 cb_fn(cb_arg, NULL, -errno); 484 _init_load_cleanup(vol, init_ctx); 485 return; 486 } 487 488 if (vol->pm_file.size != mapped_len) { 489 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 490 vol->pm_file.size, mapped_len); 491 cb_fn(cb_arg, NULL, -ENOMEM); 492 _init_load_cleanup(vol, init_ctx); 493 return; 494 } 495 496 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 497 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 498 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 499 memcpy(&vol->params, params, sizeof(*params)); 500 501 rc = _allocate_bit_arrays(vol); 502 if (rc != 0) { 503 cb_fn(cb_arg, NULL, rc); 504 _init_load_cleanup(vol, init_ctx); 505 return; 506 } 507 508 vol->backing_dev = backing_dev; 509 510 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 511 sizeof(vol->backing_super->signature)); 512 memcpy(&vol->backing_super->params, params, sizeof(*params)); 513 514 _initialize_vol_pm_pointers(vol); 515 516 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 517 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 518 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 519 */ 520 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 521 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 522 523 init_ctx->vol = vol; 524 init_ctx->cb_fn = cb_fn; 525 init_ctx->cb_arg = cb_arg; 526 527 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 528 init_ctx->iov[0].iov_base = init_ctx->path; 529 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 530 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 531 init_ctx->backing_cb_args.cb_arg = init_ctx; 532 /* Write path to offset 4K on backing device - just after where the super 533 * block will be written. We wait until this is committed before writing the 534 * super block to guarantee we don't get the super block written without the 535 * the path if the system crashed in the middle of a write operation. 536 */ 537 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 538 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 539 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 540 &init_ctx->backing_cb_args); 541 } 542 543 static void 544 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 545 { 546 struct reduce_init_load_ctx *load_ctx = cb_arg; 547 struct spdk_reduce_vol *vol = load_ctx->vol; 548 uint64_t backing_dev_size; 549 size_t mapped_len; 550 int rc; 551 552 if (memcmp(vol->backing_super->signature, 553 SPDK_REDUCE_SIGNATURE, 554 sizeof(vol->backing_super->signature)) != 0) { 555 /* This backing device isn't a libreduce backing device. */ 556 rc = -EILSEQ; 557 goto error; 558 } 559 560 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 561 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 562 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 563 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 564 565 rc = _allocate_bit_arrays(vol); 566 if (rc != 0) { 567 goto error; 568 } 569 570 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 571 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 572 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 573 backing_dev_size); 574 rc = -EILSEQ; 575 goto error; 576 } 577 578 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 579 vol->pm_file.size = _get_pm_file_size(&vol->params); 580 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 581 &vol->pm_file.pm_is_pmem); 582 if (vol->pm_file.pm_buf == NULL) { 583 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 584 rc = -errno; 585 goto error; 586 } 587 588 if (vol->pm_file.size != mapped_len) { 589 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 590 vol->pm_file.size, mapped_len); 591 rc = -ENOMEM; 592 goto error; 593 } 594 595 rc = _allocate_vol_requests(vol); 596 if (rc != 0) { 597 goto error; 598 } 599 600 _initialize_vol_pm_pointers(vol); 601 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 602 /* Only clean up the ctx - the vol has been passed to the application 603 * for use now that volume load was successful. 604 */ 605 _init_load_cleanup(NULL, load_ctx); 606 return; 607 608 error: 609 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 610 _init_load_cleanup(vol, load_ctx); 611 } 612 613 void 614 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 615 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 616 { 617 struct spdk_reduce_vol *vol; 618 struct reduce_init_load_ctx *load_ctx; 619 620 if (backing_dev->close == NULL || backing_dev->readv == NULL || 621 backing_dev->writev == NULL || backing_dev->unmap == NULL) { 622 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 623 cb_fn(cb_arg, NULL, -EINVAL); 624 return; 625 } 626 627 vol = calloc(1, sizeof(*vol)); 628 if (vol == NULL) { 629 cb_fn(cb_arg, NULL, -ENOMEM); 630 return; 631 } 632 633 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 634 if (vol->backing_super == NULL) { 635 _init_load_cleanup(vol, NULL); 636 cb_fn(cb_arg, NULL, -ENOMEM); 637 return; 638 } 639 640 vol->backing_dev = backing_dev; 641 642 load_ctx = calloc(1, sizeof(*load_ctx)); 643 if (load_ctx == NULL) { 644 _init_load_cleanup(vol, NULL); 645 cb_fn(cb_arg, NULL, -ENOMEM); 646 return; 647 } 648 649 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 650 if (load_ctx->path == NULL) { 651 _init_load_cleanup(vol, load_ctx); 652 cb_fn(cb_arg, NULL, -ENOMEM); 653 return; 654 } 655 656 load_ctx->vol = vol; 657 load_ctx->cb_fn = cb_fn; 658 load_ctx->cb_arg = cb_arg; 659 660 load_ctx->iov[0].iov_base = vol->backing_super; 661 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 662 load_ctx->iov[1].iov_base = load_ctx->path; 663 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 664 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 665 load_ctx->backing_cb_args.cb_arg = load_ctx; 666 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 667 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 668 vol->backing_dev->blocklen, 669 &load_ctx->backing_cb_args); 670 } 671 672 void 673 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 674 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 675 { 676 if (vol == NULL) { 677 /* This indicates a programming error. */ 678 assert(false); 679 cb_fn(cb_arg, -EINVAL); 680 return; 681 } 682 683 vol->backing_dev->close(vol->backing_dev); 684 685 _init_load_cleanup(vol, NULL); 686 cb_fn(cb_arg, 0); 687 } 688 689 static bool 690 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 691 { 692 uint64_t start_chunk, end_chunk; 693 694 start_chunk = offset / vol->logical_blocks_per_chunk; 695 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 696 697 return (start_chunk != end_chunk); 698 } 699 700 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 701 702 static void 703 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 704 { 705 req->cb_fn(req->cb_arg, reduce_errno); 706 TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq); 707 } 708 709 static void 710 _write_complete_req(void *_req, int reduce_errno) 711 { 712 struct spdk_reduce_vol_request *req = _req; 713 struct spdk_reduce_vol *vol = req->vol; 714 uint64_t logical_map_index, old_chunk_map_index; 715 uint64_t *old_chunk; 716 uint32_t i; 717 718 if (reduce_errno != 0) { 719 req->reduce_errno = reduce_errno; 720 } 721 722 assert(req->num_backing_ops > 0); 723 if (--req->num_backing_ops > 0) { 724 return; 725 } 726 727 if (req->reduce_errno != 0) { 728 _reduce_vol_complete_req(req, req->reduce_errno); 729 return; 730 } 731 732 logical_map_index = req->offset / vol->logical_blocks_per_chunk; 733 734 old_chunk_map_index = vol->pm_logical_map[logical_map_index]; 735 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 736 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 737 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 738 if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) { 739 break; 740 } 741 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true); 742 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]); 743 old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY; 744 } 745 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 746 } 747 748 /* 749 * We don't need to persist the clearing of the old chunk map here. The old chunk map 750 * becomes invalid after we update the logical map, since the old chunk map will no 751 * longer have a reference to it in the logical map. 752 */ 753 754 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 755 _reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk); 756 757 vol->pm_logical_map[logical_map_index] = req->chunk_map_index; 758 759 _reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t)); 760 761 _reduce_vol_complete_req(req, 0); 762 } 763 764 static void 765 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 766 reduce_request_fn next_fn, bool is_write) 767 { 768 uint32_t i; 769 770 req->num_backing_ops = vol->backing_io_units_per_chunk; 771 req->backing_cb_args.cb_fn = next_fn; 772 req->backing_cb_args.cb_arg = req; 773 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 774 req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size; 775 req->buf_iov[i].iov_len = vol->params.backing_io_unit_size; 776 if (is_write) { 777 vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1, 778 req->chunk[i] * vol->backing_lba_per_io_unit, 779 vol->backing_lba_per_io_unit, &req->backing_cb_args); 780 } else { 781 vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1, 782 req->chunk[i] * vol->backing_lba_per_io_unit, 783 vol->backing_lba_per_io_unit, &req->backing_cb_args); 784 } 785 } 786 } 787 788 static void 789 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 790 { 791 struct spdk_reduce_vol *vol = req->vol; 792 uint32_t i; 793 794 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 795 796 /* TODO: fail if no chunk map found - but really this should not happen if we 797 * size the number of requests similarly to number of extra chunk maps 798 */ 799 assert(req->chunk_map_index != UINT32_MAX); 800 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 801 802 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 803 804 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 805 req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 806 /* TODO: fail if no backing block found - but really this should also not 807 * happen (see comment above). 808 */ 809 assert(req->chunk[i] != UINT32_MAX); 810 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]); 811 } 812 813 _issue_backing_ops(req, vol, next_fn, true /* write */); 814 } 815 816 static void 817 _write_read_done(void *_req, int reduce_errno) 818 { 819 struct spdk_reduce_vol_request *req = _req; 820 uint64_t chunk_offset; 821 uint8_t *buf; 822 int i; 823 824 if (reduce_errno != 0) { 825 req->reduce_errno = reduce_errno; 826 } 827 828 assert(req->num_backing_ops > 0); 829 if (--req->num_backing_ops > 0) { 830 return; 831 } 832 833 if (req->reduce_errno != 0) { 834 _reduce_vol_complete_req(req, req->reduce_errno); 835 return; 836 } 837 838 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 839 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 840 for (i = 0; i < req->iovcnt; i++) { 841 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 842 buf += req->iov[i].iov_len; 843 } 844 845 _reduce_vol_write_chunk(req, _write_complete_req); 846 } 847 848 static void 849 _read_read_done(void *_req, int reduce_errno) 850 { 851 struct spdk_reduce_vol_request *req = _req; 852 uint64_t chunk_offset; 853 uint8_t *buf; 854 int i; 855 856 if (reduce_errno != 0) { 857 req->reduce_errno = reduce_errno; 858 } 859 860 assert(req->num_backing_ops > 0); 861 if (--req->num_backing_ops > 0) { 862 return; 863 } 864 865 if (req->reduce_errno != 0) { 866 _reduce_vol_complete_req(req, req->reduce_errno); 867 return; 868 } 869 870 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 871 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 872 for (i = 0; i < req->iovcnt; i++) { 873 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 874 buf += req->iov[i].iov_len; 875 } 876 _reduce_vol_complete_req(req, 0); 877 } 878 879 static void 880 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 881 { 882 struct spdk_reduce_vol *vol = req->vol; 883 uint64_t chunk; 884 885 chunk = req->offset / vol->logical_blocks_per_chunk; 886 req->chunk_map_index = vol->pm_logical_map[chunk]; 887 assert(req->chunk_map_index != UINT32_MAX); 888 889 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 890 _issue_backing_ops(req, vol, next_fn, false /* read */); 891 } 892 893 static bool 894 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 895 uint64_t length) 896 { 897 uint64_t size = 0; 898 int i; 899 900 for (i = 0; i < iovcnt; i++) { 901 size += iov[i].iov_len; 902 } 903 904 return size == (length * vol->params.logical_block_size); 905 } 906 907 void 908 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 909 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 910 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 911 { 912 struct spdk_reduce_vol_request *req; 913 uint64_t chunk; 914 int i; 915 916 if (length == 0) { 917 cb_fn(cb_arg, 0); 918 return; 919 } 920 921 if (_request_spans_chunk_boundary(vol, offset, length)) { 922 cb_fn(cb_arg, -EINVAL); 923 return; 924 } 925 926 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 927 cb_fn(cb_arg, -EINVAL); 928 return; 929 } 930 931 chunk = offset / vol->logical_blocks_per_chunk; 932 if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) { 933 /* 934 * This chunk hasn't been allocated. So treat the data as all 935 * zeroes for this chunk - do the memset and immediately complete 936 * the operation. 937 */ 938 for (i = 0; i < iovcnt; i++) { 939 memset(iov[i].iov_base, 0, iov[i].iov_len); 940 } 941 cb_fn(cb_arg, 0); 942 return; 943 } 944 945 req = TAILQ_FIRST(&vol->free_requests); 946 if (req == NULL) { 947 cb_fn(cb_arg, -ENOMEM); 948 return; 949 } 950 951 TAILQ_REMOVE(&vol->free_requests, req, tailq); 952 req->vol = vol; 953 req->iov = iov; 954 req->iovcnt = iovcnt; 955 req->offset = offset; 956 req->length = length; 957 req->cb_fn = cb_fn; 958 req->cb_arg = cb_arg; 959 960 _reduce_vol_read_chunk(req, _read_read_done); 961 } 962 963 void 964 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 965 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 966 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 967 { 968 struct spdk_reduce_vol_request *req; 969 uint64_t chunk, chunk_offset; 970 uint32_t lbsize, lb_per_chunk; 971 int i; 972 uint8_t *buf; 973 974 if (length == 0) { 975 cb_fn(cb_arg, 0); 976 return; 977 } 978 979 if (_request_spans_chunk_boundary(vol, offset, length)) { 980 cb_fn(cb_arg, -EINVAL); 981 return; 982 } 983 984 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 985 cb_fn(cb_arg, -EINVAL); 986 return; 987 } 988 989 req = TAILQ_FIRST(&vol->free_requests); 990 if (req == NULL) { 991 cb_fn(cb_arg, -ENOMEM); 992 return; 993 } 994 995 TAILQ_REMOVE(&vol->free_requests, req, tailq); 996 req->vol = vol; 997 req->iov = iov; 998 req->iovcnt = iovcnt; 999 req->offset = offset; 1000 req->length = length; 1001 req->cb_fn = cb_fn; 1002 req->cb_arg = cb_arg; 1003 1004 chunk = offset / vol->logical_blocks_per_chunk; 1005 if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) { 1006 /* Read old chunk, then overwrite with data from this write operation. 1007 * TODO: bypass reading old chunk if this write operation overwrites 1008 * the entire chunk. 1009 */ 1010 _reduce_vol_read_chunk(req, _write_read_done); 1011 return; 1012 } 1013 1014 buf = req->buf; 1015 lbsize = vol->params.logical_block_size; 1016 lb_per_chunk = vol->logical_blocks_per_chunk; 1017 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1018 chunk_offset = offset % lb_per_chunk; 1019 if (chunk_offset != 0) { 1020 memset(buf, 0, chunk_offset * lbsize); 1021 buf += chunk_offset * lbsize; 1022 } 1023 for (i = 0; i < iovcnt; i++) { 1024 memcpy(buf, iov[i].iov_base, iov[i].iov_len); 1025 buf += iov[i].iov_len; 1026 } 1027 chunk_offset += length; 1028 if (chunk_offset != lb_per_chunk) { 1029 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1030 } 1031 _reduce_vol_write_chunk(req, _write_complete_req); 1032 } 1033 1034 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1035