1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 #define REDUCE_IO_READV 1 82 #define REDUCE_IO_WRITEV 2 83 84 struct spdk_reduce_chunk_map { 85 uint32_t compressed_size; 86 uint32_t reserved; 87 uint64_t io_unit_index[0]; 88 }; 89 90 struct spdk_reduce_vol_request { 91 /** 92 * Scratch buffer used for uncompressed chunk. This is used for: 93 * 1) source buffer for compression operations 94 * 2) destination buffer for decompression operations 95 * 3) data buffer when writing uncompressed chunk to disk 96 * 4) data buffer when reading uncompressed chunk from disk 97 */ 98 uint8_t *decomp_buf; 99 struct iovec *decomp_buf_iov; 100 /** 101 * Scratch buffer used for compressed chunk. This is used for: 102 * 1) destination buffer for compression operations 103 * 2) source buffer for decompression operations 104 * 3) data buffer when writing compressed chunk to disk 105 * 4) data buffer when reading compressed chunk from disk 106 */ 107 uint8_t *comp_buf; 108 struct iovec *comp_buf_iov; 109 struct iovec *iov; 110 struct spdk_reduce_vol *vol; 111 int type; 112 int reduce_errno; 113 int iovcnt; 114 int num_backing_ops; 115 uint32_t num_io_units; 116 bool chunk_is_compressed; 117 uint64_t offset; 118 uint64_t logical_map_index; 119 uint64_t length; 120 uint64_t chunk_map_index; 121 struct spdk_reduce_chunk_map *chunk; 122 spdk_reduce_vol_op_complete cb_fn; 123 void *cb_arg; 124 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 125 struct spdk_reduce_vol_cb_args backing_cb_args; 126 }; 127 128 struct spdk_reduce_vol { 129 struct spdk_reduce_vol_params params; 130 uint32_t backing_io_units_per_chunk; 131 uint32_t backing_lba_per_io_unit; 132 uint32_t logical_blocks_per_chunk; 133 struct spdk_reduce_pm_file pm_file; 134 struct spdk_reduce_backing_dev *backing_dev; 135 struct spdk_reduce_vol_superblock *backing_super; 136 struct spdk_reduce_vol_superblock *pm_super; 137 uint64_t *pm_logical_map; 138 uint64_t *pm_chunk_maps; 139 140 struct spdk_bit_array *allocated_chunk_maps; 141 struct spdk_bit_array *allocated_backing_io_units; 142 143 struct spdk_reduce_vol_request *request_mem; 144 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 145 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 146 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 147 148 /* Single contiguous buffer used for all request buffers for this volume. */ 149 uint8_t *buf_mem; 150 struct iovec *buf_iov_mem; 151 }; 152 153 static void _start_readv_request(struct spdk_reduce_vol_request *req); 154 static void _start_writev_request(struct spdk_reduce_vol_request *req); 155 156 /* 157 * Allocate extra metadata chunks and corresponding backing io units to account for 158 * outstanding IO in worst case scenario where logical map is completely allocated 159 * and no data can be compressed. We need extra chunks in this case to handle 160 * in-flight writes since reduce never writes data in place. 161 */ 162 #define REDUCE_NUM_EXTRA_CHUNKS 128 163 164 static void 165 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 166 { 167 if (vol->pm_file.pm_is_pmem) { 168 pmem_persist(addr, len); 169 } else { 170 pmem_msync(addr, len); 171 } 172 } 173 174 static uint64_t 175 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 176 { 177 uint64_t chunks_in_logical_map, logical_map_size; 178 179 chunks_in_logical_map = vol_size / chunk_size; 180 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 181 182 /* Round up to next cacheline. */ 183 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 184 REDUCE_PM_SIZE_ALIGNMENT; 185 } 186 187 static uint64_t 188 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 189 { 190 uint64_t num_chunks; 191 192 num_chunks = vol_size / chunk_size; 193 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 194 195 return num_chunks; 196 } 197 198 static uint64_t 199 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 200 { 201 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 202 203 num_chunks = _get_total_chunks(vol_size, chunk_size); 204 io_units_per_chunk = chunk_size / backing_io_unit_size; 205 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 206 207 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 208 REDUCE_PM_SIZE_ALIGNMENT; 209 } 210 211 static inline uint32_t 212 _reduce_vol_get_chunk_struct_size(struct spdk_reduce_vol *vol) 213 { 214 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * vol->backing_io_units_per_chunk; 215 } 216 217 static struct spdk_reduce_chunk_map * 218 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 219 { 220 uintptr_t chunk_map_addr; 221 222 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 223 224 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 225 chunk_map_addr += chunk_map_index * _reduce_vol_get_chunk_struct_size(vol); 226 227 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 228 } 229 230 static int 231 _validate_vol_params(struct spdk_reduce_vol_params *params) 232 { 233 if (params->vol_size > 0) { 234 /** 235 * User does not pass in the vol size - it gets calculated by libreduce from 236 * values in this structure plus the size of the backing device. 237 */ 238 return -EINVAL; 239 } 240 241 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 242 params->logical_block_size == 0) { 243 return -EINVAL; 244 } 245 246 /* Chunk size must be an even multiple of the backing io unit size. */ 247 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 248 return -EINVAL; 249 } 250 251 /* Chunk size must be an even multiple of the logical block size. */ 252 if ((params->chunk_size % params->logical_block_size) != 0) { 253 return -1; 254 } 255 256 return 0; 257 } 258 259 static uint64_t 260 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 261 { 262 uint64_t num_chunks; 263 264 num_chunks = backing_dev_size / chunk_size; 265 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 266 return 0; 267 } 268 269 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 270 return num_chunks * chunk_size; 271 } 272 273 static uint64_t 274 _get_pm_file_size(struct spdk_reduce_vol_params *params) 275 { 276 uint64_t total_pm_size; 277 278 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 279 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 280 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 281 params->backing_io_unit_size); 282 return total_pm_size; 283 } 284 285 const struct spdk_uuid * 286 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 287 { 288 return &vol->params.uuid; 289 } 290 291 static void 292 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 293 { 294 /* Superblock is at the beginning of the pm file. */ 295 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 296 297 /* Logical map immediately follows the super block. */ 298 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 299 300 /* Chunks maps follow the logical map. */ 301 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 302 } 303 304 /* We need 2 iovs during load - one for the superblock, another for the path */ 305 #define LOAD_IOV_COUNT 2 306 307 struct reduce_init_load_ctx { 308 struct spdk_reduce_vol *vol; 309 struct spdk_reduce_vol_cb_args backing_cb_args; 310 spdk_reduce_vol_op_with_handle_complete cb_fn; 311 void *cb_arg; 312 struct iovec iov[LOAD_IOV_COUNT]; 313 void *path; 314 }; 315 316 static int 317 _allocate_vol_requests(struct spdk_reduce_vol *vol) 318 { 319 struct spdk_reduce_vol_request *req; 320 int i; 321 322 /* Allocate 2x since we need buffers for both read/write and compress/decompress 323 * intermediate buffers. 324 */ 325 vol->buf_mem = spdk_dma_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 326 if (vol->buf_mem == NULL) { 327 return -ENOMEM; 328 } 329 330 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 331 if (vol->request_mem == NULL) { 332 spdk_dma_free(vol->buf_mem); 333 vol->buf_mem = NULL; 334 return -ENOMEM; 335 } 336 337 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 338 * buffers. 339 */ 340 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 341 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 342 if (vol->buf_iov_mem == NULL) { 343 free(vol->request_mem); 344 spdk_dma_free(vol->buf_mem); 345 vol->request_mem = NULL; 346 vol->buf_mem = NULL; 347 return -ENOMEM; 348 } 349 350 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 351 req = &vol->request_mem[i]; 352 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 353 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 354 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 355 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 356 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 357 } 358 359 return 0; 360 } 361 362 static void 363 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 364 { 365 if (ctx != NULL) { 366 spdk_dma_free(ctx->path); 367 free(ctx); 368 } 369 370 if (vol != NULL) { 371 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 372 spdk_dma_free(vol->backing_super); 373 spdk_bit_array_free(&vol->allocated_chunk_maps); 374 spdk_bit_array_free(&vol->allocated_backing_io_units); 375 free(vol->request_mem); 376 free(vol->buf_iov_mem); 377 spdk_dma_free(vol->buf_mem); 378 free(vol); 379 } 380 } 381 382 static void 383 _init_write_super_cpl(void *cb_arg, int reduce_errno) 384 { 385 struct reduce_init_load_ctx *init_ctx = cb_arg; 386 int rc; 387 388 rc = _allocate_vol_requests(init_ctx->vol); 389 if (rc != 0) { 390 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 391 _init_load_cleanup(init_ctx->vol, init_ctx); 392 return; 393 } 394 395 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 396 /* Only clean up the ctx - the vol has been passed to the application 397 * for use now that initialization was successful. 398 */ 399 _init_load_cleanup(NULL, init_ctx); 400 } 401 402 static void 403 _init_write_path_cpl(void *cb_arg, int reduce_errno) 404 { 405 struct reduce_init_load_ctx *init_ctx = cb_arg; 406 struct spdk_reduce_vol *vol = init_ctx->vol; 407 408 init_ctx->iov[0].iov_base = vol->backing_super; 409 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 410 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 411 init_ctx->backing_cb_args.cb_arg = init_ctx; 412 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 413 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 414 &init_ctx->backing_cb_args); 415 } 416 417 static int 418 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 419 { 420 uint64_t total_chunks, total_backing_io_units; 421 uint32_t i, num_metadata_io_units; 422 423 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 424 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 425 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 426 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 427 428 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 429 return -ENOMEM; 430 } 431 432 /* Set backing io unit bits associated with metadata. */ 433 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 434 vol->backing_dev->blocklen; 435 for (i = 0; i < num_metadata_io_units; i++) { 436 spdk_bit_array_set(vol->allocated_backing_io_units, i); 437 } 438 439 return 0; 440 } 441 442 void 443 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 444 struct spdk_reduce_backing_dev *backing_dev, 445 const char *pm_file_dir, 446 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 447 { 448 struct spdk_reduce_vol *vol; 449 struct reduce_init_load_ctx *init_ctx; 450 uint64_t backing_dev_size; 451 size_t mapped_len; 452 int dir_len, max_dir_len, rc; 453 454 /* We need to append a path separator and the UUID to the supplied 455 * path. 456 */ 457 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 458 dir_len = strnlen(pm_file_dir, max_dir_len); 459 /* Strip trailing slash if the user provided one - we will add it back 460 * later when appending the filename. 461 */ 462 if (pm_file_dir[dir_len - 1] == '/') { 463 dir_len--; 464 } 465 if (dir_len == max_dir_len) { 466 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 467 cb_fn(cb_arg, NULL, -EINVAL); 468 return; 469 } 470 471 rc = _validate_vol_params(params); 472 if (rc != 0) { 473 SPDK_ERRLOG("invalid vol params\n"); 474 cb_fn(cb_arg, NULL, rc); 475 return; 476 } 477 478 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 479 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 480 if (params->vol_size == 0) { 481 SPDK_ERRLOG("backing device is too small\n"); 482 cb_fn(cb_arg, NULL, -EINVAL); 483 return; 484 } 485 486 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 487 backing_dev->unmap == NULL) { 488 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 489 cb_fn(cb_arg, NULL, -EINVAL); 490 return; 491 } 492 493 vol = calloc(1, sizeof(*vol)); 494 if (vol == NULL) { 495 cb_fn(cb_arg, NULL, -ENOMEM); 496 return; 497 } 498 499 TAILQ_INIT(&vol->free_requests); 500 TAILQ_INIT(&vol->executing_requests); 501 TAILQ_INIT(&vol->queued_requests); 502 503 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 504 if (vol->backing_super == NULL) { 505 cb_fn(cb_arg, NULL, -ENOMEM); 506 _init_load_cleanup(vol, NULL); 507 return; 508 } 509 510 init_ctx = calloc(1, sizeof(*init_ctx)); 511 if (init_ctx == NULL) { 512 cb_fn(cb_arg, NULL, -ENOMEM); 513 _init_load_cleanup(vol, NULL); 514 return; 515 } 516 517 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 518 if (init_ctx->path == NULL) { 519 cb_fn(cb_arg, NULL, -ENOMEM); 520 _init_load_cleanup(vol, init_ctx); 521 return; 522 } 523 524 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 525 spdk_uuid_generate(¶ms->uuid); 526 } 527 528 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 529 vol->pm_file.path[dir_len] = '/'; 530 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 531 ¶ms->uuid); 532 vol->pm_file.size = _get_pm_file_size(params); 533 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 534 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 535 &mapped_len, &vol->pm_file.pm_is_pmem); 536 if (vol->pm_file.pm_buf == NULL) { 537 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 538 vol->pm_file.path, strerror(errno)); 539 cb_fn(cb_arg, NULL, -errno); 540 _init_load_cleanup(vol, init_ctx); 541 return; 542 } 543 544 if (vol->pm_file.size != mapped_len) { 545 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 546 vol->pm_file.size, mapped_len); 547 cb_fn(cb_arg, NULL, -ENOMEM); 548 _init_load_cleanup(vol, init_ctx); 549 return; 550 } 551 552 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 553 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 554 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 555 memcpy(&vol->params, params, sizeof(*params)); 556 557 vol->backing_dev = backing_dev; 558 559 rc = _allocate_bit_arrays(vol); 560 if (rc != 0) { 561 cb_fn(cb_arg, NULL, rc); 562 _init_load_cleanup(vol, init_ctx); 563 return; 564 } 565 566 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 567 sizeof(vol->backing_super->signature)); 568 memcpy(&vol->backing_super->params, params, sizeof(*params)); 569 570 _initialize_vol_pm_pointers(vol); 571 572 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 573 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 574 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 575 */ 576 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 577 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 578 579 init_ctx->vol = vol; 580 init_ctx->cb_fn = cb_fn; 581 init_ctx->cb_arg = cb_arg; 582 583 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 584 init_ctx->iov[0].iov_base = init_ctx->path; 585 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 586 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 587 init_ctx->backing_cb_args.cb_arg = init_ctx; 588 /* Write path to offset 4K on backing device - just after where the super 589 * block will be written. We wait until this is committed before writing the 590 * super block to guarantee we don't get the super block written without the 591 * the path if the system crashed in the middle of a write operation. 592 */ 593 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 594 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 595 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 596 &init_ctx->backing_cb_args); 597 } 598 599 static void 600 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 601 { 602 struct reduce_init_load_ctx *load_ctx = cb_arg; 603 struct spdk_reduce_vol *vol = load_ctx->vol; 604 uint64_t backing_dev_size; 605 uint64_t i, num_chunks, logical_map_index; 606 struct spdk_reduce_chunk_map *chunk; 607 size_t mapped_len; 608 uint32_t j; 609 int rc; 610 611 if (memcmp(vol->backing_super->signature, 612 SPDK_REDUCE_SIGNATURE, 613 sizeof(vol->backing_super->signature)) != 0) { 614 /* This backing device isn't a libreduce backing device. */ 615 rc = -EILSEQ; 616 goto error; 617 } 618 619 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 620 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 621 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 622 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 623 624 rc = _allocate_bit_arrays(vol); 625 if (rc != 0) { 626 goto error; 627 } 628 629 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 630 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 631 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 632 backing_dev_size); 633 rc = -EILSEQ; 634 goto error; 635 } 636 637 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 638 vol->pm_file.size = _get_pm_file_size(&vol->params); 639 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 640 &vol->pm_file.pm_is_pmem); 641 if (vol->pm_file.pm_buf == NULL) { 642 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 643 rc = -errno; 644 goto error; 645 } 646 647 if (vol->pm_file.size != mapped_len) { 648 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 649 vol->pm_file.size, mapped_len); 650 rc = -ENOMEM; 651 goto error; 652 } 653 654 rc = _allocate_vol_requests(vol); 655 if (rc != 0) { 656 goto error; 657 } 658 659 _initialize_vol_pm_pointers(vol); 660 661 num_chunks = vol->params.vol_size / vol->params.chunk_size; 662 for (i = 0; i < num_chunks; i++) { 663 logical_map_index = vol->pm_logical_map[i]; 664 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 665 continue; 666 } 667 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 668 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 669 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 670 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 671 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 672 } 673 } 674 } 675 676 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 677 /* Only clean up the ctx - the vol has been passed to the application 678 * for use now that volume load was successful. 679 */ 680 _init_load_cleanup(NULL, load_ctx); 681 return; 682 683 error: 684 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 685 _init_load_cleanup(vol, load_ctx); 686 } 687 688 void 689 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 690 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 691 { 692 struct spdk_reduce_vol *vol; 693 struct reduce_init_load_ctx *load_ctx; 694 695 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 696 backing_dev->unmap == NULL) { 697 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 698 cb_fn(cb_arg, NULL, -EINVAL); 699 return; 700 } 701 702 vol = calloc(1, sizeof(*vol)); 703 if (vol == NULL) { 704 cb_fn(cb_arg, NULL, -ENOMEM); 705 return; 706 } 707 708 TAILQ_INIT(&vol->free_requests); 709 TAILQ_INIT(&vol->executing_requests); 710 TAILQ_INIT(&vol->queued_requests); 711 712 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 713 if (vol->backing_super == NULL) { 714 _init_load_cleanup(vol, NULL); 715 cb_fn(cb_arg, NULL, -ENOMEM); 716 return; 717 } 718 719 vol->backing_dev = backing_dev; 720 721 load_ctx = calloc(1, sizeof(*load_ctx)); 722 if (load_ctx == NULL) { 723 _init_load_cleanup(vol, NULL); 724 cb_fn(cb_arg, NULL, -ENOMEM); 725 return; 726 } 727 728 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 729 if (load_ctx->path == NULL) { 730 _init_load_cleanup(vol, load_ctx); 731 cb_fn(cb_arg, NULL, -ENOMEM); 732 return; 733 } 734 735 load_ctx->vol = vol; 736 load_ctx->cb_fn = cb_fn; 737 load_ctx->cb_arg = cb_arg; 738 739 load_ctx->iov[0].iov_base = vol->backing_super; 740 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 741 load_ctx->iov[1].iov_base = load_ctx->path; 742 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 743 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 744 load_ctx->backing_cb_args.cb_arg = load_ctx; 745 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 746 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 747 vol->backing_dev->blocklen, 748 &load_ctx->backing_cb_args); 749 } 750 751 void 752 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 753 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 754 { 755 if (vol == NULL) { 756 /* This indicates a programming error. */ 757 assert(false); 758 cb_fn(cb_arg, -EINVAL); 759 return; 760 } 761 762 _init_load_cleanup(vol, NULL); 763 cb_fn(cb_arg, 0); 764 } 765 766 struct reduce_destroy_ctx { 767 spdk_reduce_vol_op_complete cb_fn; 768 void *cb_arg; 769 struct spdk_reduce_vol *vol; 770 struct spdk_reduce_vol_superblock *super; 771 struct iovec iov; 772 struct spdk_reduce_vol_cb_args backing_cb_args; 773 int reduce_errno; 774 char pm_path[REDUCE_PATH_MAX]; 775 }; 776 777 static void 778 destroy_unload_cpl(void *cb_arg, int reduce_errno) 779 { 780 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 781 782 if (destroy_ctx->reduce_errno == 0) { 783 if (unlink(destroy_ctx->pm_path)) { 784 SPDK_ERRLOG("%s could not be unlinked: %s\n", 785 destroy_ctx->pm_path, strerror(errno)); 786 } 787 } 788 789 /* Even if the unload somehow failed, we still pass the destroy_ctx 790 * reduce_errno since that indicates whether or not the volume was 791 * actually destroyed. 792 */ 793 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 794 spdk_dma_free(destroy_ctx->super); 795 free(destroy_ctx); 796 } 797 798 static void 799 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 800 { 801 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 802 struct spdk_reduce_vol *vol = destroy_ctx->vol; 803 804 destroy_ctx->reduce_errno = reduce_errno; 805 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 806 } 807 808 static void 809 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 810 { 811 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 812 813 if (reduce_errno != 0) { 814 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 815 spdk_dma_free(destroy_ctx->super); 816 free(destroy_ctx); 817 return; 818 } 819 820 destroy_ctx->vol = vol; 821 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 822 destroy_ctx->iov.iov_base = destroy_ctx->super; 823 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 824 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 825 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 826 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 827 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 828 &destroy_ctx->backing_cb_args); 829 } 830 831 void 832 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 833 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 834 { 835 struct reduce_destroy_ctx *destroy_ctx; 836 837 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 838 if (destroy_ctx == NULL) { 839 cb_fn(cb_arg, -ENOMEM); 840 return; 841 } 842 843 destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL); 844 if (destroy_ctx->super == NULL) { 845 free(destroy_ctx); 846 cb_fn(cb_arg, -ENOMEM); 847 return; 848 } 849 destroy_ctx->cb_fn = cb_fn; 850 destroy_ctx->cb_arg = cb_arg; 851 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 852 } 853 854 static bool 855 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 856 { 857 uint64_t start_chunk, end_chunk; 858 859 start_chunk = offset / vol->logical_blocks_per_chunk; 860 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 861 862 return (start_chunk != end_chunk); 863 } 864 865 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 866 867 static void 868 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 869 { 870 struct spdk_reduce_vol_request *next_req; 871 struct spdk_reduce_vol *vol = req->vol; 872 873 req->cb_fn(req->cb_arg, reduce_errno); 874 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 875 876 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 877 if (next_req->logical_map_index == req->logical_map_index) { 878 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 879 if (next_req->type == REDUCE_IO_READV) { 880 _start_readv_request(next_req); 881 } else { 882 assert(next_req->type == REDUCE_IO_WRITEV); 883 _start_writev_request(next_req); 884 } 885 break; 886 } 887 } 888 889 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 890 } 891 892 static void 893 _write_write_done(void *_req, int reduce_errno) 894 { 895 struct spdk_reduce_vol_request *req = _req; 896 struct spdk_reduce_vol *vol = req->vol; 897 uint64_t old_chunk_map_index; 898 struct spdk_reduce_chunk_map *old_chunk; 899 uint32_t i; 900 901 if (reduce_errno != 0) { 902 req->reduce_errno = reduce_errno; 903 } 904 905 assert(req->num_backing_ops > 0); 906 if (--req->num_backing_ops > 0) { 907 return; 908 } 909 910 if (req->reduce_errno != 0) { 911 _reduce_vol_complete_req(req, req->reduce_errno); 912 return; 913 } 914 915 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 916 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 917 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 918 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 919 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 920 break; 921 } 922 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 923 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 924 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 925 } 926 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 927 } 928 929 /* 930 * We don't need to persist the clearing of the old chunk map here. The old chunk map 931 * becomes invalid after we update the logical map, since the old chunk map will no 932 * longer have a reference to it in the logical map. 933 */ 934 935 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 936 _reduce_persist(vol, req->chunk, _reduce_vol_get_chunk_struct_size(vol)); 937 938 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 939 940 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 941 942 _reduce_vol_complete_req(req, 0); 943 } 944 945 static void 946 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 947 reduce_request_fn next_fn, bool is_write) 948 { 949 struct iovec *iov; 950 uint8_t *buf; 951 uint32_t i; 952 953 if (req->chunk_is_compressed) { 954 iov = req->comp_buf_iov; 955 buf = req->comp_buf; 956 } else { 957 iov = req->decomp_buf_iov; 958 buf = req->decomp_buf; 959 } 960 961 req->num_backing_ops = req->num_io_units; 962 req->backing_cb_args.cb_fn = next_fn; 963 req->backing_cb_args.cb_arg = req; 964 for (i = 0; i < req->num_io_units; i++) { 965 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 966 iov[i].iov_len = vol->params.backing_io_unit_size; 967 if (is_write) { 968 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 969 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 970 vol->backing_lba_per_io_unit, &req->backing_cb_args); 971 } else { 972 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 973 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 974 vol->backing_lba_per_io_unit, &req->backing_cb_args); 975 } 976 } 977 } 978 979 static void 980 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 981 uint32_t compressed_size) 982 { 983 struct spdk_reduce_vol *vol = req->vol; 984 uint32_t i; 985 986 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 987 988 /* TODO: fail if no chunk map found - but really this should not happen if we 989 * size the number of requests similarly to number of extra chunk maps 990 */ 991 assert(req->chunk_map_index != UINT32_MAX); 992 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 993 994 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 995 req->num_io_units = spdk_divide_round_up(compressed_size, 996 vol->params.backing_io_unit_size); 997 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 998 req->chunk->compressed_size = 999 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1000 1001 for (i = 0; i < req->num_io_units; i++) { 1002 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1003 /* TODO: fail if no backing block found - but really this should also not 1004 * happen (see comment above). 1005 */ 1006 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1007 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1008 } 1009 while (i < vol->backing_io_units_per_chunk) { 1010 req->chunk->io_unit_index[i++] = REDUCE_EMPTY_MAP_ENTRY; 1011 } 1012 1013 _issue_backing_ops(req, vol, next_fn, true /* write */); 1014 } 1015 1016 static void 1017 _write_compress_done(void *_req, int reduce_errno) 1018 { 1019 struct spdk_reduce_vol_request *req = _req; 1020 1021 /* Negative reduce_errno indicates failure for compression operations. */ 1022 if (reduce_errno < 0) { 1023 _reduce_vol_complete_req(req, reduce_errno); 1024 return; 1025 } 1026 1027 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1028 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1029 } 1030 1031 static void 1032 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1033 { 1034 struct spdk_reduce_vol *vol = req->vol; 1035 1036 req->backing_cb_args.cb_fn = next_fn; 1037 req->backing_cb_args.cb_arg = req; 1038 req->comp_buf_iov[0].iov_base = req->comp_buf; 1039 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1040 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1041 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1042 vol->backing_dev->compress(vol->backing_dev, 1043 req->decomp_buf_iov, 1, req->comp_buf_iov, 1, 1044 &req->backing_cb_args); 1045 } 1046 1047 static void 1048 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1049 { 1050 struct spdk_reduce_vol *vol = req->vol; 1051 1052 req->backing_cb_args.cb_fn = next_fn; 1053 req->backing_cb_args.cb_arg = req; 1054 req->comp_buf_iov[0].iov_base = req->comp_buf; 1055 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1056 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1057 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1058 vol->backing_dev->decompress(vol->backing_dev, 1059 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1060 &req->backing_cb_args); 1061 } 1062 1063 static void 1064 _write_decompress_done(void *_req, int reduce_errno) 1065 { 1066 struct spdk_reduce_vol_request *req = _req; 1067 struct spdk_reduce_vol *vol = req->vol; 1068 uint64_t chunk_offset; 1069 uint8_t *buf; 1070 int i; 1071 1072 /* Negative reduce_errno indicates failure for compression operations. */ 1073 if (reduce_errno < 0) { 1074 _reduce_vol_complete_req(req, reduce_errno); 1075 return; 1076 } 1077 1078 /* Positive reduce_errno indicates number of bytes in decompressed 1079 * buffer. This should equal the chunk size - otherwise that's another 1080 * type of failure. 1081 */ 1082 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1083 _reduce_vol_complete_req(req, -EIO); 1084 return; 1085 } 1086 1087 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1088 buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1089 for (i = 0; i < req->iovcnt; i++) { 1090 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 1091 buf += req->iov[i].iov_len; 1092 } 1093 1094 _reduce_vol_compress_chunk(req, _write_compress_done); 1095 } 1096 1097 static void 1098 _write_read_done(void *_req, int reduce_errno) 1099 { 1100 struct spdk_reduce_vol_request *req = _req; 1101 1102 if (reduce_errno != 0) { 1103 req->reduce_errno = reduce_errno; 1104 } 1105 1106 assert(req->num_backing_ops > 0); 1107 if (--req->num_backing_ops > 0) { 1108 return; 1109 } 1110 1111 if (req->reduce_errno != 0) { 1112 _reduce_vol_complete_req(req, req->reduce_errno); 1113 return; 1114 } 1115 1116 if (req->chunk_is_compressed) { 1117 _reduce_vol_decompress_chunk(req, _write_decompress_done); 1118 } else { 1119 _write_decompress_done(req, req->chunk->compressed_size); 1120 } 1121 } 1122 1123 static void 1124 _read_decompress_done(void *_req, int reduce_errno) 1125 { 1126 struct spdk_reduce_vol_request *req = _req; 1127 struct spdk_reduce_vol *vol = req->vol; 1128 uint64_t chunk_offset; 1129 uint8_t *buf; 1130 int i; 1131 1132 /* Negative reduce_errno indicates failure for compression operations. */ 1133 if (reduce_errno < 0) { 1134 _reduce_vol_complete_req(req, reduce_errno); 1135 return; 1136 } 1137 1138 /* Positive reduce_errno indicates number of bytes in decompressed 1139 * buffer. This should equal the chunk size - otherwise that's another 1140 * type of failure. 1141 */ 1142 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1143 _reduce_vol_complete_req(req, -EIO); 1144 return; 1145 } 1146 1147 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1148 buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1149 for (i = 0; i < req->iovcnt; i++) { 1150 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1151 buf += req->iov[i].iov_len; 1152 } 1153 _reduce_vol_complete_req(req, 0); 1154 } 1155 1156 static void 1157 _read_read_done(void *_req, int reduce_errno) 1158 { 1159 struct spdk_reduce_vol_request *req = _req; 1160 1161 if (reduce_errno != 0) { 1162 req->reduce_errno = reduce_errno; 1163 } 1164 1165 assert(req->num_backing_ops > 0); 1166 if (--req->num_backing_ops > 0) { 1167 return; 1168 } 1169 1170 if (req->reduce_errno != 0) { 1171 _reduce_vol_complete_req(req, req->reduce_errno); 1172 return; 1173 } 1174 1175 if (req->chunk_is_compressed) { 1176 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1177 } else { 1178 _read_decompress_done(req, req->chunk->compressed_size); 1179 } 1180 } 1181 1182 static void 1183 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1184 { 1185 struct spdk_reduce_vol *vol = req->vol; 1186 1187 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1188 assert(req->chunk_map_index != UINT32_MAX); 1189 1190 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1191 assert(req->chunk->compressed_size == vol->params.chunk_size); 1192 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1193 vol->params.backing_io_unit_size); 1194 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1195 1196 _issue_backing_ops(req, vol, next_fn, false /* read */); 1197 } 1198 1199 static bool 1200 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1201 uint64_t length) 1202 { 1203 uint64_t size = 0; 1204 int i; 1205 1206 for (i = 0; i < iovcnt; i++) { 1207 size += iov[i].iov_len; 1208 } 1209 1210 return size == (length * vol->params.logical_block_size); 1211 } 1212 1213 static bool 1214 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1215 { 1216 struct spdk_reduce_vol_request *req; 1217 1218 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1219 if (logical_map_index == req->logical_map_index) { 1220 return true; 1221 } 1222 } 1223 1224 return false; 1225 } 1226 1227 static void 1228 _start_readv_request(struct spdk_reduce_vol_request *req) 1229 { 1230 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1231 _reduce_vol_read_chunk(req, _read_read_done); 1232 } 1233 1234 void 1235 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1236 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1237 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1238 { 1239 struct spdk_reduce_vol_request *req; 1240 uint64_t logical_map_index; 1241 bool overlapped; 1242 int i; 1243 1244 if (length == 0) { 1245 cb_fn(cb_arg, 0); 1246 return; 1247 } 1248 1249 if (_request_spans_chunk_boundary(vol, offset, length)) { 1250 cb_fn(cb_arg, -EINVAL); 1251 return; 1252 } 1253 1254 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1255 cb_fn(cb_arg, -EINVAL); 1256 return; 1257 } 1258 1259 logical_map_index = offset / vol->logical_blocks_per_chunk; 1260 overlapped = _check_overlap(vol, logical_map_index); 1261 1262 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1263 /* 1264 * This chunk hasn't been allocated. So treat the data as all 1265 * zeroes for this chunk - do the memset and immediately complete 1266 * the operation. 1267 */ 1268 for (i = 0; i < iovcnt; i++) { 1269 memset(iov[i].iov_base, 0, iov[i].iov_len); 1270 } 1271 cb_fn(cb_arg, 0); 1272 return; 1273 } 1274 1275 req = TAILQ_FIRST(&vol->free_requests); 1276 if (req == NULL) { 1277 cb_fn(cb_arg, -ENOMEM); 1278 return; 1279 } 1280 1281 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1282 req->type = REDUCE_IO_READV; 1283 req->vol = vol; 1284 req->iov = iov; 1285 req->iovcnt = iovcnt; 1286 req->offset = offset; 1287 req->logical_map_index = logical_map_index; 1288 req->length = length; 1289 req->cb_fn = cb_fn; 1290 req->cb_arg = cb_arg; 1291 1292 if (!overlapped) { 1293 _start_readv_request(req); 1294 } else { 1295 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1296 } 1297 } 1298 1299 static void 1300 _start_writev_request(struct spdk_reduce_vol_request *req) 1301 { 1302 struct spdk_reduce_vol *vol = req->vol; 1303 uint64_t chunk_offset; 1304 uint32_t lbsize, lb_per_chunk; 1305 int i; 1306 uint8_t *buf; 1307 1308 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1309 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1310 /* Read old chunk, then overwrite with data from this write operation. 1311 * TODO: bypass reading old chunk if this write operation overwrites 1312 * the entire chunk. 1313 */ 1314 _reduce_vol_read_chunk(req, _write_read_done); 1315 return; 1316 } 1317 1318 buf = req->decomp_buf; 1319 lbsize = vol->params.logical_block_size; 1320 lb_per_chunk = vol->logical_blocks_per_chunk; 1321 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1322 chunk_offset = req->offset % lb_per_chunk; 1323 if (chunk_offset != 0) { 1324 memset(buf, 0, chunk_offset * lbsize); 1325 buf += chunk_offset * lbsize; 1326 } 1327 for (i = 0; i < req->iovcnt; i++) { 1328 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 1329 buf += req->iov[i].iov_len; 1330 } 1331 chunk_offset += req->length; 1332 if (chunk_offset != lb_per_chunk) { 1333 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1334 } 1335 _reduce_vol_compress_chunk(req, _write_compress_done); 1336 } 1337 1338 void 1339 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1340 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1341 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1342 { 1343 struct spdk_reduce_vol_request *req; 1344 uint64_t logical_map_index; 1345 bool overlapped; 1346 1347 if (length == 0) { 1348 cb_fn(cb_arg, 0); 1349 return; 1350 } 1351 1352 if (_request_spans_chunk_boundary(vol, offset, length)) { 1353 cb_fn(cb_arg, -EINVAL); 1354 return; 1355 } 1356 1357 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1358 cb_fn(cb_arg, -EINVAL); 1359 return; 1360 } 1361 1362 logical_map_index = offset / vol->logical_blocks_per_chunk; 1363 overlapped = _check_overlap(vol, logical_map_index); 1364 1365 req = TAILQ_FIRST(&vol->free_requests); 1366 if (req == NULL) { 1367 cb_fn(cb_arg, -ENOMEM); 1368 return; 1369 } 1370 1371 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1372 req->type = REDUCE_IO_WRITEV; 1373 req->vol = vol; 1374 req->iov = iov; 1375 req->iovcnt = iovcnt; 1376 req->offset = offset; 1377 req->logical_map_index = offset / vol->logical_blocks_per_chunk; 1378 req->length = length; 1379 req->cb_fn = cb_fn; 1380 req->cb_arg = cb_arg; 1381 1382 if (!overlapped) { 1383 _start_writev_request(req); 1384 } else { 1385 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1386 } 1387 } 1388 1389 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1390