1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 struct spdk_reduce_vol_request { 82 /** 83 * Scratch buffer used for read/modify/write operations on 84 * I/Os less than a full chunk size, and as the intermediate 85 * buffer for compress/decompress operations. 86 */ 87 uint8_t *buf; 88 struct iovec *buf_iov; 89 struct iovec *iov; 90 struct spdk_reduce_vol *vol; 91 int reduce_errno; 92 int iovcnt; 93 int num_backing_ops; 94 uint64_t offset; 95 uint64_t length; 96 uint64_t chunk_map_index; 97 uint64_t *chunk; 98 spdk_reduce_vol_op_complete cb_fn; 99 void *cb_arg; 100 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 101 struct spdk_reduce_vol_cb_args backing_cb_args; 102 }; 103 104 struct spdk_reduce_vol { 105 struct spdk_reduce_vol_params params; 106 uint32_t backing_io_units_per_chunk; 107 uint32_t backing_lba_per_io_unit; 108 uint32_t logical_blocks_per_chunk; 109 struct spdk_reduce_pm_file pm_file; 110 struct spdk_reduce_backing_dev *backing_dev; 111 struct spdk_reduce_vol_superblock *backing_super; 112 struct spdk_reduce_vol_superblock *pm_super; 113 uint64_t *pm_logical_map; 114 uint64_t *pm_chunk_maps; 115 116 struct spdk_bit_array *allocated_chunk_maps; 117 struct spdk_bit_array *allocated_backing_io_units; 118 119 struct spdk_reduce_vol_request *request_mem; 120 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 121 122 /* Single contiguous buffer used for all request buffers for this volume. */ 123 uint8_t *reqbufspace; 124 struct iovec *buf_iov_mem; 125 }; 126 127 /* 128 * Allocate extra metadata chunks and corresponding backing io units to account for 129 * outstanding IO in worst case scenario where logical map is completely allocated 130 * and no data can be compressed. We need extra chunks in this case to handle 131 * in-flight writes since reduce never writes data in place. 132 */ 133 #define REDUCE_NUM_EXTRA_CHUNKS 128 134 135 static void 136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 137 { 138 if (vol->pm_file.pm_is_pmem) { 139 pmem_persist(addr, len); 140 } else { 141 pmem_msync(addr, len); 142 } 143 } 144 145 static uint64_t 146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 147 { 148 uint64_t chunks_in_logical_map, logical_map_size; 149 150 chunks_in_logical_map = vol_size / chunk_size; 151 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 152 153 /* Round up to next cacheline. */ 154 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 155 REDUCE_PM_SIZE_ALIGNMENT; 156 } 157 158 static uint64_t 159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 160 { 161 uint64_t num_chunks; 162 163 num_chunks = vol_size / chunk_size; 164 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 165 166 return num_chunks; 167 } 168 169 static uint64_t 170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 171 { 172 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 173 174 num_chunks = _get_total_chunks(vol_size, chunk_size); 175 io_units_per_chunk = chunk_size / backing_io_unit_size; 176 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 177 178 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 179 REDUCE_PM_SIZE_ALIGNMENT; 180 } 181 182 static uint64_t * 183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 184 { 185 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 186 187 return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk); 188 } 189 190 static int 191 _validate_vol_params(struct spdk_reduce_vol_params *params) 192 { 193 if (params->vol_size > 0) { 194 /** 195 * User does not pass in the vol size - it gets calculated by libreduce from 196 * values in this structure plus the size of the backing device. 197 */ 198 return -EINVAL; 199 } 200 201 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 202 params->logical_block_size == 0) { 203 return -EINVAL; 204 } 205 206 /* Chunk size must be an even multiple of the backing io unit size. */ 207 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 208 return -EINVAL; 209 } 210 211 /* Chunk size must be an even multiple of the logical block size. */ 212 if ((params->chunk_size % params->logical_block_size) != 0) { 213 return -1; 214 } 215 216 return 0; 217 } 218 219 static uint64_t 220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 221 { 222 uint64_t num_chunks; 223 224 num_chunks = backing_dev_size / chunk_size; 225 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 226 return 0; 227 } 228 229 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 230 return num_chunks * chunk_size; 231 } 232 233 static uint64_t 234 _get_pm_file_size(struct spdk_reduce_vol_params *params) 235 { 236 uint64_t total_pm_size; 237 238 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 239 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 240 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 241 params->backing_io_unit_size); 242 return total_pm_size; 243 } 244 245 const struct spdk_uuid * 246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 247 { 248 return &vol->params.uuid; 249 } 250 251 static void 252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 253 { 254 /* Superblock is at the beginning of the pm file. */ 255 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 256 257 /* Logical map immediately follows the super block. */ 258 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 259 260 /* Chunks maps follow the logical map. */ 261 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 262 } 263 264 /* We need 2 iovs during load - one for the superblock, another for the path */ 265 #define LOAD_IOV_COUNT 2 266 267 struct reduce_init_load_ctx { 268 struct spdk_reduce_vol *vol; 269 struct spdk_reduce_vol_cb_args backing_cb_args; 270 spdk_reduce_vol_op_with_handle_complete cb_fn; 271 void *cb_arg; 272 struct iovec iov[LOAD_IOV_COUNT]; 273 void *path; 274 }; 275 276 static int 277 _allocate_vol_requests(struct spdk_reduce_vol *vol) 278 { 279 struct spdk_reduce_vol_request *req; 280 int i; 281 282 vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 283 if (vol->reqbufspace == NULL) { 284 return -ENOMEM; 285 } 286 287 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 288 if (vol->request_mem == NULL) { 289 spdk_dma_free(vol->reqbufspace); 290 vol->reqbufspace = NULL; 291 return -ENOMEM; 292 } 293 294 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 295 sizeof(struct iovec) * vol->backing_io_units_per_chunk); 296 if (vol->buf_iov_mem == NULL) { 297 free(vol->request_mem); 298 spdk_dma_free(vol->reqbufspace); 299 vol->request_mem = NULL; 300 vol->reqbufspace = NULL; 301 return -ENOMEM; 302 } 303 304 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 305 req = &vol->request_mem[i]; 306 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 307 req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk]; 308 req->buf = vol->reqbufspace + i * vol->params.chunk_size; 309 } 310 311 return 0; 312 } 313 314 static void 315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 316 { 317 if (ctx != NULL) { 318 spdk_dma_free(ctx->path); 319 free(ctx); 320 } 321 322 if (vol != NULL) { 323 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 324 spdk_dma_free(vol->backing_super); 325 spdk_bit_array_free(&vol->allocated_chunk_maps); 326 spdk_bit_array_free(&vol->allocated_backing_io_units); 327 free(vol->request_mem); 328 free(vol->buf_iov_mem); 329 spdk_dma_free(vol->reqbufspace); 330 free(vol); 331 } 332 } 333 334 static void 335 _init_write_super_cpl(void *cb_arg, int reduce_errno) 336 { 337 struct reduce_init_load_ctx *init_ctx = cb_arg; 338 int rc; 339 340 rc = _allocate_vol_requests(init_ctx->vol); 341 if (rc != 0) { 342 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 343 _init_load_cleanup(init_ctx->vol, init_ctx); 344 return; 345 } 346 347 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 348 /* Only clean up the ctx - the vol has been passed to the application 349 * for use now that initialization was successful. 350 */ 351 _init_load_cleanup(NULL, init_ctx); 352 } 353 354 static void 355 _init_write_path_cpl(void *cb_arg, int reduce_errno) 356 { 357 struct reduce_init_load_ctx *init_ctx = cb_arg; 358 struct spdk_reduce_vol *vol = init_ctx->vol; 359 360 init_ctx->iov[0].iov_base = vol->backing_super; 361 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 362 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 363 init_ctx->backing_cb_args.cb_arg = init_ctx; 364 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 365 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 366 &init_ctx->backing_cb_args); 367 } 368 369 static int 370 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 371 { 372 uint64_t total_chunks, total_backing_io_units; 373 uint32_t i, num_metadata_io_units; 374 375 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 376 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 377 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 378 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 379 380 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 381 return -ENOMEM; 382 } 383 384 /* Set backing io unit bits associated with metadata. */ 385 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 386 vol->backing_dev->blocklen; 387 for (i = 0; i < num_metadata_io_units; i++) { 388 spdk_bit_array_set(vol->allocated_backing_io_units, i); 389 } 390 391 return 0; 392 } 393 394 void 395 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 396 struct spdk_reduce_backing_dev *backing_dev, 397 const char *pm_file_dir, 398 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 399 { 400 struct spdk_reduce_vol *vol; 401 struct reduce_init_load_ctx *init_ctx; 402 uint64_t backing_dev_size; 403 size_t mapped_len; 404 int dir_len, max_dir_len, rc; 405 406 /* We need to append a path separator and the UUID to the supplied 407 * path. 408 */ 409 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 410 dir_len = strnlen(pm_file_dir, max_dir_len); 411 /* Strip trailing slash if the user provided one - we will add it back 412 * later when appending the filename. 413 */ 414 if (pm_file_dir[dir_len - 1] == '/') { 415 dir_len--; 416 } 417 if (dir_len == max_dir_len) { 418 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 419 cb_fn(cb_arg, NULL, -EINVAL); 420 return; 421 } 422 423 rc = _validate_vol_params(params); 424 if (rc != 0) { 425 SPDK_ERRLOG("invalid vol params\n"); 426 cb_fn(cb_arg, NULL, rc); 427 return; 428 } 429 430 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 431 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 432 if (params->vol_size == 0) { 433 SPDK_ERRLOG("backing device is too small\n"); 434 cb_fn(cb_arg, NULL, -EINVAL); 435 return; 436 } 437 438 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 439 backing_dev->unmap == NULL) { 440 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 441 cb_fn(cb_arg, NULL, -EINVAL); 442 return; 443 } 444 445 vol = calloc(1, sizeof(*vol)); 446 if (vol == NULL) { 447 cb_fn(cb_arg, NULL, -ENOMEM); 448 return; 449 } 450 451 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 452 if (vol->backing_super == NULL) { 453 cb_fn(cb_arg, NULL, -ENOMEM); 454 _init_load_cleanup(vol, NULL); 455 return; 456 } 457 458 init_ctx = calloc(1, sizeof(*init_ctx)); 459 if (init_ctx == NULL) { 460 cb_fn(cb_arg, NULL, -ENOMEM); 461 _init_load_cleanup(vol, NULL); 462 return; 463 } 464 465 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 466 if (init_ctx->path == NULL) { 467 cb_fn(cb_arg, NULL, -ENOMEM); 468 _init_load_cleanup(vol, init_ctx); 469 return; 470 } 471 472 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 473 spdk_uuid_generate(¶ms->uuid); 474 } 475 476 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 477 vol->pm_file.path[dir_len] = '/'; 478 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 479 ¶ms->uuid); 480 vol->pm_file.size = _get_pm_file_size(params); 481 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 482 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 483 &mapped_len, &vol->pm_file.pm_is_pmem); 484 if (vol->pm_file.pm_buf == NULL) { 485 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 486 vol->pm_file.path, strerror(errno)); 487 cb_fn(cb_arg, NULL, -errno); 488 _init_load_cleanup(vol, init_ctx); 489 return; 490 } 491 492 if (vol->pm_file.size != mapped_len) { 493 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 494 vol->pm_file.size, mapped_len); 495 cb_fn(cb_arg, NULL, -ENOMEM); 496 _init_load_cleanup(vol, init_ctx); 497 return; 498 } 499 500 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 501 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 502 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 503 memcpy(&vol->params, params, sizeof(*params)); 504 505 vol->backing_dev = backing_dev; 506 507 rc = _allocate_bit_arrays(vol); 508 if (rc != 0) { 509 cb_fn(cb_arg, NULL, rc); 510 _init_load_cleanup(vol, init_ctx); 511 return; 512 } 513 514 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 515 sizeof(vol->backing_super->signature)); 516 memcpy(&vol->backing_super->params, params, sizeof(*params)); 517 518 _initialize_vol_pm_pointers(vol); 519 520 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 521 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 522 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 523 */ 524 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 525 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 526 527 init_ctx->vol = vol; 528 init_ctx->cb_fn = cb_fn; 529 init_ctx->cb_arg = cb_arg; 530 531 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 532 init_ctx->iov[0].iov_base = init_ctx->path; 533 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 534 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 535 init_ctx->backing_cb_args.cb_arg = init_ctx; 536 /* Write path to offset 4K on backing device - just after where the super 537 * block will be written. We wait until this is committed before writing the 538 * super block to guarantee we don't get the super block written without the 539 * the path if the system crashed in the middle of a write operation. 540 */ 541 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 542 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 543 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 544 &init_ctx->backing_cb_args); 545 } 546 547 static void 548 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 549 { 550 struct reduce_init_load_ctx *load_ctx = cb_arg; 551 struct spdk_reduce_vol *vol = load_ctx->vol; 552 uint64_t backing_dev_size; 553 uint64_t i, num_chunks; 554 uint64_t *chunk; 555 size_t mapped_len; 556 uint32_t j; 557 int rc; 558 559 if (memcmp(vol->backing_super->signature, 560 SPDK_REDUCE_SIGNATURE, 561 sizeof(vol->backing_super->signature)) != 0) { 562 /* This backing device isn't a libreduce backing device. */ 563 rc = -EILSEQ; 564 goto error; 565 } 566 567 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 568 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 569 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 570 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 571 572 rc = _allocate_bit_arrays(vol); 573 if (rc != 0) { 574 goto error; 575 } 576 577 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 578 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 579 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 580 backing_dev_size); 581 rc = -EILSEQ; 582 goto error; 583 } 584 585 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 586 vol->pm_file.size = _get_pm_file_size(&vol->params); 587 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 588 &vol->pm_file.pm_is_pmem); 589 if (vol->pm_file.pm_buf == NULL) { 590 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 591 rc = -errno; 592 goto error; 593 } 594 595 if (vol->pm_file.size != mapped_len) { 596 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 597 vol->pm_file.size, mapped_len); 598 rc = -ENOMEM; 599 goto error; 600 } 601 602 rc = _allocate_vol_requests(vol); 603 if (rc != 0) { 604 goto error; 605 } 606 607 _initialize_vol_pm_pointers(vol); 608 609 num_chunks = vol->params.vol_size / vol->params.chunk_size; 610 for (i = 0; i < num_chunks; i++) { 611 if (vol->pm_logical_map[i] == REDUCE_EMPTY_MAP_ENTRY) { 612 continue; 613 } 614 spdk_bit_array_set(vol->allocated_chunk_maps, i); 615 chunk = _reduce_vol_get_chunk_map(vol, i); 616 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 617 if (chunk[j] != REDUCE_EMPTY_MAP_ENTRY) { 618 spdk_bit_array_set(vol->allocated_backing_io_units, chunk[j]); 619 } 620 } 621 } 622 623 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 624 /* Only clean up the ctx - the vol has been passed to the application 625 * for use now that volume load was successful. 626 */ 627 _init_load_cleanup(NULL, load_ctx); 628 return; 629 630 error: 631 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 632 _init_load_cleanup(vol, load_ctx); 633 } 634 635 void 636 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 637 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 638 { 639 struct spdk_reduce_vol *vol; 640 struct reduce_init_load_ctx *load_ctx; 641 642 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 643 backing_dev->unmap == NULL) { 644 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 645 cb_fn(cb_arg, NULL, -EINVAL); 646 return; 647 } 648 649 vol = calloc(1, sizeof(*vol)); 650 if (vol == NULL) { 651 cb_fn(cb_arg, NULL, -ENOMEM); 652 return; 653 } 654 655 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 656 if (vol->backing_super == NULL) { 657 _init_load_cleanup(vol, NULL); 658 cb_fn(cb_arg, NULL, -ENOMEM); 659 return; 660 } 661 662 vol->backing_dev = backing_dev; 663 664 load_ctx = calloc(1, sizeof(*load_ctx)); 665 if (load_ctx == NULL) { 666 _init_load_cleanup(vol, NULL); 667 cb_fn(cb_arg, NULL, -ENOMEM); 668 return; 669 } 670 671 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 672 if (load_ctx->path == NULL) { 673 _init_load_cleanup(vol, load_ctx); 674 cb_fn(cb_arg, NULL, -ENOMEM); 675 return; 676 } 677 678 load_ctx->vol = vol; 679 load_ctx->cb_fn = cb_fn; 680 load_ctx->cb_arg = cb_arg; 681 682 load_ctx->iov[0].iov_base = vol->backing_super; 683 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 684 load_ctx->iov[1].iov_base = load_ctx->path; 685 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 686 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 687 load_ctx->backing_cb_args.cb_arg = load_ctx; 688 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 689 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 690 vol->backing_dev->blocklen, 691 &load_ctx->backing_cb_args); 692 } 693 694 void 695 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 696 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 697 { 698 if (vol == NULL) { 699 /* This indicates a programming error. */ 700 assert(false); 701 cb_fn(cb_arg, -EINVAL); 702 return; 703 } 704 705 _init_load_cleanup(vol, NULL); 706 cb_fn(cb_arg, 0); 707 } 708 709 struct reduce_destroy_ctx { 710 spdk_reduce_vol_op_complete cb_fn; 711 void *cb_arg; 712 struct spdk_reduce_vol *vol; 713 struct spdk_reduce_vol_superblock *super; 714 struct iovec iov; 715 struct spdk_reduce_vol_cb_args backing_cb_args; 716 int reduce_errno; 717 char pm_path[REDUCE_PATH_MAX]; 718 }; 719 720 static void 721 destroy_unload_cpl(void *cb_arg, int reduce_errno) 722 { 723 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 724 725 if (destroy_ctx->reduce_errno == 0) { 726 if (unlink(destroy_ctx->pm_path)) { 727 SPDK_ERRLOG("%s could not be unlinked: %s\n", 728 destroy_ctx->pm_path, strerror(errno)); 729 } 730 } 731 732 /* Even if the unload somehow failed, we still pass the destroy_ctx 733 * reduce_errno since that indicates whether or not the volume was 734 * actually destroyed. 735 */ 736 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 737 spdk_dma_free(destroy_ctx->super); 738 free(destroy_ctx); 739 } 740 741 static void 742 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 743 { 744 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 745 struct spdk_reduce_vol *vol = destroy_ctx->vol; 746 747 destroy_ctx->reduce_errno = reduce_errno; 748 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 749 } 750 751 static void 752 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 753 { 754 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 755 756 if (reduce_errno != 0) { 757 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 758 spdk_dma_free(destroy_ctx->super); 759 free(destroy_ctx); 760 return; 761 } 762 763 destroy_ctx->vol = vol; 764 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 765 destroy_ctx->iov.iov_base = destroy_ctx->super; 766 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 767 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 768 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 769 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 770 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 771 &destroy_ctx->backing_cb_args); 772 } 773 774 void 775 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 776 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 777 { 778 struct reduce_destroy_ctx *destroy_ctx; 779 780 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 781 if (destroy_ctx == NULL) { 782 cb_fn(cb_arg, -ENOMEM); 783 return; 784 } 785 786 destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL); 787 if (destroy_ctx->super == NULL) { 788 free(destroy_ctx); 789 cb_fn(cb_arg, -ENOMEM); 790 return; 791 } 792 destroy_ctx->cb_fn = cb_fn; 793 destroy_ctx->cb_arg = cb_arg; 794 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 795 } 796 797 static bool 798 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 799 { 800 uint64_t start_chunk, end_chunk; 801 802 start_chunk = offset / vol->logical_blocks_per_chunk; 803 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 804 805 return (start_chunk != end_chunk); 806 } 807 808 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 809 810 static void 811 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 812 { 813 req->cb_fn(req->cb_arg, reduce_errno); 814 TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq); 815 } 816 817 static void 818 _write_complete_req(void *_req, int reduce_errno) 819 { 820 struct spdk_reduce_vol_request *req = _req; 821 struct spdk_reduce_vol *vol = req->vol; 822 uint64_t logical_map_index, old_chunk_map_index; 823 uint64_t *old_chunk; 824 uint32_t i; 825 826 if (reduce_errno != 0) { 827 req->reduce_errno = reduce_errno; 828 } 829 830 assert(req->num_backing_ops > 0); 831 if (--req->num_backing_ops > 0) { 832 return; 833 } 834 835 if (req->reduce_errno != 0) { 836 _reduce_vol_complete_req(req, req->reduce_errno); 837 return; 838 } 839 840 logical_map_index = req->offset / vol->logical_blocks_per_chunk; 841 842 old_chunk_map_index = vol->pm_logical_map[logical_map_index]; 843 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 844 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 845 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 846 if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) { 847 break; 848 } 849 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true); 850 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]); 851 old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY; 852 } 853 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 854 } 855 856 /* 857 * We don't need to persist the clearing of the old chunk map here. The old chunk map 858 * becomes invalid after we update the logical map, since the old chunk map will no 859 * longer have a reference to it in the logical map. 860 */ 861 862 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 863 _reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk); 864 865 vol->pm_logical_map[logical_map_index] = req->chunk_map_index; 866 867 _reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t)); 868 869 _reduce_vol_complete_req(req, 0); 870 } 871 872 static void 873 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 874 reduce_request_fn next_fn, bool is_write) 875 { 876 uint32_t i; 877 878 req->num_backing_ops = vol->backing_io_units_per_chunk; 879 req->backing_cb_args.cb_fn = next_fn; 880 req->backing_cb_args.cb_arg = req; 881 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 882 req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size; 883 req->buf_iov[i].iov_len = vol->params.backing_io_unit_size; 884 if (is_write) { 885 vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1, 886 req->chunk[i] * vol->backing_lba_per_io_unit, 887 vol->backing_lba_per_io_unit, &req->backing_cb_args); 888 } else { 889 vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1, 890 req->chunk[i] * vol->backing_lba_per_io_unit, 891 vol->backing_lba_per_io_unit, &req->backing_cb_args); 892 } 893 } 894 } 895 896 static void 897 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 898 { 899 struct spdk_reduce_vol *vol = req->vol; 900 uint32_t i; 901 902 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 903 904 /* TODO: fail if no chunk map found - but really this should not happen if we 905 * size the number of requests similarly to number of extra chunk maps 906 */ 907 assert(req->chunk_map_index != UINT32_MAX); 908 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 909 910 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 911 912 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 913 req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 914 /* TODO: fail if no backing block found - but really this should also not 915 * happen (see comment above). 916 */ 917 assert(req->chunk[i] != UINT32_MAX); 918 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]); 919 } 920 921 _issue_backing_ops(req, vol, next_fn, true /* write */); 922 } 923 924 static void 925 _write_read_done(void *_req, int reduce_errno) 926 { 927 struct spdk_reduce_vol_request *req = _req; 928 uint64_t chunk_offset; 929 uint8_t *buf; 930 int i; 931 932 if (reduce_errno != 0) { 933 req->reduce_errno = reduce_errno; 934 } 935 936 assert(req->num_backing_ops > 0); 937 if (--req->num_backing_ops > 0) { 938 return; 939 } 940 941 if (req->reduce_errno != 0) { 942 _reduce_vol_complete_req(req, req->reduce_errno); 943 return; 944 } 945 946 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 947 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 948 for (i = 0; i < req->iovcnt; i++) { 949 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 950 buf += req->iov[i].iov_len; 951 } 952 953 _reduce_vol_write_chunk(req, _write_complete_req); 954 } 955 956 static void 957 _read_read_done(void *_req, int reduce_errno) 958 { 959 struct spdk_reduce_vol_request *req = _req; 960 uint64_t chunk_offset; 961 uint8_t *buf; 962 int i; 963 964 if (reduce_errno != 0) { 965 req->reduce_errno = reduce_errno; 966 } 967 968 assert(req->num_backing_ops > 0); 969 if (--req->num_backing_ops > 0) { 970 return; 971 } 972 973 if (req->reduce_errno != 0) { 974 _reduce_vol_complete_req(req, req->reduce_errno); 975 return; 976 } 977 978 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 979 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 980 for (i = 0; i < req->iovcnt; i++) { 981 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 982 buf += req->iov[i].iov_len; 983 } 984 _reduce_vol_complete_req(req, 0); 985 } 986 987 static void 988 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 989 { 990 struct spdk_reduce_vol *vol = req->vol; 991 uint64_t chunk; 992 993 chunk = req->offset / vol->logical_blocks_per_chunk; 994 req->chunk_map_index = vol->pm_logical_map[chunk]; 995 assert(req->chunk_map_index != UINT32_MAX); 996 997 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 998 _issue_backing_ops(req, vol, next_fn, false /* read */); 999 } 1000 1001 static bool 1002 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1003 uint64_t length) 1004 { 1005 uint64_t size = 0; 1006 int i; 1007 1008 for (i = 0; i < iovcnt; i++) { 1009 size += iov[i].iov_len; 1010 } 1011 1012 return size == (length * vol->params.logical_block_size); 1013 } 1014 1015 void 1016 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1017 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1018 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1019 { 1020 struct spdk_reduce_vol_request *req; 1021 uint64_t chunk; 1022 int i; 1023 1024 if (length == 0) { 1025 cb_fn(cb_arg, 0); 1026 return; 1027 } 1028 1029 if (_request_spans_chunk_boundary(vol, offset, length)) { 1030 cb_fn(cb_arg, -EINVAL); 1031 return; 1032 } 1033 1034 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1035 cb_fn(cb_arg, -EINVAL); 1036 return; 1037 } 1038 1039 chunk = offset / vol->logical_blocks_per_chunk; 1040 if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) { 1041 /* 1042 * This chunk hasn't been allocated. So treat the data as all 1043 * zeroes for this chunk - do the memset and immediately complete 1044 * the operation. 1045 */ 1046 for (i = 0; i < iovcnt; i++) { 1047 memset(iov[i].iov_base, 0, iov[i].iov_len); 1048 } 1049 cb_fn(cb_arg, 0); 1050 return; 1051 } 1052 1053 req = TAILQ_FIRST(&vol->free_requests); 1054 if (req == NULL) { 1055 cb_fn(cb_arg, -ENOMEM); 1056 return; 1057 } 1058 1059 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1060 req->vol = vol; 1061 req->iov = iov; 1062 req->iovcnt = iovcnt; 1063 req->offset = offset; 1064 req->length = length; 1065 req->cb_fn = cb_fn; 1066 req->cb_arg = cb_arg; 1067 1068 _reduce_vol_read_chunk(req, _read_read_done); 1069 } 1070 1071 void 1072 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1073 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1074 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1075 { 1076 struct spdk_reduce_vol_request *req; 1077 uint64_t chunk, chunk_offset; 1078 uint32_t lbsize, lb_per_chunk; 1079 int i; 1080 uint8_t *buf; 1081 1082 if (length == 0) { 1083 cb_fn(cb_arg, 0); 1084 return; 1085 } 1086 1087 if (_request_spans_chunk_boundary(vol, offset, length)) { 1088 cb_fn(cb_arg, -EINVAL); 1089 return; 1090 } 1091 1092 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1093 cb_fn(cb_arg, -EINVAL); 1094 return; 1095 } 1096 1097 req = TAILQ_FIRST(&vol->free_requests); 1098 if (req == NULL) { 1099 cb_fn(cb_arg, -ENOMEM); 1100 return; 1101 } 1102 1103 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1104 req->vol = vol; 1105 req->iov = iov; 1106 req->iovcnt = iovcnt; 1107 req->offset = offset; 1108 req->length = length; 1109 req->cb_fn = cb_fn; 1110 req->cb_arg = cb_arg; 1111 1112 chunk = offset / vol->logical_blocks_per_chunk; 1113 if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) { 1114 /* Read old chunk, then overwrite with data from this write operation. 1115 * TODO: bypass reading old chunk if this write operation overwrites 1116 * the entire chunk. 1117 */ 1118 _reduce_vol_read_chunk(req, _write_read_done); 1119 return; 1120 } 1121 1122 buf = req->buf; 1123 lbsize = vol->params.logical_block_size; 1124 lb_per_chunk = vol->logical_blocks_per_chunk; 1125 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1126 chunk_offset = offset % lb_per_chunk; 1127 if (chunk_offset != 0) { 1128 memset(buf, 0, chunk_offset * lbsize); 1129 buf += chunk_offset * lbsize; 1130 } 1131 for (i = 0; i < iovcnt; i++) { 1132 memcpy(buf, iov[i].iov_base, iov[i].iov_len); 1133 buf += iov[i].iov_len; 1134 } 1135 chunk_offset += length; 1136 if (chunk_offset != lb_per_chunk) { 1137 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1138 } 1139 _reduce_vol_write_chunk(req, _write_complete_req); 1140 } 1141 1142 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1143