1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 #define REDUCE_IO_READV 1 82 #define REDUCE_IO_WRITEV 2 83 84 struct spdk_reduce_chunk_map { 85 uint32_t compressed_size; 86 uint32_t reserved; 87 uint64_t io_unit_index[0]; 88 }; 89 90 struct spdk_reduce_vol_request { 91 /** 92 * Scratch buffer used for uncompressed chunk. This is used for: 93 * 1) source buffer for compression operations 94 * 2) destination buffer for decompression operations 95 * 3) data buffer when writing uncompressed chunk to disk 96 * 4) data buffer when reading uncompressed chunk from disk 97 */ 98 uint8_t *decomp_buf; 99 struct iovec *decomp_buf_iov; 100 101 /** 102 * These are used to construct the iovecs that are sent to 103 * the decomp engine, they point to a mix of the scratch buffer 104 * and user buffer 105 */ 106 struct iovec decomp_iov[REDUCE_MAX_IOVECS]; 107 int decomp_iovcnt; 108 109 /** 110 * Scratch buffer used for compressed chunk. This is used for: 111 * 1) destination buffer for compression operations 112 * 2) source buffer for decompression operations 113 * 3) data buffer when writing compressed chunk to disk 114 * 4) data buffer when reading compressed chunk from disk 115 */ 116 uint8_t *comp_buf; 117 struct iovec *comp_buf_iov; 118 struct iovec *iov; 119 bool rmw; 120 struct spdk_reduce_vol *vol; 121 int type; 122 int reduce_errno; 123 int iovcnt; 124 int num_backing_ops; 125 uint32_t num_io_units; 126 bool chunk_is_compressed; 127 uint64_t offset; 128 uint64_t logical_map_index; 129 uint64_t length; 130 uint64_t chunk_map_index; 131 struct spdk_reduce_chunk_map *chunk; 132 spdk_reduce_vol_op_complete cb_fn; 133 void *cb_arg; 134 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 135 struct spdk_reduce_vol_cb_args backing_cb_args; 136 }; 137 138 struct spdk_reduce_vol { 139 struct spdk_reduce_vol_params params; 140 uint32_t backing_io_units_per_chunk; 141 uint32_t backing_lba_per_io_unit; 142 uint32_t logical_blocks_per_chunk; 143 struct spdk_reduce_pm_file pm_file; 144 struct spdk_reduce_backing_dev *backing_dev; 145 struct spdk_reduce_vol_superblock *backing_super; 146 struct spdk_reduce_vol_superblock *pm_super; 147 uint64_t *pm_logical_map; 148 uint64_t *pm_chunk_maps; 149 150 struct spdk_bit_array *allocated_chunk_maps; 151 struct spdk_bit_array *allocated_backing_io_units; 152 153 struct spdk_reduce_vol_request *request_mem; 154 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 155 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 156 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 157 158 /* Single contiguous buffer used for all request buffers for this volume. */ 159 uint8_t *buf_mem; 160 struct iovec *buf_iov_mem; 161 }; 162 163 static void _start_readv_request(struct spdk_reduce_vol_request *req); 164 static void _start_writev_request(struct spdk_reduce_vol_request *req); 165 static uint8_t *g_zero_buf; 166 static int g_vol_count = 0; 167 168 /* 169 * Allocate extra metadata chunks and corresponding backing io units to account for 170 * outstanding IO in worst case scenario where logical map is completely allocated 171 * and no data can be compressed. We need extra chunks in this case to handle 172 * in-flight writes since reduce never writes data in place. 173 */ 174 #define REDUCE_NUM_EXTRA_CHUNKS 128 175 176 static void 177 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 178 { 179 if (vol->pm_file.pm_is_pmem) { 180 pmem_persist(addr, len); 181 } else { 182 pmem_msync(addr, len); 183 } 184 } 185 186 static uint64_t 187 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 188 { 189 uint64_t chunks_in_logical_map, logical_map_size; 190 191 chunks_in_logical_map = vol_size / chunk_size; 192 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 193 194 /* Round up to next cacheline. */ 195 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 196 REDUCE_PM_SIZE_ALIGNMENT; 197 } 198 199 static uint64_t 200 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 201 { 202 uint64_t num_chunks; 203 204 num_chunks = vol_size / chunk_size; 205 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 206 207 return num_chunks; 208 } 209 210 static inline uint32_t 211 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 212 { 213 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 214 } 215 216 static uint64_t 217 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 218 { 219 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 220 221 num_chunks = _get_total_chunks(vol_size, chunk_size); 222 io_units_per_chunk = chunk_size / backing_io_unit_size; 223 224 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 225 226 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 227 REDUCE_PM_SIZE_ALIGNMENT; 228 } 229 230 static struct spdk_reduce_chunk_map * 231 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 232 { 233 uintptr_t chunk_map_addr; 234 235 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 236 237 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 238 chunk_map_addr += chunk_map_index * 239 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 240 241 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 242 } 243 244 static int 245 _validate_vol_params(struct spdk_reduce_vol_params *params) 246 { 247 if (params->vol_size > 0) { 248 /** 249 * User does not pass in the vol size - it gets calculated by libreduce from 250 * values in this structure plus the size of the backing device. 251 */ 252 return -EINVAL; 253 } 254 255 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 256 params->logical_block_size == 0) { 257 return -EINVAL; 258 } 259 260 /* Chunk size must be an even multiple of the backing io unit size. */ 261 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 262 return -EINVAL; 263 } 264 265 /* Chunk size must be an even multiple of the logical block size. */ 266 if ((params->chunk_size % params->logical_block_size) != 0) { 267 return -1; 268 } 269 270 return 0; 271 } 272 273 static uint64_t 274 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 275 { 276 uint64_t num_chunks; 277 278 num_chunks = backing_dev_size / chunk_size; 279 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 280 return 0; 281 } 282 283 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 284 return num_chunks * chunk_size; 285 } 286 287 static uint64_t 288 _get_pm_file_size(struct spdk_reduce_vol_params *params) 289 { 290 uint64_t total_pm_size; 291 292 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 293 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 294 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 295 params->backing_io_unit_size); 296 return total_pm_size; 297 } 298 299 const struct spdk_uuid * 300 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 301 { 302 return &vol->params.uuid; 303 } 304 305 static void 306 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 307 { 308 uint64_t logical_map_size; 309 310 /* Superblock is at the beginning of the pm file. */ 311 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 312 313 /* Logical map immediately follows the super block. */ 314 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 315 316 /* Chunks maps follow the logical map. */ 317 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 318 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 319 } 320 321 /* We need 2 iovs during load - one for the superblock, another for the path */ 322 #define LOAD_IOV_COUNT 2 323 324 struct reduce_init_load_ctx { 325 struct spdk_reduce_vol *vol; 326 struct spdk_reduce_vol_cb_args backing_cb_args; 327 spdk_reduce_vol_op_with_handle_complete cb_fn; 328 void *cb_arg; 329 struct iovec iov[LOAD_IOV_COUNT]; 330 void *path; 331 }; 332 333 static int 334 _allocate_vol_requests(struct spdk_reduce_vol *vol) 335 { 336 struct spdk_reduce_vol_request *req; 337 int i; 338 339 /* Allocate 2x since we need buffers for both read/write and compress/decompress 340 * intermediate buffers. 341 */ 342 vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 343 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 344 if (vol->buf_mem == NULL) { 345 return -ENOMEM; 346 } 347 348 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 349 if (vol->request_mem == NULL) { 350 spdk_free(vol->buf_mem); 351 vol->buf_mem = NULL; 352 return -ENOMEM; 353 } 354 355 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 356 * buffers. 357 */ 358 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 359 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 360 if (vol->buf_iov_mem == NULL) { 361 free(vol->request_mem); 362 spdk_free(vol->buf_mem); 363 vol->request_mem = NULL; 364 vol->buf_mem = NULL; 365 return -ENOMEM; 366 } 367 368 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 369 req = &vol->request_mem[i]; 370 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 371 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 372 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 373 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 374 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 375 } 376 377 return 0; 378 } 379 380 static void 381 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 382 { 383 if (ctx != NULL) { 384 spdk_free(ctx->path); 385 free(ctx); 386 } 387 388 if (vol != NULL) { 389 if (vol->pm_file.pm_buf != NULL) { 390 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 391 } 392 393 spdk_free(vol->backing_super); 394 spdk_bit_array_free(&vol->allocated_chunk_maps); 395 spdk_bit_array_free(&vol->allocated_backing_io_units); 396 free(vol->request_mem); 397 free(vol->buf_iov_mem); 398 spdk_free(vol->buf_mem); 399 free(vol); 400 } 401 } 402 403 static int 404 _alloc_zero_buff(struct spdk_reduce_vol *vol) 405 { 406 int rc = 0; 407 408 /* The zero buffer is shared between all volumnes and just used 409 * for reads so allocate one global instance here if not already 410 * allocated when another vol init'd or loaded. 411 */ 412 if (g_vol_count++ == 0) { 413 g_zero_buf = spdk_zmalloc(vol->params.chunk_size, 414 64, NULL, SPDK_ENV_LCORE_ID_ANY, 415 SPDK_MALLOC_DMA); 416 if (g_zero_buf == NULL) { 417 rc = -ENOMEM; 418 } 419 } 420 return rc; 421 } 422 423 static void 424 _init_write_super_cpl(void *cb_arg, int reduce_errno) 425 { 426 struct reduce_init_load_ctx *init_ctx = cb_arg; 427 int rc; 428 429 rc = _allocate_vol_requests(init_ctx->vol); 430 if (rc != 0) { 431 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 432 _init_load_cleanup(init_ctx->vol, init_ctx); 433 return; 434 } 435 436 rc = _alloc_zero_buff(init_ctx->vol); 437 if (rc != 0) { 438 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 439 _init_load_cleanup(init_ctx->vol, init_ctx); 440 return; 441 } 442 443 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 444 /* Only clean up the ctx - the vol has been passed to the application 445 * for use now that initialization was successful. 446 */ 447 _init_load_cleanup(NULL, init_ctx); 448 } 449 450 static void 451 _init_write_path_cpl(void *cb_arg, int reduce_errno) 452 { 453 struct reduce_init_load_ctx *init_ctx = cb_arg; 454 struct spdk_reduce_vol *vol = init_ctx->vol; 455 456 init_ctx->iov[0].iov_base = vol->backing_super; 457 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 458 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 459 init_ctx->backing_cb_args.cb_arg = init_ctx; 460 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 461 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 462 &init_ctx->backing_cb_args); 463 } 464 465 static int 466 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 467 { 468 uint64_t total_chunks, total_backing_io_units; 469 uint32_t i, num_metadata_io_units; 470 471 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 472 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 473 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 474 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 475 476 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 477 return -ENOMEM; 478 } 479 480 /* Set backing io unit bits associated with metadata. */ 481 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 482 vol->backing_dev->blocklen; 483 for (i = 0; i < num_metadata_io_units; i++) { 484 spdk_bit_array_set(vol->allocated_backing_io_units, i); 485 } 486 487 return 0; 488 } 489 490 void 491 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 492 struct spdk_reduce_backing_dev *backing_dev, 493 const char *pm_file_dir, 494 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 495 { 496 struct spdk_reduce_vol *vol; 497 struct reduce_init_load_ctx *init_ctx; 498 uint64_t backing_dev_size; 499 size_t mapped_len; 500 int dir_len, max_dir_len, rc; 501 502 /* We need to append a path separator and the UUID to the supplied 503 * path. 504 */ 505 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 506 dir_len = strnlen(pm_file_dir, max_dir_len); 507 /* Strip trailing slash if the user provided one - we will add it back 508 * later when appending the filename. 509 */ 510 if (pm_file_dir[dir_len - 1] == '/') { 511 dir_len--; 512 } 513 if (dir_len == max_dir_len) { 514 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 515 cb_fn(cb_arg, NULL, -EINVAL); 516 return; 517 } 518 519 rc = _validate_vol_params(params); 520 if (rc != 0) { 521 SPDK_ERRLOG("invalid vol params\n"); 522 cb_fn(cb_arg, NULL, rc); 523 return; 524 } 525 526 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 527 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 528 if (params->vol_size == 0) { 529 SPDK_ERRLOG("backing device is too small\n"); 530 cb_fn(cb_arg, NULL, -EINVAL); 531 return; 532 } 533 534 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 535 backing_dev->unmap == NULL) { 536 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 537 cb_fn(cb_arg, NULL, -EINVAL); 538 return; 539 } 540 541 vol = calloc(1, sizeof(*vol)); 542 if (vol == NULL) { 543 cb_fn(cb_arg, NULL, -ENOMEM); 544 return; 545 } 546 547 TAILQ_INIT(&vol->free_requests); 548 TAILQ_INIT(&vol->executing_requests); 549 TAILQ_INIT(&vol->queued_requests); 550 551 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 552 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 553 if (vol->backing_super == NULL) { 554 cb_fn(cb_arg, NULL, -ENOMEM); 555 _init_load_cleanup(vol, NULL); 556 return; 557 } 558 559 init_ctx = calloc(1, sizeof(*init_ctx)); 560 if (init_ctx == NULL) { 561 cb_fn(cb_arg, NULL, -ENOMEM); 562 _init_load_cleanup(vol, NULL); 563 return; 564 } 565 566 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 567 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 568 if (init_ctx->path == NULL) { 569 cb_fn(cb_arg, NULL, -ENOMEM); 570 _init_load_cleanup(vol, init_ctx); 571 return; 572 } 573 574 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 575 spdk_uuid_generate(¶ms->uuid); 576 } 577 578 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 579 vol->pm_file.path[dir_len] = '/'; 580 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 581 ¶ms->uuid); 582 vol->pm_file.size = _get_pm_file_size(params); 583 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 584 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 585 &mapped_len, &vol->pm_file.pm_is_pmem); 586 if (vol->pm_file.pm_buf == NULL) { 587 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 588 vol->pm_file.path, strerror(errno)); 589 cb_fn(cb_arg, NULL, -errno); 590 _init_load_cleanup(vol, init_ctx); 591 return; 592 } 593 594 if (vol->pm_file.size != mapped_len) { 595 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 596 vol->pm_file.size, mapped_len); 597 cb_fn(cb_arg, NULL, -ENOMEM); 598 _init_load_cleanup(vol, init_ctx); 599 return; 600 } 601 602 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 603 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 604 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 605 memcpy(&vol->params, params, sizeof(*params)); 606 607 vol->backing_dev = backing_dev; 608 609 rc = _allocate_bit_arrays(vol); 610 if (rc != 0) { 611 cb_fn(cb_arg, NULL, rc); 612 _init_load_cleanup(vol, init_ctx); 613 return; 614 } 615 616 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 617 sizeof(vol->backing_super->signature)); 618 memcpy(&vol->backing_super->params, params, sizeof(*params)); 619 620 _initialize_vol_pm_pointers(vol); 621 622 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 623 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 624 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 625 */ 626 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 627 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 628 629 init_ctx->vol = vol; 630 init_ctx->cb_fn = cb_fn; 631 init_ctx->cb_arg = cb_arg; 632 633 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 634 init_ctx->iov[0].iov_base = init_ctx->path; 635 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 636 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 637 init_ctx->backing_cb_args.cb_arg = init_ctx; 638 /* Write path to offset 4K on backing device - just after where the super 639 * block will be written. We wait until this is committed before writing the 640 * super block to guarantee we don't get the super block written without the 641 * the path if the system crashed in the middle of a write operation. 642 */ 643 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 644 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 645 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 646 &init_ctx->backing_cb_args); 647 } 648 649 static void 650 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 651 { 652 struct reduce_init_load_ctx *load_ctx = cb_arg; 653 struct spdk_reduce_vol *vol = load_ctx->vol; 654 uint64_t backing_dev_size; 655 uint64_t i, num_chunks, logical_map_index; 656 struct spdk_reduce_chunk_map *chunk; 657 size_t mapped_len; 658 uint32_t j; 659 int rc; 660 661 if (memcmp(vol->backing_super->signature, 662 SPDK_REDUCE_SIGNATURE, 663 sizeof(vol->backing_super->signature)) != 0) { 664 /* This backing device isn't a libreduce backing device. */ 665 rc = -EILSEQ; 666 goto error; 667 } 668 669 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 670 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 671 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 672 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 673 674 rc = _allocate_bit_arrays(vol); 675 if (rc != 0) { 676 goto error; 677 } 678 679 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 680 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 681 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 682 backing_dev_size); 683 rc = -EILSEQ; 684 goto error; 685 } 686 687 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 688 vol->pm_file.size = _get_pm_file_size(&vol->params); 689 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 690 &vol->pm_file.pm_is_pmem); 691 if (vol->pm_file.pm_buf == NULL) { 692 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 693 rc = -errno; 694 goto error; 695 } 696 697 if (vol->pm_file.size != mapped_len) { 698 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 699 vol->pm_file.size, mapped_len); 700 rc = -ENOMEM; 701 goto error; 702 } 703 704 rc = _allocate_vol_requests(vol); 705 if (rc != 0) { 706 goto error; 707 } 708 709 _initialize_vol_pm_pointers(vol); 710 711 num_chunks = vol->params.vol_size / vol->params.chunk_size; 712 for (i = 0; i < num_chunks; i++) { 713 logical_map_index = vol->pm_logical_map[i]; 714 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 715 continue; 716 } 717 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 718 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 719 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 720 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 721 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 722 } 723 } 724 } 725 726 rc = _alloc_zero_buff(vol); 727 if (rc) { 728 goto error; 729 } 730 731 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 732 /* Only clean up the ctx - the vol has been passed to the application 733 * for use now that volume load was successful. 734 */ 735 _init_load_cleanup(NULL, load_ctx); 736 return; 737 738 error: 739 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 740 _init_load_cleanup(vol, load_ctx); 741 } 742 743 void 744 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 745 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 746 { 747 struct spdk_reduce_vol *vol; 748 struct reduce_init_load_ctx *load_ctx; 749 750 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 751 backing_dev->unmap == NULL) { 752 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 753 cb_fn(cb_arg, NULL, -EINVAL); 754 return; 755 } 756 757 vol = calloc(1, sizeof(*vol)); 758 if (vol == NULL) { 759 cb_fn(cb_arg, NULL, -ENOMEM); 760 return; 761 } 762 763 TAILQ_INIT(&vol->free_requests); 764 TAILQ_INIT(&vol->executing_requests); 765 TAILQ_INIT(&vol->queued_requests); 766 767 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 768 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 769 if (vol->backing_super == NULL) { 770 _init_load_cleanup(vol, NULL); 771 cb_fn(cb_arg, NULL, -ENOMEM); 772 return; 773 } 774 775 vol->backing_dev = backing_dev; 776 777 load_ctx = calloc(1, sizeof(*load_ctx)); 778 if (load_ctx == NULL) { 779 _init_load_cleanup(vol, NULL); 780 cb_fn(cb_arg, NULL, -ENOMEM); 781 return; 782 } 783 784 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 785 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 786 if (load_ctx->path == NULL) { 787 _init_load_cleanup(vol, load_ctx); 788 cb_fn(cb_arg, NULL, -ENOMEM); 789 return; 790 } 791 792 load_ctx->vol = vol; 793 load_ctx->cb_fn = cb_fn; 794 load_ctx->cb_arg = cb_arg; 795 796 load_ctx->iov[0].iov_base = vol->backing_super; 797 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 798 load_ctx->iov[1].iov_base = load_ctx->path; 799 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 800 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 801 load_ctx->backing_cb_args.cb_arg = load_ctx; 802 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 803 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 804 vol->backing_dev->blocklen, 805 &load_ctx->backing_cb_args); 806 } 807 808 void 809 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 810 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 811 { 812 if (vol == NULL) { 813 /* This indicates a programming error. */ 814 assert(false); 815 cb_fn(cb_arg, -EINVAL); 816 return; 817 } 818 819 if (--g_vol_count == 0) { 820 spdk_free(g_zero_buf); 821 } 822 _init_load_cleanup(vol, NULL); 823 cb_fn(cb_arg, 0); 824 } 825 826 struct reduce_destroy_ctx { 827 spdk_reduce_vol_op_complete cb_fn; 828 void *cb_arg; 829 struct spdk_reduce_vol *vol; 830 struct spdk_reduce_vol_superblock *super; 831 struct iovec iov; 832 struct spdk_reduce_vol_cb_args backing_cb_args; 833 int reduce_errno; 834 char pm_path[REDUCE_PATH_MAX]; 835 }; 836 837 static void 838 destroy_unload_cpl(void *cb_arg, int reduce_errno) 839 { 840 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 841 842 if (destroy_ctx->reduce_errno == 0) { 843 if (unlink(destroy_ctx->pm_path)) { 844 SPDK_ERRLOG("%s could not be unlinked: %s\n", 845 destroy_ctx->pm_path, strerror(errno)); 846 } 847 } 848 849 /* Even if the unload somehow failed, we still pass the destroy_ctx 850 * reduce_errno since that indicates whether or not the volume was 851 * actually destroyed. 852 */ 853 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 854 spdk_free(destroy_ctx->super); 855 free(destroy_ctx); 856 } 857 858 static void 859 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 860 { 861 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 862 struct spdk_reduce_vol *vol = destroy_ctx->vol; 863 864 destroy_ctx->reduce_errno = reduce_errno; 865 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 866 } 867 868 static void 869 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 870 { 871 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 872 873 if (reduce_errno != 0) { 874 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 875 spdk_free(destroy_ctx->super); 876 free(destroy_ctx); 877 return; 878 } 879 880 destroy_ctx->vol = vol; 881 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 882 destroy_ctx->iov.iov_base = destroy_ctx->super; 883 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 884 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 885 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 886 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 887 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 888 &destroy_ctx->backing_cb_args); 889 } 890 891 void 892 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 893 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 894 { 895 struct reduce_destroy_ctx *destroy_ctx; 896 897 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 898 if (destroy_ctx == NULL) { 899 cb_fn(cb_arg, -ENOMEM); 900 return; 901 } 902 903 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 904 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 905 if (destroy_ctx->super == NULL) { 906 free(destroy_ctx); 907 cb_fn(cb_arg, -ENOMEM); 908 return; 909 } 910 destroy_ctx->cb_fn = cb_fn; 911 destroy_ctx->cb_arg = cb_arg; 912 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 913 } 914 915 static bool 916 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 917 { 918 uint64_t start_chunk, end_chunk; 919 920 start_chunk = offset / vol->logical_blocks_per_chunk; 921 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 922 923 return (start_chunk != end_chunk); 924 } 925 926 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 927 928 static void 929 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 930 { 931 struct spdk_reduce_vol_request *next_req; 932 struct spdk_reduce_vol *vol = req->vol; 933 934 req->cb_fn(req->cb_arg, reduce_errno); 935 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 936 937 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 938 if (next_req->logical_map_index == req->logical_map_index) { 939 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 940 if (next_req->type == REDUCE_IO_READV) { 941 _start_readv_request(next_req); 942 } else { 943 assert(next_req->type == REDUCE_IO_WRITEV); 944 _start_writev_request(next_req); 945 } 946 break; 947 } 948 } 949 950 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 951 } 952 953 static void 954 _write_write_done(void *_req, int reduce_errno) 955 { 956 struct spdk_reduce_vol_request *req = _req; 957 struct spdk_reduce_vol *vol = req->vol; 958 uint64_t old_chunk_map_index; 959 struct spdk_reduce_chunk_map *old_chunk; 960 uint32_t i; 961 962 if (reduce_errno != 0) { 963 req->reduce_errno = reduce_errno; 964 } 965 966 assert(req->num_backing_ops > 0); 967 if (--req->num_backing_ops > 0) { 968 return; 969 } 970 971 if (req->reduce_errno != 0) { 972 _reduce_vol_complete_req(req, req->reduce_errno); 973 return; 974 } 975 976 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 977 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 978 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 979 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 980 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 981 break; 982 } 983 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 984 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 985 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 986 } 987 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 988 } 989 990 /* 991 * We don't need to persist the clearing of the old chunk map here. The old chunk map 992 * becomes invalid after we update the logical map, since the old chunk map will no 993 * longer have a reference to it in the logical map. 994 */ 995 996 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 997 _reduce_persist(vol, req->chunk, 998 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 999 1000 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1001 1002 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1003 1004 _reduce_vol_complete_req(req, 0); 1005 } 1006 1007 static void 1008 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1009 reduce_request_fn next_fn, bool is_write) 1010 { 1011 struct iovec *iov; 1012 uint8_t *buf; 1013 uint32_t i; 1014 1015 if (req->chunk_is_compressed) { 1016 iov = req->comp_buf_iov; 1017 buf = req->comp_buf; 1018 } else { 1019 iov = req->decomp_buf_iov; 1020 buf = req->decomp_buf; 1021 } 1022 1023 req->num_backing_ops = req->num_io_units; 1024 req->backing_cb_args.cb_fn = next_fn; 1025 req->backing_cb_args.cb_arg = req; 1026 for (i = 0; i < req->num_io_units; i++) { 1027 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1028 iov[i].iov_len = vol->params.backing_io_unit_size; 1029 if (is_write) { 1030 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1031 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1032 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1033 } else { 1034 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1035 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1036 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1037 } 1038 } 1039 } 1040 1041 static void 1042 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1043 uint32_t compressed_size) 1044 { 1045 struct spdk_reduce_vol *vol = req->vol; 1046 uint32_t i; 1047 uint64_t chunk_offset, remainder, total_len = 0; 1048 uint8_t *buf; 1049 int j; 1050 1051 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1052 1053 /* TODO: fail if no chunk map found - but really this should not happen if we 1054 * size the number of requests similarly to number of extra chunk maps 1055 */ 1056 assert(req->chunk_map_index != UINT32_MAX); 1057 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1058 1059 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1060 req->num_io_units = spdk_divide_round_up(compressed_size, 1061 vol->params.backing_io_unit_size); 1062 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1063 req->chunk->compressed_size = 1064 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1065 1066 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1067 if (req->chunk_is_compressed == false) { 1068 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1069 buf = req->decomp_buf; 1070 total_len = chunk_offset * vol->params.logical_block_size; 1071 1072 /* zero any offset into chunk */ 1073 if (req->rmw == false && chunk_offset) { 1074 memset(buf, 0, total_len); 1075 } 1076 buf += total_len; 1077 1078 /* copy the data */ 1079 for (j = 0; j < req->iovcnt; j++) { 1080 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1081 buf += req->iov[j].iov_len; 1082 total_len += req->iov[j].iov_len; 1083 } 1084 1085 /* zero any remainder */ 1086 remainder = vol->params.chunk_size - total_len; 1087 total_len += remainder; 1088 if (req->rmw == false && remainder) { 1089 memset(buf, 0, remainder); 1090 } 1091 assert(total_len == vol->params.chunk_size); 1092 } 1093 1094 for (i = 0; i < req->num_io_units; i++) { 1095 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1096 /* TODO: fail if no backing block found - but really this should also not 1097 * happen (see comment above). 1098 */ 1099 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1100 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1101 } 1102 1103 _issue_backing_ops(req, vol, next_fn, true /* write */); 1104 } 1105 1106 static void 1107 _write_compress_done(void *_req, int reduce_errno) 1108 { 1109 struct spdk_reduce_vol_request *req = _req; 1110 1111 /* Negative reduce_errno indicates failure for compression operations. 1112 * Just write the uncompressed data instead. Force this to happen 1113 * by just passing the full chunk size to _reduce_vol_write_chunk. 1114 * When it sees the data couldn't be compressed, it will just write 1115 * the uncompressed buffer to disk. 1116 */ 1117 if (reduce_errno < 0) { 1118 reduce_errno = req->vol->params.chunk_size; 1119 } 1120 1121 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1122 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1123 } 1124 1125 static void 1126 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1127 { 1128 struct spdk_reduce_vol *vol = req->vol; 1129 1130 req->backing_cb_args.cb_fn = next_fn; 1131 req->backing_cb_args.cb_arg = req; 1132 req->comp_buf_iov[0].iov_base = req->comp_buf; 1133 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1134 vol->backing_dev->compress(vol->backing_dev, 1135 &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1, 1136 &req->backing_cb_args); 1137 } 1138 1139 static void 1140 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1141 { 1142 struct spdk_reduce_vol *vol = req->vol; 1143 1144 req->backing_cb_args.cb_fn = next_fn; 1145 req->backing_cb_args.cb_arg = req; 1146 req->comp_buf_iov[0].iov_base = req->comp_buf; 1147 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1148 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1149 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1150 vol->backing_dev->decompress(vol->backing_dev, 1151 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1152 &req->backing_cb_args); 1153 } 1154 1155 static void 1156 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1157 { 1158 struct spdk_reduce_vol *vol = req->vol; 1159 uint64_t chunk_offset, remainder = 0; 1160 uint64_t ttl_len = 0; 1161 int i; 1162 1163 req->decomp_iovcnt = 0; 1164 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1165 1166 if (chunk_offset) { 1167 /* first iov point to our scratch buffer for any offset into the chunk */ 1168 req->decomp_iov[0].iov_base = req->decomp_buf; 1169 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1170 ttl_len += req->decomp_iov[0].iov_len; 1171 req->decomp_iovcnt = 1; 1172 } 1173 1174 /* now the user data iov, direct to the user buffer */ 1175 for (i = 0; i < req->iovcnt; i++) { 1176 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1177 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1178 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1179 } 1180 req->decomp_iovcnt += req->iovcnt; 1181 1182 /* send the rest of the chunk to our scratch buffer */ 1183 remainder = vol->params.chunk_size - ttl_len; 1184 if (remainder) { 1185 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1186 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1187 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1188 req->decomp_iovcnt++; 1189 } 1190 assert(ttl_len == vol->params.chunk_size); 1191 1192 req->backing_cb_args.cb_fn = next_fn; 1193 req->backing_cb_args.cb_arg = req; 1194 req->comp_buf_iov[0].iov_base = req->comp_buf; 1195 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1196 vol->backing_dev->decompress(vol->backing_dev, 1197 req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt, 1198 &req->backing_cb_args); 1199 } 1200 1201 static void 1202 _write_decompress_done(void *_req, int reduce_errno) 1203 { 1204 struct spdk_reduce_vol_request *req = _req; 1205 struct spdk_reduce_vol *vol = req->vol; 1206 uint64_t chunk_offset, remainder, ttl_len = 0; 1207 int i; 1208 1209 /* Negative reduce_errno indicates failure for compression operations. */ 1210 if (reduce_errno < 0) { 1211 _reduce_vol_complete_req(req, reduce_errno); 1212 return; 1213 } 1214 1215 /* Positive reduce_errno indicates number of bytes in decompressed 1216 * buffer. This should equal the chunk size - otherwise that's another 1217 * type of failure. 1218 */ 1219 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1220 _reduce_vol_complete_req(req, -EIO); 1221 return; 1222 } 1223 1224 req->decomp_iovcnt = 0; 1225 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1226 1227 if (chunk_offset) { 1228 req->decomp_iov[0].iov_base = req->decomp_buf; 1229 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1230 ttl_len += req->decomp_iov[0].iov_len; 1231 req->decomp_iovcnt = 1; 1232 } 1233 1234 for (i = 0; i < req->iovcnt; i++) { 1235 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1236 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1237 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1238 } 1239 req->decomp_iovcnt += req->iovcnt; 1240 1241 remainder = vol->params.chunk_size - ttl_len; 1242 if (remainder) { 1243 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1244 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1245 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1246 req->decomp_iovcnt++; 1247 } 1248 assert(ttl_len == vol->params.chunk_size); 1249 1250 _reduce_vol_compress_chunk(req, _write_compress_done); 1251 } 1252 1253 static void 1254 _write_read_done(void *_req, int reduce_errno) 1255 { 1256 struct spdk_reduce_vol_request *req = _req; 1257 1258 if (reduce_errno != 0) { 1259 req->reduce_errno = reduce_errno; 1260 } 1261 1262 assert(req->num_backing_ops > 0); 1263 if (--req->num_backing_ops > 0) { 1264 return; 1265 } 1266 1267 if (req->reduce_errno != 0) { 1268 _reduce_vol_complete_req(req, req->reduce_errno); 1269 return; 1270 } 1271 1272 if (req->chunk_is_compressed) { 1273 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1274 } else { 1275 _write_decompress_done(req, req->chunk->compressed_size); 1276 } 1277 } 1278 1279 static void 1280 _read_decompress_done(void *_req, int reduce_errno) 1281 { 1282 struct spdk_reduce_vol_request *req = _req; 1283 struct spdk_reduce_vol *vol = req->vol; 1284 1285 /* Negative reduce_errno indicates failure for compression operations. */ 1286 if (reduce_errno < 0) { 1287 _reduce_vol_complete_req(req, reduce_errno); 1288 return; 1289 } 1290 1291 /* Positive reduce_errno indicates number of bytes in decompressed 1292 * buffer. This should equal the chunk size - otherwise that's another 1293 * type of failure. 1294 */ 1295 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1296 _reduce_vol_complete_req(req, -EIO); 1297 return; 1298 } 1299 1300 _reduce_vol_complete_req(req, 0); 1301 } 1302 1303 static void 1304 _read_read_done(void *_req, int reduce_errno) 1305 { 1306 struct spdk_reduce_vol_request *req = _req; 1307 uint64_t chunk_offset; 1308 uint8_t *buf; 1309 int i; 1310 1311 if (reduce_errno != 0) { 1312 req->reduce_errno = reduce_errno; 1313 } 1314 1315 assert(req->num_backing_ops > 0); 1316 if (--req->num_backing_ops > 0) { 1317 return; 1318 } 1319 1320 if (req->reduce_errno != 0) { 1321 _reduce_vol_complete_req(req, req->reduce_errno); 1322 return; 1323 } 1324 1325 if (req->chunk_is_compressed) { 1326 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1327 } else { 1328 1329 /* If the chunk was compressed, the data would have been sent to the 1330 * host buffers by the decompression operation, if not we need to memcpy here. 1331 */ 1332 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1333 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1334 for (i = 0; i < req->iovcnt; i++) { 1335 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1336 buf += req->iov[i].iov_len; 1337 } 1338 1339 _read_decompress_done(req, req->chunk->compressed_size); 1340 } 1341 } 1342 1343 static void 1344 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1345 { 1346 struct spdk_reduce_vol *vol = req->vol; 1347 1348 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1349 assert(req->chunk_map_index != UINT32_MAX); 1350 1351 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1352 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1353 vol->params.backing_io_unit_size); 1354 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1355 1356 _issue_backing_ops(req, vol, next_fn, false /* read */); 1357 } 1358 1359 static bool 1360 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1361 uint64_t length) 1362 { 1363 uint64_t size = 0; 1364 int i; 1365 1366 for (i = 0; i < iovcnt; i++) { 1367 size += iov[i].iov_len; 1368 } 1369 1370 return size == (length * vol->params.logical_block_size); 1371 } 1372 1373 static bool 1374 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1375 { 1376 struct spdk_reduce_vol_request *req; 1377 1378 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1379 if (logical_map_index == req->logical_map_index) { 1380 return true; 1381 } 1382 } 1383 1384 return false; 1385 } 1386 1387 static void 1388 _start_readv_request(struct spdk_reduce_vol_request *req) 1389 { 1390 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1391 _reduce_vol_read_chunk(req, _read_read_done); 1392 } 1393 1394 void 1395 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1396 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1397 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1398 { 1399 struct spdk_reduce_vol_request *req; 1400 uint64_t logical_map_index; 1401 bool overlapped; 1402 int i; 1403 1404 if (length == 0) { 1405 cb_fn(cb_arg, 0); 1406 return; 1407 } 1408 1409 if (_request_spans_chunk_boundary(vol, offset, length)) { 1410 cb_fn(cb_arg, -EINVAL); 1411 return; 1412 } 1413 1414 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1415 cb_fn(cb_arg, -EINVAL); 1416 return; 1417 } 1418 1419 logical_map_index = offset / vol->logical_blocks_per_chunk; 1420 overlapped = _check_overlap(vol, logical_map_index); 1421 1422 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1423 /* 1424 * This chunk hasn't been allocated. So treat the data as all 1425 * zeroes for this chunk - do the memset and immediately complete 1426 * the operation. 1427 */ 1428 for (i = 0; i < iovcnt; i++) { 1429 memset(iov[i].iov_base, 0, iov[i].iov_len); 1430 } 1431 cb_fn(cb_arg, 0); 1432 return; 1433 } 1434 1435 req = TAILQ_FIRST(&vol->free_requests); 1436 if (req == NULL) { 1437 cb_fn(cb_arg, -ENOMEM); 1438 return; 1439 } 1440 1441 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1442 req->type = REDUCE_IO_READV; 1443 req->vol = vol; 1444 req->iov = iov; 1445 req->iovcnt = iovcnt; 1446 req->offset = offset; 1447 req->logical_map_index = logical_map_index; 1448 req->length = length; 1449 req->cb_fn = cb_fn; 1450 req->cb_arg = cb_arg; 1451 1452 if (!overlapped) { 1453 _start_readv_request(req); 1454 } else { 1455 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1456 } 1457 } 1458 1459 static void 1460 _start_writev_request(struct spdk_reduce_vol_request *req) 1461 { 1462 struct spdk_reduce_vol *vol = req->vol; 1463 uint64_t chunk_offset, ttl_len = 0; 1464 uint64_t remainder = 0; 1465 uint32_t lbsize; 1466 int i; 1467 1468 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1469 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1470 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1471 /* Read old chunk, then overwrite with data from this write 1472 * operation. 1473 */ 1474 req->rmw = true; 1475 _reduce_vol_read_chunk(req, _write_read_done); 1476 return; 1477 } 1478 } 1479 1480 lbsize = vol->params.logical_block_size; 1481 req->decomp_iovcnt = 0; 1482 req->rmw = false; 1483 1484 /* Note: point to our zero buf for offset into the chunk. */ 1485 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1486 if (chunk_offset != 0) { 1487 ttl_len += chunk_offset * lbsize; 1488 req->decomp_iov[0].iov_base = g_zero_buf; 1489 req->decomp_iov[0].iov_len = ttl_len; 1490 req->decomp_iovcnt = 1; 1491 } 1492 1493 /* now the user data iov, direct from the user buffer */ 1494 for (i = 0; i < req->iovcnt; i++) { 1495 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1496 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1497 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1498 } 1499 req->decomp_iovcnt += req->iovcnt; 1500 1501 remainder = vol->params.chunk_size - ttl_len; 1502 if (remainder) { 1503 req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf; 1504 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1505 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1506 req->decomp_iovcnt++; 1507 } 1508 assert(ttl_len == req->vol->params.chunk_size); 1509 1510 _reduce_vol_compress_chunk(req, _write_compress_done); 1511 } 1512 1513 void 1514 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1515 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1516 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1517 { 1518 struct spdk_reduce_vol_request *req; 1519 uint64_t logical_map_index; 1520 bool overlapped; 1521 1522 if (length == 0) { 1523 cb_fn(cb_arg, 0); 1524 return; 1525 } 1526 1527 if (_request_spans_chunk_boundary(vol, offset, length)) { 1528 cb_fn(cb_arg, -EINVAL); 1529 return; 1530 } 1531 1532 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1533 cb_fn(cb_arg, -EINVAL); 1534 return; 1535 } 1536 1537 logical_map_index = offset / vol->logical_blocks_per_chunk; 1538 overlapped = _check_overlap(vol, logical_map_index); 1539 1540 req = TAILQ_FIRST(&vol->free_requests); 1541 if (req == NULL) { 1542 cb_fn(cb_arg, -ENOMEM); 1543 return; 1544 } 1545 1546 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1547 req->type = REDUCE_IO_WRITEV; 1548 req->vol = vol; 1549 req->iov = iov; 1550 req->iovcnt = iovcnt; 1551 req->offset = offset; 1552 req->logical_map_index = logical_map_index; 1553 req->length = length; 1554 req->cb_fn = cb_fn; 1555 req->cb_arg = cb_arg; 1556 1557 if (!overlapped) { 1558 _start_writev_request(req); 1559 } else { 1560 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1561 } 1562 } 1563 1564 const struct spdk_reduce_vol_params * 1565 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1566 { 1567 return &vol->params; 1568 } 1569 1570 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1571 { 1572 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1573 uint32_t struct_size; 1574 uint64_t chunk_map_size; 1575 1576 SPDK_NOTICELOG("vol info:\n"); 1577 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1578 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1579 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1580 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1581 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1582 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1583 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1584 vol->params.vol_size / vol->params.chunk_size); 1585 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1586 vol->params.backing_io_unit_size); 1587 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1588 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1589 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1590 1591 SPDK_NOTICELOG("pmem info:\n"); 1592 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1593 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1594 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1595 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1596 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1597 vol->params.chunk_size); 1598 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1599 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1600 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1601 vol->params.backing_io_unit_size); 1602 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1603 } 1604 1605 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1606