1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 struct spdk_reduce_vol_request { 82 /** 83 * Scratch buffer used for read/modify/write operations on 84 * I/Os less than a full chunk size, and as the intermediate 85 * buffer for compress/decompress operations. 86 */ 87 uint8_t *buf; 88 struct iovec *buf_iov; 89 struct iovec *iov; 90 struct spdk_reduce_vol *vol; 91 int reduce_errno; 92 int iovcnt; 93 int num_backing_ops; 94 uint64_t offset; 95 uint64_t length; 96 uint64_t chunk_map_index; 97 uint64_t *chunk; 98 spdk_reduce_vol_op_complete cb_fn; 99 void *cb_arg; 100 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 101 struct spdk_reduce_vol_cb_args backing_cb_args; 102 }; 103 104 struct spdk_reduce_vol { 105 struct spdk_reduce_vol_params params; 106 uint32_t backing_io_units_per_chunk; 107 uint32_t backing_lba_per_io_unit; 108 uint32_t logical_blocks_per_chunk; 109 struct spdk_reduce_pm_file pm_file; 110 struct spdk_reduce_backing_dev *backing_dev; 111 struct spdk_reduce_vol_superblock *backing_super; 112 struct spdk_reduce_vol_superblock *pm_super; 113 uint64_t *pm_logical_map; 114 uint64_t *pm_chunk_maps; 115 116 struct spdk_bit_array *allocated_chunk_maps; 117 struct spdk_bit_array *allocated_backing_io_units; 118 119 struct spdk_reduce_vol_request *request_mem; 120 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 121 122 /* Single contiguous buffer used for all request buffers for this volume. */ 123 uint8_t *reqbufspace; 124 struct iovec *buf_iov_mem; 125 }; 126 127 /* 128 * Allocate extra metadata chunks and corresponding backing io units to account for 129 * outstanding IO in worst case scenario where logical map is completely allocated 130 * and no data can be compressed. We need extra chunks in this case to handle 131 * in-flight writes since reduce never writes data in place. 132 */ 133 #define REDUCE_NUM_EXTRA_CHUNKS 128 134 135 static void 136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 137 { 138 if (vol->pm_file.pm_is_pmem) { 139 pmem_persist(addr, len); 140 } else { 141 pmem_msync(addr, len); 142 } 143 } 144 145 static uint64_t 146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 147 { 148 uint64_t chunks_in_logical_map, logical_map_size; 149 150 chunks_in_logical_map = vol_size / chunk_size; 151 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 152 153 /* Round up to next cacheline. */ 154 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 155 REDUCE_PM_SIZE_ALIGNMENT; 156 } 157 158 static uint64_t 159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 160 { 161 uint64_t num_chunks; 162 163 num_chunks = vol_size / chunk_size; 164 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 165 166 return num_chunks; 167 } 168 169 static uint64_t 170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 171 { 172 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 173 174 num_chunks = _get_total_chunks(vol_size, chunk_size); 175 io_units_per_chunk = chunk_size / backing_io_unit_size; 176 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 177 178 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 179 REDUCE_PM_SIZE_ALIGNMENT; 180 } 181 182 static uint64_t * 183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 184 { 185 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 186 187 return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk); 188 } 189 190 static int 191 _validate_vol_params(struct spdk_reduce_vol_params *params) 192 { 193 if (params->vol_size > 0) { 194 /** 195 * User does not pass in the vol size - it gets calculated by libreduce from 196 * values in this structure plus the size of the backing device. 197 */ 198 return -EINVAL; 199 } 200 201 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 202 params->logical_block_size == 0) { 203 return -EINVAL; 204 } 205 206 /* Chunk size must be an even multiple of the backing io unit size. */ 207 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 208 return -EINVAL; 209 } 210 211 /* Chunk size must be an even multiple of the logical block size. */ 212 if ((params->chunk_size % params->logical_block_size) != 0) { 213 return -1; 214 } 215 216 return 0; 217 } 218 219 static uint64_t 220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 221 { 222 uint64_t num_chunks; 223 224 num_chunks = backing_dev_size / chunk_size; 225 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 226 return 0; 227 } 228 229 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 230 return num_chunks * chunk_size; 231 } 232 233 static uint64_t 234 _get_pm_file_size(struct spdk_reduce_vol_params *params) 235 { 236 uint64_t total_pm_size; 237 238 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 239 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 240 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 241 params->backing_io_unit_size); 242 return total_pm_size; 243 } 244 245 const struct spdk_uuid * 246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 247 { 248 return &vol->params.uuid; 249 } 250 251 static void 252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 253 { 254 /* Superblock is at the beginning of the pm file. */ 255 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 256 257 /* Logical map immediately follows the super block. */ 258 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 259 260 /* Chunks maps follow the logical map. */ 261 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 262 } 263 264 /* We need 2 iovs during load - one for the superblock, another for the path */ 265 #define LOAD_IOV_COUNT 2 266 267 struct reduce_init_load_ctx { 268 struct spdk_reduce_vol *vol; 269 struct spdk_reduce_vol_cb_args backing_cb_args; 270 spdk_reduce_vol_op_with_handle_complete cb_fn; 271 void *cb_arg; 272 struct iovec iov[LOAD_IOV_COUNT]; 273 void *path; 274 }; 275 276 static int 277 _allocate_vol_requests(struct spdk_reduce_vol *vol) 278 { 279 struct spdk_reduce_vol_request *req; 280 int i; 281 282 vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 283 if (vol->reqbufspace == NULL) { 284 return -ENOMEM; 285 } 286 287 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 288 if (vol->request_mem == NULL) { 289 spdk_dma_free(vol->reqbufspace); 290 vol->reqbufspace = NULL; 291 return -ENOMEM; 292 } 293 294 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 295 sizeof(struct iovec) * vol->backing_io_units_per_chunk); 296 if (vol->buf_iov_mem == NULL) { 297 free(vol->request_mem); 298 spdk_dma_free(vol->reqbufspace); 299 vol->request_mem = NULL; 300 vol->reqbufspace = NULL; 301 return -ENOMEM; 302 } 303 304 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 305 req = &vol->request_mem[i]; 306 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 307 req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk]; 308 req->buf = vol->reqbufspace + i * vol->params.chunk_size; 309 } 310 311 return 0; 312 } 313 314 static void 315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 316 { 317 if (ctx != NULL) { 318 spdk_dma_free(ctx->path); 319 free(ctx); 320 } 321 322 if (vol != NULL) { 323 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 324 spdk_dma_free(vol->backing_super); 325 spdk_bit_array_free(&vol->allocated_chunk_maps); 326 spdk_bit_array_free(&vol->allocated_backing_io_units); 327 free(vol->request_mem); 328 free(vol->buf_iov_mem); 329 spdk_dma_free(vol->reqbufspace); 330 free(vol); 331 } 332 } 333 334 static void 335 _init_write_super_cpl(void *cb_arg, int reduce_errno) 336 { 337 struct reduce_init_load_ctx *init_ctx = cb_arg; 338 int rc; 339 340 rc = _allocate_vol_requests(init_ctx->vol); 341 if (rc != 0) { 342 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 343 _init_load_cleanup(init_ctx->vol, init_ctx); 344 return; 345 } 346 347 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 348 /* Only clean up the ctx - the vol has been passed to the application 349 * for use now that initialization was successful. 350 */ 351 _init_load_cleanup(NULL, init_ctx); 352 } 353 354 static void 355 _init_write_path_cpl(void *cb_arg, int reduce_errno) 356 { 357 struct reduce_init_load_ctx *init_ctx = cb_arg; 358 struct spdk_reduce_vol *vol = init_ctx->vol; 359 360 init_ctx->iov[0].iov_base = vol->backing_super; 361 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 362 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 363 init_ctx->backing_cb_args.cb_arg = init_ctx; 364 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 365 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 366 &init_ctx->backing_cb_args); 367 } 368 369 static int 370 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 371 { 372 uint64_t total_chunks, total_backing_io_units; 373 374 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 375 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 376 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 377 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 378 379 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 380 return -ENOMEM; 381 } 382 383 /* Set backing io unit bits associated with metadata. */ 384 spdk_bit_array_set(vol->allocated_backing_io_units, 0); 385 spdk_bit_array_set(vol->allocated_backing_io_units, 1); 386 387 return 0; 388 } 389 390 void 391 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 392 struct spdk_reduce_backing_dev *backing_dev, 393 const char *pm_file_dir, 394 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 395 { 396 struct spdk_reduce_vol *vol; 397 struct reduce_init_load_ctx *init_ctx; 398 uint64_t backing_dev_size; 399 size_t mapped_len; 400 int dir_len, max_dir_len, rc; 401 402 /* We need to append a path separator and the UUID to the supplied 403 * path. 404 */ 405 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 406 dir_len = strnlen(pm_file_dir, max_dir_len); 407 /* Strip trailing slash if the user provided one - we will add it back 408 * later when appending the filename. 409 */ 410 if (pm_file_dir[dir_len - 1] == '/') { 411 dir_len--; 412 } 413 if (dir_len == max_dir_len) { 414 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 415 cb_fn(cb_arg, NULL, -EINVAL); 416 return; 417 } 418 419 rc = _validate_vol_params(params); 420 if (rc != 0) { 421 SPDK_ERRLOG("invalid vol params\n"); 422 cb_fn(cb_arg, NULL, rc); 423 return; 424 } 425 426 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 427 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 428 if (params->vol_size == 0) { 429 SPDK_ERRLOG("backing device is too small\n"); 430 cb_fn(cb_arg, NULL, -EINVAL); 431 return; 432 } 433 434 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 435 backing_dev->unmap == NULL) { 436 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 437 cb_fn(cb_arg, NULL, -EINVAL); 438 return; 439 } 440 441 vol = calloc(1, sizeof(*vol)); 442 if (vol == NULL) { 443 cb_fn(cb_arg, NULL, -ENOMEM); 444 return; 445 } 446 447 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 448 if (vol->backing_super == NULL) { 449 cb_fn(cb_arg, NULL, -ENOMEM); 450 _init_load_cleanup(vol, NULL); 451 return; 452 } 453 454 init_ctx = calloc(1, sizeof(*init_ctx)); 455 if (init_ctx == NULL) { 456 cb_fn(cb_arg, NULL, -ENOMEM); 457 _init_load_cleanup(vol, NULL); 458 return; 459 } 460 461 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 462 if (init_ctx->path == NULL) { 463 cb_fn(cb_arg, NULL, -ENOMEM); 464 _init_load_cleanup(vol, init_ctx); 465 return; 466 } 467 468 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 469 spdk_uuid_generate(¶ms->uuid); 470 } 471 472 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 473 vol->pm_file.path[dir_len] = '/'; 474 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 475 ¶ms->uuid); 476 vol->pm_file.size = _get_pm_file_size(params); 477 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 478 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 479 &mapped_len, &vol->pm_file.pm_is_pmem); 480 if (vol->pm_file.pm_buf == NULL) { 481 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 482 vol->pm_file.path, strerror(errno)); 483 cb_fn(cb_arg, NULL, -errno); 484 _init_load_cleanup(vol, init_ctx); 485 return; 486 } 487 488 if (vol->pm_file.size != mapped_len) { 489 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 490 vol->pm_file.size, mapped_len); 491 cb_fn(cb_arg, NULL, -ENOMEM); 492 _init_load_cleanup(vol, init_ctx); 493 return; 494 } 495 496 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 497 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 498 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 499 memcpy(&vol->params, params, sizeof(*params)); 500 501 rc = _allocate_bit_arrays(vol); 502 if (rc != 0) { 503 cb_fn(cb_arg, NULL, rc); 504 _init_load_cleanup(vol, init_ctx); 505 return; 506 } 507 508 vol->backing_dev = backing_dev; 509 510 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 511 sizeof(vol->backing_super->signature)); 512 memcpy(&vol->backing_super->params, params, sizeof(*params)); 513 514 _initialize_vol_pm_pointers(vol); 515 516 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 517 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 518 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 519 */ 520 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 521 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 522 523 init_ctx->vol = vol; 524 init_ctx->cb_fn = cb_fn; 525 init_ctx->cb_arg = cb_arg; 526 527 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 528 init_ctx->iov[0].iov_base = init_ctx->path; 529 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 530 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 531 init_ctx->backing_cb_args.cb_arg = init_ctx; 532 /* Write path to offset 4K on backing device - just after where the super 533 * block will be written. We wait until this is committed before writing the 534 * super block to guarantee we don't get the super block written without the 535 * the path if the system crashed in the middle of a write operation. 536 */ 537 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 538 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 539 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 540 &init_ctx->backing_cb_args); 541 } 542 543 static void 544 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 545 { 546 struct reduce_init_load_ctx *load_ctx = cb_arg; 547 struct spdk_reduce_vol *vol = load_ctx->vol; 548 uint64_t backing_dev_size; 549 uint64_t i, num_chunks; 550 uint64_t *chunk; 551 size_t mapped_len; 552 uint32_t j; 553 int rc; 554 555 if (memcmp(vol->backing_super->signature, 556 SPDK_REDUCE_SIGNATURE, 557 sizeof(vol->backing_super->signature)) != 0) { 558 /* This backing device isn't a libreduce backing device. */ 559 rc = -EILSEQ; 560 goto error; 561 } 562 563 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 564 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 565 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 566 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 567 568 rc = _allocate_bit_arrays(vol); 569 if (rc != 0) { 570 goto error; 571 } 572 573 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 574 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 575 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 576 backing_dev_size); 577 rc = -EILSEQ; 578 goto error; 579 } 580 581 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 582 vol->pm_file.size = _get_pm_file_size(&vol->params); 583 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 584 &vol->pm_file.pm_is_pmem); 585 if (vol->pm_file.pm_buf == NULL) { 586 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 587 rc = -errno; 588 goto error; 589 } 590 591 if (vol->pm_file.size != mapped_len) { 592 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 593 vol->pm_file.size, mapped_len); 594 rc = -ENOMEM; 595 goto error; 596 } 597 598 rc = _allocate_vol_requests(vol); 599 if (rc != 0) { 600 goto error; 601 } 602 603 _initialize_vol_pm_pointers(vol); 604 605 num_chunks = vol->params.vol_size / vol->params.chunk_size; 606 for (i = 0; i < num_chunks; i++) { 607 if (vol->pm_logical_map[i] == REDUCE_EMPTY_MAP_ENTRY) { 608 continue; 609 } 610 spdk_bit_array_set(vol->allocated_chunk_maps, i); 611 chunk = _reduce_vol_get_chunk_map(vol, i); 612 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 613 if (chunk[j] != REDUCE_EMPTY_MAP_ENTRY) { 614 spdk_bit_array_set(vol->allocated_backing_io_units, chunk[j]); 615 } 616 } 617 } 618 619 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 620 /* Only clean up the ctx - the vol has been passed to the application 621 * for use now that volume load was successful. 622 */ 623 _init_load_cleanup(NULL, load_ctx); 624 return; 625 626 error: 627 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 628 _init_load_cleanup(vol, load_ctx); 629 } 630 631 void 632 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 633 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 634 { 635 struct spdk_reduce_vol *vol; 636 struct reduce_init_load_ctx *load_ctx; 637 638 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 639 backing_dev->unmap == NULL) { 640 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 641 cb_fn(cb_arg, NULL, -EINVAL); 642 return; 643 } 644 645 vol = calloc(1, sizeof(*vol)); 646 if (vol == NULL) { 647 cb_fn(cb_arg, NULL, -ENOMEM); 648 return; 649 } 650 651 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 652 if (vol->backing_super == NULL) { 653 _init_load_cleanup(vol, NULL); 654 cb_fn(cb_arg, NULL, -ENOMEM); 655 return; 656 } 657 658 vol->backing_dev = backing_dev; 659 660 load_ctx = calloc(1, sizeof(*load_ctx)); 661 if (load_ctx == NULL) { 662 _init_load_cleanup(vol, NULL); 663 cb_fn(cb_arg, NULL, -ENOMEM); 664 return; 665 } 666 667 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 668 if (load_ctx->path == NULL) { 669 _init_load_cleanup(vol, load_ctx); 670 cb_fn(cb_arg, NULL, -ENOMEM); 671 return; 672 } 673 674 load_ctx->vol = vol; 675 load_ctx->cb_fn = cb_fn; 676 load_ctx->cb_arg = cb_arg; 677 678 load_ctx->iov[0].iov_base = vol->backing_super; 679 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 680 load_ctx->iov[1].iov_base = load_ctx->path; 681 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 682 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 683 load_ctx->backing_cb_args.cb_arg = load_ctx; 684 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 685 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 686 vol->backing_dev->blocklen, 687 &load_ctx->backing_cb_args); 688 } 689 690 void 691 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 692 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 693 { 694 if (vol == NULL) { 695 /* This indicates a programming error. */ 696 assert(false); 697 cb_fn(cb_arg, -EINVAL); 698 return; 699 } 700 701 _init_load_cleanup(vol, NULL); 702 cb_fn(cb_arg, 0); 703 } 704 705 struct reduce_destroy_ctx { 706 spdk_reduce_vol_op_complete cb_fn; 707 void *cb_arg; 708 struct spdk_reduce_vol *vol; 709 struct spdk_reduce_vol_superblock *super; 710 struct iovec iov; 711 struct spdk_reduce_vol_cb_args backing_cb_args; 712 int reduce_errno; 713 char pm_path[REDUCE_PATH_MAX]; 714 }; 715 716 static void 717 destroy_unload_cpl(void *cb_arg, int reduce_errno) 718 { 719 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 720 721 if (destroy_ctx->reduce_errno == 0) { 722 if (unlink(destroy_ctx->pm_path)) { 723 SPDK_ERRLOG("%s could not be unlinked: %s\n", 724 destroy_ctx->pm_path, strerror(errno)); 725 } 726 } 727 728 /* Even if the unload somehow failed, we still pass the destroy_ctx 729 * reduce_errno since that indicates whether or not the volume was 730 * actually destroyed. 731 */ 732 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 733 spdk_dma_free(destroy_ctx->super); 734 free(destroy_ctx); 735 } 736 737 static void 738 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 739 { 740 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 741 struct spdk_reduce_vol *vol = destroy_ctx->vol; 742 743 destroy_ctx->reduce_errno = reduce_errno; 744 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 745 } 746 747 static void 748 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 749 { 750 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 751 752 if (reduce_errno != 0) { 753 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 754 spdk_dma_free(destroy_ctx->super); 755 free(destroy_ctx); 756 return; 757 } 758 759 destroy_ctx->vol = vol; 760 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 761 destroy_ctx->iov.iov_base = destroy_ctx->super; 762 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 763 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 764 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 765 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 766 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 767 &destroy_ctx->backing_cb_args); 768 } 769 770 void 771 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 772 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 773 { 774 struct reduce_destroy_ctx *destroy_ctx; 775 776 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 777 if (destroy_ctx == NULL) { 778 cb_fn(cb_arg, -ENOMEM); 779 return; 780 } 781 782 destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL); 783 if (destroy_ctx->super == NULL) { 784 free(destroy_ctx); 785 cb_fn(cb_arg, -ENOMEM); 786 return; 787 } 788 destroy_ctx->cb_fn = cb_fn; 789 destroy_ctx->cb_arg = cb_arg; 790 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 791 } 792 793 static bool 794 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 795 { 796 uint64_t start_chunk, end_chunk; 797 798 start_chunk = offset / vol->logical_blocks_per_chunk; 799 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 800 801 return (start_chunk != end_chunk); 802 } 803 804 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 805 806 static void 807 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 808 { 809 req->cb_fn(req->cb_arg, reduce_errno); 810 TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq); 811 } 812 813 static void 814 _write_complete_req(void *_req, int reduce_errno) 815 { 816 struct spdk_reduce_vol_request *req = _req; 817 struct spdk_reduce_vol *vol = req->vol; 818 uint64_t logical_map_index, old_chunk_map_index; 819 uint64_t *old_chunk; 820 uint32_t i; 821 822 if (reduce_errno != 0) { 823 req->reduce_errno = reduce_errno; 824 } 825 826 assert(req->num_backing_ops > 0); 827 if (--req->num_backing_ops > 0) { 828 return; 829 } 830 831 if (req->reduce_errno != 0) { 832 _reduce_vol_complete_req(req, req->reduce_errno); 833 return; 834 } 835 836 logical_map_index = req->offset / vol->logical_blocks_per_chunk; 837 838 old_chunk_map_index = vol->pm_logical_map[logical_map_index]; 839 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 840 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 841 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 842 if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) { 843 break; 844 } 845 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true); 846 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]); 847 old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY; 848 } 849 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 850 } 851 852 /* 853 * We don't need to persist the clearing of the old chunk map here. The old chunk map 854 * becomes invalid after we update the logical map, since the old chunk map will no 855 * longer have a reference to it in the logical map. 856 */ 857 858 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 859 _reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk); 860 861 vol->pm_logical_map[logical_map_index] = req->chunk_map_index; 862 863 _reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t)); 864 865 _reduce_vol_complete_req(req, 0); 866 } 867 868 static void 869 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 870 reduce_request_fn next_fn, bool is_write) 871 { 872 uint32_t i; 873 874 req->num_backing_ops = vol->backing_io_units_per_chunk; 875 req->backing_cb_args.cb_fn = next_fn; 876 req->backing_cb_args.cb_arg = req; 877 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 878 req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size; 879 req->buf_iov[i].iov_len = vol->params.backing_io_unit_size; 880 if (is_write) { 881 vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1, 882 req->chunk[i] * vol->backing_lba_per_io_unit, 883 vol->backing_lba_per_io_unit, &req->backing_cb_args); 884 } else { 885 vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1, 886 req->chunk[i] * vol->backing_lba_per_io_unit, 887 vol->backing_lba_per_io_unit, &req->backing_cb_args); 888 } 889 } 890 } 891 892 static void 893 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 894 { 895 struct spdk_reduce_vol *vol = req->vol; 896 uint32_t i; 897 898 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 899 900 /* TODO: fail if no chunk map found - but really this should not happen if we 901 * size the number of requests similarly to number of extra chunk maps 902 */ 903 assert(req->chunk_map_index != UINT32_MAX); 904 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 905 906 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 907 908 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 909 req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 910 /* TODO: fail if no backing block found - but really this should also not 911 * happen (see comment above). 912 */ 913 assert(req->chunk[i] != UINT32_MAX); 914 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]); 915 } 916 917 _issue_backing_ops(req, vol, next_fn, true /* write */); 918 } 919 920 static void 921 _write_read_done(void *_req, int reduce_errno) 922 { 923 struct spdk_reduce_vol_request *req = _req; 924 uint64_t chunk_offset; 925 uint8_t *buf; 926 int i; 927 928 if (reduce_errno != 0) { 929 req->reduce_errno = reduce_errno; 930 } 931 932 assert(req->num_backing_ops > 0); 933 if (--req->num_backing_ops > 0) { 934 return; 935 } 936 937 if (req->reduce_errno != 0) { 938 _reduce_vol_complete_req(req, req->reduce_errno); 939 return; 940 } 941 942 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 943 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 944 for (i = 0; i < req->iovcnt; i++) { 945 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 946 buf += req->iov[i].iov_len; 947 } 948 949 _reduce_vol_write_chunk(req, _write_complete_req); 950 } 951 952 static void 953 _read_read_done(void *_req, int reduce_errno) 954 { 955 struct spdk_reduce_vol_request *req = _req; 956 uint64_t chunk_offset; 957 uint8_t *buf; 958 int i; 959 960 if (reduce_errno != 0) { 961 req->reduce_errno = reduce_errno; 962 } 963 964 assert(req->num_backing_ops > 0); 965 if (--req->num_backing_ops > 0) { 966 return; 967 } 968 969 if (req->reduce_errno != 0) { 970 _reduce_vol_complete_req(req, req->reduce_errno); 971 return; 972 } 973 974 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 975 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 976 for (i = 0; i < req->iovcnt; i++) { 977 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 978 buf += req->iov[i].iov_len; 979 } 980 _reduce_vol_complete_req(req, 0); 981 } 982 983 static void 984 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 985 { 986 struct spdk_reduce_vol *vol = req->vol; 987 uint64_t chunk; 988 989 chunk = req->offset / vol->logical_blocks_per_chunk; 990 req->chunk_map_index = vol->pm_logical_map[chunk]; 991 assert(req->chunk_map_index != UINT32_MAX); 992 993 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 994 _issue_backing_ops(req, vol, next_fn, false /* read */); 995 } 996 997 static bool 998 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 999 uint64_t length) 1000 { 1001 uint64_t size = 0; 1002 int i; 1003 1004 for (i = 0; i < iovcnt; i++) { 1005 size += iov[i].iov_len; 1006 } 1007 1008 return size == (length * vol->params.logical_block_size); 1009 } 1010 1011 void 1012 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1013 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1014 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1015 { 1016 struct spdk_reduce_vol_request *req; 1017 uint64_t chunk; 1018 int i; 1019 1020 if (length == 0) { 1021 cb_fn(cb_arg, 0); 1022 return; 1023 } 1024 1025 if (_request_spans_chunk_boundary(vol, offset, length)) { 1026 cb_fn(cb_arg, -EINVAL); 1027 return; 1028 } 1029 1030 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1031 cb_fn(cb_arg, -EINVAL); 1032 return; 1033 } 1034 1035 chunk = offset / vol->logical_blocks_per_chunk; 1036 if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) { 1037 /* 1038 * This chunk hasn't been allocated. So treat the data as all 1039 * zeroes for this chunk - do the memset and immediately complete 1040 * the operation. 1041 */ 1042 for (i = 0; i < iovcnt; i++) { 1043 memset(iov[i].iov_base, 0, iov[i].iov_len); 1044 } 1045 cb_fn(cb_arg, 0); 1046 return; 1047 } 1048 1049 req = TAILQ_FIRST(&vol->free_requests); 1050 if (req == NULL) { 1051 cb_fn(cb_arg, -ENOMEM); 1052 return; 1053 } 1054 1055 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1056 req->vol = vol; 1057 req->iov = iov; 1058 req->iovcnt = iovcnt; 1059 req->offset = offset; 1060 req->length = length; 1061 req->cb_fn = cb_fn; 1062 req->cb_arg = cb_arg; 1063 1064 _reduce_vol_read_chunk(req, _read_read_done); 1065 } 1066 1067 void 1068 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1069 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1070 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1071 { 1072 struct spdk_reduce_vol_request *req; 1073 uint64_t chunk, chunk_offset; 1074 uint32_t lbsize, lb_per_chunk; 1075 int i; 1076 uint8_t *buf; 1077 1078 if (length == 0) { 1079 cb_fn(cb_arg, 0); 1080 return; 1081 } 1082 1083 if (_request_spans_chunk_boundary(vol, offset, length)) { 1084 cb_fn(cb_arg, -EINVAL); 1085 return; 1086 } 1087 1088 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1089 cb_fn(cb_arg, -EINVAL); 1090 return; 1091 } 1092 1093 req = TAILQ_FIRST(&vol->free_requests); 1094 if (req == NULL) { 1095 cb_fn(cb_arg, -ENOMEM); 1096 return; 1097 } 1098 1099 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1100 req->vol = vol; 1101 req->iov = iov; 1102 req->iovcnt = iovcnt; 1103 req->offset = offset; 1104 req->length = length; 1105 req->cb_fn = cb_fn; 1106 req->cb_arg = cb_arg; 1107 1108 chunk = offset / vol->logical_blocks_per_chunk; 1109 if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) { 1110 /* Read old chunk, then overwrite with data from this write operation. 1111 * TODO: bypass reading old chunk if this write operation overwrites 1112 * the entire chunk. 1113 */ 1114 _reduce_vol_read_chunk(req, _write_read_done); 1115 return; 1116 } 1117 1118 buf = req->buf; 1119 lbsize = vol->params.logical_block_size; 1120 lb_per_chunk = vol->logical_blocks_per_chunk; 1121 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1122 chunk_offset = offset % lb_per_chunk; 1123 if (chunk_offset != 0) { 1124 memset(buf, 0, chunk_offset * lbsize); 1125 buf += chunk_offset * lbsize; 1126 } 1127 for (i = 0; i < iovcnt; i++) { 1128 memcpy(buf, iov[i].iov_base, iov[i].iov_len); 1129 buf += iov[i].iov_len; 1130 } 1131 chunk_offset += length; 1132 if (chunk_offset != lb_per_chunk) { 1133 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1134 } 1135 _reduce_vol_write_chunk(req, _write_complete_req); 1136 } 1137 1138 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1139