1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 #define REDUCE_IO_READV 1 82 #define REDUCE_IO_WRITEV 2 83 84 struct spdk_reduce_chunk_map { 85 uint32_t compressed_size; 86 uint32_t reserved; 87 uint64_t io_unit_index[0]; 88 }; 89 90 struct spdk_reduce_vol_request { 91 /** 92 * Scratch buffer used for uncompressed chunk. This is used for: 93 * 1) source buffer for compression operations 94 * 2) destination buffer for decompression operations 95 * 3) data buffer when writing uncompressed chunk to disk 96 * 4) data buffer when reading uncompressed chunk from disk 97 */ 98 uint8_t *decomp_buf; 99 struct iovec *decomp_buf_iov; 100 /** 101 * Scratch buffer used for compressed chunk. This is used for: 102 * 1) destination buffer for compression operations 103 * 2) source buffer for decompression operations 104 * 3) data buffer when writing compressed chunk to disk 105 * 4) data buffer when reading compressed chunk from disk 106 */ 107 uint8_t *comp_buf; 108 struct iovec *comp_buf_iov; 109 struct iovec *iov; 110 struct spdk_reduce_vol *vol; 111 int type; 112 int reduce_errno; 113 int iovcnt; 114 int num_backing_ops; 115 uint32_t num_io_units; 116 bool chunk_is_compressed; 117 uint64_t offset; 118 uint64_t logical_map_index; 119 uint64_t length; 120 uint64_t chunk_map_index; 121 struct spdk_reduce_chunk_map *chunk; 122 spdk_reduce_vol_op_complete cb_fn; 123 void *cb_arg; 124 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 125 struct spdk_reduce_vol_cb_args backing_cb_args; 126 }; 127 128 struct spdk_reduce_vol { 129 struct spdk_reduce_vol_params params; 130 uint32_t backing_io_units_per_chunk; 131 uint32_t backing_lba_per_io_unit; 132 uint32_t logical_blocks_per_chunk; 133 struct spdk_reduce_pm_file pm_file; 134 struct spdk_reduce_backing_dev *backing_dev; 135 struct spdk_reduce_vol_superblock *backing_super; 136 struct spdk_reduce_vol_superblock *pm_super; 137 uint64_t *pm_logical_map; 138 uint64_t *pm_chunk_maps; 139 140 struct spdk_bit_array *allocated_chunk_maps; 141 struct spdk_bit_array *allocated_backing_io_units; 142 143 struct spdk_reduce_vol_request *request_mem; 144 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 145 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 146 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 147 148 /* Single contiguous buffer used for all request buffers for this volume. */ 149 uint8_t *buf_mem; 150 struct iovec *buf_iov_mem; 151 }; 152 153 static void _start_readv_request(struct spdk_reduce_vol_request *req); 154 static void _start_writev_request(struct spdk_reduce_vol_request *req); 155 156 /* 157 * Allocate extra metadata chunks and corresponding backing io units to account for 158 * outstanding IO in worst case scenario where logical map is completely allocated 159 * and no data can be compressed. We need extra chunks in this case to handle 160 * in-flight writes since reduce never writes data in place. 161 */ 162 #define REDUCE_NUM_EXTRA_CHUNKS 128 163 164 static void 165 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 166 { 167 if (vol->pm_file.pm_is_pmem) { 168 pmem_persist(addr, len); 169 } else { 170 pmem_msync(addr, len); 171 } 172 } 173 174 static uint64_t 175 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 176 { 177 uint64_t chunks_in_logical_map, logical_map_size; 178 179 chunks_in_logical_map = vol_size / chunk_size; 180 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 181 182 /* Round up to next cacheline. */ 183 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 184 REDUCE_PM_SIZE_ALIGNMENT; 185 } 186 187 static uint64_t 188 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 189 { 190 uint64_t num_chunks; 191 192 num_chunks = vol_size / chunk_size; 193 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 194 195 return num_chunks; 196 } 197 198 static uint64_t 199 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 200 { 201 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 202 203 num_chunks = _get_total_chunks(vol_size, chunk_size); 204 io_units_per_chunk = chunk_size / backing_io_unit_size; 205 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 206 207 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 208 REDUCE_PM_SIZE_ALIGNMENT; 209 } 210 211 static inline uint32_t 212 _reduce_vol_get_chunk_struct_size(struct spdk_reduce_vol *vol) 213 { 214 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * vol->backing_io_units_per_chunk; 215 } 216 217 static struct spdk_reduce_chunk_map * 218 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 219 { 220 uintptr_t chunk_map_addr; 221 222 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 223 224 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 225 chunk_map_addr += chunk_map_index * _reduce_vol_get_chunk_struct_size(vol); 226 227 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 228 } 229 230 static int 231 _validate_vol_params(struct spdk_reduce_vol_params *params) 232 { 233 if (params->vol_size > 0) { 234 /** 235 * User does not pass in the vol size - it gets calculated by libreduce from 236 * values in this structure plus the size of the backing device. 237 */ 238 return -EINVAL; 239 } 240 241 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 242 params->logical_block_size == 0) { 243 return -EINVAL; 244 } 245 246 /* Chunk size must be an even multiple of the backing io unit size. */ 247 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 248 return -EINVAL; 249 } 250 251 /* Chunk size must be an even multiple of the logical block size. */ 252 if ((params->chunk_size % params->logical_block_size) != 0) { 253 return -1; 254 } 255 256 return 0; 257 } 258 259 static uint64_t 260 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 261 { 262 uint64_t num_chunks; 263 264 num_chunks = backing_dev_size / chunk_size; 265 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 266 return 0; 267 } 268 269 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 270 return num_chunks * chunk_size; 271 } 272 273 static uint64_t 274 _get_pm_file_size(struct spdk_reduce_vol_params *params) 275 { 276 uint64_t total_pm_size; 277 278 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 279 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 280 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 281 params->backing_io_unit_size); 282 return total_pm_size; 283 } 284 285 const struct spdk_uuid * 286 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 287 { 288 return &vol->params.uuid; 289 } 290 291 static void 292 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 293 { 294 /* Superblock is at the beginning of the pm file. */ 295 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 296 297 /* Logical map immediately follows the super block. */ 298 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 299 300 /* Chunks maps follow the logical map. */ 301 vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size); 302 } 303 304 /* We need 2 iovs during load - one for the superblock, another for the path */ 305 #define LOAD_IOV_COUNT 2 306 307 struct reduce_init_load_ctx { 308 struct spdk_reduce_vol *vol; 309 struct spdk_reduce_vol_cb_args backing_cb_args; 310 spdk_reduce_vol_op_with_handle_complete cb_fn; 311 void *cb_arg; 312 struct iovec iov[LOAD_IOV_COUNT]; 313 void *path; 314 }; 315 316 static int 317 _allocate_vol_requests(struct spdk_reduce_vol *vol) 318 { 319 struct spdk_reduce_vol_request *req; 320 int i; 321 322 /* Allocate 2x since we need buffers for both read/write and compress/decompress 323 * intermediate buffers. 324 */ 325 vol->buf_mem = spdk_dma_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 326 if (vol->buf_mem == NULL) { 327 return -ENOMEM; 328 } 329 330 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 331 if (vol->request_mem == NULL) { 332 spdk_dma_free(vol->buf_mem); 333 vol->buf_mem = NULL; 334 return -ENOMEM; 335 } 336 337 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 338 * buffers. 339 */ 340 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 341 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 342 if (vol->buf_iov_mem == NULL) { 343 free(vol->request_mem); 344 spdk_dma_free(vol->buf_mem); 345 vol->request_mem = NULL; 346 vol->buf_mem = NULL; 347 return -ENOMEM; 348 } 349 350 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 351 req = &vol->request_mem[i]; 352 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 353 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 354 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 355 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 356 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 357 } 358 359 return 0; 360 } 361 362 static void 363 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 364 { 365 if (ctx != NULL) { 366 spdk_dma_free(ctx->path); 367 free(ctx); 368 } 369 370 if (vol != NULL) { 371 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 372 spdk_dma_free(vol->backing_super); 373 spdk_bit_array_free(&vol->allocated_chunk_maps); 374 spdk_bit_array_free(&vol->allocated_backing_io_units); 375 free(vol->request_mem); 376 free(vol->buf_iov_mem); 377 spdk_dma_free(vol->buf_mem); 378 free(vol); 379 } 380 } 381 382 static void 383 _init_write_super_cpl(void *cb_arg, int reduce_errno) 384 { 385 struct reduce_init_load_ctx *init_ctx = cb_arg; 386 int rc; 387 388 rc = _allocate_vol_requests(init_ctx->vol); 389 if (rc != 0) { 390 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 391 _init_load_cleanup(init_ctx->vol, init_ctx); 392 return; 393 } 394 395 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 396 /* Only clean up the ctx - the vol has been passed to the application 397 * for use now that initialization was successful. 398 */ 399 _init_load_cleanup(NULL, init_ctx); 400 } 401 402 static void 403 _init_write_path_cpl(void *cb_arg, int reduce_errno) 404 { 405 struct reduce_init_load_ctx *init_ctx = cb_arg; 406 struct spdk_reduce_vol *vol = init_ctx->vol; 407 408 init_ctx->iov[0].iov_base = vol->backing_super; 409 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 410 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 411 init_ctx->backing_cb_args.cb_arg = init_ctx; 412 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 413 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 414 &init_ctx->backing_cb_args); 415 } 416 417 static int 418 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 419 { 420 uint64_t total_chunks, total_backing_io_units; 421 uint32_t i, num_metadata_io_units; 422 423 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 424 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 425 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 426 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 427 428 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 429 return -ENOMEM; 430 } 431 432 /* Set backing io unit bits associated with metadata. */ 433 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 434 vol->backing_dev->blocklen; 435 for (i = 0; i < num_metadata_io_units; i++) { 436 spdk_bit_array_set(vol->allocated_backing_io_units, i); 437 } 438 439 return 0; 440 } 441 442 void 443 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 444 struct spdk_reduce_backing_dev *backing_dev, 445 const char *pm_file_dir, 446 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 447 { 448 struct spdk_reduce_vol *vol; 449 struct reduce_init_load_ctx *init_ctx; 450 uint64_t backing_dev_size; 451 size_t mapped_len; 452 int dir_len, max_dir_len, rc; 453 454 /* We need to append a path separator and the UUID to the supplied 455 * path. 456 */ 457 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 458 dir_len = strnlen(pm_file_dir, max_dir_len); 459 /* Strip trailing slash if the user provided one - we will add it back 460 * later when appending the filename. 461 */ 462 if (pm_file_dir[dir_len - 1] == '/') { 463 dir_len--; 464 } 465 if (dir_len == max_dir_len) { 466 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 467 cb_fn(cb_arg, NULL, -EINVAL); 468 return; 469 } 470 471 rc = _validate_vol_params(params); 472 if (rc != 0) { 473 SPDK_ERRLOG("invalid vol params\n"); 474 cb_fn(cb_arg, NULL, rc); 475 return; 476 } 477 478 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 479 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 480 if (params->vol_size == 0) { 481 SPDK_ERRLOG("backing device is too small\n"); 482 cb_fn(cb_arg, NULL, -EINVAL); 483 return; 484 } 485 486 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 487 backing_dev->unmap == NULL) { 488 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 489 cb_fn(cb_arg, NULL, -EINVAL); 490 return; 491 } 492 493 vol = calloc(1, sizeof(*vol)); 494 if (vol == NULL) { 495 cb_fn(cb_arg, NULL, -ENOMEM); 496 return; 497 } 498 499 TAILQ_INIT(&vol->free_requests); 500 TAILQ_INIT(&vol->executing_requests); 501 TAILQ_INIT(&vol->queued_requests); 502 503 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 504 if (vol->backing_super == NULL) { 505 cb_fn(cb_arg, NULL, -ENOMEM); 506 _init_load_cleanup(vol, NULL); 507 return; 508 } 509 510 init_ctx = calloc(1, sizeof(*init_ctx)); 511 if (init_ctx == NULL) { 512 cb_fn(cb_arg, NULL, -ENOMEM); 513 _init_load_cleanup(vol, NULL); 514 return; 515 } 516 517 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 518 if (init_ctx->path == NULL) { 519 cb_fn(cb_arg, NULL, -ENOMEM); 520 _init_load_cleanup(vol, init_ctx); 521 return; 522 } 523 524 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 525 spdk_uuid_generate(¶ms->uuid); 526 } 527 528 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 529 vol->pm_file.path[dir_len] = '/'; 530 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 531 ¶ms->uuid); 532 vol->pm_file.size = _get_pm_file_size(params); 533 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 534 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 535 &mapped_len, &vol->pm_file.pm_is_pmem); 536 if (vol->pm_file.pm_buf == NULL) { 537 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 538 vol->pm_file.path, strerror(errno)); 539 cb_fn(cb_arg, NULL, -errno); 540 _init_load_cleanup(vol, init_ctx); 541 return; 542 } 543 544 if (vol->pm_file.size != mapped_len) { 545 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 546 vol->pm_file.size, mapped_len); 547 cb_fn(cb_arg, NULL, -ENOMEM); 548 _init_load_cleanup(vol, init_ctx); 549 return; 550 } 551 552 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 553 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 554 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 555 memcpy(&vol->params, params, sizeof(*params)); 556 557 vol->backing_dev = backing_dev; 558 559 rc = _allocate_bit_arrays(vol); 560 if (rc != 0) { 561 cb_fn(cb_arg, NULL, rc); 562 _init_load_cleanup(vol, init_ctx); 563 return; 564 } 565 566 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 567 sizeof(vol->backing_super->signature)); 568 memcpy(&vol->backing_super->params, params, sizeof(*params)); 569 570 _initialize_vol_pm_pointers(vol); 571 572 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 573 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 574 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 575 */ 576 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 577 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 578 579 init_ctx->vol = vol; 580 init_ctx->cb_fn = cb_fn; 581 init_ctx->cb_arg = cb_arg; 582 583 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 584 init_ctx->iov[0].iov_base = init_ctx->path; 585 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 586 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 587 init_ctx->backing_cb_args.cb_arg = init_ctx; 588 /* Write path to offset 4K on backing device - just after where the super 589 * block will be written. We wait until this is committed before writing the 590 * super block to guarantee we don't get the super block written without the 591 * the path if the system crashed in the middle of a write operation. 592 */ 593 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 594 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 595 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 596 &init_ctx->backing_cb_args); 597 } 598 599 static void 600 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 601 { 602 struct reduce_init_load_ctx *load_ctx = cb_arg; 603 struct spdk_reduce_vol *vol = load_ctx->vol; 604 uint64_t backing_dev_size; 605 uint64_t i, num_chunks, logical_map_index; 606 struct spdk_reduce_chunk_map *chunk; 607 size_t mapped_len; 608 uint32_t j; 609 int rc; 610 611 if (memcmp(vol->backing_super->signature, 612 SPDK_REDUCE_SIGNATURE, 613 sizeof(vol->backing_super->signature)) != 0) { 614 /* This backing device isn't a libreduce backing device. */ 615 rc = -EILSEQ; 616 goto error; 617 } 618 619 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 620 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 621 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 622 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 623 624 rc = _allocate_bit_arrays(vol); 625 if (rc != 0) { 626 goto error; 627 } 628 629 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 630 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 631 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 632 backing_dev_size); 633 rc = -EILSEQ; 634 goto error; 635 } 636 637 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 638 vol->pm_file.size = _get_pm_file_size(&vol->params); 639 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 640 &vol->pm_file.pm_is_pmem); 641 if (vol->pm_file.pm_buf == NULL) { 642 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 643 rc = -errno; 644 goto error; 645 } 646 647 if (vol->pm_file.size != mapped_len) { 648 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 649 vol->pm_file.size, mapped_len); 650 rc = -ENOMEM; 651 goto error; 652 } 653 654 rc = _allocate_vol_requests(vol); 655 if (rc != 0) { 656 goto error; 657 } 658 659 _initialize_vol_pm_pointers(vol); 660 661 num_chunks = vol->params.vol_size / vol->params.chunk_size; 662 for (i = 0; i < num_chunks; i++) { 663 logical_map_index = vol->pm_logical_map[i]; 664 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 665 continue; 666 } 667 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 668 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 669 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 670 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 671 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 672 } 673 } 674 } 675 676 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 677 /* Only clean up the ctx - the vol has been passed to the application 678 * for use now that volume load was successful. 679 */ 680 _init_load_cleanup(NULL, load_ctx); 681 return; 682 683 error: 684 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 685 _init_load_cleanup(vol, load_ctx); 686 } 687 688 void 689 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 690 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 691 { 692 struct spdk_reduce_vol *vol; 693 struct reduce_init_load_ctx *load_ctx; 694 695 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 696 backing_dev->unmap == NULL) { 697 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 698 cb_fn(cb_arg, NULL, -EINVAL); 699 return; 700 } 701 702 vol = calloc(1, sizeof(*vol)); 703 if (vol == NULL) { 704 cb_fn(cb_arg, NULL, -ENOMEM); 705 return; 706 } 707 708 TAILQ_INIT(&vol->free_requests); 709 TAILQ_INIT(&vol->executing_requests); 710 TAILQ_INIT(&vol->queued_requests); 711 712 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 713 if (vol->backing_super == NULL) { 714 _init_load_cleanup(vol, NULL); 715 cb_fn(cb_arg, NULL, -ENOMEM); 716 return; 717 } 718 719 vol->backing_dev = backing_dev; 720 721 load_ctx = calloc(1, sizeof(*load_ctx)); 722 if (load_ctx == NULL) { 723 _init_load_cleanup(vol, NULL); 724 cb_fn(cb_arg, NULL, -ENOMEM); 725 return; 726 } 727 728 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 729 if (load_ctx->path == NULL) { 730 _init_load_cleanup(vol, load_ctx); 731 cb_fn(cb_arg, NULL, -ENOMEM); 732 return; 733 } 734 735 load_ctx->vol = vol; 736 load_ctx->cb_fn = cb_fn; 737 load_ctx->cb_arg = cb_arg; 738 739 load_ctx->iov[0].iov_base = vol->backing_super; 740 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 741 load_ctx->iov[1].iov_base = load_ctx->path; 742 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 743 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 744 load_ctx->backing_cb_args.cb_arg = load_ctx; 745 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 746 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 747 vol->backing_dev->blocklen, 748 &load_ctx->backing_cb_args); 749 } 750 751 void 752 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 753 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 754 { 755 if (vol == NULL) { 756 /* This indicates a programming error. */ 757 assert(false); 758 cb_fn(cb_arg, -EINVAL); 759 return; 760 } 761 762 _init_load_cleanup(vol, NULL); 763 cb_fn(cb_arg, 0); 764 } 765 766 struct reduce_destroy_ctx { 767 spdk_reduce_vol_op_complete cb_fn; 768 void *cb_arg; 769 struct spdk_reduce_vol *vol; 770 struct spdk_reduce_vol_superblock *super; 771 struct iovec iov; 772 struct spdk_reduce_vol_cb_args backing_cb_args; 773 int reduce_errno; 774 char pm_path[REDUCE_PATH_MAX]; 775 }; 776 777 static void 778 destroy_unload_cpl(void *cb_arg, int reduce_errno) 779 { 780 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 781 782 if (destroy_ctx->reduce_errno == 0) { 783 if (unlink(destroy_ctx->pm_path)) { 784 SPDK_ERRLOG("%s could not be unlinked: %s\n", 785 destroy_ctx->pm_path, strerror(errno)); 786 } 787 } 788 789 /* Even if the unload somehow failed, we still pass the destroy_ctx 790 * reduce_errno since that indicates whether or not the volume was 791 * actually destroyed. 792 */ 793 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 794 spdk_dma_free(destroy_ctx->super); 795 free(destroy_ctx); 796 } 797 798 static void 799 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 800 { 801 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 802 struct spdk_reduce_vol *vol = destroy_ctx->vol; 803 804 destroy_ctx->reduce_errno = reduce_errno; 805 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 806 } 807 808 static void 809 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 810 { 811 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 812 813 if (reduce_errno != 0) { 814 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 815 spdk_dma_free(destroy_ctx->super); 816 free(destroy_ctx); 817 return; 818 } 819 820 destroy_ctx->vol = vol; 821 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 822 destroy_ctx->iov.iov_base = destroy_ctx->super; 823 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 824 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 825 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 826 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 827 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 828 &destroy_ctx->backing_cb_args); 829 } 830 831 void 832 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 833 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 834 { 835 struct reduce_destroy_ctx *destroy_ctx; 836 837 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 838 if (destroy_ctx == NULL) { 839 cb_fn(cb_arg, -ENOMEM); 840 return; 841 } 842 843 destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL); 844 if (destroy_ctx->super == NULL) { 845 free(destroy_ctx); 846 cb_fn(cb_arg, -ENOMEM); 847 return; 848 } 849 destroy_ctx->cb_fn = cb_fn; 850 destroy_ctx->cb_arg = cb_arg; 851 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 852 } 853 854 static bool 855 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 856 { 857 uint64_t start_chunk, end_chunk; 858 859 start_chunk = offset / vol->logical_blocks_per_chunk; 860 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 861 862 return (start_chunk != end_chunk); 863 } 864 865 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 866 867 static void 868 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 869 { 870 struct spdk_reduce_vol_request *next_req; 871 struct spdk_reduce_vol *vol = req->vol; 872 873 req->cb_fn(req->cb_arg, reduce_errno); 874 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 875 876 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 877 if (next_req->logical_map_index == req->logical_map_index) { 878 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 879 if (next_req->type == REDUCE_IO_READV) { 880 _start_readv_request(next_req); 881 } else { 882 assert(next_req->type == REDUCE_IO_WRITEV); 883 _start_writev_request(next_req); 884 } 885 break; 886 } 887 } 888 889 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 890 } 891 892 static void 893 _write_write_done(void *_req, int reduce_errno) 894 { 895 struct spdk_reduce_vol_request *req = _req; 896 struct spdk_reduce_vol *vol = req->vol; 897 uint64_t old_chunk_map_index; 898 struct spdk_reduce_chunk_map *old_chunk; 899 uint32_t i; 900 901 if (reduce_errno != 0) { 902 req->reduce_errno = reduce_errno; 903 } 904 905 assert(req->num_backing_ops > 0); 906 if (--req->num_backing_ops > 0) { 907 return; 908 } 909 910 if (req->reduce_errno != 0) { 911 _reduce_vol_complete_req(req, req->reduce_errno); 912 return; 913 } 914 915 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 916 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 917 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 918 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 919 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 920 break; 921 } 922 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 923 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 924 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 925 } 926 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 927 } 928 929 /* 930 * We don't need to persist the clearing of the old chunk map here. The old chunk map 931 * becomes invalid after we update the logical map, since the old chunk map will no 932 * longer have a reference to it in the logical map. 933 */ 934 935 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 936 _reduce_persist(vol, req->chunk, _reduce_vol_get_chunk_struct_size(vol)); 937 938 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 939 940 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 941 942 _reduce_vol_complete_req(req, 0); 943 } 944 945 static void 946 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 947 reduce_request_fn next_fn, bool is_write) 948 { 949 struct iovec *iov; 950 uint8_t *buf; 951 uint32_t i; 952 953 if (req->chunk_is_compressed) { 954 iov = req->comp_buf_iov; 955 buf = req->comp_buf; 956 } else { 957 iov = req->decomp_buf_iov; 958 buf = req->decomp_buf; 959 } 960 961 req->num_backing_ops = req->num_io_units; 962 req->backing_cb_args.cb_fn = next_fn; 963 req->backing_cb_args.cb_arg = req; 964 for (i = 0; i < req->num_io_units; i++) { 965 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 966 iov[i].iov_len = vol->params.backing_io_unit_size; 967 if (is_write) { 968 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 969 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 970 vol->backing_lba_per_io_unit, &req->backing_cb_args); 971 } else { 972 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 973 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 974 vol->backing_lba_per_io_unit, &req->backing_cb_args); 975 } 976 } 977 } 978 979 static void 980 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 981 uint32_t compressed_size) 982 { 983 struct spdk_reduce_vol *vol = req->vol; 984 uint32_t i; 985 986 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 987 988 /* TODO: fail if no chunk map found - but really this should not happen if we 989 * size the number of requests similarly to number of extra chunk maps 990 */ 991 assert(req->chunk_map_index != UINT32_MAX); 992 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 993 994 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 995 req->num_io_units = spdk_divide_round_up(compressed_size, 996 vol->params.backing_io_unit_size); 997 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 998 req->chunk->compressed_size = 999 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1000 1001 for (i = 0; i < req->num_io_units; i++) { 1002 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1003 /* TODO: fail if no backing block found - but really this should also not 1004 * happen (see comment above). 1005 */ 1006 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1007 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1008 } 1009 while (i < vol->backing_io_units_per_chunk) { 1010 req->chunk->io_unit_index[i++] = REDUCE_EMPTY_MAP_ENTRY; 1011 } 1012 1013 _issue_backing_ops(req, vol, next_fn, true /* write */); 1014 } 1015 1016 static void 1017 _write_compress_done(void *_req, int reduce_errno) 1018 { 1019 struct spdk_reduce_vol_request *req = _req; 1020 1021 /* Negative reduce_errno indicates failure for compression operations. 1022 * Just write the uncompressed data instead. Force this to happen 1023 * by just passing the full chunk size to _reduce_vol_write_chunk. 1024 * When it sees the data couldn't be compressed, it will just write 1025 * the uncompressed buffer to disk. 1026 */ 1027 if (reduce_errno < 0) { 1028 reduce_errno = req->vol->params.chunk_size; 1029 } 1030 1031 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1032 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1033 } 1034 1035 static void 1036 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1037 { 1038 struct spdk_reduce_vol *vol = req->vol; 1039 1040 req->backing_cb_args.cb_fn = next_fn; 1041 req->backing_cb_args.cb_arg = req; 1042 req->comp_buf_iov[0].iov_base = req->comp_buf; 1043 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1044 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1045 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1046 vol->backing_dev->compress(vol->backing_dev, 1047 req->decomp_buf_iov, 1, req->comp_buf_iov, 1, 1048 &req->backing_cb_args); 1049 } 1050 1051 static void 1052 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1053 { 1054 struct spdk_reduce_vol *vol = req->vol; 1055 1056 req->backing_cb_args.cb_fn = next_fn; 1057 req->backing_cb_args.cb_arg = req; 1058 req->comp_buf_iov[0].iov_base = req->comp_buf; 1059 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1060 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1061 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1062 vol->backing_dev->decompress(vol->backing_dev, 1063 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1064 &req->backing_cb_args); 1065 } 1066 1067 static void 1068 _write_decompress_done(void *_req, int reduce_errno) 1069 { 1070 struct spdk_reduce_vol_request *req = _req; 1071 struct spdk_reduce_vol *vol = req->vol; 1072 uint64_t chunk_offset; 1073 uint8_t *buf; 1074 int i; 1075 1076 /* Negative reduce_errno indicates failure for compression operations. */ 1077 if (reduce_errno < 0) { 1078 _reduce_vol_complete_req(req, reduce_errno); 1079 return; 1080 } 1081 1082 /* Positive reduce_errno indicates number of bytes in decompressed 1083 * buffer. This should equal the chunk size - otherwise that's another 1084 * type of failure. 1085 */ 1086 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1087 _reduce_vol_complete_req(req, -EIO); 1088 return; 1089 } 1090 1091 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1092 buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1093 for (i = 0; i < req->iovcnt; i++) { 1094 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 1095 buf += req->iov[i].iov_len; 1096 } 1097 1098 _reduce_vol_compress_chunk(req, _write_compress_done); 1099 } 1100 1101 static void 1102 _write_read_done(void *_req, int reduce_errno) 1103 { 1104 struct spdk_reduce_vol_request *req = _req; 1105 1106 if (reduce_errno != 0) { 1107 req->reduce_errno = reduce_errno; 1108 } 1109 1110 assert(req->num_backing_ops > 0); 1111 if (--req->num_backing_ops > 0) { 1112 return; 1113 } 1114 1115 if (req->reduce_errno != 0) { 1116 _reduce_vol_complete_req(req, req->reduce_errno); 1117 return; 1118 } 1119 1120 if (req->chunk_is_compressed) { 1121 _reduce_vol_decompress_chunk(req, _write_decompress_done); 1122 } else { 1123 _write_decompress_done(req, req->chunk->compressed_size); 1124 } 1125 } 1126 1127 static void 1128 _read_decompress_done(void *_req, int reduce_errno) 1129 { 1130 struct spdk_reduce_vol_request *req = _req; 1131 struct spdk_reduce_vol *vol = req->vol; 1132 uint64_t chunk_offset; 1133 uint8_t *buf; 1134 int i; 1135 1136 /* Negative reduce_errno indicates failure for compression operations. */ 1137 if (reduce_errno < 0) { 1138 _reduce_vol_complete_req(req, reduce_errno); 1139 return; 1140 } 1141 1142 /* Positive reduce_errno indicates number of bytes in decompressed 1143 * buffer. This should equal the chunk size - otherwise that's another 1144 * type of failure. 1145 */ 1146 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1147 _reduce_vol_complete_req(req, -EIO); 1148 return; 1149 } 1150 1151 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1152 buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1153 for (i = 0; i < req->iovcnt; i++) { 1154 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1155 buf += req->iov[i].iov_len; 1156 } 1157 _reduce_vol_complete_req(req, 0); 1158 } 1159 1160 static void 1161 _read_read_done(void *_req, int reduce_errno) 1162 { 1163 struct spdk_reduce_vol_request *req = _req; 1164 1165 if (reduce_errno != 0) { 1166 req->reduce_errno = reduce_errno; 1167 } 1168 1169 assert(req->num_backing_ops > 0); 1170 if (--req->num_backing_ops > 0) { 1171 return; 1172 } 1173 1174 if (req->reduce_errno != 0) { 1175 _reduce_vol_complete_req(req, req->reduce_errno); 1176 return; 1177 } 1178 1179 if (req->chunk_is_compressed) { 1180 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1181 } else { 1182 _read_decompress_done(req, req->chunk->compressed_size); 1183 } 1184 } 1185 1186 static void 1187 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1188 { 1189 struct spdk_reduce_vol *vol = req->vol; 1190 1191 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1192 assert(req->chunk_map_index != UINT32_MAX); 1193 1194 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1195 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1196 vol->params.backing_io_unit_size); 1197 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1198 1199 _issue_backing_ops(req, vol, next_fn, false /* read */); 1200 } 1201 1202 static bool 1203 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1204 uint64_t length) 1205 { 1206 uint64_t size = 0; 1207 int i; 1208 1209 for (i = 0; i < iovcnt; i++) { 1210 size += iov[i].iov_len; 1211 } 1212 1213 return size == (length * vol->params.logical_block_size); 1214 } 1215 1216 static bool 1217 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1218 { 1219 struct spdk_reduce_vol_request *req; 1220 1221 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1222 if (logical_map_index == req->logical_map_index) { 1223 return true; 1224 } 1225 } 1226 1227 return false; 1228 } 1229 1230 static void 1231 _start_readv_request(struct spdk_reduce_vol_request *req) 1232 { 1233 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1234 _reduce_vol_read_chunk(req, _read_read_done); 1235 } 1236 1237 void 1238 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1239 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1240 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1241 { 1242 struct spdk_reduce_vol_request *req; 1243 uint64_t logical_map_index; 1244 bool overlapped; 1245 int i; 1246 1247 if (length == 0) { 1248 cb_fn(cb_arg, 0); 1249 return; 1250 } 1251 1252 if (_request_spans_chunk_boundary(vol, offset, length)) { 1253 cb_fn(cb_arg, -EINVAL); 1254 return; 1255 } 1256 1257 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1258 cb_fn(cb_arg, -EINVAL); 1259 return; 1260 } 1261 1262 logical_map_index = offset / vol->logical_blocks_per_chunk; 1263 overlapped = _check_overlap(vol, logical_map_index); 1264 1265 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1266 /* 1267 * This chunk hasn't been allocated. So treat the data as all 1268 * zeroes for this chunk - do the memset and immediately complete 1269 * the operation. 1270 */ 1271 for (i = 0; i < iovcnt; i++) { 1272 memset(iov[i].iov_base, 0, iov[i].iov_len); 1273 } 1274 cb_fn(cb_arg, 0); 1275 return; 1276 } 1277 1278 req = TAILQ_FIRST(&vol->free_requests); 1279 if (req == NULL) { 1280 cb_fn(cb_arg, -ENOMEM); 1281 return; 1282 } 1283 1284 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1285 req->type = REDUCE_IO_READV; 1286 req->vol = vol; 1287 req->iov = iov; 1288 req->iovcnt = iovcnt; 1289 req->offset = offset; 1290 req->logical_map_index = logical_map_index; 1291 req->length = length; 1292 req->cb_fn = cb_fn; 1293 req->cb_arg = cb_arg; 1294 1295 if (!overlapped) { 1296 _start_readv_request(req); 1297 } else { 1298 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1299 } 1300 } 1301 1302 static void 1303 _start_writev_request(struct spdk_reduce_vol_request *req) 1304 { 1305 struct spdk_reduce_vol *vol = req->vol; 1306 uint64_t chunk_offset; 1307 uint32_t lbsize, lb_per_chunk; 1308 int i; 1309 uint8_t *buf; 1310 1311 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1312 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1313 /* Read old chunk, then overwrite with data from this write operation. 1314 * TODO: bypass reading old chunk if this write operation overwrites 1315 * the entire chunk. 1316 */ 1317 _reduce_vol_read_chunk(req, _write_read_done); 1318 return; 1319 } 1320 1321 buf = req->decomp_buf; 1322 lbsize = vol->params.logical_block_size; 1323 lb_per_chunk = vol->logical_blocks_per_chunk; 1324 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1325 chunk_offset = req->offset % lb_per_chunk; 1326 if (chunk_offset != 0) { 1327 memset(buf, 0, chunk_offset * lbsize); 1328 buf += chunk_offset * lbsize; 1329 } 1330 for (i = 0; i < req->iovcnt; i++) { 1331 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 1332 buf += req->iov[i].iov_len; 1333 } 1334 chunk_offset += req->length; 1335 if (chunk_offset != lb_per_chunk) { 1336 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1337 } 1338 _reduce_vol_compress_chunk(req, _write_compress_done); 1339 } 1340 1341 void 1342 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1343 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1344 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1345 { 1346 struct spdk_reduce_vol_request *req; 1347 uint64_t logical_map_index; 1348 bool overlapped; 1349 1350 if (length == 0) { 1351 cb_fn(cb_arg, 0); 1352 return; 1353 } 1354 1355 if (_request_spans_chunk_boundary(vol, offset, length)) { 1356 cb_fn(cb_arg, -EINVAL); 1357 return; 1358 } 1359 1360 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1361 cb_fn(cb_arg, -EINVAL); 1362 return; 1363 } 1364 1365 logical_map_index = offset / vol->logical_blocks_per_chunk; 1366 overlapped = _check_overlap(vol, logical_map_index); 1367 1368 req = TAILQ_FIRST(&vol->free_requests); 1369 if (req == NULL) { 1370 cb_fn(cb_arg, -ENOMEM); 1371 return; 1372 } 1373 1374 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1375 req->type = REDUCE_IO_WRITEV; 1376 req->vol = vol; 1377 req->iov = iov; 1378 req->iovcnt = iovcnt; 1379 req->offset = offset; 1380 req->logical_map_index = offset / vol->logical_blocks_per_chunk; 1381 req->length = length; 1382 req->cb_fn = cb_fn; 1383 req->cb_arg = cb_arg; 1384 1385 if (!overlapped) { 1386 _start_writev_request(req); 1387 } else { 1388 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1389 } 1390 } 1391 1392 const struct spdk_reduce_vol_params * 1393 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1394 { 1395 return &vol->params; 1396 } 1397 1398 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1399