1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 #define REDUCE_IO_READV 1 82 #define REDUCE_IO_WRITEV 2 83 84 struct spdk_reduce_chunk_map { 85 uint32_t compressed_size; 86 uint32_t reserved; 87 uint64_t io_unit_index[0]; 88 }; 89 90 struct spdk_reduce_vol_request { 91 /** 92 * Scratch buffer used for uncompressed chunk. This is used for: 93 * 1) source buffer for compression operations 94 * 2) destination buffer for decompression operations 95 * 3) data buffer when writing uncompressed chunk to disk 96 * 4) data buffer when reading uncompressed chunk from disk 97 */ 98 uint8_t *decomp_buf; 99 struct iovec *decomp_buf_iov; 100 /** 101 * Scratch buffer used for compressed chunk. This is used for: 102 * 1) destination buffer for compression operations 103 * 2) source buffer for decompression operations 104 * 3) data buffer when writing compressed chunk to disk 105 * 4) data buffer when reading compressed chunk from disk 106 */ 107 uint8_t *comp_buf; 108 struct iovec *comp_buf_iov; 109 struct iovec *iov; 110 struct spdk_reduce_vol *vol; 111 int type; 112 int reduce_errno; 113 int iovcnt; 114 int num_backing_ops; 115 uint32_t num_io_units; 116 bool chunk_is_compressed; 117 uint64_t offset; 118 uint64_t logical_map_index; 119 uint64_t length; 120 uint64_t chunk_map_index; 121 struct spdk_reduce_chunk_map *chunk; 122 spdk_reduce_vol_op_complete cb_fn; 123 void *cb_arg; 124 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 125 struct spdk_reduce_vol_cb_args backing_cb_args; 126 }; 127 128 struct spdk_reduce_vol { 129 struct spdk_reduce_vol_params params; 130 uint32_t backing_io_units_per_chunk; 131 uint32_t backing_lba_per_io_unit; 132 uint32_t logical_blocks_per_chunk; 133 struct spdk_reduce_pm_file pm_file; 134 struct spdk_reduce_backing_dev *backing_dev; 135 struct spdk_reduce_vol_superblock *backing_super; 136 struct spdk_reduce_vol_superblock *pm_super; 137 uint64_t *pm_logical_map; 138 uint64_t *pm_chunk_maps; 139 140 struct spdk_bit_array *allocated_chunk_maps; 141 struct spdk_bit_array *allocated_backing_io_units; 142 143 struct spdk_reduce_vol_request *request_mem; 144 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 145 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 146 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 147 148 /* Single contiguous buffer used for all request buffers for this volume. */ 149 uint8_t *buf_mem; 150 struct iovec *buf_iov_mem; 151 }; 152 153 static void _start_readv_request(struct spdk_reduce_vol_request *req); 154 static void _start_writev_request(struct spdk_reduce_vol_request *req); 155 156 /* 157 * Allocate extra metadata chunks and corresponding backing io units to account for 158 * outstanding IO in worst case scenario where logical map is completely allocated 159 * and no data can be compressed. We need extra chunks in this case to handle 160 * in-flight writes since reduce never writes data in place. 161 */ 162 #define REDUCE_NUM_EXTRA_CHUNKS 128 163 164 static void 165 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 166 { 167 if (vol->pm_file.pm_is_pmem) { 168 pmem_persist(addr, len); 169 } else { 170 pmem_msync(addr, len); 171 } 172 } 173 174 static uint64_t 175 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 176 { 177 uint64_t chunks_in_logical_map, logical_map_size; 178 179 chunks_in_logical_map = vol_size / chunk_size; 180 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 181 182 /* Round up to next cacheline. */ 183 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 184 REDUCE_PM_SIZE_ALIGNMENT; 185 } 186 187 static uint64_t 188 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 189 { 190 uint64_t num_chunks; 191 192 num_chunks = vol_size / chunk_size; 193 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 194 195 return num_chunks; 196 } 197 198 static uint64_t 199 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 200 { 201 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 202 203 num_chunks = _get_total_chunks(vol_size, chunk_size); 204 io_units_per_chunk = chunk_size / backing_io_unit_size; 205 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 206 207 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 208 REDUCE_PM_SIZE_ALIGNMENT; 209 } 210 211 static inline uint32_t 212 _reduce_vol_get_chunk_struct_size(struct spdk_reduce_vol *vol) 213 { 214 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * vol->backing_io_units_per_chunk; 215 } 216 217 static struct spdk_reduce_chunk_map * 218 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 219 { 220 uintptr_t chunk_map_addr; 221 222 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 223 224 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 225 chunk_map_addr += chunk_map_index * _reduce_vol_get_chunk_struct_size(vol); 226 227 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 228 } 229 230 static int 231 _validate_vol_params(struct spdk_reduce_vol_params *params) 232 { 233 if (params->vol_size > 0) { 234 /** 235 * User does not pass in the vol size - it gets calculated by libreduce from 236 * values in this structure plus the size of the backing device. 237 */ 238 return -EINVAL; 239 } 240 241 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 242 params->logical_block_size == 0) { 243 return -EINVAL; 244 } 245 246 /* Chunk size must be an even multiple of the backing io unit size. */ 247 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 248 return -EINVAL; 249 } 250 251 /* Chunk size must be an even multiple of the logical block size. */ 252 if ((params->chunk_size % params->logical_block_size) != 0) { 253 return -1; 254 } 255 256 return 0; 257 } 258 259 static uint64_t 260 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 261 { 262 uint64_t num_chunks; 263 264 num_chunks = backing_dev_size / chunk_size; 265 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 266 return 0; 267 } 268 269 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 270 return num_chunks * chunk_size; 271 } 272 273 static uint64_t 274 _get_pm_file_size(struct spdk_reduce_vol_params *params) 275 { 276 uint64_t total_pm_size; 277 278 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 279 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 280 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 281 params->backing_io_unit_size); 282 return total_pm_size; 283 } 284 285 const struct spdk_uuid * 286 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 287 { 288 return &vol->params.uuid; 289 } 290 291 static void 292 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 293 { 294 uint64_t logical_map_size; 295 296 /* Superblock is at the beginning of the pm file. */ 297 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 298 299 /* Logical map immediately follows the super block. */ 300 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 301 302 /* Chunks maps follow the logical map. */ 303 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 304 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 305 } 306 307 /* We need 2 iovs during load - one for the superblock, another for the path */ 308 #define LOAD_IOV_COUNT 2 309 310 struct reduce_init_load_ctx { 311 struct spdk_reduce_vol *vol; 312 struct spdk_reduce_vol_cb_args backing_cb_args; 313 spdk_reduce_vol_op_with_handle_complete cb_fn; 314 void *cb_arg; 315 struct iovec iov[LOAD_IOV_COUNT]; 316 void *path; 317 }; 318 319 static int 320 _allocate_vol_requests(struct spdk_reduce_vol *vol) 321 { 322 struct spdk_reduce_vol_request *req; 323 int i; 324 325 /* Allocate 2x since we need buffers for both read/write and compress/decompress 326 * intermediate buffers. 327 */ 328 vol->buf_mem = spdk_dma_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL); 329 if (vol->buf_mem == NULL) { 330 return -ENOMEM; 331 } 332 333 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 334 if (vol->request_mem == NULL) { 335 spdk_dma_free(vol->buf_mem); 336 vol->buf_mem = NULL; 337 return -ENOMEM; 338 } 339 340 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 341 * buffers. 342 */ 343 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 344 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 345 if (vol->buf_iov_mem == NULL) { 346 free(vol->request_mem); 347 spdk_dma_free(vol->buf_mem); 348 vol->request_mem = NULL; 349 vol->buf_mem = NULL; 350 return -ENOMEM; 351 } 352 353 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 354 req = &vol->request_mem[i]; 355 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 356 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 357 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 358 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 359 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 360 } 361 362 return 0; 363 } 364 365 static void 366 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 367 { 368 if (ctx != NULL) { 369 spdk_dma_free(ctx->path); 370 free(ctx); 371 } 372 373 if (vol != NULL) { 374 if (vol->pm_file.pm_buf != NULL) { 375 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 376 } 377 378 spdk_dma_free(vol->backing_super); 379 spdk_bit_array_free(&vol->allocated_chunk_maps); 380 spdk_bit_array_free(&vol->allocated_backing_io_units); 381 free(vol->request_mem); 382 free(vol->buf_iov_mem); 383 spdk_dma_free(vol->buf_mem); 384 free(vol); 385 } 386 } 387 388 static void 389 _init_write_super_cpl(void *cb_arg, int reduce_errno) 390 { 391 struct reduce_init_load_ctx *init_ctx = cb_arg; 392 int rc; 393 394 rc = _allocate_vol_requests(init_ctx->vol); 395 if (rc != 0) { 396 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 397 _init_load_cleanup(init_ctx->vol, init_ctx); 398 return; 399 } 400 401 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 402 /* Only clean up the ctx - the vol has been passed to the application 403 * for use now that initialization was successful. 404 */ 405 _init_load_cleanup(NULL, init_ctx); 406 } 407 408 static void 409 _init_write_path_cpl(void *cb_arg, int reduce_errno) 410 { 411 struct reduce_init_load_ctx *init_ctx = cb_arg; 412 struct spdk_reduce_vol *vol = init_ctx->vol; 413 414 init_ctx->iov[0].iov_base = vol->backing_super; 415 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 416 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 417 init_ctx->backing_cb_args.cb_arg = init_ctx; 418 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 419 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 420 &init_ctx->backing_cb_args); 421 } 422 423 static int 424 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 425 { 426 uint64_t total_chunks, total_backing_io_units; 427 uint32_t i, num_metadata_io_units; 428 429 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 430 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 431 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 432 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 433 434 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 435 return -ENOMEM; 436 } 437 438 /* Set backing io unit bits associated with metadata. */ 439 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 440 vol->backing_dev->blocklen; 441 for (i = 0; i < num_metadata_io_units; i++) { 442 spdk_bit_array_set(vol->allocated_backing_io_units, i); 443 } 444 445 return 0; 446 } 447 448 void 449 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 450 struct spdk_reduce_backing_dev *backing_dev, 451 const char *pm_file_dir, 452 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 453 { 454 struct spdk_reduce_vol *vol; 455 struct reduce_init_load_ctx *init_ctx; 456 uint64_t backing_dev_size; 457 size_t mapped_len; 458 int dir_len, max_dir_len, rc; 459 460 /* We need to append a path separator and the UUID to the supplied 461 * path. 462 */ 463 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 464 dir_len = strnlen(pm_file_dir, max_dir_len); 465 /* Strip trailing slash if the user provided one - we will add it back 466 * later when appending the filename. 467 */ 468 if (pm_file_dir[dir_len - 1] == '/') { 469 dir_len--; 470 } 471 if (dir_len == max_dir_len) { 472 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 473 cb_fn(cb_arg, NULL, -EINVAL); 474 return; 475 } 476 477 rc = _validate_vol_params(params); 478 if (rc != 0) { 479 SPDK_ERRLOG("invalid vol params\n"); 480 cb_fn(cb_arg, NULL, rc); 481 return; 482 } 483 484 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 485 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 486 if (params->vol_size == 0) { 487 SPDK_ERRLOG("backing device is too small\n"); 488 cb_fn(cb_arg, NULL, -EINVAL); 489 return; 490 } 491 492 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 493 backing_dev->unmap == NULL) { 494 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 495 cb_fn(cb_arg, NULL, -EINVAL); 496 return; 497 } 498 499 vol = calloc(1, sizeof(*vol)); 500 if (vol == NULL) { 501 cb_fn(cb_arg, NULL, -ENOMEM); 502 return; 503 } 504 505 TAILQ_INIT(&vol->free_requests); 506 TAILQ_INIT(&vol->executing_requests); 507 TAILQ_INIT(&vol->queued_requests); 508 509 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL); 510 if (vol->backing_super == NULL) { 511 cb_fn(cb_arg, NULL, -ENOMEM); 512 _init_load_cleanup(vol, NULL); 513 return; 514 } 515 516 init_ctx = calloc(1, sizeof(*init_ctx)); 517 if (init_ctx == NULL) { 518 cb_fn(cb_arg, NULL, -ENOMEM); 519 _init_load_cleanup(vol, NULL); 520 return; 521 } 522 523 init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL); 524 if (init_ctx->path == NULL) { 525 cb_fn(cb_arg, NULL, -ENOMEM); 526 _init_load_cleanup(vol, init_ctx); 527 return; 528 } 529 530 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 531 spdk_uuid_generate(¶ms->uuid); 532 } 533 534 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 535 vol->pm_file.path[dir_len] = '/'; 536 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 537 ¶ms->uuid); 538 vol->pm_file.size = _get_pm_file_size(params); 539 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 540 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 541 &mapped_len, &vol->pm_file.pm_is_pmem); 542 if (vol->pm_file.pm_buf == NULL) { 543 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 544 vol->pm_file.path, strerror(errno)); 545 cb_fn(cb_arg, NULL, -errno); 546 _init_load_cleanup(vol, init_ctx); 547 return; 548 } 549 550 if (vol->pm_file.size != mapped_len) { 551 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 552 vol->pm_file.size, mapped_len); 553 cb_fn(cb_arg, NULL, -ENOMEM); 554 _init_load_cleanup(vol, init_ctx); 555 return; 556 } 557 558 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 559 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 560 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 561 memcpy(&vol->params, params, sizeof(*params)); 562 563 vol->backing_dev = backing_dev; 564 565 rc = _allocate_bit_arrays(vol); 566 if (rc != 0) { 567 cb_fn(cb_arg, NULL, rc); 568 _init_load_cleanup(vol, init_ctx); 569 return; 570 } 571 572 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 573 sizeof(vol->backing_super->signature)); 574 memcpy(&vol->backing_super->params, params, sizeof(*params)); 575 576 _initialize_vol_pm_pointers(vol); 577 578 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 579 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 580 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 581 */ 582 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 583 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 584 585 init_ctx->vol = vol; 586 init_ctx->cb_fn = cb_fn; 587 init_ctx->cb_arg = cb_arg; 588 589 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 590 init_ctx->iov[0].iov_base = init_ctx->path; 591 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 592 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 593 init_ctx->backing_cb_args.cb_arg = init_ctx; 594 /* Write path to offset 4K on backing device - just after where the super 595 * block will be written. We wait until this is committed before writing the 596 * super block to guarantee we don't get the super block written without the 597 * the path if the system crashed in the middle of a write operation. 598 */ 599 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 600 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 601 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 602 &init_ctx->backing_cb_args); 603 } 604 605 static void 606 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 607 { 608 struct reduce_init_load_ctx *load_ctx = cb_arg; 609 struct spdk_reduce_vol *vol = load_ctx->vol; 610 uint64_t backing_dev_size; 611 uint64_t i, num_chunks, logical_map_index; 612 struct spdk_reduce_chunk_map *chunk; 613 size_t mapped_len; 614 uint32_t j; 615 int rc; 616 617 if (memcmp(vol->backing_super->signature, 618 SPDK_REDUCE_SIGNATURE, 619 sizeof(vol->backing_super->signature)) != 0) { 620 /* This backing device isn't a libreduce backing device. */ 621 rc = -EILSEQ; 622 goto error; 623 } 624 625 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 626 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 627 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 628 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 629 630 rc = _allocate_bit_arrays(vol); 631 if (rc != 0) { 632 goto error; 633 } 634 635 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 636 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 637 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 638 backing_dev_size); 639 rc = -EILSEQ; 640 goto error; 641 } 642 643 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 644 vol->pm_file.size = _get_pm_file_size(&vol->params); 645 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 646 &vol->pm_file.pm_is_pmem); 647 if (vol->pm_file.pm_buf == NULL) { 648 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 649 rc = -errno; 650 goto error; 651 } 652 653 if (vol->pm_file.size != mapped_len) { 654 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 655 vol->pm_file.size, mapped_len); 656 rc = -ENOMEM; 657 goto error; 658 } 659 660 rc = _allocate_vol_requests(vol); 661 if (rc != 0) { 662 goto error; 663 } 664 665 _initialize_vol_pm_pointers(vol); 666 667 num_chunks = vol->params.vol_size / vol->params.chunk_size; 668 for (i = 0; i < num_chunks; i++) { 669 logical_map_index = vol->pm_logical_map[i]; 670 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 671 continue; 672 } 673 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 674 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 675 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 676 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 677 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 678 } 679 } 680 } 681 682 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 683 /* Only clean up the ctx - the vol has been passed to the application 684 * for use now that volume load was successful. 685 */ 686 _init_load_cleanup(NULL, load_ctx); 687 return; 688 689 error: 690 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 691 _init_load_cleanup(vol, load_ctx); 692 } 693 694 void 695 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 696 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 697 { 698 struct spdk_reduce_vol *vol; 699 struct reduce_init_load_ctx *load_ctx; 700 701 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 702 backing_dev->unmap == NULL) { 703 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 704 cb_fn(cb_arg, NULL, -EINVAL); 705 return; 706 } 707 708 vol = calloc(1, sizeof(*vol)); 709 if (vol == NULL) { 710 cb_fn(cb_arg, NULL, -ENOMEM); 711 return; 712 } 713 714 TAILQ_INIT(&vol->free_requests); 715 TAILQ_INIT(&vol->executing_requests); 716 TAILQ_INIT(&vol->queued_requests); 717 718 vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL); 719 if (vol->backing_super == NULL) { 720 _init_load_cleanup(vol, NULL); 721 cb_fn(cb_arg, NULL, -ENOMEM); 722 return; 723 } 724 725 vol->backing_dev = backing_dev; 726 727 load_ctx = calloc(1, sizeof(*load_ctx)); 728 if (load_ctx == NULL) { 729 _init_load_cleanup(vol, NULL); 730 cb_fn(cb_arg, NULL, -ENOMEM); 731 return; 732 } 733 734 load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL); 735 if (load_ctx->path == NULL) { 736 _init_load_cleanup(vol, load_ctx); 737 cb_fn(cb_arg, NULL, -ENOMEM); 738 return; 739 } 740 741 load_ctx->vol = vol; 742 load_ctx->cb_fn = cb_fn; 743 load_ctx->cb_arg = cb_arg; 744 745 load_ctx->iov[0].iov_base = vol->backing_super; 746 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 747 load_ctx->iov[1].iov_base = load_ctx->path; 748 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 749 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 750 load_ctx->backing_cb_args.cb_arg = load_ctx; 751 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 752 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 753 vol->backing_dev->blocklen, 754 &load_ctx->backing_cb_args); 755 } 756 757 void 758 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 759 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 760 { 761 if (vol == NULL) { 762 /* This indicates a programming error. */ 763 assert(false); 764 cb_fn(cb_arg, -EINVAL); 765 return; 766 } 767 768 _init_load_cleanup(vol, NULL); 769 cb_fn(cb_arg, 0); 770 } 771 772 struct reduce_destroy_ctx { 773 spdk_reduce_vol_op_complete cb_fn; 774 void *cb_arg; 775 struct spdk_reduce_vol *vol; 776 struct spdk_reduce_vol_superblock *super; 777 struct iovec iov; 778 struct spdk_reduce_vol_cb_args backing_cb_args; 779 int reduce_errno; 780 char pm_path[REDUCE_PATH_MAX]; 781 }; 782 783 static void 784 destroy_unload_cpl(void *cb_arg, int reduce_errno) 785 { 786 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 787 788 if (destroy_ctx->reduce_errno == 0) { 789 if (unlink(destroy_ctx->pm_path)) { 790 SPDK_ERRLOG("%s could not be unlinked: %s\n", 791 destroy_ctx->pm_path, strerror(errno)); 792 } 793 } 794 795 /* Even if the unload somehow failed, we still pass the destroy_ctx 796 * reduce_errno since that indicates whether or not the volume was 797 * actually destroyed. 798 */ 799 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 800 spdk_dma_free(destroy_ctx->super); 801 free(destroy_ctx); 802 } 803 804 static void 805 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 806 { 807 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 808 struct spdk_reduce_vol *vol = destroy_ctx->vol; 809 810 destroy_ctx->reduce_errno = reduce_errno; 811 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 812 } 813 814 static void 815 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 816 { 817 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 818 819 if (reduce_errno != 0) { 820 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 821 spdk_dma_free(destroy_ctx->super); 822 free(destroy_ctx); 823 return; 824 } 825 826 destroy_ctx->vol = vol; 827 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 828 destroy_ctx->iov.iov_base = destroy_ctx->super; 829 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 830 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 831 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 832 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 833 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 834 &destroy_ctx->backing_cb_args); 835 } 836 837 void 838 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 839 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 840 { 841 struct reduce_destroy_ctx *destroy_ctx; 842 843 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 844 if (destroy_ctx == NULL) { 845 cb_fn(cb_arg, -ENOMEM); 846 return; 847 } 848 849 destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL); 850 if (destroy_ctx->super == NULL) { 851 free(destroy_ctx); 852 cb_fn(cb_arg, -ENOMEM); 853 return; 854 } 855 destroy_ctx->cb_fn = cb_fn; 856 destroy_ctx->cb_arg = cb_arg; 857 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 858 } 859 860 static bool 861 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 862 { 863 uint64_t start_chunk, end_chunk; 864 865 start_chunk = offset / vol->logical_blocks_per_chunk; 866 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 867 868 return (start_chunk != end_chunk); 869 } 870 871 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 872 873 static void 874 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 875 { 876 struct spdk_reduce_vol_request *next_req; 877 struct spdk_reduce_vol *vol = req->vol; 878 879 req->cb_fn(req->cb_arg, reduce_errno); 880 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 881 882 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 883 if (next_req->logical_map_index == req->logical_map_index) { 884 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 885 if (next_req->type == REDUCE_IO_READV) { 886 _start_readv_request(next_req); 887 } else { 888 assert(next_req->type == REDUCE_IO_WRITEV); 889 _start_writev_request(next_req); 890 } 891 break; 892 } 893 } 894 895 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 896 } 897 898 static void 899 _write_write_done(void *_req, int reduce_errno) 900 { 901 struct spdk_reduce_vol_request *req = _req; 902 struct spdk_reduce_vol *vol = req->vol; 903 uint64_t old_chunk_map_index; 904 struct spdk_reduce_chunk_map *old_chunk; 905 uint32_t i; 906 907 if (reduce_errno != 0) { 908 req->reduce_errno = reduce_errno; 909 } 910 911 assert(req->num_backing_ops > 0); 912 if (--req->num_backing_ops > 0) { 913 return; 914 } 915 916 if (req->reduce_errno != 0) { 917 _reduce_vol_complete_req(req, req->reduce_errno); 918 return; 919 } 920 921 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 922 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 923 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 924 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 925 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 926 break; 927 } 928 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 929 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 930 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 931 } 932 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 933 } 934 935 /* 936 * We don't need to persist the clearing of the old chunk map here. The old chunk map 937 * becomes invalid after we update the logical map, since the old chunk map will no 938 * longer have a reference to it in the logical map. 939 */ 940 941 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 942 _reduce_persist(vol, req->chunk, _reduce_vol_get_chunk_struct_size(vol)); 943 944 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 945 946 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 947 948 _reduce_vol_complete_req(req, 0); 949 } 950 951 static void 952 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 953 reduce_request_fn next_fn, bool is_write) 954 { 955 struct iovec *iov; 956 uint8_t *buf; 957 uint32_t i; 958 959 if (req->chunk_is_compressed) { 960 iov = req->comp_buf_iov; 961 buf = req->comp_buf; 962 } else { 963 iov = req->decomp_buf_iov; 964 buf = req->decomp_buf; 965 } 966 967 req->num_backing_ops = req->num_io_units; 968 req->backing_cb_args.cb_fn = next_fn; 969 req->backing_cb_args.cb_arg = req; 970 for (i = 0; i < req->num_io_units; i++) { 971 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 972 iov[i].iov_len = vol->params.backing_io_unit_size; 973 if (is_write) { 974 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 975 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 976 vol->backing_lba_per_io_unit, &req->backing_cb_args); 977 } else { 978 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 979 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 980 vol->backing_lba_per_io_unit, &req->backing_cb_args); 981 } 982 } 983 } 984 985 static void 986 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 987 uint32_t compressed_size) 988 { 989 struct spdk_reduce_vol *vol = req->vol; 990 uint32_t i; 991 992 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 993 994 /* TODO: fail if no chunk map found - but really this should not happen if we 995 * size the number of requests similarly to number of extra chunk maps 996 */ 997 assert(req->chunk_map_index != UINT32_MAX); 998 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 999 1000 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1001 req->num_io_units = spdk_divide_round_up(compressed_size, 1002 vol->params.backing_io_unit_size); 1003 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1004 req->chunk->compressed_size = 1005 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1006 1007 for (i = 0; i < req->num_io_units; i++) { 1008 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1009 /* TODO: fail if no backing block found - but really this should also not 1010 * happen (see comment above). 1011 */ 1012 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1013 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1014 } 1015 while (i < vol->backing_io_units_per_chunk) { 1016 req->chunk->io_unit_index[i++] = REDUCE_EMPTY_MAP_ENTRY; 1017 } 1018 1019 _issue_backing_ops(req, vol, next_fn, true /* write */); 1020 } 1021 1022 static void 1023 _write_compress_done(void *_req, int reduce_errno) 1024 { 1025 struct spdk_reduce_vol_request *req = _req; 1026 1027 /* Negative reduce_errno indicates failure for compression operations. 1028 * Just write the uncompressed data instead. Force this to happen 1029 * by just passing the full chunk size to _reduce_vol_write_chunk. 1030 * When it sees the data couldn't be compressed, it will just write 1031 * the uncompressed buffer to disk. 1032 */ 1033 if (reduce_errno < 0) { 1034 reduce_errno = req->vol->params.chunk_size; 1035 } 1036 1037 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1038 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1039 } 1040 1041 static void 1042 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1043 { 1044 struct spdk_reduce_vol *vol = req->vol; 1045 1046 req->backing_cb_args.cb_fn = next_fn; 1047 req->backing_cb_args.cb_arg = req; 1048 req->comp_buf_iov[0].iov_base = req->comp_buf; 1049 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1050 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1051 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1052 vol->backing_dev->compress(vol->backing_dev, 1053 req->decomp_buf_iov, 1, req->comp_buf_iov, 1, 1054 &req->backing_cb_args); 1055 } 1056 1057 static void 1058 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1059 { 1060 struct spdk_reduce_vol *vol = req->vol; 1061 1062 req->backing_cb_args.cb_fn = next_fn; 1063 req->backing_cb_args.cb_arg = req; 1064 req->comp_buf_iov[0].iov_base = req->comp_buf; 1065 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1066 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1067 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1068 vol->backing_dev->decompress(vol->backing_dev, 1069 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1070 &req->backing_cb_args); 1071 } 1072 1073 static void 1074 _write_decompress_done(void *_req, int reduce_errno) 1075 { 1076 struct spdk_reduce_vol_request *req = _req; 1077 struct spdk_reduce_vol *vol = req->vol; 1078 uint64_t chunk_offset; 1079 uint8_t *buf; 1080 int i; 1081 1082 /* Negative reduce_errno indicates failure for compression operations. */ 1083 if (reduce_errno < 0) { 1084 _reduce_vol_complete_req(req, reduce_errno); 1085 return; 1086 } 1087 1088 /* Positive reduce_errno indicates number of bytes in decompressed 1089 * buffer. This should equal the chunk size - otherwise that's another 1090 * type of failure. 1091 */ 1092 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1093 _reduce_vol_complete_req(req, -EIO); 1094 return; 1095 } 1096 1097 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1098 buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1099 for (i = 0; i < req->iovcnt; i++) { 1100 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 1101 buf += req->iov[i].iov_len; 1102 } 1103 1104 _reduce_vol_compress_chunk(req, _write_compress_done); 1105 } 1106 1107 static void 1108 _write_read_done(void *_req, int reduce_errno) 1109 { 1110 struct spdk_reduce_vol_request *req = _req; 1111 1112 if (reduce_errno != 0) { 1113 req->reduce_errno = reduce_errno; 1114 } 1115 1116 assert(req->num_backing_ops > 0); 1117 if (--req->num_backing_ops > 0) { 1118 return; 1119 } 1120 1121 if (req->reduce_errno != 0) { 1122 _reduce_vol_complete_req(req, req->reduce_errno); 1123 return; 1124 } 1125 1126 if (req->chunk_is_compressed) { 1127 _reduce_vol_decompress_chunk(req, _write_decompress_done); 1128 } else { 1129 _write_decompress_done(req, req->chunk->compressed_size); 1130 } 1131 } 1132 1133 static void 1134 _read_decompress_done(void *_req, int reduce_errno) 1135 { 1136 struct spdk_reduce_vol_request *req = _req; 1137 struct spdk_reduce_vol *vol = req->vol; 1138 uint64_t chunk_offset; 1139 uint8_t *buf; 1140 int i; 1141 1142 /* Negative reduce_errno indicates failure for compression operations. */ 1143 if (reduce_errno < 0) { 1144 _reduce_vol_complete_req(req, reduce_errno); 1145 return; 1146 } 1147 1148 /* Positive reduce_errno indicates number of bytes in decompressed 1149 * buffer. This should equal the chunk size - otherwise that's another 1150 * type of failure. 1151 */ 1152 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1153 _reduce_vol_complete_req(req, -EIO); 1154 return; 1155 } 1156 1157 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1158 buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1159 for (i = 0; i < req->iovcnt; i++) { 1160 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1161 buf += req->iov[i].iov_len; 1162 } 1163 _reduce_vol_complete_req(req, 0); 1164 } 1165 1166 static void 1167 _read_read_done(void *_req, int reduce_errno) 1168 { 1169 struct spdk_reduce_vol_request *req = _req; 1170 1171 if (reduce_errno != 0) { 1172 req->reduce_errno = reduce_errno; 1173 } 1174 1175 assert(req->num_backing_ops > 0); 1176 if (--req->num_backing_ops > 0) { 1177 return; 1178 } 1179 1180 if (req->reduce_errno != 0) { 1181 _reduce_vol_complete_req(req, req->reduce_errno); 1182 return; 1183 } 1184 1185 if (req->chunk_is_compressed) { 1186 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1187 } else { 1188 _read_decompress_done(req, req->chunk->compressed_size); 1189 } 1190 } 1191 1192 static void 1193 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1194 { 1195 struct spdk_reduce_vol *vol = req->vol; 1196 1197 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1198 assert(req->chunk_map_index != UINT32_MAX); 1199 1200 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1201 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1202 vol->params.backing_io_unit_size); 1203 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1204 1205 _issue_backing_ops(req, vol, next_fn, false /* read */); 1206 } 1207 1208 static bool 1209 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1210 uint64_t length) 1211 { 1212 uint64_t size = 0; 1213 int i; 1214 1215 for (i = 0; i < iovcnt; i++) { 1216 size += iov[i].iov_len; 1217 } 1218 1219 return size == (length * vol->params.logical_block_size); 1220 } 1221 1222 static bool 1223 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1224 { 1225 struct spdk_reduce_vol_request *req; 1226 1227 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1228 if (logical_map_index == req->logical_map_index) { 1229 return true; 1230 } 1231 } 1232 1233 return false; 1234 } 1235 1236 static void 1237 _start_readv_request(struct spdk_reduce_vol_request *req) 1238 { 1239 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1240 _reduce_vol_read_chunk(req, _read_read_done); 1241 } 1242 1243 void 1244 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1245 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1246 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1247 { 1248 struct spdk_reduce_vol_request *req; 1249 uint64_t logical_map_index; 1250 bool overlapped; 1251 int i; 1252 1253 if (length == 0) { 1254 cb_fn(cb_arg, 0); 1255 return; 1256 } 1257 1258 if (_request_spans_chunk_boundary(vol, offset, length)) { 1259 cb_fn(cb_arg, -EINVAL); 1260 return; 1261 } 1262 1263 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1264 cb_fn(cb_arg, -EINVAL); 1265 return; 1266 } 1267 1268 logical_map_index = offset / vol->logical_blocks_per_chunk; 1269 overlapped = _check_overlap(vol, logical_map_index); 1270 1271 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1272 /* 1273 * This chunk hasn't been allocated. So treat the data as all 1274 * zeroes for this chunk - do the memset and immediately complete 1275 * the operation. 1276 */ 1277 for (i = 0; i < iovcnt; i++) { 1278 memset(iov[i].iov_base, 0, iov[i].iov_len); 1279 } 1280 cb_fn(cb_arg, 0); 1281 return; 1282 } 1283 1284 req = TAILQ_FIRST(&vol->free_requests); 1285 if (req == NULL) { 1286 cb_fn(cb_arg, -ENOMEM); 1287 return; 1288 } 1289 1290 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1291 req->type = REDUCE_IO_READV; 1292 req->vol = vol; 1293 req->iov = iov; 1294 req->iovcnt = iovcnt; 1295 req->offset = offset; 1296 req->logical_map_index = logical_map_index; 1297 req->length = length; 1298 req->cb_fn = cb_fn; 1299 req->cb_arg = cb_arg; 1300 1301 if (!overlapped) { 1302 _start_readv_request(req); 1303 } else { 1304 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1305 } 1306 } 1307 1308 static void 1309 _start_writev_request(struct spdk_reduce_vol_request *req) 1310 { 1311 struct spdk_reduce_vol *vol = req->vol; 1312 uint64_t chunk_offset; 1313 uint32_t lbsize, lb_per_chunk; 1314 int i; 1315 uint8_t *buf; 1316 1317 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1318 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1319 /* Read old chunk, then overwrite with data from this write operation. 1320 * TODO: bypass reading old chunk if this write operation overwrites 1321 * the entire chunk. 1322 */ 1323 _reduce_vol_read_chunk(req, _write_read_done); 1324 return; 1325 } 1326 1327 buf = req->decomp_buf; 1328 lbsize = vol->params.logical_block_size; 1329 lb_per_chunk = vol->logical_blocks_per_chunk; 1330 /* Note: we must zero out parts of req->buf not specified by this write operation. */ 1331 chunk_offset = req->offset % lb_per_chunk; 1332 if (chunk_offset != 0) { 1333 memset(buf, 0, chunk_offset * lbsize); 1334 buf += chunk_offset * lbsize; 1335 } 1336 for (i = 0; i < req->iovcnt; i++) { 1337 memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len); 1338 buf += req->iov[i].iov_len; 1339 } 1340 chunk_offset += req->length; 1341 if (chunk_offset != lb_per_chunk) { 1342 memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize); 1343 } 1344 _reduce_vol_compress_chunk(req, _write_compress_done); 1345 } 1346 1347 void 1348 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1349 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1350 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1351 { 1352 struct spdk_reduce_vol_request *req; 1353 uint64_t logical_map_index; 1354 bool overlapped; 1355 1356 if (length == 0) { 1357 cb_fn(cb_arg, 0); 1358 return; 1359 } 1360 1361 if (_request_spans_chunk_boundary(vol, offset, length)) { 1362 cb_fn(cb_arg, -EINVAL); 1363 return; 1364 } 1365 1366 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1367 cb_fn(cb_arg, -EINVAL); 1368 return; 1369 } 1370 1371 logical_map_index = offset / vol->logical_blocks_per_chunk; 1372 overlapped = _check_overlap(vol, logical_map_index); 1373 1374 req = TAILQ_FIRST(&vol->free_requests); 1375 if (req == NULL) { 1376 cb_fn(cb_arg, -ENOMEM); 1377 return; 1378 } 1379 1380 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1381 req->type = REDUCE_IO_WRITEV; 1382 req->vol = vol; 1383 req->iov = iov; 1384 req->iovcnt = iovcnt; 1385 req->offset = offset; 1386 req->logical_map_index = logical_map_index; 1387 req->length = length; 1388 req->cb_fn = cb_fn; 1389 req->cb_arg = cb_arg; 1390 1391 if (!overlapped) { 1392 _start_writev_request(req); 1393 } else { 1394 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1395 } 1396 } 1397 1398 const struct spdk_reduce_vol_params * 1399 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1400 { 1401 return &vol->params; 1402 } 1403 1404 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1405 { 1406 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1407 uint32_t struct_size; 1408 uint64_t chunk_map_size; 1409 1410 SPDK_NOTICELOG("vol info:\n"); 1411 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1412 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1413 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1414 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1415 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1416 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1417 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1418 vol->params.vol_size / vol->params.chunk_size); 1419 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1420 vol->params.backing_io_unit_size); 1421 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1422 struct_size = _reduce_vol_get_chunk_struct_size(vol); 1423 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1424 1425 SPDK_NOTICELOG("pmem info:\n"); 1426 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1427 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1428 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1429 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1430 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1431 vol->params.chunk_size); 1432 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1433 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1434 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1435 vol->params.backing_io_unit_size); 1436 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1437 } 1438 1439 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1440