1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 #define REDUCE_IO_READV 1 82 #define REDUCE_IO_WRITEV 2 83 84 struct spdk_reduce_chunk_map { 85 uint32_t compressed_size; 86 uint32_t reserved; 87 uint64_t io_unit_index[0]; 88 }; 89 90 #define REDUCE_MAX_IOVECS 32 91 92 struct spdk_reduce_vol_request { 93 /** 94 * Scratch buffer used for uncompressed chunk. This is used for: 95 * 1) source buffer for compression operations 96 * 2) destination buffer for decompression operations 97 * 3) data buffer when writing uncompressed chunk to disk 98 * 4) data buffer when reading uncompressed chunk from disk 99 */ 100 uint8_t *decomp_buf; 101 struct iovec *decomp_buf_iov; 102 103 /** 104 * These are used to construct the iovecs that are sent to 105 * the decomp engine, they point to a mix of the scratch buffer 106 * and user buffer 107 */ 108 struct iovec decomp_iov[REDUCE_MAX_IOVECS]; 109 int decomp_iovcnt; 110 111 /** 112 * Scratch buffer used for compressed chunk. This is used for: 113 * 1) destination buffer for compression operations 114 * 2) source buffer for decompression operations 115 * 3) data buffer when writing compressed chunk to disk 116 * 4) data buffer when reading compressed chunk from disk 117 */ 118 uint8_t *comp_buf; 119 struct iovec *comp_buf_iov; 120 struct iovec *iov; 121 struct spdk_reduce_vol *vol; 122 int type; 123 int reduce_errno; 124 int iovcnt; 125 int num_backing_ops; 126 uint32_t num_io_units; 127 bool chunk_is_compressed; 128 uint64_t offset; 129 uint64_t logical_map_index; 130 uint64_t length; 131 uint64_t chunk_map_index; 132 struct spdk_reduce_chunk_map *chunk; 133 spdk_reduce_vol_op_complete cb_fn; 134 void *cb_arg; 135 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 136 struct spdk_reduce_vol_cb_args backing_cb_args; 137 }; 138 139 struct spdk_reduce_vol { 140 struct spdk_reduce_vol_params params; 141 uint32_t backing_io_units_per_chunk; 142 uint32_t backing_lba_per_io_unit; 143 uint32_t logical_blocks_per_chunk; 144 struct spdk_reduce_pm_file pm_file; 145 struct spdk_reduce_backing_dev *backing_dev; 146 struct spdk_reduce_vol_superblock *backing_super; 147 struct spdk_reduce_vol_superblock *pm_super; 148 uint64_t *pm_logical_map; 149 uint64_t *pm_chunk_maps; 150 151 struct spdk_bit_array *allocated_chunk_maps; 152 struct spdk_bit_array *allocated_backing_io_units; 153 154 struct spdk_reduce_vol_request *request_mem; 155 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 156 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 157 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 158 159 /* Single contiguous buffer used for all request buffers for this volume. */ 160 uint8_t *buf_mem; 161 struct iovec *buf_iov_mem; 162 }; 163 164 static void _start_readv_request(struct spdk_reduce_vol_request *req); 165 static void _start_writev_request(struct spdk_reduce_vol_request *req); 166 static uint8_t *g_zero_buf; 167 static int g_vol_count = 0; 168 169 /* 170 * Allocate extra metadata chunks and corresponding backing io units to account for 171 * outstanding IO in worst case scenario where logical map is completely allocated 172 * and no data can be compressed. We need extra chunks in this case to handle 173 * in-flight writes since reduce never writes data in place. 174 */ 175 #define REDUCE_NUM_EXTRA_CHUNKS 128 176 177 static void 178 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 179 { 180 if (vol->pm_file.pm_is_pmem) { 181 pmem_persist(addr, len); 182 } else { 183 pmem_msync(addr, len); 184 } 185 } 186 187 static uint64_t 188 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 189 { 190 uint64_t chunks_in_logical_map, logical_map_size; 191 192 chunks_in_logical_map = vol_size / chunk_size; 193 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 194 195 /* Round up to next cacheline. */ 196 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 197 REDUCE_PM_SIZE_ALIGNMENT; 198 } 199 200 static uint64_t 201 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 202 { 203 uint64_t num_chunks; 204 205 num_chunks = vol_size / chunk_size; 206 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 207 208 return num_chunks; 209 } 210 211 static uint64_t 212 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 213 { 214 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 215 216 num_chunks = _get_total_chunks(vol_size, chunk_size); 217 io_units_per_chunk = chunk_size / backing_io_unit_size; 218 total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t); 219 220 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 221 REDUCE_PM_SIZE_ALIGNMENT; 222 } 223 224 static inline uint32_t 225 _reduce_vol_get_chunk_struct_size(struct spdk_reduce_vol *vol) 226 { 227 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * vol->backing_io_units_per_chunk; 228 } 229 230 static struct spdk_reduce_chunk_map * 231 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 232 { 233 uintptr_t chunk_map_addr; 234 235 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 236 237 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 238 chunk_map_addr += chunk_map_index * _reduce_vol_get_chunk_struct_size(vol); 239 240 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 241 } 242 243 static int 244 _validate_vol_params(struct spdk_reduce_vol_params *params) 245 { 246 if (params->vol_size > 0) { 247 /** 248 * User does not pass in the vol size - it gets calculated by libreduce from 249 * values in this structure plus the size of the backing device. 250 */ 251 return -EINVAL; 252 } 253 254 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 255 params->logical_block_size == 0) { 256 return -EINVAL; 257 } 258 259 /* Chunk size must be an even multiple of the backing io unit size. */ 260 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 261 return -EINVAL; 262 } 263 264 /* Chunk size must be an even multiple of the logical block size. */ 265 if ((params->chunk_size % params->logical_block_size) != 0) { 266 return -1; 267 } 268 269 return 0; 270 } 271 272 static uint64_t 273 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 274 { 275 uint64_t num_chunks; 276 277 num_chunks = backing_dev_size / chunk_size; 278 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 279 return 0; 280 } 281 282 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 283 return num_chunks * chunk_size; 284 } 285 286 static uint64_t 287 _get_pm_file_size(struct spdk_reduce_vol_params *params) 288 { 289 uint64_t total_pm_size; 290 291 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 292 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 293 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 294 params->backing_io_unit_size); 295 return total_pm_size; 296 } 297 298 const struct spdk_uuid * 299 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 300 { 301 return &vol->params.uuid; 302 } 303 304 static void 305 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 306 { 307 uint64_t logical_map_size; 308 309 /* Superblock is at the beginning of the pm file. */ 310 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 311 312 /* Logical map immediately follows the super block. */ 313 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 314 315 /* Chunks maps follow the logical map. */ 316 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 317 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 318 } 319 320 /* We need 2 iovs during load - one for the superblock, another for the path */ 321 #define LOAD_IOV_COUNT 2 322 323 struct reduce_init_load_ctx { 324 struct spdk_reduce_vol *vol; 325 struct spdk_reduce_vol_cb_args backing_cb_args; 326 spdk_reduce_vol_op_with_handle_complete cb_fn; 327 void *cb_arg; 328 struct iovec iov[LOAD_IOV_COUNT]; 329 void *path; 330 }; 331 332 static int 333 _allocate_vol_requests(struct spdk_reduce_vol *vol) 334 { 335 struct spdk_reduce_vol_request *req; 336 int i; 337 338 /* Allocate 2x since we need buffers for both read/write and compress/decompress 339 * intermediate buffers. 340 */ 341 vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 342 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 343 if (vol->buf_mem == NULL) { 344 return -ENOMEM; 345 } 346 347 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 348 if (vol->request_mem == NULL) { 349 spdk_free(vol->buf_mem); 350 vol->buf_mem = NULL; 351 return -ENOMEM; 352 } 353 354 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 355 * buffers. 356 */ 357 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 358 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 359 if (vol->buf_iov_mem == NULL) { 360 free(vol->request_mem); 361 spdk_free(vol->buf_mem); 362 vol->request_mem = NULL; 363 vol->buf_mem = NULL; 364 return -ENOMEM; 365 } 366 367 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 368 req = &vol->request_mem[i]; 369 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 370 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 371 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 372 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 373 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 374 } 375 376 return 0; 377 } 378 379 static void 380 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 381 { 382 if (ctx != NULL) { 383 spdk_free(ctx->path); 384 free(ctx); 385 } 386 387 if (vol != NULL) { 388 if (vol->pm_file.pm_buf != NULL) { 389 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 390 } 391 392 spdk_free(vol->backing_super); 393 spdk_bit_array_free(&vol->allocated_chunk_maps); 394 spdk_bit_array_free(&vol->allocated_backing_io_units); 395 free(vol->request_mem); 396 free(vol->buf_iov_mem); 397 spdk_free(vol->buf_mem); 398 free(vol); 399 } 400 } 401 402 static int 403 _alloc_zero_buff(struct spdk_reduce_vol *vol) 404 { 405 int rc = 0; 406 407 /* The zero buffer is shared between all volumnes and just used 408 * for reads so allocate one global instance here if not already 409 * allocated when another vol init'd or loaded. 410 */ 411 if (g_vol_count++ == 0) { 412 g_zero_buf = spdk_zmalloc(vol->params.chunk_size, 413 64, NULL, SPDK_ENV_LCORE_ID_ANY, 414 SPDK_MALLOC_DMA); 415 if (g_zero_buf == NULL) { 416 rc = -ENOMEM; 417 } 418 } 419 return rc; 420 } 421 422 static void 423 _init_write_super_cpl(void *cb_arg, int reduce_errno) 424 { 425 struct reduce_init_load_ctx *init_ctx = cb_arg; 426 int rc; 427 428 rc = _allocate_vol_requests(init_ctx->vol); 429 if (rc != 0) { 430 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 431 _init_load_cleanup(init_ctx->vol, init_ctx); 432 return; 433 } 434 435 rc = _alloc_zero_buff(init_ctx->vol); 436 if (rc != 0) { 437 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 438 _init_load_cleanup(init_ctx->vol, init_ctx); 439 return; 440 } 441 442 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 443 /* Only clean up the ctx - the vol has been passed to the application 444 * for use now that initialization was successful. 445 */ 446 _init_load_cleanup(NULL, init_ctx); 447 } 448 449 static void 450 _init_write_path_cpl(void *cb_arg, int reduce_errno) 451 { 452 struct reduce_init_load_ctx *init_ctx = cb_arg; 453 struct spdk_reduce_vol *vol = init_ctx->vol; 454 455 init_ctx->iov[0].iov_base = vol->backing_super; 456 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 457 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 458 init_ctx->backing_cb_args.cb_arg = init_ctx; 459 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 460 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 461 &init_ctx->backing_cb_args); 462 } 463 464 static int 465 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 466 { 467 uint64_t total_chunks, total_backing_io_units; 468 uint32_t i, num_metadata_io_units; 469 470 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 471 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 472 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 473 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 474 475 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 476 return -ENOMEM; 477 } 478 479 /* Set backing io unit bits associated with metadata. */ 480 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 481 vol->backing_dev->blocklen; 482 for (i = 0; i < num_metadata_io_units; i++) { 483 spdk_bit_array_set(vol->allocated_backing_io_units, i); 484 } 485 486 return 0; 487 } 488 489 void 490 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 491 struct spdk_reduce_backing_dev *backing_dev, 492 const char *pm_file_dir, 493 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 494 { 495 struct spdk_reduce_vol *vol; 496 struct reduce_init_load_ctx *init_ctx; 497 uint64_t backing_dev_size; 498 size_t mapped_len; 499 int dir_len, max_dir_len, rc; 500 501 /* We need to append a path separator and the UUID to the supplied 502 * path. 503 */ 504 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 505 dir_len = strnlen(pm_file_dir, max_dir_len); 506 /* Strip trailing slash if the user provided one - we will add it back 507 * later when appending the filename. 508 */ 509 if (pm_file_dir[dir_len - 1] == '/') { 510 dir_len--; 511 } 512 if (dir_len == max_dir_len) { 513 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 514 cb_fn(cb_arg, NULL, -EINVAL); 515 return; 516 } 517 518 rc = _validate_vol_params(params); 519 if (rc != 0) { 520 SPDK_ERRLOG("invalid vol params\n"); 521 cb_fn(cb_arg, NULL, rc); 522 return; 523 } 524 525 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 526 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 527 if (params->vol_size == 0) { 528 SPDK_ERRLOG("backing device is too small\n"); 529 cb_fn(cb_arg, NULL, -EINVAL); 530 return; 531 } 532 533 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 534 backing_dev->unmap == NULL) { 535 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 536 cb_fn(cb_arg, NULL, -EINVAL); 537 return; 538 } 539 540 vol = calloc(1, sizeof(*vol)); 541 if (vol == NULL) { 542 cb_fn(cb_arg, NULL, -ENOMEM); 543 return; 544 } 545 546 TAILQ_INIT(&vol->free_requests); 547 TAILQ_INIT(&vol->executing_requests); 548 TAILQ_INIT(&vol->queued_requests); 549 550 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 551 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 552 if (vol->backing_super == NULL) { 553 cb_fn(cb_arg, NULL, -ENOMEM); 554 _init_load_cleanup(vol, NULL); 555 return; 556 } 557 558 init_ctx = calloc(1, sizeof(*init_ctx)); 559 if (init_ctx == NULL) { 560 cb_fn(cb_arg, NULL, -ENOMEM); 561 _init_load_cleanup(vol, NULL); 562 return; 563 } 564 565 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 566 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 567 if (init_ctx->path == NULL) { 568 cb_fn(cb_arg, NULL, -ENOMEM); 569 _init_load_cleanup(vol, init_ctx); 570 return; 571 } 572 573 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 574 spdk_uuid_generate(¶ms->uuid); 575 } 576 577 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 578 vol->pm_file.path[dir_len] = '/'; 579 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 580 ¶ms->uuid); 581 vol->pm_file.size = _get_pm_file_size(params); 582 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 583 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 584 &mapped_len, &vol->pm_file.pm_is_pmem); 585 if (vol->pm_file.pm_buf == NULL) { 586 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 587 vol->pm_file.path, strerror(errno)); 588 cb_fn(cb_arg, NULL, -errno); 589 _init_load_cleanup(vol, init_ctx); 590 return; 591 } 592 593 if (vol->pm_file.size != mapped_len) { 594 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 595 vol->pm_file.size, mapped_len); 596 cb_fn(cb_arg, NULL, -ENOMEM); 597 _init_load_cleanup(vol, init_ctx); 598 return; 599 } 600 601 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 602 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 603 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 604 memcpy(&vol->params, params, sizeof(*params)); 605 606 vol->backing_dev = backing_dev; 607 608 rc = _allocate_bit_arrays(vol); 609 if (rc != 0) { 610 cb_fn(cb_arg, NULL, rc); 611 _init_load_cleanup(vol, init_ctx); 612 return; 613 } 614 615 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 616 sizeof(vol->backing_super->signature)); 617 memcpy(&vol->backing_super->params, params, sizeof(*params)); 618 619 _initialize_vol_pm_pointers(vol); 620 621 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 622 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 623 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 624 */ 625 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 626 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 627 628 init_ctx->vol = vol; 629 init_ctx->cb_fn = cb_fn; 630 init_ctx->cb_arg = cb_arg; 631 632 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 633 init_ctx->iov[0].iov_base = init_ctx->path; 634 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 635 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 636 init_ctx->backing_cb_args.cb_arg = init_ctx; 637 /* Write path to offset 4K on backing device - just after where the super 638 * block will be written. We wait until this is committed before writing the 639 * super block to guarantee we don't get the super block written without the 640 * the path if the system crashed in the middle of a write operation. 641 */ 642 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 643 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 644 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 645 &init_ctx->backing_cb_args); 646 } 647 648 static void 649 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 650 { 651 struct reduce_init_load_ctx *load_ctx = cb_arg; 652 struct spdk_reduce_vol *vol = load_ctx->vol; 653 uint64_t backing_dev_size; 654 uint64_t i, num_chunks, logical_map_index; 655 struct spdk_reduce_chunk_map *chunk; 656 size_t mapped_len; 657 uint32_t j; 658 int rc; 659 660 if (memcmp(vol->backing_super->signature, 661 SPDK_REDUCE_SIGNATURE, 662 sizeof(vol->backing_super->signature)) != 0) { 663 /* This backing device isn't a libreduce backing device. */ 664 rc = -EILSEQ; 665 goto error; 666 } 667 668 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 669 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 670 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 671 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 672 673 rc = _allocate_bit_arrays(vol); 674 if (rc != 0) { 675 goto error; 676 } 677 678 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 679 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 680 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 681 backing_dev_size); 682 rc = -EILSEQ; 683 goto error; 684 } 685 686 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 687 vol->pm_file.size = _get_pm_file_size(&vol->params); 688 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 689 &vol->pm_file.pm_is_pmem); 690 if (vol->pm_file.pm_buf == NULL) { 691 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 692 rc = -errno; 693 goto error; 694 } 695 696 if (vol->pm_file.size != mapped_len) { 697 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 698 vol->pm_file.size, mapped_len); 699 rc = -ENOMEM; 700 goto error; 701 } 702 703 rc = _allocate_vol_requests(vol); 704 if (rc != 0) { 705 goto error; 706 } 707 708 _initialize_vol_pm_pointers(vol); 709 710 num_chunks = vol->params.vol_size / vol->params.chunk_size; 711 for (i = 0; i < num_chunks; i++) { 712 logical_map_index = vol->pm_logical_map[i]; 713 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 714 continue; 715 } 716 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 717 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 718 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 719 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 720 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 721 } 722 } 723 } 724 725 rc = _alloc_zero_buff(vol); 726 if (rc) { 727 goto error; 728 } 729 730 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 731 /* Only clean up the ctx - the vol has been passed to the application 732 * for use now that volume load was successful. 733 */ 734 _init_load_cleanup(NULL, load_ctx); 735 return; 736 737 error: 738 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 739 _init_load_cleanup(vol, load_ctx); 740 } 741 742 void 743 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 744 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 745 { 746 struct spdk_reduce_vol *vol; 747 struct reduce_init_load_ctx *load_ctx; 748 749 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 750 backing_dev->unmap == NULL) { 751 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 752 cb_fn(cb_arg, NULL, -EINVAL); 753 return; 754 } 755 756 vol = calloc(1, sizeof(*vol)); 757 if (vol == NULL) { 758 cb_fn(cb_arg, NULL, -ENOMEM); 759 return; 760 } 761 762 TAILQ_INIT(&vol->free_requests); 763 TAILQ_INIT(&vol->executing_requests); 764 TAILQ_INIT(&vol->queued_requests); 765 766 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 767 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 768 if (vol->backing_super == NULL) { 769 _init_load_cleanup(vol, NULL); 770 cb_fn(cb_arg, NULL, -ENOMEM); 771 return; 772 } 773 774 vol->backing_dev = backing_dev; 775 776 load_ctx = calloc(1, sizeof(*load_ctx)); 777 if (load_ctx == NULL) { 778 _init_load_cleanup(vol, NULL); 779 cb_fn(cb_arg, NULL, -ENOMEM); 780 return; 781 } 782 783 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 784 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 785 if (load_ctx->path == NULL) { 786 _init_load_cleanup(vol, load_ctx); 787 cb_fn(cb_arg, NULL, -ENOMEM); 788 return; 789 } 790 791 load_ctx->vol = vol; 792 load_ctx->cb_fn = cb_fn; 793 load_ctx->cb_arg = cb_arg; 794 795 load_ctx->iov[0].iov_base = vol->backing_super; 796 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 797 load_ctx->iov[1].iov_base = load_ctx->path; 798 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 799 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 800 load_ctx->backing_cb_args.cb_arg = load_ctx; 801 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 802 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 803 vol->backing_dev->blocklen, 804 &load_ctx->backing_cb_args); 805 } 806 807 void 808 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 809 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 810 { 811 if (vol == NULL) { 812 /* This indicates a programming error. */ 813 assert(false); 814 cb_fn(cb_arg, -EINVAL); 815 return; 816 } 817 818 if (--g_vol_count == 0) { 819 spdk_free(g_zero_buf); 820 } 821 _init_load_cleanup(vol, NULL); 822 cb_fn(cb_arg, 0); 823 } 824 825 struct reduce_destroy_ctx { 826 spdk_reduce_vol_op_complete cb_fn; 827 void *cb_arg; 828 struct spdk_reduce_vol *vol; 829 struct spdk_reduce_vol_superblock *super; 830 struct iovec iov; 831 struct spdk_reduce_vol_cb_args backing_cb_args; 832 int reduce_errno; 833 char pm_path[REDUCE_PATH_MAX]; 834 }; 835 836 static void 837 destroy_unload_cpl(void *cb_arg, int reduce_errno) 838 { 839 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 840 841 if (destroy_ctx->reduce_errno == 0) { 842 if (unlink(destroy_ctx->pm_path)) { 843 SPDK_ERRLOG("%s could not be unlinked: %s\n", 844 destroy_ctx->pm_path, strerror(errno)); 845 } 846 } 847 848 /* Even if the unload somehow failed, we still pass the destroy_ctx 849 * reduce_errno since that indicates whether or not the volume was 850 * actually destroyed. 851 */ 852 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 853 spdk_free(destroy_ctx->super); 854 free(destroy_ctx); 855 } 856 857 static void 858 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 859 { 860 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 861 struct spdk_reduce_vol *vol = destroy_ctx->vol; 862 863 destroy_ctx->reduce_errno = reduce_errno; 864 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 865 } 866 867 static void 868 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 869 { 870 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 871 872 if (reduce_errno != 0) { 873 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 874 spdk_free(destroy_ctx->super); 875 free(destroy_ctx); 876 return; 877 } 878 879 destroy_ctx->vol = vol; 880 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 881 destroy_ctx->iov.iov_base = destroy_ctx->super; 882 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 883 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 884 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 885 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 886 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 887 &destroy_ctx->backing_cb_args); 888 } 889 890 void 891 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 892 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 893 { 894 struct reduce_destroy_ctx *destroy_ctx; 895 896 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 897 if (destroy_ctx == NULL) { 898 cb_fn(cb_arg, -ENOMEM); 899 return; 900 } 901 902 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 903 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 904 if (destroy_ctx->super == NULL) { 905 free(destroy_ctx); 906 cb_fn(cb_arg, -ENOMEM); 907 return; 908 } 909 destroy_ctx->cb_fn = cb_fn; 910 destroy_ctx->cb_arg = cb_arg; 911 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 912 } 913 914 static bool 915 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 916 { 917 uint64_t start_chunk, end_chunk; 918 919 start_chunk = offset / vol->logical_blocks_per_chunk; 920 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 921 922 return (start_chunk != end_chunk); 923 } 924 925 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 926 927 static void 928 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 929 { 930 struct spdk_reduce_vol_request *next_req; 931 struct spdk_reduce_vol *vol = req->vol; 932 933 req->cb_fn(req->cb_arg, reduce_errno); 934 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 935 936 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 937 if (next_req->logical_map_index == req->logical_map_index) { 938 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 939 if (next_req->type == REDUCE_IO_READV) { 940 _start_readv_request(next_req); 941 } else { 942 assert(next_req->type == REDUCE_IO_WRITEV); 943 _start_writev_request(next_req); 944 } 945 break; 946 } 947 } 948 949 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 950 } 951 952 static void 953 _write_write_done(void *_req, int reduce_errno) 954 { 955 struct spdk_reduce_vol_request *req = _req; 956 struct spdk_reduce_vol *vol = req->vol; 957 uint64_t old_chunk_map_index; 958 struct spdk_reduce_chunk_map *old_chunk; 959 uint32_t i; 960 961 if (reduce_errno != 0) { 962 req->reduce_errno = reduce_errno; 963 } 964 965 assert(req->num_backing_ops > 0); 966 if (--req->num_backing_ops > 0) { 967 return; 968 } 969 970 if (req->reduce_errno != 0) { 971 _reduce_vol_complete_req(req, req->reduce_errno); 972 return; 973 } 974 975 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 976 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 977 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 978 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 979 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 980 break; 981 } 982 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 983 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 984 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 985 } 986 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 987 } 988 989 /* 990 * We don't need to persist the clearing of the old chunk map here. The old chunk map 991 * becomes invalid after we update the logical map, since the old chunk map will no 992 * longer have a reference to it in the logical map. 993 */ 994 995 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 996 _reduce_persist(vol, req->chunk, _reduce_vol_get_chunk_struct_size(vol)); 997 998 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 999 1000 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1001 1002 _reduce_vol_complete_req(req, 0); 1003 } 1004 1005 static void 1006 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1007 reduce_request_fn next_fn, bool is_write) 1008 { 1009 struct iovec *iov; 1010 uint8_t *buf; 1011 uint32_t i; 1012 1013 if (req->chunk_is_compressed) { 1014 iov = req->comp_buf_iov; 1015 buf = req->comp_buf; 1016 } else { 1017 iov = req->decomp_buf_iov; 1018 buf = req->decomp_buf; 1019 } 1020 1021 req->num_backing_ops = req->num_io_units; 1022 req->backing_cb_args.cb_fn = next_fn; 1023 req->backing_cb_args.cb_arg = req; 1024 for (i = 0; i < req->num_io_units; i++) { 1025 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1026 iov[i].iov_len = vol->params.backing_io_unit_size; 1027 if (is_write) { 1028 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1029 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1030 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1031 } else { 1032 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1033 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1034 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1035 } 1036 } 1037 } 1038 1039 static void 1040 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1041 uint32_t compressed_size) 1042 { 1043 struct spdk_reduce_vol *vol = req->vol; 1044 uint32_t i; 1045 1046 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1047 1048 /* TODO: fail if no chunk map found - but really this should not happen if we 1049 * size the number of requests similarly to number of extra chunk maps 1050 */ 1051 assert(req->chunk_map_index != UINT32_MAX); 1052 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1053 1054 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1055 req->num_io_units = spdk_divide_round_up(compressed_size, 1056 vol->params.backing_io_unit_size); 1057 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1058 req->chunk->compressed_size = 1059 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1060 1061 for (i = 0; i < req->num_io_units; i++) { 1062 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1063 /* TODO: fail if no backing block found - but really this should also not 1064 * happen (see comment above). 1065 */ 1066 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1067 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1068 } 1069 while (i < vol->backing_io_units_per_chunk) { 1070 req->chunk->io_unit_index[i++] = REDUCE_EMPTY_MAP_ENTRY; 1071 } 1072 1073 _issue_backing_ops(req, vol, next_fn, true /* write */); 1074 } 1075 1076 static void 1077 _write_compress_done(void *_req, int reduce_errno) 1078 { 1079 struct spdk_reduce_vol_request *req = _req; 1080 1081 /* Negative reduce_errno indicates failure for compression operations. 1082 * Just write the uncompressed data instead. Force this to happen 1083 * by just passing the full chunk size to _reduce_vol_write_chunk. 1084 * When it sees the data couldn't be compressed, it will just write 1085 * the uncompressed buffer to disk. 1086 */ 1087 if (reduce_errno < 0) { 1088 reduce_errno = req->vol->params.chunk_size; 1089 } 1090 1091 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1092 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1093 } 1094 1095 static void 1096 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1097 { 1098 struct spdk_reduce_vol *vol = req->vol; 1099 1100 req->backing_cb_args.cb_fn = next_fn; 1101 req->backing_cb_args.cb_arg = req; 1102 req->comp_buf_iov[0].iov_base = req->comp_buf; 1103 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1104 vol->backing_dev->compress(vol->backing_dev, 1105 &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1, 1106 &req->backing_cb_args); 1107 } 1108 1109 static void 1110 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1111 { 1112 struct spdk_reduce_vol *vol = req->vol; 1113 1114 req->backing_cb_args.cb_fn = next_fn; 1115 req->backing_cb_args.cb_arg = req; 1116 req->comp_buf_iov[0].iov_base = req->comp_buf; 1117 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1118 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1119 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1120 vol->backing_dev->decompress(vol->backing_dev, 1121 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1122 &req->backing_cb_args); 1123 } 1124 1125 static void 1126 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1127 { 1128 struct spdk_reduce_vol *vol = req->vol; 1129 uint64_t chunk_offset, remainder = 0; 1130 uint64_t ttl_len = 0; 1131 int i; 1132 1133 req->decomp_iovcnt = 0; 1134 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1135 1136 if (chunk_offset) { 1137 /* first iov point to our scratch buffer for any offset into the chunk */ 1138 req->decomp_iov[0].iov_base = req->decomp_buf; 1139 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1140 ttl_len += req->decomp_iov[0].iov_len; 1141 req->decomp_iovcnt = 1; 1142 } 1143 1144 /* now the user data iov, direct to the user buffer */ 1145 for (i = 0; i < req->iovcnt; i++) { 1146 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1147 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1148 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1149 } 1150 req->decomp_iovcnt += req->iovcnt; 1151 1152 /* send the rest of the chunk to our scratch buffer */ 1153 remainder = vol->params.chunk_size - ttl_len; 1154 if (remainder) { 1155 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1156 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1157 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1158 req->decomp_iovcnt++; 1159 } 1160 assert(ttl_len == vol->params.chunk_size); 1161 1162 req->backing_cb_args.cb_fn = next_fn; 1163 req->backing_cb_args.cb_arg = req; 1164 req->comp_buf_iov[0].iov_base = req->comp_buf; 1165 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1166 vol->backing_dev->decompress(vol->backing_dev, 1167 req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt, 1168 &req->backing_cb_args); 1169 } 1170 1171 static void 1172 _write_decompress_done(void *_req, int reduce_errno) 1173 { 1174 struct spdk_reduce_vol_request *req = _req; 1175 struct spdk_reduce_vol *vol = req->vol; 1176 uint64_t chunk_offset, ttl_len = 0; 1177 int i; 1178 1179 /* Negative reduce_errno indicates failure for compression operations. */ 1180 if (reduce_errno < 0) { 1181 _reduce_vol_complete_req(req, reduce_errno); 1182 return; 1183 } 1184 1185 /* Positive reduce_errno indicates number of bytes in decompressed 1186 * buffer. This should equal the chunk size - otherwise that's another 1187 * type of failure. 1188 */ 1189 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1190 _reduce_vol_complete_req(req, -EIO); 1191 return; 1192 } 1193 1194 req->decomp_iovcnt = 0; 1195 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1196 1197 if (chunk_offset) { 1198 req->decomp_iov[0].iov_base = req->decomp_buf; 1199 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1200 ttl_len += req->decomp_iov[0].iov_len; 1201 req->decomp_iovcnt = 1; 1202 } 1203 1204 for (i = 0; i < req->iovcnt; i++) { 1205 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1206 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1207 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1208 } 1209 req->decomp_iovcnt += req->iovcnt; 1210 1211 if (ttl_len < req->vol->params.chunk_size) { 1212 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1213 req->decomp_iov[req->decomp_iovcnt].iov_len = req->vol->params.chunk_size - ttl_len; 1214 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1215 req->decomp_iovcnt++; 1216 } 1217 assert(ttl_len == vol->params.chunk_size); 1218 1219 _reduce_vol_compress_chunk(req, _write_compress_done); 1220 } 1221 1222 static void 1223 _write_read_done(void *_req, int reduce_errno) 1224 { 1225 struct spdk_reduce_vol_request *req = _req; 1226 1227 if (reduce_errno != 0) { 1228 req->reduce_errno = reduce_errno; 1229 } 1230 1231 assert(req->num_backing_ops > 0); 1232 if (--req->num_backing_ops > 0) { 1233 return; 1234 } 1235 1236 if (req->reduce_errno != 0) { 1237 _reduce_vol_complete_req(req, req->reduce_errno); 1238 return; 1239 } 1240 1241 if (req->chunk_is_compressed) { 1242 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1243 } else { 1244 _write_decompress_done(req, req->chunk->compressed_size); 1245 } 1246 } 1247 1248 static void 1249 _read_decompress_done(void *_req, int reduce_errno) 1250 { 1251 struct spdk_reduce_vol_request *req = _req; 1252 struct spdk_reduce_vol *vol = req->vol; 1253 1254 /* Negative reduce_errno indicates failure for compression operations. */ 1255 if (reduce_errno < 0) { 1256 _reduce_vol_complete_req(req, reduce_errno); 1257 return; 1258 } 1259 1260 /* Positive reduce_errno indicates number of bytes in decompressed 1261 * buffer. This should equal the chunk size - otherwise that's another 1262 * type of failure. 1263 */ 1264 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1265 _reduce_vol_complete_req(req, -EIO); 1266 return; 1267 } 1268 1269 _reduce_vol_complete_req(req, 0); 1270 } 1271 1272 static void 1273 _read_read_done(void *_req, int reduce_errno) 1274 { 1275 struct spdk_reduce_vol_request *req = _req; 1276 1277 if (reduce_errno != 0) { 1278 req->reduce_errno = reduce_errno; 1279 } 1280 1281 assert(req->num_backing_ops > 0); 1282 if (--req->num_backing_ops > 0) { 1283 return; 1284 } 1285 1286 if (req->reduce_errno != 0) { 1287 _reduce_vol_complete_req(req, req->reduce_errno); 1288 return; 1289 } 1290 1291 if (req->chunk_is_compressed) { 1292 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1293 } else { 1294 _read_decompress_done(req, req->chunk->compressed_size); 1295 } 1296 } 1297 1298 static void 1299 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1300 { 1301 struct spdk_reduce_vol *vol = req->vol; 1302 1303 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1304 assert(req->chunk_map_index != UINT32_MAX); 1305 1306 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1307 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1308 vol->params.backing_io_unit_size); 1309 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1310 1311 _issue_backing_ops(req, vol, next_fn, false /* read */); 1312 } 1313 1314 static bool 1315 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1316 uint64_t length) 1317 { 1318 uint64_t size = 0; 1319 int i; 1320 1321 for (i = 0; i < iovcnt; i++) { 1322 size += iov[i].iov_len; 1323 } 1324 1325 return size == (length * vol->params.logical_block_size); 1326 } 1327 1328 static bool 1329 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1330 { 1331 struct spdk_reduce_vol_request *req; 1332 1333 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1334 if (logical_map_index == req->logical_map_index) { 1335 return true; 1336 } 1337 } 1338 1339 return false; 1340 } 1341 1342 static void 1343 _start_readv_request(struct spdk_reduce_vol_request *req) 1344 { 1345 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1346 _reduce_vol_read_chunk(req, _read_read_done); 1347 } 1348 1349 void 1350 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1351 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1352 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1353 { 1354 struct spdk_reduce_vol_request *req; 1355 uint64_t logical_map_index; 1356 bool overlapped; 1357 int i; 1358 1359 if (length == 0) { 1360 cb_fn(cb_arg, 0); 1361 return; 1362 } 1363 1364 if (_request_spans_chunk_boundary(vol, offset, length)) { 1365 cb_fn(cb_arg, -EINVAL); 1366 return; 1367 } 1368 1369 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1370 cb_fn(cb_arg, -EINVAL); 1371 return; 1372 } 1373 1374 logical_map_index = offset / vol->logical_blocks_per_chunk; 1375 overlapped = _check_overlap(vol, logical_map_index); 1376 1377 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1378 /* 1379 * This chunk hasn't been allocated. So treat the data as all 1380 * zeroes for this chunk - do the memset and immediately complete 1381 * the operation. 1382 */ 1383 for (i = 0; i < iovcnt; i++) { 1384 memset(iov[i].iov_base, 0, iov[i].iov_len); 1385 } 1386 cb_fn(cb_arg, 0); 1387 return; 1388 } 1389 1390 req = TAILQ_FIRST(&vol->free_requests); 1391 if (req == NULL) { 1392 cb_fn(cb_arg, -ENOMEM); 1393 return; 1394 } 1395 1396 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1397 req->type = REDUCE_IO_READV; 1398 req->vol = vol; 1399 req->iov = iov; 1400 req->iovcnt = iovcnt; 1401 req->offset = offset; 1402 req->logical_map_index = logical_map_index; 1403 req->length = length; 1404 req->cb_fn = cb_fn; 1405 req->cb_arg = cb_arg; 1406 1407 if (!overlapped) { 1408 _start_readv_request(req); 1409 } else { 1410 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1411 } 1412 } 1413 1414 static void 1415 _start_writev_request(struct spdk_reduce_vol_request *req) 1416 { 1417 struct spdk_reduce_vol *vol = req->vol; 1418 uint64_t chunk_offset, ttl_len = 0; 1419 uint64_t remainder = 0; 1420 uint32_t lbsize, lb_per_chunk; 1421 int i; 1422 1423 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1424 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1425 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1426 /* Read old chunk, then overwrite with data from this write 1427 * operation. 1428 */ 1429 _reduce_vol_read_chunk(req, _write_read_done); 1430 return; 1431 } 1432 } 1433 1434 lbsize = vol->params.logical_block_size; 1435 lb_per_chunk = vol->logical_blocks_per_chunk; 1436 req->decomp_iovcnt = 0; 1437 1438 /* Note: point to our zero buf for offset into the chunk. */ 1439 chunk_offset = req->offset % lb_per_chunk; 1440 if (chunk_offset != 0) { 1441 ttl_len += chunk_offset * lbsize; 1442 req->decomp_iov[0].iov_base = g_zero_buf; 1443 req->decomp_iov[0].iov_len = ttl_len; 1444 req->decomp_iovcnt = 1; 1445 } 1446 1447 /* now the user data iov, direct to the user buffer */ 1448 for (i = 0; i < req->iovcnt; i++) { 1449 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1450 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1451 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1452 } 1453 req->decomp_iovcnt += req->iovcnt; 1454 1455 chunk_offset += req->length; 1456 if (chunk_offset != lb_per_chunk) { 1457 remainder = (lb_per_chunk - chunk_offset) * lbsize; 1458 req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf; 1459 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1460 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1461 req->decomp_iovcnt++; 1462 } 1463 assert(ttl_len == req->vol->params.chunk_size); 1464 1465 _reduce_vol_compress_chunk(req, _write_compress_done); 1466 } 1467 1468 void 1469 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1470 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1471 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1472 { 1473 struct spdk_reduce_vol_request *req; 1474 uint64_t logical_map_index; 1475 bool overlapped; 1476 1477 if (length == 0) { 1478 cb_fn(cb_arg, 0); 1479 return; 1480 } 1481 1482 if (_request_spans_chunk_boundary(vol, offset, length)) { 1483 cb_fn(cb_arg, -EINVAL); 1484 return; 1485 } 1486 1487 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1488 cb_fn(cb_arg, -EINVAL); 1489 return; 1490 } 1491 1492 logical_map_index = offset / vol->logical_blocks_per_chunk; 1493 overlapped = _check_overlap(vol, logical_map_index); 1494 1495 req = TAILQ_FIRST(&vol->free_requests); 1496 if (req == NULL) { 1497 cb_fn(cb_arg, -ENOMEM); 1498 return; 1499 } 1500 1501 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1502 req->type = REDUCE_IO_WRITEV; 1503 req->vol = vol; 1504 req->iov = iov; 1505 req->iovcnt = iovcnt; 1506 req->offset = offset; 1507 req->logical_map_index = logical_map_index; 1508 req->length = length; 1509 req->cb_fn = cb_fn; 1510 req->cb_arg = cb_arg; 1511 1512 if (!overlapped) { 1513 _start_writev_request(req); 1514 } else { 1515 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1516 } 1517 } 1518 1519 const struct spdk_reduce_vol_params * 1520 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1521 { 1522 return &vol->params; 1523 } 1524 1525 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1526 { 1527 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1528 uint32_t struct_size; 1529 uint64_t chunk_map_size; 1530 1531 SPDK_NOTICELOG("vol info:\n"); 1532 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1533 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1534 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1535 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1536 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1537 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1538 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1539 vol->params.vol_size / vol->params.chunk_size); 1540 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1541 vol->params.backing_io_unit_size); 1542 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1543 struct_size = _reduce_vol_get_chunk_struct_size(vol); 1544 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1545 1546 SPDK_NOTICELOG("pmem info:\n"); 1547 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1548 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1549 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1550 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1551 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1552 vol->params.chunk_size); 1553 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1554 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1555 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1556 vol->params.backing_io_unit_size); 1557 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1558 } 1559 1560 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1561