1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 struct spdk_reduce_vol_request { 82 /** 83 * Scratch buffer used for read/modify/write operations on 84 * I/Os less than a full chunk size, and as the intermediate 85 * buffer for compress/decompress operations. 86 */ 87 uint8_t *buf; 88 struct iovec *buf_iov; 89 struct iovec *iov; 90 struct spdk_reduce_vol *vol; 91 int reduce_errno; 92 int iovcnt; 93 int num_backing_ops; 94 uint64_t offset; 95 uint64_t length; 96 uint64_t chunk_map_index; 97 uint64_t *chunk; 98 spdk_reduce_vol_op_complete cb_fn; 99 void *cb_arg; 100 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 101 struct spdk_reduce_vol_cb_args backing_cb_args; 102 }; 103 104 struct spdk_reduce_vol { 105 struct spdk_reduce_vol_params params; 106 uint32_t backing_io_units_per_chunk; 107 uint32_t backing_lba_per_io_unit; 108 uint32_t logical_blocks_per_chunk; 109 struct spdk_reduce_pm_file pm_file; 110 struct spdk_reduce_backing_dev *backing_dev; 111 struct spdk_reduce_vol_superblock *backing_super; 112 struct spdk_reduce_vol_superblock *pm_super; 113 uint64_t *pm_logical_map; 114 uint64_t *pm_chunk_maps; 115 116 struct spdk_bit_array *allocated_chunk_maps; 117 struct spdk_bit_array *allocated_backing_io_units; 118 119 struct spdk_reduce_vol_request *request_mem; 120 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 121 122 /* Single contiguous buffer used for all request buffers for this volume. */ 123 uint8_t *reqbufspace; 124 struct iovec *buf_iov_mem; 125 }; 126 127 /* 128 * Allocate extra metadata chunks and corresponding backing io units to account for 129 * outstanding IO in worst case scenario where logical map is completely allocated 130 * and no data can be compressed. We need extra chunks in this case to handle 131 * in-flight writes since reduce never writes data in place. 132 */ 133 #define REDUCE_NUM_EXTRA_CHUNKS 128 134 135 static void 136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 137 { 138 if (vol->pm_file.pm_is_pmem) { 139 pmem_persist(addr, len); 140 } else { 141 pmem_msync(addr, len); 142 } 143 } 144 145 static uint64_t 146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 147 { 148 uint64_t chunks_in_logical_map, logical_map_size; 149 150 chunks_in_logical_map = vol_size / chunk_size; 151 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 152 153 /* Round up to next cacheline. */ 154 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 155 REDUCE_PM_SIZE_ALIGNMENT; 156 } 157 158 static uint64_t 159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 160 { 161 uint64_t num_chunks; 162 163 num_chunks = vol_size / chunk_size; 164 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 165 166 return num_chunks; 167 } 168 169 static uint64_t 170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 171 { 172 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 173 174 num_chunks = _get_total_chunks(vol_size, chunk_size); 175 io_units_per_chunk = chunk_size / backing_io_unit_size; 176 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 177 178 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 179 REDUCE_PM_SIZE_ALIGNMENT; 180 } 181 182 static uint64_t * 183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 184 { 185 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 186 187 return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk); 188 } 189 190 static int 191 _validate_vol_params(struct spdk_reduce_vol_params *params) 192 { 193 if (params->vol_size > 0) { 194 /** 195 * User does not pass in the vol size - it gets calculated by libreduce from 196 * values in this structure plus the size of the backing device. 197 */ 198 return -EINVAL; 199 } 200 201 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 202 params->logical_block_size == 0) { 203 return -EINVAL; 204 } 205 206 /* Chunk size must be an even multiple of the backing io unit size. */ 207 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 208 return -EINVAL; 209 } 210 211 /* Chunk size must be an even multiple of the logical block size. */ 212 if ((params->chunk_size % params->logical_block_size) != 0) { 213 return -1; 214 } 215 216 return 0; 217 } 218 219 static uint64_t 220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 221 { 222 uint64_t num_chunks; 223 224 num_chunks = backing_dev_size / chunk_size; 225 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 226 return 0; 227 } 228 229 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 230 return num_chunks * chunk_size; 231 } 232 233 static uint64_t 234 _get_pm_file_size(struct spdk_reduce_vol_params *params) 235 { 236 uint64_t total_pm_size; 237 238 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 239 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 240 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 241 params->backing_io_unit_size); 242 return total_pm_size; 243 } 244 245 const struct spdk_uuid * 246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 247 { 248 return &vol->params.uuid; 249 } 250 251 static void 252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 253 { 254 /* Superblock is at the beginning of the pm file. */ 255 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 256 257 /* Logical map immediately follows the super block. */ 258 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 259 260 /* Chunks maps follow the logical map. */ 261 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 262 } 263 264 /* We need 2 iovs during load - one for the superblock, another for the path */ 265 #define LOAD_IOV_COUNT 2 266 267 struct reduce_init_load_ctx { 268 struct spdk_reduce_vol *vol; 269 struct spdk_reduce_vol_cb_args backing_cb_args; 270 spdk_reduce_vol_op_with_handle_complete cb_fn; 271 void *cb_arg; 272 struct iovec iov[LOAD_IOV_COUNT]; 273 void *path; 274 }; 275 276 static int 277 _allocate_vol_requests(struct spdk_reduce_vol *vol) 278 { 279 struct spdk_reduce_vol_request *req; 280 int i; 281 282 vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 283 if (vol->reqbufspace == NULL) { 284 return -ENOMEM; 285 } 286 287 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 288 if (vol->request_mem == NULL) { 289 spdk_dma_free(vol->reqbufspace); 290 vol->reqbufspace = NULL; 291 return -ENOMEM; 292 } 293 294 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 295 sizeof(struct iovec) * vol->backing_io_units_per_chunk); 296 if (vol->buf_iov_mem == NULL) { 297 free(vol->request_mem); 298 spdk_dma_free(vol->reqbufspace); 299 vol->request_mem = NULL; 300 vol->reqbufspace = NULL; 301 return -ENOMEM; 302 } 303 304 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 305 req = &vol->request_mem[i]; 306 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 307 req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk]; 308 req->buf = vol->reqbufspace + i * vol->params.chunk_size; 309 } 310 311 return 0; 312 } 313 314 static void 315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 316 { 317 if (ctx != NULL) { 318 spdk_dma_free(ctx->path); 319 free(ctx); 320 } 321 322 if (vol != NULL) { 323 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 324 spdk_dma_free(vol->backing_super); 325 spdk_bit_array_free(&vol->allocated_chunk_maps); 326 spdk_bit_array_free(&vol->allocated_backing_io_units); 327 free(vol->request_mem); 328 free(vol->buf_iov_mem); 329 spdk_dma_free(vol->reqbufspace); 330 free(vol); 331 } 332 } 333 334 static void 335 _init_write_super_cpl(void *cb_arg, int reduce_errno) 336 { 337 struct reduce_init_load_ctx *init_ctx = cb_arg; 338 int rc; 339 340 rc = _allocate_vol_requests(init_ctx->vol); 341 if (rc != 0) { 342 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 343 _init_load_cleanup(init_ctx->vol, init_ctx); 344 return; 345 } 346 347 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 348 /* Only clean up the ctx - the vol has been passed to the application 349 * for use now that initialization was successful. 350 */ 351 _init_load_cleanup(NULL, init_ctx); 352 } 353 354 static void 355 _init_write_path_cpl(void *cb_arg, int reduce_errno) 356 { 357 struct reduce_init_load_ctx *init_ctx = cb_arg; 358 struct spdk_reduce_vol *vol = init_ctx->vol; 359 360 init_ctx->iov[0].iov_base = vol->backing_super; 361 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 362 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 363 init_ctx->backing_cb_args.cb_arg = init_ctx; 364 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 365 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 366 &init_ctx->backing_cb_args); 367 } 368 369 static int 370 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 371 { 372 uint64_t total_chunks, total_backing_io_units; 373 uint32_t i, num_metadata_io_units; 374 375 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 376 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 377 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 378 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 379 380 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 381 return -ENOMEM; 382 } 383 384 /* Set backing io unit bits associated with metadata. */ 385 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 386 vol->backing_dev->blocklen; 387 for (i = 0; i < num_metadata_io_units; i++) { 388 spdk_bit_array_set(vol->allocated_backing_io_units, i); 389 } 390 391 return 0; 392 } 393 394 void 395 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 396 struct spdk_reduce_backing_dev *backing_dev, 397 const char *pm_file_dir, 398 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 399 { 400 struct spdk_reduce_vol *vol; 401 struct reduce_init_load_ctx *init_ctx; 402 uint64_t backing_dev_size; 403 size_t mapped_len; 404 int dir_len, max_dir_len, rc; 405 406 /* We need to append a path separator and the UUID to the supplied 407 * path. 408 */ 409 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 410 dir_len = strnlen(pm_file_dir, max_dir_len); 411 /* Strip trailing slash if the user provided one - we will add it back 412 * later when appending the filename. 413 */ 414 if (pm_file_dir[dir_len - 1] == '/') { 415 dir_len--; 416 } 417 if (dir_len == max_dir_len) { 418 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 419 cb_fn(cb_arg, NULL, -EINVAL); 420 return; 421 } 422 423 rc = _validate_vol_params(params); 424 if (rc != 0) { 425 SPDK_ERRLOG("invalid vol params\n"); 426 cb_fn(cb_arg, NULL, rc); 427 return; 428 } 429 430 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 431 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 432 if (params->vol_size == 0) { 433 SPDK_ERRLOG("backing device is too small\n"); 434 cb_fn(cb_arg, NULL, -EINVAL); 435 return; 436 } 437 438 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 439 backing_dev->unmap == NULL) { 440 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 441 cb_fn(cb_arg, NULL, -EINVAL); 442 return; 443 } 444 445 vol = calloc(1, sizeof(*vol)); 446 if (vol == NULL) { 447 cb_fn(cb_arg, NULL, -ENOMEM); 448 return; 449 } 450 451 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 452 if (vol->backing_super == NULL) { 453 cb_fn(cb_arg, NULL, -ENOMEM); 454 _init_load_cleanup(vol, NULL); 455 return; 456 } 457 458 init_ctx = calloc(1, sizeof(*init_ctx)); 459 if (init_ctx == NULL) { 460 cb_fn(cb_arg, NULL, -ENOMEM); 461 _init_load_cleanup(vol, NULL); 462 return; 463 } 464 465 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 466 if (init_ctx->path == NULL) { 467 cb_fn(cb_arg, NULL, -ENOMEM); 468 _init_load_cleanup(vol, init_ctx); 469 return; 470 } 471 472 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 473 spdk_uuid_generate(¶ms->uuid); 474 } 475 476 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 477 vol->pm_file.path[dir_len] = '/'; 478 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 479 ¶ms->uuid); 480 vol->pm_file.size = _get_pm_file_size(params); 481 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 482 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 483 &mapped_len, &vol->pm_file.pm_is_pmem); 484 if (vol->pm_file.pm_buf == NULL) { 485 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 486 vol->pm_file.path, strerror(errno)); 487 cb_fn(cb_arg, NULL, -errno); 488 _init_load_cleanup(vol, init_ctx); 489 return; 490 } 491 492 if (vol->pm_file.size != mapped_len) { 493 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 494 vol->pm_file.size, mapped_len); 495 cb_fn(cb_arg, NULL, -ENOMEM); 496 _init_load_cleanup(vol, init_ctx); 497 return; 498 } 499 500 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 501 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 502 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 503 memcpy(&vol->params, params, sizeof(*params)); 504 505 vol->backing_dev = backing_dev; 506 507 rc = _allocate_bit_arrays(vol); 508 if (rc != 0) { 509 cb_fn(cb_arg, NULL, rc); 510 _init_load_cleanup(vol, init_ctx); 511 return; 512 } 513 514 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 515 sizeof(vol->backing_super->signature)); 516 memcpy(&vol->backing_super->params, params, sizeof(*params)); 517 518 _initialize_vol_pm_pointers(vol); 519 520 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 521 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 522 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 523 */ 524 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 525 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 526 527 init_ctx->vol = vol; 528 init_ctx->cb_fn = cb_fn; 529 init_ctx->cb_arg = cb_arg; 530 531 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 532 init_ctx->iov[0].iov_base = init_ctx->path; 533 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 534 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 535 init_ctx->backing_cb_args.cb_arg = init_ctx; 536 /* Write path to offset 4K on backing device - just after where the super 537 * block will be written. We wait until this is committed before writing the 538 * super block to guarantee we don't get the super block written without the 539 * the path if the system crashed in the middle of a write operation. 540 */ 541 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 542 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 543 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 544 &init_ctx->backing_cb_args); 545 } 546 547 static void 548 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 549 { 550 struct reduce_init_load_ctx *load_ctx = cb_arg; 551 struct spdk_reduce_vol *vol = load_ctx->vol; 552 uint64_t backing_dev_size; 553 uint64_t i, num_chunks, logical_map_index; 554 uint64_t *chunk; 555 size_t mapped_len; 556 uint32_t j; 557 int rc; 558 559 if (memcmp(vol->backing_super->signature, 560 SPDK_REDUCE_SIGNATURE, 561 sizeof(vol->backing_super->signature)) != 0) { 562 /* This backing device isn't a libreduce backing device. */ 563 rc = -EILSEQ; 564 goto error; 565 } 566 567 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 568 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 569 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 570 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 571 572 rc = _allocate_bit_arrays(vol); 573 if (rc != 0) { 574 goto error; 575 } 576 577 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 578 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 579 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 580 backing_dev_size); 581 rc = -EILSEQ; 582 goto error; 583 } 584 585 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 586 vol->pm_file.size = _get_pm_file_size(&vol->params); 587 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 588 &vol->pm_file.pm_is_pmem); 589 if (vol->pm_file.pm_buf == NULL) { 590 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 591 rc = -errno; 592 goto error; 593 } 594 595 if (vol->pm_file.size != mapped_len) { 596 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 597 vol->pm_file.size, mapped_len); 598 rc = -ENOMEM; 599 goto error; 600 } 601 602 rc = _allocate_vol_requests(vol); 603 if (rc != 0) { 604 goto error; 605 } 606 607 _initialize_vol_pm_pointers(vol); 608 609 num_chunks = vol->params.vol_size / vol->params.chunk_size; 610 for (i = 0; i < num_chunks; i++) { 611 logical_map_index = vol->pm_logical_map[i]; 612 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 613 continue; 614 } 615 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 616 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 617 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 618 if (chunk[j] != REDUCE_EMPTY_MAP_ENTRY) { 619 spdk_bit_array_set(vol->allocated_backing_io_units, chunk[j]); 620 } 621 } 622 } 623 624 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 625 /* Only clean up the ctx - the vol has been passed to the application 626 * for use now that volume load was successful. 627 */ 628 _init_load_cleanup(NULL, load_ctx); 629 return; 630 631 error: 632 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 633 _init_load_cleanup(vol, load_ctx); 634 } 635 636 void 637 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 638 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 639 { 640 struct spdk_reduce_vol *vol; 641 struct reduce_init_load_ctx *load_ctx; 642 643 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 644 backing_dev->unmap == NULL) { 645 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 646 cb_fn(cb_arg, NULL, -EINVAL); 647 return; 648 } 649 650 vol = calloc(1, sizeof(*vol)); 651 if (vol == NULL) { 652 cb_fn(cb_arg, NULL, -ENOMEM); 653 return; 654 } 655 656 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 657 if (vol->backing_super == NULL) { 658 _init_load_cleanup(vol, NULL); 659 cb_fn(cb_arg, NULL, -ENOMEM); 660 return; 661 } 662 663 vol->backing_dev = backing_dev; 664 665 load_ctx = calloc(1, sizeof(*load_ctx)); 666 if (load_ctx == NULL) { 667 _init_load_cleanup(vol, NULL); 668 cb_fn(cb_arg, NULL, -ENOMEM); 669 return; 670 } 671 672 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 673 if (load_ctx->path == NULL) { 674 _init_load_cleanup(vol, load_ctx); 675 cb_fn(cb_arg, NULL, -ENOMEM); 676 return; 677 } 678 679 load_ctx->vol = vol; 680 load_ctx->cb_fn = cb_fn; 681 load_ctx->cb_arg = cb_arg; 682 683 load_ctx->iov[0].iov_base = vol->backing_super; 684 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 685 load_ctx->iov[1].iov_base = load_ctx->path; 686 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 687 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 688 load_ctx->backing_cb_args.cb_arg = load_ctx; 689 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 690 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 691 vol->backing_dev->blocklen, 692 &load_ctx->backing_cb_args); 693 } 694 695 void 696 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 697 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 698 { 699 if (vol == NULL) { 700 /* This indicates a programming error. */ 701 assert(false); 702 cb_fn(cb_arg, -EINVAL); 703 return; 704 } 705 706 _init_load_cleanup(vol, NULL); 707 cb_fn(cb_arg, 0); 708 } 709 710 struct reduce_destroy_ctx { 711 spdk_reduce_vol_op_complete cb_fn; 712 void *cb_arg; 713 struct spdk_reduce_vol *vol; 714 struct spdk_reduce_vol_superblock *super; 715 struct iovec iov; 716 struct spdk_reduce_vol_cb_args backing_cb_args; 717 int reduce_errno; 718 char pm_path[REDUCE_PATH_MAX]; 719 }; 720 721 static void 722 destroy_unload_cpl(void *cb_arg, int reduce_errno) 723 { 724 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 725 726 if (destroy_ctx->reduce_errno == 0) { 727 if (unlink(destroy_ctx->pm_path)) { 728 SPDK_ERRLOG("%s could not be unlinked: %s\n", 729 destroy_ctx->pm_path, strerror(errno)); 730 } 731 } 732 733 /* Even if the unload somehow failed, we still pass the destroy_ctx 734 * reduce_errno since that indicates whether or not the volume was 735 * actually destroyed. 736 */ 737 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 738 spdk_dma_free(destroy_ctx->super); 739 free(destroy_ctx); 740 } 741 742 static void 743 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 744 { 745 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 746 struct spdk_reduce_vol *vol = destroy_ctx->vol; 747 748 destroy_ctx->reduce_errno = reduce_errno; 749 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 750 } 751 752 static void 753 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 754 { 755 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 756 757 if (reduce_errno != 0) { 758 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 759 spdk_dma_free(destroy_ctx->super); 760 free(destroy_ctx); 761 return; 762 } 763 764 destroy_ctx->vol = vol; 765 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 766 destroy_ctx->iov.iov_base = destroy_ctx->super; 767 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 768 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 769 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 770 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 771 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 772 &destroy_ctx->backing_cb_args); 773 } 774 775 void 776 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 777 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 778 { 779 struct reduce_destroy_ctx *destroy_ctx; 780 781 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 782 if (destroy_ctx == NULL) { 783 cb_fn(cb_arg, -ENOMEM); 784 return; 785 } 786 787 destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL); 788 if (destroy_ctx->super == NULL) { 789 free(destroy_ctx); 790 cb_fn(cb_arg, -ENOMEM); 791 return; 792 } 793 destroy_ctx->cb_fn = cb_fn; 794 destroy_ctx->cb_arg = cb_arg; 795 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 796 } 797 798 static bool 799 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 800 { 801 uint64_t start_chunk, end_chunk; 802 803 start_chunk = offset / vol->logical_blocks_per_chunk; 804 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 805 806 return (start_chunk != end_chunk); 807 } 808 809 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 810 811 static void 812 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 813 { 814 req->cb_fn(req->cb_arg, reduce_errno); 815 TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq); 816 } 817 818 static void 819 _write_complete_req(void *_req, int reduce_errno) 820 { 821 struct spdk_reduce_vol_request *req = _req; 822 struct spdk_reduce_vol *vol = req->vol; 823 uint64_t logical_map_index, old_logical_map_index; 824 uint64_t *old_chunk; 825 uint32_t i; 826 827 if (reduce_errno != 0) { 828 req->reduce_errno = reduce_errno; 829 } 830 831 assert(req->num_backing_ops > 0); 832 if (--req->num_backing_ops > 0) { 833 return; 834 } 835 836 if (req->reduce_errno != 0) { 837 _reduce_vol_complete_req(req, req->reduce_errno); 838 return; 839 } 840 841 logical_map_index = req->offset / vol->logical_blocks_per_chunk; 842 843 old_logical_map_index = vol->pm_logical_map[logical_map_index]; 844 if (old_logical_map_index != REDUCE_EMPTY_MAP_ENTRY) { 845 old_chunk = _reduce_vol_get_chunk_map(vol, old_logical_map_index); 846 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 847 if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) { 848 break; 849 } 850 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true); 851 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]); 852 old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY; 853 } 854 spdk_bit_array_clear(vol->allocated_chunk_maps, old_logical_map_index); 855 } 856 857 /* 858 * We don't need to persist the clearing of the old chunk map here. The old chunk map 859 * becomes invalid after we update the logical map, since the old chunk map will no 860 * longer have a reference to it in the logical map. 861 */ 862 863 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 864 _reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk); 865 866 vol->pm_logical_map[logical_map_index] = req->chunk_map_index; 867 868 _reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t)); 869 870 _reduce_vol_complete_req(req, 0); 871 } 872 873 static void 874 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 875 reduce_request_fn next_fn, bool is_write) 876 { 877 uint32_t i; 878 879 req->num_backing_ops = vol->backing_io_units_per_chunk; 880 req->backing_cb_args.cb_fn = next_fn; 881 req->backing_cb_args.cb_arg = req; 882 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 883 req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size; 884 req->buf_iov[i].iov_len = vol->params.backing_io_unit_size; 885 if (is_write) { 886 vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1, 887 req->chunk[i] * vol->backing_lba_per_io_unit, 888 vol->backing_lba_per_io_unit, &req->backing_cb_args); 889 } else { 890 vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1, 891 req->chunk[i] * vol->backing_lba_per_io_unit, 892 vol->backing_lba_per_io_unit, &req->backing_cb_args); 893 } 894 } 895 } 896 897 static void 898 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 899 { 900 struct spdk_reduce_vol *vol = req->vol; 901 uint32_t i; 902 903 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 904 905 /* TODO: fail if no chunk map found - but really this should not happen if we 906 * size the number of requests similarly to number of extra chunk maps 907 */ 908 assert(req->chunk_map_index != UINT32_MAX); 909 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 910 911 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 912 913 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 914 req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 915 /* TODO: fail if no backing block found - but really this should also not 916 * happen (see comment above). 917 */ 918 assert(req->chunk[i] != UINT32_MAX); 919 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]); 920 } 921 922 _issue_backing_ops(req, vol, next_fn, true /* write */); 923 } 924 925 static void 926 _write_read_done(void *_req, int reduce_errno) 927 { 928 struct spdk_reduce_vol_request *req = _req; 929 uint64_t chunk_offset; 930 uint8_t *buf; 931 int i; 932 933 if (reduce_errno != 0) { 934 req->reduce_errno = reduce_errno; 935 } 936 937 assert(req->num_backing_ops > 0); 938 if (--req->num_backing_ops > 0) { 939 return; 940 } 941 942 if (req->reduce_errno != 0) { 943 _reduce_vol_complete_req(req, req->reduce_errno); 944 return; 945 } 946 947 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 948 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 949 for (i = 0; i < req->iovcnt; i++) { 950 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 951 buf += req->iov[i].iov_len; 952 } 953 954 _reduce_vol_write_chunk(req, _write_complete_req); 955 } 956 957 static void 958 _read_read_done(void *_req, int reduce_errno) 959 { 960 struct spdk_reduce_vol_request *req = _req; 961 uint64_t chunk_offset; 962 uint8_t *buf; 963 int i; 964 965 if (reduce_errno != 0) { 966 req->reduce_errno = reduce_errno; 967 } 968 969 assert(req->num_backing_ops > 0); 970 if (--req->num_backing_ops > 0) { 971 return; 972 } 973 974 if (req->reduce_errno != 0) { 975 _reduce_vol_complete_req(req, req->reduce_errno); 976 return; 977 } 978 979 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 980 buf = req->buf + chunk_offset * req->vol->params.logical_block_size; 981 for (i = 0; i < req->iovcnt; i++) { 982 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 983 buf += req->iov[i].iov_len; 984 } 985 _reduce_vol_complete_req(req, 0); 986 } 987 988 static void 989 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 990 { 991 struct spdk_reduce_vol *vol = req->vol; 992 uint64_t logical_map_index; 993 994 logical_map_index = req->offset / vol->logical_blocks_per_chunk; 995 req->chunk_map_index = vol->pm_logical_map[logical_map_index]; 996 assert(req->chunk_map_index != UINT32_MAX); 997 998 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 999 _issue_backing_ops(req, vol, next_fn, false /* read */); 1000 } 1001 1002 static bool 1003 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1004 uint64_t length) 1005 { 1006 uint64_t size = 0; 1007 int i; 1008 1009 for (i = 0; i < iovcnt; i++) { 1010 size += iov[i].iov_len; 1011 } 1012 1013 return size == (length * vol->params.logical_block_size); 1014 } 1015 1016 void 1017 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1018 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1019 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1020 { 1021 struct spdk_reduce_vol_request *req; 1022 uint64_t logical_map_index; 1023 int i; 1024 1025 if (length == 0) { 1026 cb_fn(cb_arg, 0); 1027 return; 1028 } 1029 1030 if (_request_spans_chunk_boundary(vol, offset, length)) { 1031 cb_fn(cb_arg, -EINVAL); 1032 return; 1033 } 1034 1035 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1036 cb_fn(cb_arg, -EINVAL); 1037 return; 1038 } 1039 1040 logical_map_index = offset / vol->logical_blocks_per_chunk; 1041 if (vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1042 /* 1043 * This chunk hasn't been allocated. So treat the data as all 1044 * zeroes for this chunk - do the memset and immediately complete 1045 * the operation. 1046 */ 1047 for (i = 0; i < iovcnt; i++) { 1048 memset(iov[i].iov_base, 0, iov[i].iov_len); 1049 } 1050 cb_fn(cb_arg, 0); 1051 return; 1052 } 1053 1054 req = TAILQ_FIRST(&vol->free_requests); 1055 if (req == NULL) { 1056 cb_fn(cb_arg, -ENOMEM); 1057 return; 1058 } 1059 1060 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1061 req->vol = vol; 1062 req->iov = iov; 1063 req->iovcnt = iovcnt; 1064 req->offset = offset; 1065 req->length = length; 1066 req->cb_fn = cb_fn; 1067 req->cb_arg = cb_arg; 1068 1069 _reduce_vol_read_chunk(req, _read_read_done); 1070 } 1071 1072 void 1073 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1074 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1075 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1076 { 1077 struct spdk_reduce_vol_request *req; 1078 uint64_t logical_map_index, chunk_offset; 1079 uint32_t lbsize, lb_per_chunk; 1080 int i; 1081 uint8_t *buf; 1082 1083 if (length == 0) { 1084 cb_fn(cb_arg, 0); 1085 return; 1086 } 1087 1088 if (_request_spans_chunk_boundary(vol, offset, length)) { 1089 cb_fn(cb_arg, -EINVAL); 1090 return; 1091 } 1092 1093 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1094 cb_fn(cb_arg, -EINVAL); 1095 return; 1096 } 1097 1098 req = TAILQ_FIRST(&vol->free_requests); 1099 if (req == NULL) { 1100 cb_fn(cb_arg, -ENOMEM); 1101 return; 1102 } 1103 1104 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1105 req->vol = vol; 1106 req->iov = iov; 1107 req->iovcnt = iovcnt; 1108 req->offset = offset; 1109 req->length = length; 1110 req->cb_fn = cb_fn; 1111 req->cb_arg = cb_arg; 1112 1113 logical_map_index = offset / vol->logical_blocks_per_chunk; 1114 if (vol->pm_logical_map[logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1115 /* Read old chunk, then overwrite with data from this write operation. 1116 * TODO: bypass reading old chunk if this write operation overwrites 1117 * the entire chunk. 1118 */ 1119 _reduce_vol_read_chunk(req, _write_read_done); 1120 return; 1121 } 1122 1123 buf = req->buf; 1124 lbsize = vol->params.logical_block_size; 1125 lb_per_chunk = vol->logical_blocks_per_chunk; 1126 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1127 chunk_offset = offset % lb_per_chunk; 1128 if (chunk_offset != 0) { 1129 memset(buf, 0, chunk_offset * lbsize); 1130 buf += chunk_offset * lbsize; 1131 } 1132 for (i = 0; i < iovcnt; i++) { 1133 memcpy(buf, iov[i].iov_base, iov[i].iov_len); 1134 buf += iov[i].iov_len; 1135 } 1136 chunk_offset += length; 1137 if (chunk_offset != lb_per_chunk) { 1138 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1139 } 1140 _reduce_vol_write_chunk(req, _write_complete_req); 1141 } 1142 1143 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1144