1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 /** 71 * Describes a persistent memory file used to hold metadata associated with a 72 * compressed volume. 73 */ 74 struct spdk_reduce_pm_file { 75 char path[REDUCE_PATH_MAX]; 76 void *pm_buf; 77 int pm_is_pmem; 78 uint64_t size; 79 }; 80 81 #define REDUCE_IO_READV 1 82 #define REDUCE_IO_WRITEV 2 83 84 struct spdk_reduce_chunk_map { 85 uint32_t compressed_size; 86 uint32_t reserved; 87 uint64_t io_unit_index[0]; 88 }; 89 90 struct spdk_reduce_vol_request { 91 /** 92 * Scratch buffer used for uncompressed chunk. This is used for: 93 * 1) source buffer for compression operations 94 * 2) destination buffer for decompression operations 95 * 3) data buffer when writing uncompressed chunk to disk 96 * 4) data buffer when reading uncompressed chunk from disk 97 */ 98 uint8_t *decomp_buf; 99 struct iovec *decomp_buf_iov; 100 101 /** 102 * These are used to construct the iovecs that are sent to 103 * the decomp engine, they point to a mix of the scratch buffer 104 * and user buffer 105 */ 106 struct iovec decomp_iov[REDUCE_MAX_IOVECS]; 107 int decomp_iovcnt; 108 109 /** 110 * Scratch buffer used for compressed chunk. This is used for: 111 * 1) destination buffer for compression operations 112 * 2) source buffer for decompression operations 113 * 3) data buffer when writing compressed chunk to disk 114 * 4) data buffer when reading compressed chunk from disk 115 */ 116 uint8_t *comp_buf; 117 struct iovec *comp_buf_iov; 118 struct iovec *iov; 119 bool rmw; 120 struct spdk_reduce_vol *vol; 121 int type; 122 int reduce_errno; 123 int iovcnt; 124 int num_backing_ops; 125 uint32_t num_io_units; 126 bool chunk_is_compressed; 127 uint64_t offset; 128 uint64_t logical_map_index; 129 uint64_t length; 130 uint64_t chunk_map_index; 131 struct spdk_reduce_chunk_map *chunk; 132 spdk_reduce_vol_op_complete cb_fn; 133 void *cb_arg; 134 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 135 struct spdk_reduce_vol_cb_args backing_cb_args; 136 }; 137 138 struct spdk_reduce_vol { 139 struct spdk_reduce_vol_params params; 140 uint32_t backing_io_units_per_chunk; 141 uint32_t backing_lba_per_io_unit; 142 uint32_t logical_blocks_per_chunk; 143 struct spdk_reduce_pm_file pm_file; 144 struct spdk_reduce_backing_dev *backing_dev; 145 struct spdk_reduce_vol_superblock *backing_super; 146 struct spdk_reduce_vol_superblock *pm_super; 147 uint64_t *pm_logical_map; 148 uint64_t *pm_chunk_maps; 149 150 struct spdk_bit_array *allocated_chunk_maps; 151 struct spdk_bit_array *allocated_backing_io_units; 152 153 struct spdk_reduce_vol_request *request_mem; 154 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 155 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 156 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 157 158 /* Single contiguous buffer used for all request buffers for this volume. */ 159 uint8_t *buf_mem; 160 struct iovec *buf_iov_mem; 161 }; 162 163 static void _start_readv_request(struct spdk_reduce_vol_request *req); 164 static void _start_writev_request(struct spdk_reduce_vol_request *req); 165 static uint8_t *g_zero_buf; 166 static int g_vol_count = 0; 167 168 /* 169 * Allocate extra metadata chunks and corresponding backing io units to account for 170 * outstanding IO in worst case scenario where logical map is completely allocated 171 * and no data can be compressed. We need extra chunks in this case to handle 172 * in-flight writes since reduce never writes data in place. 173 */ 174 #define REDUCE_NUM_EXTRA_CHUNKS 128 175 176 static void 177 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 178 { 179 if (vol->pm_file.pm_is_pmem) { 180 pmem_persist(addr, len); 181 } else { 182 pmem_msync(addr, len); 183 } 184 } 185 186 static uint64_t 187 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 188 { 189 uint64_t chunks_in_logical_map, logical_map_size; 190 191 chunks_in_logical_map = vol_size / chunk_size; 192 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 193 194 /* Round up to next cacheline. */ 195 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 196 REDUCE_PM_SIZE_ALIGNMENT; 197 } 198 199 static uint64_t 200 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 201 { 202 uint64_t num_chunks; 203 204 num_chunks = vol_size / chunk_size; 205 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 206 207 return num_chunks; 208 } 209 210 static inline uint32_t 211 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 212 { 213 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 214 } 215 216 static uint64_t 217 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 218 { 219 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 220 221 num_chunks = _get_total_chunks(vol_size, chunk_size); 222 io_units_per_chunk = chunk_size / backing_io_unit_size; 223 224 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 225 226 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 227 REDUCE_PM_SIZE_ALIGNMENT; 228 } 229 230 static struct spdk_reduce_chunk_map * 231 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 232 { 233 uintptr_t chunk_map_addr; 234 235 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 236 237 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 238 chunk_map_addr += chunk_map_index * 239 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 240 241 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 242 } 243 244 static int 245 _validate_vol_params(struct spdk_reduce_vol_params *params) 246 { 247 if (params->vol_size > 0) { 248 /** 249 * User does not pass in the vol size - it gets calculated by libreduce from 250 * values in this structure plus the size of the backing device. 251 */ 252 return -EINVAL; 253 } 254 255 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 256 params->logical_block_size == 0) { 257 return -EINVAL; 258 } 259 260 /* Chunk size must be an even multiple of the backing io unit size. */ 261 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 262 return -EINVAL; 263 } 264 265 /* Chunk size must be an even multiple of the logical block size. */ 266 if ((params->chunk_size % params->logical_block_size) != 0) { 267 return -1; 268 } 269 270 return 0; 271 } 272 273 static uint64_t 274 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 275 { 276 uint64_t num_chunks; 277 278 num_chunks = backing_dev_size / chunk_size; 279 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 280 return 0; 281 } 282 283 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 284 return num_chunks * chunk_size; 285 } 286 287 static uint64_t 288 _get_pm_file_size(struct spdk_reduce_vol_params *params) 289 { 290 uint64_t total_pm_size; 291 292 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 293 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 294 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 295 params->backing_io_unit_size); 296 return total_pm_size; 297 } 298 299 const struct spdk_uuid * 300 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 301 { 302 return &vol->params.uuid; 303 } 304 305 static void 306 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 307 { 308 uint64_t logical_map_size; 309 310 /* Superblock is at the beginning of the pm file. */ 311 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 312 313 /* Logical map immediately follows the super block. */ 314 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 315 316 /* Chunks maps follow the logical map. */ 317 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 318 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 319 } 320 321 /* We need 2 iovs during load - one for the superblock, another for the path */ 322 #define LOAD_IOV_COUNT 2 323 324 struct reduce_init_load_ctx { 325 struct spdk_reduce_vol *vol; 326 struct spdk_reduce_vol_cb_args backing_cb_args; 327 spdk_reduce_vol_op_with_handle_complete cb_fn; 328 void *cb_arg; 329 struct iovec iov[LOAD_IOV_COUNT]; 330 void *path; 331 }; 332 333 static int 334 _allocate_vol_requests(struct spdk_reduce_vol *vol) 335 { 336 struct spdk_reduce_vol_request *req; 337 int i; 338 339 /* Allocate 2x since we need buffers for both read/write and compress/decompress 340 * intermediate buffers. 341 */ 342 vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 343 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 344 if (vol->buf_mem == NULL) { 345 return -ENOMEM; 346 } 347 348 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 349 if (vol->request_mem == NULL) { 350 spdk_free(vol->buf_mem); 351 vol->buf_mem = NULL; 352 return -ENOMEM; 353 } 354 355 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 356 * buffers. 357 */ 358 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 359 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 360 if (vol->buf_iov_mem == NULL) { 361 free(vol->request_mem); 362 spdk_free(vol->buf_mem); 363 vol->request_mem = NULL; 364 vol->buf_mem = NULL; 365 return -ENOMEM; 366 } 367 368 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 369 req = &vol->request_mem[i]; 370 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 371 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 372 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 373 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 374 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 375 } 376 377 return 0; 378 } 379 380 static void 381 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 382 { 383 if (ctx != NULL) { 384 spdk_free(ctx->path); 385 free(ctx); 386 } 387 388 if (vol != NULL) { 389 if (vol->pm_file.pm_buf != NULL) { 390 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 391 } 392 393 spdk_free(vol->backing_super); 394 spdk_bit_array_free(&vol->allocated_chunk_maps); 395 spdk_bit_array_free(&vol->allocated_backing_io_units); 396 free(vol->request_mem); 397 free(vol->buf_iov_mem); 398 spdk_free(vol->buf_mem); 399 free(vol); 400 } 401 } 402 403 static int 404 _alloc_zero_buff(struct spdk_reduce_vol *vol) 405 { 406 int rc = 0; 407 408 /* The zero buffer is shared between all volumnes and just used 409 * for reads so allocate one global instance here if not already 410 * allocated when another vol init'd or loaded. 411 */ 412 if (g_vol_count++ == 0) { 413 g_zero_buf = spdk_zmalloc(vol->params.chunk_size, 414 64, NULL, SPDK_ENV_LCORE_ID_ANY, 415 SPDK_MALLOC_DMA); 416 if (g_zero_buf == NULL) { 417 rc = -ENOMEM; 418 } 419 } 420 return rc; 421 } 422 423 static void 424 _init_write_super_cpl(void *cb_arg, int reduce_errno) 425 { 426 struct reduce_init_load_ctx *init_ctx = cb_arg; 427 int rc; 428 429 rc = _allocate_vol_requests(init_ctx->vol); 430 if (rc != 0) { 431 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 432 _init_load_cleanup(init_ctx->vol, init_ctx); 433 return; 434 } 435 436 rc = _alloc_zero_buff(init_ctx->vol); 437 if (rc != 0) { 438 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 439 _init_load_cleanup(init_ctx->vol, init_ctx); 440 return; 441 } 442 443 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 444 /* Only clean up the ctx - the vol has been passed to the application 445 * for use now that initialization was successful. 446 */ 447 _init_load_cleanup(NULL, init_ctx); 448 } 449 450 static void 451 _init_write_path_cpl(void *cb_arg, int reduce_errno) 452 { 453 struct reduce_init_load_ctx *init_ctx = cb_arg; 454 struct spdk_reduce_vol *vol = init_ctx->vol; 455 456 init_ctx->iov[0].iov_base = vol->backing_super; 457 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 458 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 459 init_ctx->backing_cb_args.cb_arg = init_ctx; 460 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 461 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 462 &init_ctx->backing_cb_args); 463 } 464 465 static int 466 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 467 { 468 uint64_t total_chunks, total_backing_io_units; 469 uint32_t i, num_metadata_io_units; 470 471 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 472 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 473 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 474 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 475 476 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 477 return -ENOMEM; 478 } 479 480 /* Set backing io unit bits associated with metadata. */ 481 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 482 vol->backing_dev->blocklen; 483 for (i = 0; i < num_metadata_io_units; i++) { 484 spdk_bit_array_set(vol->allocated_backing_io_units, i); 485 } 486 487 return 0; 488 } 489 490 void 491 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 492 struct spdk_reduce_backing_dev *backing_dev, 493 const char *pm_file_dir, 494 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 495 { 496 struct spdk_reduce_vol *vol; 497 struct reduce_init_load_ctx *init_ctx; 498 uint64_t backing_dev_size; 499 size_t mapped_len; 500 int dir_len, max_dir_len, rc; 501 502 /* We need to append a path separator and the UUID to the supplied 503 * path. 504 */ 505 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 506 dir_len = strnlen(pm_file_dir, max_dir_len); 507 /* Strip trailing slash if the user provided one - we will add it back 508 * later when appending the filename. 509 */ 510 if (pm_file_dir[dir_len - 1] == '/') { 511 dir_len--; 512 } 513 if (dir_len == max_dir_len) { 514 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 515 cb_fn(cb_arg, NULL, -EINVAL); 516 return; 517 } 518 519 rc = _validate_vol_params(params); 520 if (rc != 0) { 521 SPDK_ERRLOG("invalid vol params\n"); 522 cb_fn(cb_arg, NULL, rc); 523 return; 524 } 525 526 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 527 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 528 if (params->vol_size == 0) { 529 SPDK_ERRLOG("backing device is too small\n"); 530 cb_fn(cb_arg, NULL, -EINVAL); 531 return; 532 } 533 534 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 535 backing_dev->unmap == NULL) { 536 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 537 cb_fn(cb_arg, NULL, -EINVAL); 538 return; 539 } 540 541 vol = calloc(1, sizeof(*vol)); 542 if (vol == NULL) { 543 cb_fn(cb_arg, NULL, -ENOMEM); 544 return; 545 } 546 547 TAILQ_INIT(&vol->free_requests); 548 TAILQ_INIT(&vol->executing_requests); 549 TAILQ_INIT(&vol->queued_requests); 550 551 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 552 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 553 if (vol->backing_super == NULL) { 554 cb_fn(cb_arg, NULL, -ENOMEM); 555 _init_load_cleanup(vol, NULL); 556 return; 557 } 558 559 init_ctx = calloc(1, sizeof(*init_ctx)); 560 if (init_ctx == NULL) { 561 cb_fn(cb_arg, NULL, -ENOMEM); 562 _init_load_cleanup(vol, NULL); 563 return; 564 } 565 566 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 567 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 568 if (init_ctx->path == NULL) { 569 cb_fn(cb_arg, NULL, -ENOMEM); 570 _init_load_cleanup(vol, init_ctx); 571 return; 572 } 573 574 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 575 spdk_uuid_generate(¶ms->uuid); 576 } 577 578 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 579 vol->pm_file.path[dir_len] = '/'; 580 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 581 ¶ms->uuid); 582 vol->pm_file.size = _get_pm_file_size(params); 583 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 584 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 585 &mapped_len, &vol->pm_file.pm_is_pmem); 586 if (vol->pm_file.pm_buf == NULL) { 587 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 588 vol->pm_file.path, strerror(errno)); 589 cb_fn(cb_arg, NULL, -errno); 590 _init_load_cleanup(vol, init_ctx); 591 return; 592 } 593 594 if (vol->pm_file.size != mapped_len) { 595 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 596 vol->pm_file.size, mapped_len); 597 cb_fn(cb_arg, NULL, -ENOMEM); 598 _init_load_cleanup(vol, init_ctx); 599 return; 600 } 601 602 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 603 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 604 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 605 memcpy(&vol->params, params, sizeof(*params)); 606 607 vol->backing_dev = backing_dev; 608 609 rc = _allocate_bit_arrays(vol); 610 if (rc != 0) { 611 cb_fn(cb_arg, NULL, rc); 612 _init_load_cleanup(vol, init_ctx); 613 return; 614 } 615 616 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 617 sizeof(vol->backing_super->signature)); 618 memcpy(&vol->backing_super->params, params, sizeof(*params)); 619 620 _initialize_vol_pm_pointers(vol); 621 622 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 623 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 624 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 625 */ 626 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 627 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 628 629 init_ctx->vol = vol; 630 init_ctx->cb_fn = cb_fn; 631 init_ctx->cb_arg = cb_arg; 632 633 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 634 init_ctx->iov[0].iov_base = init_ctx->path; 635 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 636 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 637 init_ctx->backing_cb_args.cb_arg = init_ctx; 638 /* Write path to offset 4K on backing device - just after where the super 639 * block will be written. We wait until this is committed before writing the 640 * super block to guarantee we don't get the super block written without the 641 * the path if the system crashed in the middle of a write operation. 642 */ 643 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 644 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 645 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 646 &init_ctx->backing_cb_args); 647 } 648 649 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 650 651 static void 652 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 653 { 654 struct reduce_init_load_ctx *load_ctx = cb_arg; 655 struct spdk_reduce_vol *vol = load_ctx->vol; 656 uint64_t backing_dev_size; 657 uint64_t i, num_chunks, logical_map_index; 658 struct spdk_reduce_chunk_map *chunk; 659 size_t mapped_len; 660 uint32_t j; 661 int rc; 662 663 if (memcmp(vol->backing_super->signature, 664 SPDK_REDUCE_SIGNATURE, 665 sizeof(vol->backing_super->signature)) != 0) { 666 /* This backing device isn't a libreduce backing device. */ 667 rc = -EILSEQ; 668 goto error; 669 } 670 671 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 672 * So don't bother getting the volume ready to use - invoke the callback immediately 673 * so destroy_load_cb can delete the metadata off of the block device and delete the 674 * persistent memory file if it exists. 675 */ 676 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 677 if (load_ctx->cb_fn == (*destroy_load_cb)) { 678 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 679 _init_load_cleanup(NULL, load_ctx); 680 return; 681 } 682 683 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 684 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 685 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 686 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 687 688 rc = _allocate_bit_arrays(vol); 689 if (rc != 0) { 690 goto error; 691 } 692 693 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 694 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 695 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 696 backing_dev_size); 697 rc = -EILSEQ; 698 goto error; 699 } 700 701 vol->pm_file.size = _get_pm_file_size(&vol->params); 702 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 703 &vol->pm_file.pm_is_pmem); 704 if (vol->pm_file.pm_buf == NULL) { 705 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 706 rc = -errno; 707 goto error; 708 } 709 710 if (vol->pm_file.size != mapped_len) { 711 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 712 vol->pm_file.size, mapped_len); 713 rc = -ENOMEM; 714 goto error; 715 } 716 717 rc = _allocate_vol_requests(vol); 718 if (rc != 0) { 719 goto error; 720 } 721 722 _initialize_vol_pm_pointers(vol); 723 724 num_chunks = vol->params.vol_size / vol->params.chunk_size; 725 for (i = 0; i < num_chunks; i++) { 726 logical_map_index = vol->pm_logical_map[i]; 727 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 728 continue; 729 } 730 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 731 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 732 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 733 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 734 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 735 } 736 } 737 } 738 739 rc = _alloc_zero_buff(vol); 740 if (rc) { 741 goto error; 742 } 743 744 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 745 /* Only clean up the ctx - the vol has been passed to the application 746 * for use now that volume load was successful. 747 */ 748 _init_load_cleanup(NULL, load_ctx); 749 return; 750 751 error: 752 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 753 _init_load_cleanup(vol, load_ctx); 754 } 755 756 void 757 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 758 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 759 { 760 struct spdk_reduce_vol *vol; 761 struct reduce_init_load_ctx *load_ctx; 762 763 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 764 backing_dev->unmap == NULL) { 765 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 766 cb_fn(cb_arg, NULL, -EINVAL); 767 return; 768 } 769 770 vol = calloc(1, sizeof(*vol)); 771 if (vol == NULL) { 772 cb_fn(cb_arg, NULL, -ENOMEM); 773 return; 774 } 775 776 TAILQ_INIT(&vol->free_requests); 777 TAILQ_INIT(&vol->executing_requests); 778 TAILQ_INIT(&vol->queued_requests); 779 780 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 781 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 782 if (vol->backing_super == NULL) { 783 _init_load_cleanup(vol, NULL); 784 cb_fn(cb_arg, NULL, -ENOMEM); 785 return; 786 } 787 788 vol->backing_dev = backing_dev; 789 790 load_ctx = calloc(1, sizeof(*load_ctx)); 791 if (load_ctx == NULL) { 792 _init_load_cleanup(vol, NULL); 793 cb_fn(cb_arg, NULL, -ENOMEM); 794 return; 795 } 796 797 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 798 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 799 if (load_ctx->path == NULL) { 800 _init_load_cleanup(vol, load_ctx); 801 cb_fn(cb_arg, NULL, -ENOMEM); 802 return; 803 } 804 805 load_ctx->vol = vol; 806 load_ctx->cb_fn = cb_fn; 807 load_ctx->cb_arg = cb_arg; 808 809 load_ctx->iov[0].iov_base = vol->backing_super; 810 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 811 load_ctx->iov[1].iov_base = load_ctx->path; 812 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 813 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 814 load_ctx->backing_cb_args.cb_arg = load_ctx; 815 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 816 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 817 vol->backing_dev->blocklen, 818 &load_ctx->backing_cb_args); 819 } 820 821 void 822 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 823 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 824 { 825 if (vol == NULL) { 826 /* This indicates a programming error. */ 827 assert(false); 828 cb_fn(cb_arg, -EINVAL); 829 return; 830 } 831 832 if (--g_vol_count == 0) { 833 spdk_free(g_zero_buf); 834 } 835 _init_load_cleanup(vol, NULL); 836 cb_fn(cb_arg, 0); 837 } 838 839 struct reduce_destroy_ctx { 840 spdk_reduce_vol_op_complete cb_fn; 841 void *cb_arg; 842 struct spdk_reduce_vol *vol; 843 struct spdk_reduce_vol_superblock *super; 844 struct iovec iov; 845 struct spdk_reduce_vol_cb_args backing_cb_args; 846 int reduce_errno; 847 char pm_path[REDUCE_PATH_MAX]; 848 }; 849 850 static void 851 destroy_unload_cpl(void *cb_arg, int reduce_errno) 852 { 853 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 854 855 if (destroy_ctx->reduce_errno == 0) { 856 if (unlink(destroy_ctx->pm_path)) { 857 SPDK_ERRLOG("%s could not be unlinked: %s\n", 858 destroy_ctx->pm_path, strerror(errno)); 859 } 860 } 861 862 /* Even if the unload somehow failed, we still pass the destroy_ctx 863 * reduce_errno since that indicates whether or not the volume was 864 * actually destroyed. 865 */ 866 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 867 spdk_free(destroy_ctx->super); 868 free(destroy_ctx); 869 } 870 871 static void 872 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 873 { 874 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 875 struct spdk_reduce_vol *vol = destroy_ctx->vol; 876 877 destroy_ctx->reduce_errno = reduce_errno; 878 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 879 } 880 881 static void 882 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 883 { 884 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 885 886 if (reduce_errno != 0) { 887 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 888 spdk_free(destroy_ctx->super); 889 free(destroy_ctx); 890 return; 891 } 892 893 destroy_ctx->vol = vol; 894 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 895 destroy_ctx->iov.iov_base = destroy_ctx->super; 896 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 897 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 898 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 899 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 900 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 901 &destroy_ctx->backing_cb_args); 902 } 903 904 void 905 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 906 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 907 { 908 struct reduce_destroy_ctx *destroy_ctx; 909 910 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 911 if (destroy_ctx == NULL) { 912 cb_fn(cb_arg, -ENOMEM); 913 return; 914 } 915 916 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 917 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 918 if (destroy_ctx->super == NULL) { 919 free(destroy_ctx); 920 cb_fn(cb_arg, -ENOMEM); 921 return; 922 } 923 destroy_ctx->cb_fn = cb_fn; 924 destroy_ctx->cb_arg = cb_arg; 925 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 926 } 927 928 static bool 929 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 930 { 931 uint64_t start_chunk, end_chunk; 932 933 start_chunk = offset / vol->logical_blocks_per_chunk; 934 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 935 936 return (start_chunk != end_chunk); 937 } 938 939 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 940 941 static void 942 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 943 { 944 struct spdk_reduce_vol_request *next_req; 945 struct spdk_reduce_vol *vol = req->vol; 946 947 req->cb_fn(req->cb_arg, reduce_errno); 948 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 949 950 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 951 if (next_req->logical_map_index == req->logical_map_index) { 952 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 953 if (next_req->type == REDUCE_IO_READV) { 954 _start_readv_request(next_req); 955 } else { 956 assert(next_req->type == REDUCE_IO_WRITEV); 957 _start_writev_request(next_req); 958 } 959 break; 960 } 961 } 962 963 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 964 } 965 966 static void 967 _write_write_done(void *_req, int reduce_errno) 968 { 969 struct spdk_reduce_vol_request *req = _req; 970 struct spdk_reduce_vol *vol = req->vol; 971 uint64_t old_chunk_map_index; 972 struct spdk_reduce_chunk_map *old_chunk; 973 uint32_t i; 974 975 if (reduce_errno != 0) { 976 req->reduce_errno = reduce_errno; 977 } 978 979 assert(req->num_backing_ops > 0); 980 if (--req->num_backing_ops > 0) { 981 return; 982 } 983 984 if (req->reduce_errno != 0) { 985 _reduce_vol_complete_req(req, req->reduce_errno); 986 return; 987 } 988 989 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 990 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 991 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 992 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 993 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 994 break; 995 } 996 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 997 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 998 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 999 } 1000 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 1001 } 1002 1003 /* 1004 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1005 * becomes invalid after we update the logical map, since the old chunk map will no 1006 * longer have a reference to it in the logical map. 1007 */ 1008 1009 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1010 _reduce_persist(vol, req->chunk, 1011 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1012 1013 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1014 1015 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1016 1017 _reduce_vol_complete_req(req, 0); 1018 } 1019 1020 static void 1021 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1022 reduce_request_fn next_fn, bool is_write) 1023 { 1024 struct iovec *iov; 1025 uint8_t *buf; 1026 uint32_t i; 1027 1028 if (req->chunk_is_compressed) { 1029 iov = req->comp_buf_iov; 1030 buf = req->comp_buf; 1031 } else { 1032 iov = req->decomp_buf_iov; 1033 buf = req->decomp_buf; 1034 } 1035 1036 req->num_backing_ops = req->num_io_units; 1037 req->backing_cb_args.cb_fn = next_fn; 1038 req->backing_cb_args.cb_arg = req; 1039 for (i = 0; i < req->num_io_units; i++) { 1040 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1041 iov[i].iov_len = vol->params.backing_io_unit_size; 1042 if (is_write) { 1043 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1044 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1045 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1046 } else { 1047 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1048 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1049 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1050 } 1051 } 1052 } 1053 1054 static void 1055 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1056 uint32_t compressed_size) 1057 { 1058 struct spdk_reduce_vol *vol = req->vol; 1059 uint32_t i; 1060 uint64_t chunk_offset, remainder, total_len = 0; 1061 uint8_t *buf; 1062 int j; 1063 1064 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1065 1066 /* TODO: fail if no chunk map found - but really this should not happen if we 1067 * size the number of requests similarly to number of extra chunk maps 1068 */ 1069 assert(req->chunk_map_index != UINT32_MAX); 1070 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1071 1072 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1073 req->num_io_units = spdk_divide_round_up(compressed_size, 1074 vol->params.backing_io_unit_size); 1075 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1076 req->chunk->compressed_size = 1077 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1078 1079 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1080 if (req->chunk_is_compressed == false) { 1081 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1082 buf = req->decomp_buf; 1083 total_len = chunk_offset * vol->params.logical_block_size; 1084 1085 /* zero any offset into chunk */ 1086 if (req->rmw == false && chunk_offset) { 1087 memset(buf, 0, total_len); 1088 } 1089 buf += total_len; 1090 1091 /* copy the data */ 1092 for (j = 0; j < req->iovcnt; j++) { 1093 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1094 buf += req->iov[j].iov_len; 1095 total_len += req->iov[j].iov_len; 1096 } 1097 1098 /* zero any remainder */ 1099 remainder = vol->params.chunk_size - total_len; 1100 total_len += remainder; 1101 if (req->rmw == false && remainder) { 1102 memset(buf, 0, remainder); 1103 } 1104 assert(total_len == vol->params.chunk_size); 1105 } 1106 1107 for (i = 0; i < req->num_io_units; i++) { 1108 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1109 /* TODO: fail if no backing block found - but really this should also not 1110 * happen (see comment above). 1111 */ 1112 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1113 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1114 } 1115 1116 _issue_backing_ops(req, vol, next_fn, true /* write */); 1117 } 1118 1119 static void 1120 _write_compress_done(void *_req, int reduce_errno) 1121 { 1122 struct spdk_reduce_vol_request *req = _req; 1123 1124 /* Negative reduce_errno indicates failure for compression operations. 1125 * Just write the uncompressed data instead. Force this to happen 1126 * by just passing the full chunk size to _reduce_vol_write_chunk. 1127 * When it sees the data couldn't be compressed, it will just write 1128 * the uncompressed buffer to disk. 1129 */ 1130 if (reduce_errno < 0) { 1131 reduce_errno = req->vol->params.chunk_size; 1132 } 1133 1134 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1135 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1136 } 1137 1138 static void 1139 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1140 { 1141 struct spdk_reduce_vol *vol = req->vol; 1142 1143 req->backing_cb_args.cb_fn = next_fn; 1144 req->backing_cb_args.cb_arg = req; 1145 req->comp_buf_iov[0].iov_base = req->comp_buf; 1146 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1147 vol->backing_dev->compress(vol->backing_dev, 1148 &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1, 1149 &req->backing_cb_args); 1150 } 1151 1152 static void 1153 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1154 { 1155 struct spdk_reduce_vol *vol = req->vol; 1156 1157 req->backing_cb_args.cb_fn = next_fn; 1158 req->backing_cb_args.cb_arg = req; 1159 req->comp_buf_iov[0].iov_base = req->comp_buf; 1160 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1161 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1162 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1163 vol->backing_dev->decompress(vol->backing_dev, 1164 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1165 &req->backing_cb_args); 1166 } 1167 1168 static void 1169 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1170 { 1171 struct spdk_reduce_vol *vol = req->vol; 1172 uint64_t chunk_offset, remainder = 0; 1173 uint64_t ttl_len = 0; 1174 int i; 1175 1176 req->decomp_iovcnt = 0; 1177 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1178 1179 if (chunk_offset) { 1180 /* first iov point to our scratch buffer for any offset into the chunk */ 1181 req->decomp_iov[0].iov_base = req->decomp_buf; 1182 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1183 ttl_len += req->decomp_iov[0].iov_len; 1184 req->decomp_iovcnt = 1; 1185 } 1186 1187 /* now the user data iov, direct to the user buffer */ 1188 for (i = 0; i < req->iovcnt; i++) { 1189 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1190 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1191 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1192 } 1193 req->decomp_iovcnt += req->iovcnt; 1194 1195 /* send the rest of the chunk to our scratch buffer */ 1196 remainder = vol->params.chunk_size - ttl_len; 1197 if (remainder) { 1198 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1199 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1200 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1201 req->decomp_iovcnt++; 1202 } 1203 assert(ttl_len == vol->params.chunk_size); 1204 1205 req->backing_cb_args.cb_fn = next_fn; 1206 req->backing_cb_args.cb_arg = req; 1207 req->comp_buf_iov[0].iov_base = req->comp_buf; 1208 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1209 vol->backing_dev->decompress(vol->backing_dev, 1210 req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt, 1211 &req->backing_cb_args); 1212 } 1213 1214 static void 1215 _write_decompress_done(void *_req, int reduce_errno) 1216 { 1217 struct spdk_reduce_vol_request *req = _req; 1218 struct spdk_reduce_vol *vol = req->vol; 1219 uint64_t chunk_offset, remainder, ttl_len = 0; 1220 int i; 1221 1222 /* Negative reduce_errno indicates failure for compression operations. */ 1223 if (reduce_errno < 0) { 1224 _reduce_vol_complete_req(req, reduce_errno); 1225 return; 1226 } 1227 1228 /* Positive reduce_errno indicates number of bytes in decompressed 1229 * buffer. This should equal the chunk size - otherwise that's another 1230 * type of failure. 1231 */ 1232 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1233 _reduce_vol_complete_req(req, -EIO); 1234 return; 1235 } 1236 1237 req->decomp_iovcnt = 0; 1238 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1239 1240 if (chunk_offset) { 1241 req->decomp_iov[0].iov_base = req->decomp_buf; 1242 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1243 ttl_len += req->decomp_iov[0].iov_len; 1244 req->decomp_iovcnt = 1; 1245 } 1246 1247 for (i = 0; i < req->iovcnt; i++) { 1248 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1249 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1250 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1251 } 1252 req->decomp_iovcnt += req->iovcnt; 1253 1254 remainder = vol->params.chunk_size - ttl_len; 1255 if (remainder) { 1256 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1257 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1258 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1259 req->decomp_iovcnt++; 1260 } 1261 assert(ttl_len == vol->params.chunk_size); 1262 1263 _reduce_vol_compress_chunk(req, _write_compress_done); 1264 } 1265 1266 static void 1267 _write_read_done(void *_req, int reduce_errno) 1268 { 1269 struct spdk_reduce_vol_request *req = _req; 1270 1271 if (reduce_errno != 0) { 1272 req->reduce_errno = reduce_errno; 1273 } 1274 1275 assert(req->num_backing_ops > 0); 1276 if (--req->num_backing_ops > 0) { 1277 return; 1278 } 1279 1280 if (req->reduce_errno != 0) { 1281 _reduce_vol_complete_req(req, req->reduce_errno); 1282 return; 1283 } 1284 1285 if (req->chunk_is_compressed) { 1286 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1287 } else { 1288 _write_decompress_done(req, req->chunk->compressed_size); 1289 } 1290 } 1291 1292 static void 1293 _read_decompress_done(void *_req, int reduce_errno) 1294 { 1295 struct spdk_reduce_vol_request *req = _req; 1296 struct spdk_reduce_vol *vol = req->vol; 1297 1298 /* Negative reduce_errno indicates failure for compression operations. */ 1299 if (reduce_errno < 0) { 1300 _reduce_vol_complete_req(req, reduce_errno); 1301 return; 1302 } 1303 1304 /* Positive reduce_errno indicates number of bytes in decompressed 1305 * buffer. This should equal the chunk size - otherwise that's another 1306 * type of failure. 1307 */ 1308 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1309 _reduce_vol_complete_req(req, -EIO); 1310 return; 1311 } 1312 1313 _reduce_vol_complete_req(req, 0); 1314 } 1315 1316 static void 1317 _read_read_done(void *_req, int reduce_errno) 1318 { 1319 struct spdk_reduce_vol_request *req = _req; 1320 uint64_t chunk_offset; 1321 uint8_t *buf; 1322 int i; 1323 1324 if (reduce_errno != 0) { 1325 req->reduce_errno = reduce_errno; 1326 } 1327 1328 assert(req->num_backing_ops > 0); 1329 if (--req->num_backing_ops > 0) { 1330 return; 1331 } 1332 1333 if (req->reduce_errno != 0) { 1334 _reduce_vol_complete_req(req, req->reduce_errno); 1335 return; 1336 } 1337 1338 if (req->chunk_is_compressed) { 1339 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1340 } else { 1341 1342 /* If the chunk was compressed, the data would have been sent to the 1343 * host buffers by the decompression operation, if not we need to memcpy here. 1344 */ 1345 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1346 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1347 for (i = 0; i < req->iovcnt; i++) { 1348 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1349 buf += req->iov[i].iov_len; 1350 } 1351 1352 _read_decompress_done(req, req->chunk->compressed_size); 1353 } 1354 } 1355 1356 static void 1357 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1358 { 1359 struct spdk_reduce_vol *vol = req->vol; 1360 1361 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1362 assert(req->chunk_map_index != UINT32_MAX); 1363 1364 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1365 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1366 vol->params.backing_io_unit_size); 1367 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1368 1369 _issue_backing_ops(req, vol, next_fn, false /* read */); 1370 } 1371 1372 static bool 1373 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1374 uint64_t length) 1375 { 1376 uint64_t size = 0; 1377 int i; 1378 1379 for (i = 0; i < iovcnt; i++) { 1380 size += iov[i].iov_len; 1381 } 1382 1383 return size == (length * vol->params.logical_block_size); 1384 } 1385 1386 static bool 1387 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1388 { 1389 struct spdk_reduce_vol_request *req; 1390 1391 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1392 if (logical_map_index == req->logical_map_index) { 1393 return true; 1394 } 1395 } 1396 1397 return false; 1398 } 1399 1400 static void 1401 _start_readv_request(struct spdk_reduce_vol_request *req) 1402 { 1403 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1404 _reduce_vol_read_chunk(req, _read_read_done); 1405 } 1406 1407 void 1408 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1409 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1410 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1411 { 1412 struct spdk_reduce_vol_request *req; 1413 uint64_t logical_map_index; 1414 bool overlapped; 1415 int i; 1416 1417 if (length == 0) { 1418 cb_fn(cb_arg, 0); 1419 return; 1420 } 1421 1422 if (_request_spans_chunk_boundary(vol, offset, length)) { 1423 cb_fn(cb_arg, -EINVAL); 1424 return; 1425 } 1426 1427 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1428 cb_fn(cb_arg, -EINVAL); 1429 return; 1430 } 1431 1432 logical_map_index = offset / vol->logical_blocks_per_chunk; 1433 overlapped = _check_overlap(vol, logical_map_index); 1434 1435 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1436 /* 1437 * This chunk hasn't been allocated. So treat the data as all 1438 * zeroes for this chunk - do the memset and immediately complete 1439 * the operation. 1440 */ 1441 for (i = 0; i < iovcnt; i++) { 1442 memset(iov[i].iov_base, 0, iov[i].iov_len); 1443 } 1444 cb_fn(cb_arg, 0); 1445 return; 1446 } 1447 1448 req = TAILQ_FIRST(&vol->free_requests); 1449 if (req == NULL) { 1450 cb_fn(cb_arg, -ENOMEM); 1451 return; 1452 } 1453 1454 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1455 req->type = REDUCE_IO_READV; 1456 req->vol = vol; 1457 req->iov = iov; 1458 req->iovcnt = iovcnt; 1459 req->offset = offset; 1460 req->logical_map_index = logical_map_index; 1461 req->length = length; 1462 req->cb_fn = cb_fn; 1463 req->cb_arg = cb_arg; 1464 1465 if (!overlapped) { 1466 _start_readv_request(req); 1467 } else { 1468 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1469 } 1470 } 1471 1472 static void 1473 _start_writev_request(struct spdk_reduce_vol_request *req) 1474 { 1475 struct spdk_reduce_vol *vol = req->vol; 1476 uint64_t chunk_offset, ttl_len = 0; 1477 uint64_t remainder = 0; 1478 uint32_t lbsize; 1479 int i; 1480 1481 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1482 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1483 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1484 /* Read old chunk, then overwrite with data from this write 1485 * operation. 1486 */ 1487 req->rmw = true; 1488 _reduce_vol_read_chunk(req, _write_read_done); 1489 return; 1490 } 1491 } 1492 1493 lbsize = vol->params.logical_block_size; 1494 req->decomp_iovcnt = 0; 1495 req->rmw = false; 1496 1497 /* Note: point to our zero buf for offset into the chunk. */ 1498 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1499 if (chunk_offset != 0) { 1500 ttl_len += chunk_offset * lbsize; 1501 req->decomp_iov[0].iov_base = g_zero_buf; 1502 req->decomp_iov[0].iov_len = ttl_len; 1503 req->decomp_iovcnt = 1; 1504 } 1505 1506 /* now the user data iov, direct from the user buffer */ 1507 for (i = 0; i < req->iovcnt; i++) { 1508 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1509 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1510 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1511 } 1512 req->decomp_iovcnt += req->iovcnt; 1513 1514 remainder = vol->params.chunk_size - ttl_len; 1515 if (remainder) { 1516 req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf; 1517 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1518 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1519 req->decomp_iovcnt++; 1520 } 1521 assert(ttl_len == req->vol->params.chunk_size); 1522 1523 _reduce_vol_compress_chunk(req, _write_compress_done); 1524 } 1525 1526 void 1527 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1528 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1529 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1530 { 1531 struct spdk_reduce_vol_request *req; 1532 uint64_t logical_map_index; 1533 bool overlapped; 1534 1535 if (length == 0) { 1536 cb_fn(cb_arg, 0); 1537 return; 1538 } 1539 1540 if (_request_spans_chunk_boundary(vol, offset, length)) { 1541 cb_fn(cb_arg, -EINVAL); 1542 return; 1543 } 1544 1545 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1546 cb_fn(cb_arg, -EINVAL); 1547 return; 1548 } 1549 1550 logical_map_index = offset / vol->logical_blocks_per_chunk; 1551 overlapped = _check_overlap(vol, logical_map_index); 1552 1553 req = TAILQ_FIRST(&vol->free_requests); 1554 if (req == NULL) { 1555 cb_fn(cb_arg, -ENOMEM); 1556 return; 1557 } 1558 1559 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1560 req->type = REDUCE_IO_WRITEV; 1561 req->vol = vol; 1562 req->iov = iov; 1563 req->iovcnt = iovcnt; 1564 req->offset = offset; 1565 req->logical_map_index = logical_map_index; 1566 req->length = length; 1567 req->cb_fn = cb_fn; 1568 req->cb_arg = cb_arg; 1569 1570 if (!overlapped) { 1571 _start_writev_request(req); 1572 } else { 1573 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1574 } 1575 } 1576 1577 const struct spdk_reduce_vol_params * 1578 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1579 { 1580 return &vol->params; 1581 } 1582 1583 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1584 { 1585 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1586 uint32_t struct_size; 1587 uint64_t chunk_map_size; 1588 1589 SPDK_NOTICELOG("vol info:\n"); 1590 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1591 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1592 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1593 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1594 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1595 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1596 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1597 vol->params.vol_size / vol->params.chunk_size); 1598 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1599 vol->params.backing_io_unit_size); 1600 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1601 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1602 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1603 1604 SPDK_NOTICELOG("pmem info:\n"); 1605 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1606 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1607 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1608 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1609 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1610 vol->params.chunk_size); 1611 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1612 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1613 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1614 vol->params.backing_io_unit_size); 1615 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1616 } 1617 1618 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1619