1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include "spdk/stdinc.h" 36 37 #include "spdk/reduce.h" 38 #include "spdk/env.h" 39 #include "spdk/string.h" 40 #include "spdk/bit_array.h" 41 #include "spdk/util.h" 42 #include "spdk/log.h" 43 44 #include "libpmem.h" 45 46 /* Always round up the size of the PM region to the nearest cacheline. */ 47 #define REDUCE_PM_SIZE_ALIGNMENT 64 48 49 /* Offset into the backing device where the persistent memory file's path is stored. */ 50 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 51 52 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 53 54 #define REDUCE_NUM_VOL_REQUESTS 256 55 56 /* Structure written to offset 0 of both the pm file and the backing device. */ 57 struct spdk_reduce_vol_superblock { 58 uint8_t signature[8]; 59 struct spdk_reduce_vol_params params; 60 uint8_t reserved[4048]; 61 }; 62 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 63 64 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 65 /* null terminator counts one */ 66 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 67 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 68 69 #define REDUCE_PATH_MAX 4096 70 71 #define REDUCE_ZERO_BUF_SIZE 0x100000 72 73 /** 74 * Describes a persistent memory file used to hold metadata associated with a 75 * compressed volume. 76 */ 77 struct spdk_reduce_pm_file { 78 char path[REDUCE_PATH_MAX]; 79 void *pm_buf; 80 int pm_is_pmem; 81 uint64_t size; 82 }; 83 84 #define REDUCE_IO_READV 1 85 #define REDUCE_IO_WRITEV 2 86 87 struct spdk_reduce_chunk_map { 88 uint32_t compressed_size; 89 uint32_t reserved; 90 uint64_t io_unit_index[0]; 91 }; 92 93 struct spdk_reduce_vol_request { 94 /** 95 * Scratch buffer used for uncompressed chunk. This is used for: 96 * 1) source buffer for compression operations 97 * 2) destination buffer for decompression operations 98 * 3) data buffer when writing uncompressed chunk to disk 99 * 4) data buffer when reading uncompressed chunk from disk 100 */ 101 uint8_t *decomp_buf; 102 struct iovec *decomp_buf_iov; 103 104 /** 105 * These are used to construct the iovecs that are sent to 106 * the decomp engine, they point to a mix of the scratch buffer 107 * and user buffer 108 */ 109 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 110 int decomp_iovcnt; 111 112 /** 113 * Scratch buffer used for compressed chunk. This is used for: 114 * 1) destination buffer for compression operations 115 * 2) source buffer for decompression operations 116 * 3) data buffer when writing compressed chunk to disk 117 * 4) data buffer when reading compressed chunk from disk 118 */ 119 uint8_t *comp_buf; 120 struct iovec *comp_buf_iov; 121 struct iovec *iov; 122 bool rmw; 123 struct spdk_reduce_vol *vol; 124 int type; 125 int reduce_errno; 126 int iovcnt; 127 int num_backing_ops; 128 uint32_t num_io_units; 129 bool chunk_is_compressed; 130 bool copy_after_decompress; 131 uint64_t offset; 132 uint64_t logical_map_index; 133 uint64_t length; 134 uint64_t chunk_map_index; 135 struct spdk_reduce_chunk_map *chunk; 136 spdk_reduce_vol_op_complete cb_fn; 137 void *cb_arg; 138 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 139 struct spdk_reduce_vol_cb_args backing_cb_args; 140 }; 141 142 struct spdk_reduce_vol { 143 struct spdk_reduce_vol_params params; 144 uint32_t backing_io_units_per_chunk; 145 uint32_t backing_lba_per_io_unit; 146 uint32_t logical_blocks_per_chunk; 147 struct spdk_reduce_pm_file pm_file; 148 struct spdk_reduce_backing_dev *backing_dev; 149 struct spdk_reduce_vol_superblock *backing_super; 150 struct spdk_reduce_vol_superblock *pm_super; 151 uint64_t *pm_logical_map; 152 uint64_t *pm_chunk_maps; 153 154 struct spdk_bit_array *allocated_chunk_maps; 155 struct spdk_bit_array *allocated_backing_io_units; 156 157 struct spdk_reduce_vol_request *request_mem; 158 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 159 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 160 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 161 162 /* Single contiguous buffer used for all request buffers for this volume. */ 163 uint8_t *buf_mem; 164 struct iovec *buf_iov_mem; 165 }; 166 167 static void _start_readv_request(struct spdk_reduce_vol_request *req); 168 static void _start_writev_request(struct spdk_reduce_vol_request *req); 169 static uint8_t *g_zero_buf; 170 static int g_vol_count = 0; 171 172 /* 173 * Allocate extra metadata chunks and corresponding backing io units to account for 174 * outstanding IO in worst case scenario where logical map is completely allocated 175 * and no data can be compressed. We need extra chunks in this case to handle 176 * in-flight writes since reduce never writes data in place. 177 */ 178 #define REDUCE_NUM_EXTRA_CHUNKS 128 179 180 static void 181 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 182 { 183 if (vol->pm_file.pm_is_pmem) { 184 pmem_persist(addr, len); 185 } else { 186 pmem_msync(addr, len); 187 } 188 } 189 190 static uint64_t 191 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 192 { 193 uint64_t chunks_in_logical_map, logical_map_size; 194 195 chunks_in_logical_map = vol_size / chunk_size; 196 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 197 198 /* Round up to next cacheline. */ 199 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 200 REDUCE_PM_SIZE_ALIGNMENT; 201 } 202 203 static uint64_t 204 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 205 { 206 uint64_t num_chunks; 207 208 num_chunks = vol_size / chunk_size; 209 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 210 211 return num_chunks; 212 } 213 214 static inline uint32_t 215 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 216 { 217 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 218 } 219 220 static uint64_t 221 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 222 { 223 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 224 225 num_chunks = _get_total_chunks(vol_size, chunk_size); 226 io_units_per_chunk = chunk_size / backing_io_unit_size; 227 228 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 229 230 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 231 REDUCE_PM_SIZE_ALIGNMENT; 232 } 233 234 static struct spdk_reduce_chunk_map * 235 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 236 { 237 uintptr_t chunk_map_addr; 238 239 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 240 241 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 242 chunk_map_addr += chunk_map_index * 243 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 244 245 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 246 } 247 248 static int 249 _validate_vol_params(struct spdk_reduce_vol_params *params) 250 { 251 if (params->vol_size > 0) { 252 /** 253 * User does not pass in the vol size - it gets calculated by libreduce from 254 * values in this structure plus the size of the backing device. 255 */ 256 return -EINVAL; 257 } 258 259 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 260 params->logical_block_size == 0) { 261 return -EINVAL; 262 } 263 264 /* Chunk size must be an even multiple of the backing io unit size. */ 265 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 266 return -EINVAL; 267 } 268 269 /* Chunk size must be an even multiple of the logical block size. */ 270 if ((params->chunk_size % params->logical_block_size) != 0) { 271 return -1; 272 } 273 274 return 0; 275 } 276 277 static uint64_t 278 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 279 { 280 uint64_t num_chunks; 281 282 num_chunks = backing_dev_size / chunk_size; 283 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 284 return 0; 285 } 286 287 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 288 return num_chunks * chunk_size; 289 } 290 291 static uint64_t 292 _get_pm_file_size(struct spdk_reduce_vol_params *params) 293 { 294 uint64_t total_pm_size; 295 296 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 297 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 298 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 299 params->backing_io_unit_size); 300 return total_pm_size; 301 } 302 303 const struct spdk_uuid * 304 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 305 { 306 return &vol->params.uuid; 307 } 308 309 static void 310 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 311 { 312 uint64_t logical_map_size; 313 314 /* Superblock is at the beginning of the pm file. */ 315 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 316 317 /* Logical map immediately follows the super block. */ 318 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 319 320 /* Chunks maps follow the logical map. */ 321 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 322 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 323 } 324 325 /* We need 2 iovs during load - one for the superblock, another for the path */ 326 #define LOAD_IOV_COUNT 2 327 328 struct reduce_init_load_ctx { 329 struct spdk_reduce_vol *vol; 330 struct spdk_reduce_vol_cb_args backing_cb_args; 331 spdk_reduce_vol_op_with_handle_complete cb_fn; 332 void *cb_arg; 333 struct iovec iov[LOAD_IOV_COUNT]; 334 void *path; 335 }; 336 337 static int 338 _allocate_vol_requests(struct spdk_reduce_vol *vol) 339 { 340 struct spdk_reduce_vol_request *req; 341 int i; 342 343 /* Allocate 2x since we need buffers for both read/write and compress/decompress 344 * intermediate buffers. 345 */ 346 vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 347 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 348 if (vol->buf_mem == NULL) { 349 return -ENOMEM; 350 } 351 352 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 353 if (vol->request_mem == NULL) { 354 spdk_free(vol->buf_mem); 355 vol->buf_mem = NULL; 356 return -ENOMEM; 357 } 358 359 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 360 * buffers. 361 */ 362 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 363 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 364 if (vol->buf_iov_mem == NULL) { 365 free(vol->request_mem); 366 spdk_free(vol->buf_mem); 367 vol->request_mem = NULL; 368 vol->buf_mem = NULL; 369 return -ENOMEM; 370 } 371 372 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 373 req = &vol->request_mem[i]; 374 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 375 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 376 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 377 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 378 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 379 } 380 381 return 0; 382 } 383 384 static void 385 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 386 { 387 if (ctx != NULL) { 388 spdk_free(ctx->path); 389 free(ctx); 390 } 391 392 if (vol != NULL) { 393 if (vol->pm_file.pm_buf != NULL) { 394 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 395 } 396 397 spdk_free(vol->backing_super); 398 spdk_bit_array_free(&vol->allocated_chunk_maps); 399 spdk_bit_array_free(&vol->allocated_backing_io_units); 400 free(vol->request_mem); 401 free(vol->buf_iov_mem); 402 spdk_free(vol->buf_mem); 403 free(vol); 404 } 405 } 406 407 static int 408 _alloc_zero_buff(void) 409 { 410 int rc = 0; 411 412 /* The zero buffer is shared between all volumes and just used 413 * for reads so allocate one global instance here if not already 414 * allocated when another vol init'd or loaded. 415 */ 416 if (g_vol_count++ == 0) { 417 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 418 64, NULL, SPDK_ENV_LCORE_ID_ANY, 419 SPDK_MALLOC_DMA); 420 if (g_zero_buf == NULL) { 421 rc = -ENOMEM; 422 } 423 } 424 return rc; 425 } 426 427 static void 428 _init_write_super_cpl(void *cb_arg, int reduce_errno) 429 { 430 struct reduce_init_load_ctx *init_ctx = cb_arg; 431 int rc; 432 433 rc = _allocate_vol_requests(init_ctx->vol); 434 if (rc != 0) { 435 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 436 _init_load_cleanup(init_ctx->vol, init_ctx); 437 return; 438 } 439 440 rc = _alloc_zero_buff(); 441 if (rc != 0) { 442 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 443 _init_load_cleanup(init_ctx->vol, init_ctx); 444 return; 445 } 446 447 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 448 /* Only clean up the ctx - the vol has been passed to the application 449 * for use now that initialization was successful. 450 */ 451 _init_load_cleanup(NULL, init_ctx); 452 } 453 454 static void 455 _init_write_path_cpl(void *cb_arg, int reduce_errno) 456 { 457 struct reduce_init_load_ctx *init_ctx = cb_arg; 458 struct spdk_reduce_vol *vol = init_ctx->vol; 459 460 init_ctx->iov[0].iov_base = vol->backing_super; 461 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 462 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 463 init_ctx->backing_cb_args.cb_arg = init_ctx; 464 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 465 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 466 &init_ctx->backing_cb_args); 467 } 468 469 static int 470 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 471 { 472 uint64_t total_chunks, total_backing_io_units; 473 uint32_t i, num_metadata_io_units; 474 475 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 476 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 477 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 478 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 479 480 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 481 return -ENOMEM; 482 } 483 484 /* Set backing io unit bits associated with metadata. */ 485 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 486 vol->backing_dev->blocklen; 487 for (i = 0; i < num_metadata_io_units; i++) { 488 spdk_bit_array_set(vol->allocated_backing_io_units, i); 489 } 490 491 return 0; 492 } 493 494 void 495 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 496 struct spdk_reduce_backing_dev *backing_dev, 497 const char *pm_file_dir, 498 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 499 { 500 struct spdk_reduce_vol *vol; 501 struct reduce_init_load_ctx *init_ctx; 502 uint64_t backing_dev_size; 503 size_t mapped_len; 504 int dir_len, max_dir_len, rc; 505 506 /* We need to append a path separator and the UUID to the supplied 507 * path. 508 */ 509 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 510 dir_len = strnlen(pm_file_dir, max_dir_len); 511 /* Strip trailing slash if the user provided one - we will add it back 512 * later when appending the filename. 513 */ 514 if (pm_file_dir[dir_len - 1] == '/') { 515 dir_len--; 516 } 517 if (dir_len == max_dir_len) { 518 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 519 cb_fn(cb_arg, NULL, -EINVAL); 520 return; 521 } 522 523 rc = _validate_vol_params(params); 524 if (rc != 0) { 525 SPDK_ERRLOG("invalid vol params\n"); 526 cb_fn(cb_arg, NULL, rc); 527 return; 528 } 529 530 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 531 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 532 if (params->vol_size == 0) { 533 SPDK_ERRLOG("backing device is too small\n"); 534 cb_fn(cb_arg, NULL, -EINVAL); 535 return; 536 } 537 538 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 539 backing_dev->unmap == NULL) { 540 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 541 cb_fn(cb_arg, NULL, -EINVAL); 542 return; 543 } 544 545 vol = calloc(1, sizeof(*vol)); 546 if (vol == NULL) { 547 cb_fn(cb_arg, NULL, -ENOMEM); 548 return; 549 } 550 551 TAILQ_INIT(&vol->free_requests); 552 TAILQ_INIT(&vol->executing_requests); 553 TAILQ_INIT(&vol->queued_requests); 554 555 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 556 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 557 if (vol->backing_super == NULL) { 558 cb_fn(cb_arg, NULL, -ENOMEM); 559 _init_load_cleanup(vol, NULL); 560 return; 561 } 562 563 init_ctx = calloc(1, sizeof(*init_ctx)); 564 if (init_ctx == NULL) { 565 cb_fn(cb_arg, NULL, -ENOMEM); 566 _init_load_cleanup(vol, NULL); 567 return; 568 } 569 570 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 571 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 572 if (init_ctx->path == NULL) { 573 cb_fn(cb_arg, NULL, -ENOMEM); 574 _init_load_cleanup(vol, init_ctx); 575 return; 576 } 577 578 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 579 spdk_uuid_generate(¶ms->uuid); 580 } 581 582 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 583 vol->pm_file.path[dir_len] = '/'; 584 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 585 ¶ms->uuid); 586 vol->pm_file.size = _get_pm_file_size(params); 587 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 588 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 589 &mapped_len, &vol->pm_file.pm_is_pmem); 590 if (vol->pm_file.pm_buf == NULL) { 591 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 592 vol->pm_file.path, strerror(errno)); 593 cb_fn(cb_arg, NULL, -errno); 594 _init_load_cleanup(vol, init_ctx); 595 return; 596 } 597 598 if (vol->pm_file.size != mapped_len) { 599 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 600 vol->pm_file.size, mapped_len); 601 cb_fn(cb_arg, NULL, -ENOMEM); 602 _init_load_cleanup(vol, init_ctx); 603 return; 604 } 605 606 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 607 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 608 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 609 memcpy(&vol->params, params, sizeof(*params)); 610 611 vol->backing_dev = backing_dev; 612 613 rc = _allocate_bit_arrays(vol); 614 if (rc != 0) { 615 cb_fn(cb_arg, NULL, rc); 616 _init_load_cleanup(vol, init_ctx); 617 return; 618 } 619 620 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 621 sizeof(vol->backing_super->signature)); 622 memcpy(&vol->backing_super->params, params, sizeof(*params)); 623 624 _initialize_vol_pm_pointers(vol); 625 626 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 627 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 628 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 629 */ 630 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 631 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 632 633 init_ctx->vol = vol; 634 init_ctx->cb_fn = cb_fn; 635 init_ctx->cb_arg = cb_arg; 636 637 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 638 init_ctx->iov[0].iov_base = init_ctx->path; 639 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 640 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 641 init_ctx->backing_cb_args.cb_arg = init_ctx; 642 /* Write path to offset 4K on backing device - just after where the super 643 * block will be written. We wait until this is committed before writing the 644 * super block to guarantee we don't get the super block written without the 645 * the path if the system crashed in the middle of a write operation. 646 */ 647 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 648 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 649 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 650 &init_ctx->backing_cb_args); 651 } 652 653 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 654 655 static void 656 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 657 { 658 struct reduce_init_load_ctx *load_ctx = cb_arg; 659 struct spdk_reduce_vol *vol = load_ctx->vol; 660 uint64_t backing_dev_size; 661 uint64_t i, num_chunks, logical_map_index; 662 struct spdk_reduce_chunk_map *chunk; 663 size_t mapped_len; 664 uint32_t j; 665 int rc; 666 667 rc = _alloc_zero_buff(); 668 if (rc) { 669 goto error; 670 } 671 672 if (memcmp(vol->backing_super->signature, 673 SPDK_REDUCE_SIGNATURE, 674 sizeof(vol->backing_super->signature)) != 0) { 675 /* This backing device isn't a libreduce backing device. */ 676 rc = -EILSEQ; 677 goto error; 678 } 679 680 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 681 * So don't bother getting the volume ready to use - invoke the callback immediately 682 * so destroy_load_cb can delete the metadata off of the block device and delete the 683 * persistent memory file if it exists. 684 */ 685 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 686 if (load_ctx->cb_fn == (*destroy_load_cb)) { 687 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 688 _init_load_cleanup(NULL, load_ctx); 689 return; 690 } 691 692 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 693 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 694 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 695 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 696 697 rc = _allocate_bit_arrays(vol); 698 if (rc != 0) { 699 goto error; 700 } 701 702 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 703 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 704 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 705 backing_dev_size); 706 rc = -EILSEQ; 707 goto error; 708 } 709 710 vol->pm_file.size = _get_pm_file_size(&vol->params); 711 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 712 &vol->pm_file.pm_is_pmem); 713 if (vol->pm_file.pm_buf == NULL) { 714 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 715 rc = -errno; 716 goto error; 717 } 718 719 if (vol->pm_file.size != mapped_len) { 720 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 721 vol->pm_file.size, mapped_len); 722 rc = -ENOMEM; 723 goto error; 724 } 725 726 rc = _allocate_vol_requests(vol); 727 if (rc != 0) { 728 goto error; 729 } 730 731 _initialize_vol_pm_pointers(vol); 732 733 num_chunks = vol->params.vol_size / vol->params.chunk_size; 734 for (i = 0; i < num_chunks; i++) { 735 logical_map_index = vol->pm_logical_map[i]; 736 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 737 continue; 738 } 739 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 740 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 741 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 742 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 743 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 744 } 745 } 746 } 747 748 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 749 /* Only clean up the ctx - the vol has been passed to the application 750 * for use now that volume load was successful. 751 */ 752 _init_load_cleanup(NULL, load_ctx); 753 return; 754 755 error: 756 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 757 _init_load_cleanup(vol, load_ctx); 758 } 759 760 void 761 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 762 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 763 { 764 struct spdk_reduce_vol *vol; 765 struct reduce_init_load_ctx *load_ctx; 766 767 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 768 backing_dev->unmap == NULL) { 769 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 770 cb_fn(cb_arg, NULL, -EINVAL); 771 return; 772 } 773 774 vol = calloc(1, sizeof(*vol)); 775 if (vol == NULL) { 776 cb_fn(cb_arg, NULL, -ENOMEM); 777 return; 778 } 779 780 TAILQ_INIT(&vol->free_requests); 781 TAILQ_INIT(&vol->executing_requests); 782 TAILQ_INIT(&vol->queued_requests); 783 784 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 785 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 786 if (vol->backing_super == NULL) { 787 _init_load_cleanup(vol, NULL); 788 cb_fn(cb_arg, NULL, -ENOMEM); 789 return; 790 } 791 792 vol->backing_dev = backing_dev; 793 794 load_ctx = calloc(1, sizeof(*load_ctx)); 795 if (load_ctx == NULL) { 796 _init_load_cleanup(vol, NULL); 797 cb_fn(cb_arg, NULL, -ENOMEM); 798 return; 799 } 800 801 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 802 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 803 if (load_ctx->path == NULL) { 804 _init_load_cleanup(vol, load_ctx); 805 cb_fn(cb_arg, NULL, -ENOMEM); 806 return; 807 } 808 809 load_ctx->vol = vol; 810 load_ctx->cb_fn = cb_fn; 811 load_ctx->cb_arg = cb_arg; 812 813 load_ctx->iov[0].iov_base = vol->backing_super; 814 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 815 load_ctx->iov[1].iov_base = load_ctx->path; 816 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 817 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 818 load_ctx->backing_cb_args.cb_arg = load_ctx; 819 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 820 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 821 vol->backing_dev->blocklen, 822 &load_ctx->backing_cb_args); 823 } 824 825 void 826 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 827 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 828 { 829 if (vol == NULL) { 830 /* This indicates a programming error. */ 831 assert(false); 832 cb_fn(cb_arg, -EINVAL); 833 return; 834 } 835 836 if (--g_vol_count == 0) { 837 spdk_free(g_zero_buf); 838 } 839 assert(g_vol_count >= 0); 840 _init_load_cleanup(vol, NULL); 841 cb_fn(cb_arg, 0); 842 } 843 844 struct reduce_destroy_ctx { 845 spdk_reduce_vol_op_complete cb_fn; 846 void *cb_arg; 847 struct spdk_reduce_vol *vol; 848 struct spdk_reduce_vol_superblock *super; 849 struct iovec iov; 850 struct spdk_reduce_vol_cb_args backing_cb_args; 851 int reduce_errno; 852 char pm_path[REDUCE_PATH_MAX]; 853 }; 854 855 static void 856 destroy_unload_cpl(void *cb_arg, int reduce_errno) 857 { 858 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 859 860 if (destroy_ctx->reduce_errno == 0) { 861 if (unlink(destroy_ctx->pm_path)) { 862 SPDK_ERRLOG("%s could not be unlinked: %s\n", 863 destroy_ctx->pm_path, strerror(errno)); 864 } 865 } 866 867 /* Even if the unload somehow failed, we still pass the destroy_ctx 868 * reduce_errno since that indicates whether or not the volume was 869 * actually destroyed. 870 */ 871 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 872 spdk_free(destroy_ctx->super); 873 free(destroy_ctx); 874 } 875 876 static void 877 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 878 { 879 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 880 struct spdk_reduce_vol *vol = destroy_ctx->vol; 881 882 destroy_ctx->reduce_errno = reduce_errno; 883 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 884 } 885 886 static void 887 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 888 { 889 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 890 891 if (reduce_errno != 0) { 892 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 893 spdk_free(destroy_ctx->super); 894 free(destroy_ctx); 895 return; 896 } 897 898 destroy_ctx->vol = vol; 899 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 900 destroy_ctx->iov.iov_base = destroy_ctx->super; 901 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 902 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 903 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 904 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 905 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 906 &destroy_ctx->backing_cb_args); 907 } 908 909 void 910 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 911 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 912 { 913 struct reduce_destroy_ctx *destroy_ctx; 914 915 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 916 if (destroy_ctx == NULL) { 917 cb_fn(cb_arg, -ENOMEM); 918 return; 919 } 920 921 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 922 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 923 if (destroy_ctx->super == NULL) { 924 free(destroy_ctx); 925 cb_fn(cb_arg, -ENOMEM); 926 return; 927 } 928 destroy_ctx->cb_fn = cb_fn; 929 destroy_ctx->cb_arg = cb_arg; 930 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 931 } 932 933 static bool 934 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 935 { 936 uint64_t start_chunk, end_chunk; 937 938 start_chunk = offset / vol->logical_blocks_per_chunk; 939 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 940 941 return (start_chunk != end_chunk); 942 } 943 944 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 945 946 static void 947 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 948 { 949 struct spdk_reduce_vol_request *next_req; 950 struct spdk_reduce_vol *vol = req->vol; 951 952 req->cb_fn(req->cb_arg, reduce_errno); 953 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 954 955 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 956 if (next_req->logical_map_index == req->logical_map_index) { 957 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 958 if (next_req->type == REDUCE_IO_READV) { 959 _start_readv_request(next_req); 960 } else { 961 assert(next_req->type == REDUCE_IO_WRITEV); 962 _start_writev_request(next_req); 963 } 964 break; 965 } 966 } 967 968 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 969 } 970 971 static void 972 _write_write_done(void *_req, int reduce_errno) 973 { 974 struct spdk_reduce_vol_request *req = _req; 975 struct spdk_reduce_vol *vol = req->vol; 976 uint64_t old_chunk_map_index; 977 struct spdk_reduce_chunk_map *old_chunk; 978 uint32_t i; 979 980 if (reduce_errno != 0) { 981 req->reduce_errno = reduce_errno; 982 } 983 984 assert(req->num_backing_ops > 0); 985 if (--req->num_backing_ops > 0) { 986 return; 987 } 988 989 if (req->reduce_errno != 0) { 990 _reduce_vol_complete_req(req, req->reduce_errno); 991 return; 992 } 993 994 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 995 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 996 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 997 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 998 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 999 break; 1000 } 1001 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 1002 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 1003 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1004 } 1005 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 1006 } 1007 1008 /* 1009 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1010 * becomes invalid after we update the logical map, since the old chunk map will no 1011 * longer have a reference to it in the logical map. 1012 */ 1013 1014 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1015 _reduce_persist(vol, req->chunk, 1016 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1017 1018 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1019 1020 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1021 1022 _reduce_vol_complete_req(req, 0); 1023 } 1024 1025 static void 1026 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1027 reduce_request_fn next_fn, bool is_write) 1028 { 1029 struct iovec *iov; 1030 uint8_t *buf; 1031 uint32_t i; 1032 1033 if (req->chunk_is_compressed) { 1034 iov = req->comp_buf_iov; 1035 buf = req->comp_buf; 1036 } else { 1037 iov = req->decomp_buf_iov; 1038 buf = req->decomp_buf; 1039 } 1040 1041 req->num_backing_ops = req->num_io_units; 1042 req->backing_cb_args.cb_fn = next_fn; 1043 req->backing_cb_args.cb_arg = req; 1044 for (i = 0; i < req->num_io_units; i++) { 1045 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1046 iov[i].iov_len = vol->params.backing_io_unit_size; 1047 if (is_write) { 1048 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1049 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1050 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1051 } else { 1052 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1053 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1054 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1055 } 1056 } 1057 } 1058 1059 static void 1060 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1061 uint32_t compressed_size) 1062 { 1063 struct spdk_reduce_vol *vol = req->vol; 1064 uint32_t i; 1065 uint64_t chunk_offset, remainder, total_len = 0; 1066 uint8_t *buf; 1067 int j; 1068 1069 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1070 1071 /* TODO: fail if no chunk map found - but really this should not happen if we 1072 * size the number of requests similarly to number of extra chunk maps 1073 */ 1074 assert(req->chunk_map_index != UINT32_MAX); 1075 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1076 1077 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1078 req->num_io_units = spdk_divide_round_up(compressed_size, 1079 vol->params.backing_io_unit_size); 1080 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1081 req->chunk->compressed_size = 1082 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1083 1084 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1085 if (req->chunk_is_compressed == false) { 1086 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1087 buf = req->decomp_buf; 1088 total_len = chunk_offset * vol->params.logical_block_size; 1089 1090 /* zero any offset into chunk */ 1091 if (req->rmw == false && chunk_offset) { 1092 memset(buf, 0, total_len); 1093 } 1094 buf += total_len; 1095 1096 /* copy the data */ 1097 for (j = 0; j < req->iovcnt; j++) { 1098 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1099 buf += req->iov[j].iov_len; 1100 total_len += req->iov[j].iov_len; 1101 } 1102 1103 /* zero any remainder */ 1104 remainder = vol->params.chunk_size - total_len; 1105 total_len += remainder; 1106 if (req->rmw == false && remainder) { 1107 memset(buf, 0, remainder); 1108 } 1109 assert(total_len == vol->params.chunk_size); 1110 } 1111 1112 for (i = 0; i < req->num_io_units; i++) { 1113 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1114 /* TODO: fail if no backing block found - but really this should also not 1115 * happen (see comment above). 1116 */ 1117 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1118 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1119 } 1120 1121 _issue_backing_ops(req, vol, next_fn, true /* write */); 1122 } 1123 1124 static void 1125 _write_compress_done(void *_req, int reduce_errno) 1126 { 1127 struct spdk_reduce_vol_request *req = _req; 1128 1129 /* Negative reduce_errno indicates failure for compression operations. 1130 * Just write the uncompressed data instead. Force this to happen 1131 * by just passing the full chunk size to _reduce_vol_write_chunk. 1132 * When it sees the data couldn't be compressed, it will just write 1133 * the uncompressed buffer to disk. 1134 */ 1135 if (reduce_errno < 0) { 1136 reduce_errno = req->vol->params.chunk_size; 1137 } 1138 1139 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1140 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1141 } 1142 1143 static void 1144 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1145 { 1146 struct spdk_reduce_vol *vol = req->vol; 1147 1148 req->backing_cb_args.cb_fn = next_fn; 1149 req->backing_cb_args.cb_arg = req; 1150 req->comp_buf_iov[0].iov_base = req->comp_buf; 1151 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1152 vol->backing_dev->compress(vol->backing_dev, 1153 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1154 &req->backing_cb_args); 1155 } 1156 1157 static void 1158 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1159 { 1160 struct spdk_reduce_vol *vol = req->vol; 1161 1162 req->backing_cb_args.cb_fn = next_fn; 1163 req->backing_cb_args.cb_arg = req; 1164 req->comp_buf_iov[0].iov_base = req->comp_buf; 1165 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1166 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1167 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1168 vol->backing_dev->decompress(vol->backing_dev, 1169 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1170 &req->backing_cb_args); 1171 } 1172 1173 static void 1174 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1175 { 1176 struct spdk_reduce_vol *vol = req->vol; 1177 uint64_t chunk_offset, remainder = 0; 1178 uint64_t ttl_len = 0; 1179 int i; 1180 1181 req->decomp_iovcnt = 0; 1182 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1183 1184 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1185 * if at least one of the conditions below is true: 1186 * 1. User's buffer is fragmented 1187 * 2. Length of the user's buffer is less than the chunk */ 1188 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1189 req->iov[0].iov_len < vol->params.chunk_size); 1190 if (req->copy_after_decompress) { 1191 req->decomp_iov[0].iov_base = req->decomp_buf; 1192 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1193 req->decomp_iovcnt = 1; 1194 goto decompress; 1195 } 1196 1197 if (chunk_offset) { 1198 /* first iov point to our scratch buffer for any offset into the chunk */ 1199 req->decomp_iov[0].iov_base = req->decomp_buf; 1200 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1201 ttl_len += req->decomp_iov[0].iov_len; 1202 req->decomp_iovcnt = 1; 1203 } 1204 1205 /* now the user data iov, direct to the user buffer */ 1206 for (i = 0; i < req->iovcnt; i++) { 1207 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1208 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1209 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1210 } 1211 req->decomp_iovcnt += req->iovcnt; 1212 1213 /* send the rest of the chunk to our scratch buffer */ 1214 remainder = vol->params.chunk_size - ttl_len; 1215 if (remainder) { 1216 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1217 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1218 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1219 req->decomp_iovcnt++; 1220 } 1221 assert(ttl_len == vol->params.chunk_size); 1222 1223 decompress: 1224 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1225 req->backing_cb_args.cb_fn = next_fn; 1226 req->backing_cb_args.cb_arg = req; 1227 req->comp_buf_iov[0].iov_base = req->comp_buf; 1228 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1229 vol->backing_dev->decompress(vol->backing_dev, 1230 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1231 &req->backing_cb_args); 1232 } 1233 1234 static inline void 1235 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1236 { 1237 struct spdk_reduce_vol *vol = req->vol; 1238 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1239 uint64_t chunk_offset, ttl_len = 0; 1240 uint64_t remainder = 0; 1241 char *copy_offset = NULL; 1242 uint32_t lbsize = vol->params.logical_block_size; 1243 int i; 1244 1245 req->decomp_iov[0].iov_base = req->decomp_buf; 1246 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1247 req->decomp_iovcnt = 1; 1248 copy_offset = req->decomp_iov[0].iov_base; 1249 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1250 1251 if (chunk_offset) { 1252 ttl_len += chunk_offset * lbsize; 1253 /* copy_offset already points to padding buffer if zero_paddings=false */ 1254 if (zero_paddings) { 1255 memcpy(copy_offset, padding_buffer, ttl_len); 1256 } 1257 copy_offset += ttl_len; 1258 } 1259 1260 /* now the user data iov, direct from the user buffer */ 1261 for (i = 0; i < req->iovcnt; i++) { 1262 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1263 copy_offset += req->iov[i].iov_len; 1264 ttl_len += req->iov[i].iov_len; 1265 } 1266 1267 remainder = vol->params.chunk_size - ttl_len; 1268 if (remainder) { 1269 /* copy_offset already points to padding buffer if zero_paddings=false */ 1270 if (zero_paddings) { 1271 memcpy(copy_offset, padding_buffer + ttl_len, remainder); 1272 } 1273 ttl_len += remainder; 1274 } 1275 1276 assert(ttl_len == req->vol->params.chunk_size); 1277 } 1278 1279 /* This function can be called when we are compressing a new data or in case of read-modify-write 1280 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1281 * should point to already read and decompressed buffer */ 1282 static inline void 1283 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1284 { 1285 struct spdk_reduce_vol *vol = req->vol; 1286 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1287 uint64_t chunk_offset, ttl_len = 0; 1288 uint64_t remainder = 0; 1289 uint32_t lbsize = vol->params.logical_block_size; 1290 int i; 1291 1292 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1293 * if at least one of the conditions below is true: 1294 * 1. User's buffer is fragmented 1295 * 2. Length of the user's buffer is less than the chunk */ 1296 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1297 req->iov[0].iov_len < vol->params.chunk_size)) { 1298 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1299 return; 1300 } 1301 1302 req->decomp_iovcnt = 0; 1303 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1304 1305 if (chunk_offset != 0) { 1306 ttl_len += chunk_offset * lbsize; 1307 req->decomp_iov[0].iov_base = padding_buffer; 1308 req->decomp_iov[0].iov_len = ttl_len; 1309 req->decomp_iovcnt = 1; 1310 } 1311 1312 /* now the user data iov, direct from the user buffer */ 1313 for (i = 0; i < req->iovcnt; i++) { 1314 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1315 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1316 ttl_len += req->iov[i].iov_len; 1317 } 1318 req->decomp_iovcnt += req->iovcnt; 1319 1320 remainder = vol->params.chunk_size - ttl_len; 1321 if (remainder) { 1322 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1323 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1324 req->decomp_iovcnt++; 1325 ttl_len += remainder; 1326 } 1327 assert(ttl_len == req->vol->params.chunk_size); 1328 } 1329 1330 static void 1331 _write_decompress_done(void *_req, int reduce_errno) 1332 { 1333 struct spdk_reduce_vol_request *req = _req; 1334 1335 /* Negative reduce_errno indicates failure for compression operations. */ 1336 if (reduce_errno < 0) { 1337 _reduce_vol_complete_req(req, reduce_errno); 1338 return; 1339 } 1340 1341 /* Positive reduce_errno indicates number of bytes in decompressed 1342 * buffer. This should equal the chunk size - otherwise that's another 1343 * type of failure. 1344 */ 1345 if ((uint32_t)reduce_errno != req->vol->params.chunk_size) { 1346 _reduce_vol_complete_req(req, -EIO); 1347 return; 1348 } 1349 1350 _prepare_compress_chunk(req, false); 1351 _reduce_vol_compress_chunk(req, _write_compress_done); 1352 } 1353 1354 static void 1355 _write_read_done(void *_req, int reduce_errno) 1356 { 1357 struct spdk_reduce_vol_request *req = _req; 1358 1359 if (reduce_errno != 0) { 1360 req->reduce_errno = reduce_errno; 1361 } 1362 1363 assert(req->num_backing_ops > 0); 1364 if (--req->num_backing_ops > 0) { 1365 return; 1366 } 1367 1368 if (req->reduce_errno != 0) { 1369 _reduce_vol_complete_req(req, req->reduce_errno); 1370 return; 1371 } 1372 1373 if (req->chunk_is_compressed) { 1374 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1375 } else { 1376 _write_decompress_done(req, req->chunk->compressed_size); 1377 } 1378 } 1379 1380 static void 1381 _read_decompress_done(void *_req, int reduce_errno) 1382 { 1383 struct spdk_reduce_vol_request *req = _req; 1384 struct spdk_reduce_vol *vol = req->vol; 1385 1386 /* Negative reduce_errno indicates failure for compression operations. */ 1387 if (reduce_errno < 0) { 1388 _reduce_vol_complete_req(req, reduce_errno); 1389 return; 1390 } 1391 1392 /* Positive reduce_errno indicates number of bytes in decompressed 1393 * buffer. This should equal the chunk size - otherwise that's another 1394 * type of failure. 1395 */ 1396 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1397 _reduce_vol_complete_req(req, -EIO); 1398 return; 1399 } 1400 1401 if (req->copy_after_decompress) { 1402 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1403 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1404 int i; 1405 1406 for (i = 0; i < req->iovcnt; i++) { 1407 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1408 decomp_buffer += req->iov[i].iov_len; 1409 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1410 } 1411 } 1412 1413 _reduce_vol_complete_req(req, 0); 1414 } 1415 1416 static void 1417 _read_read_done(void *_req, int reduce_errno) 1418 { 1419 struct spdk_reduce_vol_request *req = _req; 1420 uint64_t chunk_offset; 1421 uint8_t *buf; 1422 int i; 1423 1424 if (reduce_errno != 0) { 1425 req->reduce_errno = reduce_errno; 1426 } 1427 1428 assert(req->num_backing_ops > 0); 1429 if (--req->num_backing_ops > 0) { 1430 return; 1431 } 1432 1433 if (req->reduce_errno != 0) { 1434 _reduce_vol_complete_req(req, req->reduce_errno); 1435 return; 1436 } 1437 1438 if (req->chunk_is_compressed) { 1439 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1440 } else { 1441 1442 /* If the chunk was compressed, the data would have been sent to the 1443 * host buffers by the decompression operation, if not we need to memcpy here. 1444 */ 1445 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1446 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1447 for (i = 0; i < req->iovcnt; i++) { 1448 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1449 buf += req->iov[i].iov_len; 1450 } 1451 1452 _read_decompress_done(req, req->chunk->compressed_size); 1453 } 1454 } 1455 1456 static void 1457 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1458 { 1459 struct spdk_reduce_vol *vol = req->vol; 1460 1461 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1462 assert(req->chunk_map_index != UINT32_MAX); 1463 1464 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1465 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1466 vol->params.backing_io_unit_size); 1467 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1468 1469 _issue_backing_ops(req, vol, next_fn, false /* read */); 1470 } 1471 1472 static bool 1473 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1474 uint64_t length) 1475 { 1476 uint64_t size = 0; 1477 int i; 1478 1479 if (iovcnt > REDUCE_MAX_IOVECS) { 1480 return false; 1481 } 1482 1483 for (i = 0; i < iovcnt; i++) { 1484 size += iov[i].iov_len; 1485 } 1486 1487 return size == (length * vol->params.logical_block_size); 1488 } 1489 1490 static bool 1491 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1492 { 1493 struct spdk_reduce_vol_request *req; 1494 1495 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1496 if (logical_map_index == req->logical_map_index) { 1497 return true; 1498 } 1499 } 1500 1501 return false; 1502 } 1503 1504 static void 1505 _start_readv_request(struct spdk_reduce_vol_request *req) 1506 { 1507 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1508 _reduce_vol_read_chunk(req, _read_read_done); 1509 } 1510 1511 void 1512 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1513 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1514 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1515 { 1516 struct spdk_reduce_vol_request *req; 1517 uint64_t logical_map_index; 1518 bool overlapped; 1519 int i; 1520 1521 if (length == 0) { 1522 cb_fn(cb_arg, 0); 1523 return; 1524 } 1525 1526 if (_request_spans_chunk_boundary(vol, offset, length)) { 1527 cb_fn(cb_arg, -EINVAL); 1528 return; 1529 } 1530 1531 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1532 cb_fn(cb_arg, -EINVAL); 1533 return; 1534 } 1535 1536 logical_map_index = offset / vol->logical_blocks_per_chunk; 1537 overlapped = _check_overlap(vol, logical_map_index); 1538 1539 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1540 /* 1541 * This chunk hasn't been allocated. So treat the data as all 1542 * zeroes for this chunk - do the memset and immediately complete 1543 * the operation. 1544 */ 1545 for (i = 0; i < iovcnt; i++) { 1546 memset(iov[i].iov_base, 0, iov[i].iov_len); 1547 } 1548 cb_fn(cb_arg, 0); 1549 return; 1550 } 1551 1552 req = TAILQ_FIRST(&vol->free_requests); 1553 if (req == NULL) { 1554 cb_fn(cb_arg, -ENOMEM); 1555 return; 1556 } 1557 1558 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1559 req->type = REDUCE_IO_READV; 1560 req->vol = vol; 1561 req->iov = iov; 1562 req->iovcnt = iovcnt; 1563 req->offset = offset; 1564 req->logical_map_index = logical_map_index; 1565 req->length = length; 1566 req->copy_after_decompress = false; 1567 req->cb_fn = cb_fn; 1568 req->cb_arg = cb_arg; 1569 1570 if (!overlapped) { 1571 _start_readv_request(req); 1572 } else { 1573 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1574 } 1575 } 1576 1577 static void 1578 _start_writev_request(struct spdk_reduce_vol_request *req) 1579 { 1580 struct spdk_reduce_vol *vol = req->vol; 1581 1582 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1583 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1584 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1585 /* Read old chunk, then overwrite with data from this write 1586 * operation. 1587 */ 1588 req->rmw = true; 1589 _reduce_vol_read_chunk(req, _write_read_done); 1590 return; 1591 } 1592 } 1593 1594 req->rmw = false; 1595 1596 _prepare_compress_chunk(req, true); 1597 _reduce_vol_compress_chunk(req, _write_compress_done); 1598 } 1599 1600 void 1601 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1602 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1603 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1604 { 1605 struct spdk_reduce_vol_request *req; 1606 uint64_t logical_map_index; 1607 bool overlapped; 1608 1609 if (length == 0) { 1610 cb_fn(cb_arg, 0); 1611 return; 1612 } 1613 1614 if (_request_spans_chunk_boundary(vol, offset, length)) { 1615 cb_fn(cb_arg, -EINVAL); 1616 return; 1617 } 1618 1619 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1620 cb_fn(cb_arg, -EINVAL); 1621 return; 1622 } 1623 1624 logical_map_index = offset / vol->logical_blocks_per_chunk; 1625 overlapped = _check_overlap(vol, logical_map_index); 1626 1627 req = TAILQ_FIRST(&vol->free_requests); 1628 if (req == NULL) { 1629 cb_fn(cb_arg, -ENOMEM); 1630 return; 1631 } 1632 1633 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1634 req->type = REDUCE_IO_WRITEV; 1635 req->vol = vol; 1636 req->iov = iov; 1637 req->iovcnt = iovcnt; 1638 req->offset = offset; 1639 req->logical_map_index = logical_map_index; 1640 req->length = length; 1641 req->copy_after_decompress = false; 1642 req->cb_fn = cb_fn; 1643 req->cb_arg = cb_arg; 1644 1645 if (!overlapped) { 1646 _start_writev_request(req); 1647 } else { 1648 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1649 } 1650 } 1651 1652 const struct spdk_reduce_vol_params * 1653 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1654 { 1655 return &vol->params; 1656 } 1657 1658 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1659 { 1660 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1661 uint32_t struct_size; 1662 uint64_t chunk_map_size; 1663 1664 SPDK_NOTICELOG("vol info:\n"); 1665 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1666 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1667 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1668 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1669 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1670 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1671 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1672 vol->params.vol_size / vol->params.chunk_size); 1673 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1674 vol->params.backing_io_unit_size); 1675 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1676 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1677 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1678 1679 SPDK_NOTICELOG("pmem info:\n"); 1680 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1681 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1682 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1683 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1684 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1685 vol->params.chunk_size); 1686 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1687 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1688 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1689 vol->params.backing_io_unit_size); 1690 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1691 } 1692 1693 SPDK_LOG_REGISTER_COMPONENT(reduce) 1694