1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/reduce.h" 37 #include "spdk/env.h" 38 #include "spdk/string.h" 39 #include "spdk/bit_array.h" 40 #include "spdk/util.h" 41 #include "spdk_internal/log.h" 42 43 #include "libpmem.h" 44 45 /* Always round up the size of the PM region to the nearest cacheline. */ 46 #define REDUCE_PM_SIZE_ALIGNMENT 64 47 48 /* Offset into the backing device where the persistent memory file's path is stored. */ 49 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 50 51 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 52 53 #define REDUCE_NUM_VOL_REQUESTS 256 54 55 /* Structure written to offset 0 of both the pm file and the backing device. */ 56 struct spdk_reduce_vol_superblock { 57 uint8_t signature[8]; 58 struct spdk_reduce_vol_params params; 59 uint8_t reserved[4048]; 60 }; 61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 62 63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 64 /* null terminator counts one */ 65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 66 sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); 67 68 #define REDUCE_PATH_MAX 4096 69 70 #define REDUCE_ZERO_BUF_SIZE 0x100000 71 72 /** 73 * Describes a persistent memory file used to hold metadata associated with a 74 * compressed volume. 75 */ 76 struct spdk_reduce_pm_file { 77 char path[REDUCE_PATH_MAX]; 78 void *pm_buf; 79 int pm_is_pmem; 80 uint64_t size; 81 }; 82 83 #define REDUCE_IO_READV 1 84 #define REDUCE_IO_WRITEV 2 85 86 struct spdk_reduce_chunk_map { 87 uint32_t compressed_size; 88 uint32_t reserved; 89 uint64_t io_unit_index[0]; 90 }; 91 92 struct spdk_reduce_vol_request { 93 /** 94 * Scratch buffer used for uncompressed chunk. This is used for: 95 * 1) source buffer for compression operations 96 * 2) destination buffer for decompression operations 97 * 3) data buffer when writing uncompressed chunk to disk 98 * 4) data buffer when reading uncompressed chunk from disk 99 */ 100 uint8_t *decomp_buf; 101 struct iovec *decomp_buf_iov; 102 103 /** 104 * These are used to construct the iovecs that are sent to 105 * the decomp engine, they point to a mix of the scratch buffer 106 * and user buffer 107 */ 108 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 109 int decomp_iovcnt; 110 111 /** 112 * Scratch buffer used for compressed chunk. This is used for: 113 * 1) destination buffer for compression operations 114 * 2) source buffer for decompression operations 115 * 3) data buffer when writing compressed chunk to disk 116 * 4) data buffer when reading compressed chunk from disk 117 */ 118 uint8_t *comp_buf; 119 struct iovec *comp_buf_iov; 120 struct iovec *iov; 121 bool rmw; 122 struct spdk_reduce_vol *vol; 123 int type; 124 int reduce_errno; 125 int iovcnt; 126 int num_backing_ops; 127 uint32_t num_io_units; 128 bool chunk_is_compressed; 129 uint64_t offset; 130 uint64_t logical_map_index; 131 uint64_t length; 132 uint64_t chunk_map_index; 133 struct spdk_reduce_chunk_map *chunk; 134 spdk_reduce_vol_op_complete cb_fn; 135 void *cb_arg; 136 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 137 struct spdk_reduce_vol_cb_args backing_cb_args; 138 }; 139 140 struct spdk_reduce_vol { 141 struct spdk_reduce_vol_params params; 142 uint32_t backing_io_units_per_chunk; 143 uint32_t backing_lba_per_io_unit; 144 uint32_t logical_blocks_per_chunk; 145 struct spdk_reduce_pm_file pm_file; 146 struct spdk_reduce_backing_dev *backing_dev; 147 struct spdk_reduce_vol_superblock *backing_super; 148 struct spdk_reduce_vol_superblock *pm_super; 149 uint64_t *pm_logical_map; 150 uint64_t *pm_chunk_maps; 151 152 struct spdk_bit_array *allocated_chunk_maps; 153 struct spdk_bit_array *allocated_backing_io_units; 154 155 struct spdk_reduce_vol_request *request_mem; 156 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 157 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 158 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 159 160 /* Single contiguous buffer used for all request buffers for this volume. */ 161 uint8_t *buf_mem; 162 struct iovec *buf_iov_mem; 163 }; 164 165 static void _start_readv_request(struct spdk_reduce_vol_request *req); 166 static void _start_writev_request(struct spdk_reduce_vol_request *req); 167 static uint8_t *g_zero_buf; 168 static int g_vol_count = 0; 169 170 /* 171 * Allocate extra metadata chunks and corresponding backing io units to account for 172 * outstanding IO in worst case scenario where logical map is completely allocated 173 * and no data can be compressed. We need extra chunks in this case to handle 174 * in-flight writes since reduce never writes data in place. 175 */ 176 #define REDUCE_NUM_EXTRA_CHUNKS 128 177 178 static void 179 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 180 { 181 if (vol->pm_file.pm_is_pmem) { 182 pmem_persist(addr, len); 183 } else { 184 pmem_msync(addr, len); 185 } 186 } 187 188 static uint64_t 189 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 190 { 191 uint64_t chunks_in_logical_map, logical_map_size; 192 193 chunks_in_logical_map = vol_size / chunk_size; 194 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 195 196 /* Round up to next cacheline. */ 197 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 198 REDUCE_PM_SIZE_ALIGNMENT; 199 } 200 201 static uint64_t 202 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 203 { 204 uint64_t num_chunks; 205 206 num_chunks = vol_size / chunk_size; 207 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 208 209 return num_chunks; 210 } 211 212 static inline uint32_t 213 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 214 { 215 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 216 } 217 218 static uint64_t 219 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 220 { 221 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 222 223 num_chunks = _get_total_chunks(vol_size, chunk_size); 224 io_units_per_chunk = chunk_size / backing_io_unit_size; 225 226 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 227 228 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 229 REDUCE_PM_SIZE_ALIGNMENT; 230 } 231 232 static struct spdk_reduce_chunk_map * 233 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 234 { 235 uintptr_t chunk_map_addr; 236 237 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 238 239 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 240 chunk_map_addr += chunk_map_index * 241 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 242 243 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 244 } 245 246 static int 247 _validate_vol_params(struct spdk_reduce_vol_params *params) 248 { 249 if (params->vol_size > 0) { 250 /** 251 * User does not pass in the vol size - it gets calculated by libreduce from 252 * values in this structure plus the size of the backing device. 253 */ 254 return -EINVAL; 255 } 256 257 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 258 params->logical_block_size == 0) { 259 return -EINVAL; 260 } 261 262 /* Chunk size must be an even multiple of the backing io unit size. */ 263 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 264 return -EINVAL; 265 } 266 267 /* Chunk size must be an even multiple of the logical block size. */ 268 if ((params->chunk_size % params->logical_block_size) != 0) { 269 return -1; 270 } 271 272 return 0; 273 } 274 275 static uint64_t 276 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 277 { 278 uint64_t num_chunks; 279 280 num_chunks = backing_dev_size / chunk_size; 281 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 282 return 0; 283 } 284 285 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 286 return num_chunks * chunk_size; 287 } 288 289 static uint64_t 290 _get_pm_file_size(struct spdk_reduce_vol_params *params) 291 { 292 uint64_t total_pm_size; 293 294 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 295 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 296 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 297 params->backing_io_unit_size); 298 return total_pm_size; 299 } 300 301 const struct spdk_uuid * 302 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 303 { 304 return &vol->params.uuid; 305 } 306 307 static void 308 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 309 { 310 uint64_t logical_map_size; 311 312 /* Superblock is at the beginning of the pm file. */ 313 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 314 315 /* Logical map immediately follows the super block. */ 316 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 317 318 /* Chunks maps follow the logical map. */ 319 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 320 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 321 } 322 323 /* We need 2 iovs during load - one for the superblock, another for the path */ 324 #define LOAD_IOV_COUNT 2 325 326 struct reduce_init_load_ctx { 327 struct spdk_reduce_vol *vol; 328 struct spdk_reduce_vol_cb_args backing_cb_args; 329 spdk_reduce_vol_op_with_handle_complete cb_fn; 330 void *cb_arg; 331 struct iovec iov[LOAD_IOV_COUNT]; 332 void *path; 333 }; 334 335 static int 336 _allocate_vol_requests(struct spdk_reduce_vol *vol) 337 { 338 struct spdk_reduce_vol_request *req; 339 int i; 340 341 /* Allocate 2x since we need buffers for both read/write and compress/decompress 342 * intermediate buffers. 343 */ 344 vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 345 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 346 if (vol->buf_mem == NULL) { 347 return -ENOMEM; 348 } 349 350 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 351 if (vol->request_mem == NULL) { 352 spdk_free(vol->buf_mem); 353 vol->buf_mem = NULL; 354 return -ENOMEM; 355 } 356 357 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 358 * buffers. 359 */ 360 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 361 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 362 if (vol->buf_iov_mem == NULL) { 363 free(vol->request_mem); 364 spdk_free(vol->buf_mem); 365 vol->request_mem = NULL; 366 vol->buf_mem = NULL; 367 return -ENOMEM; 368 } 369 370 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 371 req = &vol->request_mem[i]; 372 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 373 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 374 req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; 375 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 376 req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; 377 } 378 379 return 0; 380 } 381 382 static void 383 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 384 { 385 if (ctx != NULL) { 386 spdk_free(ctx->path); 387 free(ctx); 388 } 389 390 if (vol != NULL) { 391 if (vol->pm_file.pm_buf != NULL) { 392 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 393 } 394 395 spdk_free(vol->backing_super); 396 spdk_bit_array_free(&vol->allocated_chunk_maps); 397 spdk_bit_array_free(&vol->allocated_backing_io_units); 398 free(vol->request_mem); 399 free(vol->buf_iov_mem); 400 spdk_free(vol->buf_mem); 401 free(vol); 402 } 403 } 404 405 static int 406 _alloc_zero_buff(void) 407 { 408 int rc = 0; 409 410 /* The zero buffer is shared between all volumnes and just used 411 * for reads so allocate one global instance here if not already 412 * allocated when another vol init'd or loaded. 413 */ 414 if (g_vol_count++ == 0) { 415 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 416 64, NULL, SPDK_ENV_LCORE_ID_ANY, 417 SPDK_MALLOC_DMA); 418 if (g_zero_buf == NULL) { 419 rc = -ENOMEM; 420 } 421 } 422 return rc; 423 } 424 425 static void 426 _init_write_super_cpl(void *cb_arg, int reduce_errno) 427 { 428 struct reduce_init_load_ctx *init_ctx = cb_arg; 429 int rc; 430 431 rc = _allocate_vol_requests(init_ctx->vol); 432 if (rc != 0) { 433 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 434 _init_load_cleanup(init_ctx->vol, init_ctx); 435 return; 436 } 437 438 rc = _alloc_zero_buff(); 439 if (rc != 0) { 440 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 441 _init_load_cleanup(init_ctx->vol, init_ctx); 442 return; 443 } 444 445 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 446 /* Only clean up the ctx - the vol has been passed to the application 447 * for use now that initialization was successful. 448 */ 449 _init_load_cleanup(NULL, init_ctx); 450 } 451 452 static void 453 _init_write_path_cpl(void *cb_arg, int reduce_errno) 454 { 455 struct reduce_init_load_ctx *init_ctx = cb_arg; 456 struct spdk_reduce_vol *vol = init_ctx->vol; 457 458 init_ctx->iov[0].iov_base = vol->backing_super; 459 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 460 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 461 init_ctx->backing_cb_args.cb_arg = init_ctx; 462 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 463 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 464 &init_ctx->backing_cb_args); 465 } 466 467 static int 468 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 469 { 470 uint64_t total_chunks, total_backing_io_units; 471 uint32_t i, num_metadata_io_units; 472 473 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 474 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 475 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 476 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 477 478 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 479 return -ENOMEM; 480 } 481 482 /* Set backing io unit bits associated with metadata. */ 483 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 484 vol->backing_dev->blocklen; 485 for (i = 0; i < num_metadata_io_units; i++) { 486 spdk_bit_array_set(vol->allocated_backing_io_units, i); 487 } 488 489 return 0; 490 } 491 492 void 493 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 494 struct spdk_reduce_backing_dev *backing_dev, 495 const char *pm_file_dir, 496 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 497 { 498 struct spdk_reduce_vol *vol; 499 struct reduce_init_load_ctx *init_ctx; 500 uint64_t backing_dev_size; 501 size_t mapped_len; 502 int dir_len, max_dir_len, rc; 503 504 /* We need to append a path separator and the UUID to the supplied 505 * path. 506 */ 507 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 508 dir_len = strnlen(pm_file_dir, max_dir_len); 509 /* Strip trailing slash if the user provided one - we will add it back 510 * later when appending the filename. 511 */ 512 if (pm_file_dir[dir_len - 1] == '/') { 513 dir_len--; 514 } 515 if (dir_len == max_dir_len) { 516 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 517 cb_fn(cb_arg, NULL, -EINVAL); 518 return; 519 } 520 521 rc = _validate_vol_params(params); 522 if (rc != 0) { 523 SPDK_ERRLOG("invalid vol params\n"); 524 cb_fn(cb_arg, NULL, rc); 525 return; 526 } 527 528 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 529 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 530 if (params->vol_size == 0) { 531 SPDK_ERRLOG("backing device is too small\n"); 532 cb_fn(cb_arg, NULL, -EINVAL); 533 return; 534 } 535 536 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 537 backing_dev->unmap == NULL) { 538 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 539 cb_fn(cb_arg, NULL, -EINVAL); 540 return; 541 } 542 543 vol = calloc(1, sizeof(*vol)); 544 if (vol == NULL) { 545 cb_fn(cb_arg, NULL, -ENOMEM); 546 return; 547 } 548 549 TAILQ_INIT(&vol->free_requests); 550 TAILQ_INIT(&vol->executing_requests); 551 TAILQ_INIT(&vol->queued_requests); 552 553 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 554 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 555 if (vol->backing_super == NULL) { 556 cb_fn(cb_arg, NULL, -ENOMEM); 557 _init_load_cleanup(vol, NULL); 558 return; 559 } 560 561 init_ctx = calloc(1, sizeof(*init_ctx)); 562 if (init_ctx == NULL) { 563 cb_fn(cb_arg, NULL, -ENOMEM); 564 _init_load_cleanup(vol, NULL); 565 return; 566 } 567 568 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 569 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 570 if (init_ctx->path == NULL) { 571 cb_fn(cb_arg, NULL, -ENOMEM); 572 _init_load_cleanup(vol, init_ctx); 573 return; 574 } 575 576 if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { 577 spdk_uuid_generate(¶ms->uuid); 578 } 579 580 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 581 vol->pm_file.path[dir_len] = '/'; 582 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 583 ¶ms->uuid); 584 vol->pm_file.size = _get_pm_file_size(params); 585 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 586 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 587 &mapped_len, &vol->pm_file.pm_is_pmem); 588 if (vol->pm_file.pm_buf == NULL) { 589 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 590 vol->pm_file.path, strerror(errno)); 591 cb_fn(cb_arg, NULL, -errno); 592 _init_load_cleanup(vol, init_ctx); 593 return; 594 } 595 596 if (vol->pm_file.size != mapped_len) { 597 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 598 vol->pm_file.size, mapped_len); 599 cb_fn(cb_arg, NULL, -ENOMEM); 600 _init_load_cleanup(vol, init_ctx); 601 return; 602 } 603 604 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 605 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 606 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 607 memcpy(&vol->params, params, sizeof(*params)); 608 609 vol->backing_dev = backing_dev; 610 611 rc = _allocate_bit_arrays(vol); 612 if (rc != 0) { 613 cb_fn(cb_arg, NULL, rc); 614 _init_load_cleanup(vol, init_ctx); 615 return; 616 } 617 618 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 619 sizeof(vol->backing_super->signature)); 620 memcpy(&vol->backing_super->params, params, sizeof(*params)); 621 622 _initialize_vol_pm_pointers(vol); 623 624 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 625 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 626 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 627 */ 628 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 629 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 630 631 init_ctx->vol = vol; 632 init_ctx->cb_fn = cb_fn; 633 init_ctx->cb_arg = cb_arg; 634 635 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 636 init_ctx->iov[0].iov_base = init_ctx->path; 637 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 638 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 639 init_ctx->backing_cb_args.cb_arg = init_ctx; 640 /* Write path to offset 4K on backing device - just after where the super 641 * block will be written. We wait until this is committed before writing the 642 * super block to guarantee we don't get the super block written without the 643 * the path if the system crashed in the middle of a write operation. 644 */ 645 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 646 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 647 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 648 &init_ctx->backing_cb_args); 649 } 650 651 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 652 653 static void 654 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 655 { 656 struct reduce_init_load_ctx *load_ctx = cb_arg; 657 struct spdk_reduce_vol *vol = load_ctx->vol; 658 uint64_t backing_dev_size; 659 uint64_t i, num_chunks, logical_map_index; 660 struct spdk_reduce_chunk_map *chunk; 661 size_t mapped_len; 662 uint32_t j; 663 int rc; 664 665 rc = _alloc_zero_buff(); 666 if (rc) { 667 goto error; 668 } 669 670 if (memcmp(vol->backing_super->signature, 671 SPDK_REDUCE_SIGNATURE, 672 sizeof(vol->backing_super->signature)) != 0) { 673 /* This backing device isn't a libreduce backing device. */ 674 rc = -EILSEQ; 675 goto error; 676 } 677 678 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 679 * So don't bother getting the volume ready to use - invoke the callback immediately 680 * so destroy_load_cb can delete the metadata off of the block device and delete the 681 * persistent memory file if it exists. 682 */ 683 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 684 if (load_ctx->cb_fn == (*destroy_load_cb)) { 685 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 686 _init_load_cleanup(NULL, load_ctx); 687 return; 688 } 689 690 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 691 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 692 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 693 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 694 695 rc = _allocate_bit_arrays(vol); 696 if (rc != 0) { 697 goto error; 698 } 699 700 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 701 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 702 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 703 backing_dev_size); 704 rc = -EILSEQ; 705 goto error; 706 } 707 708 vol->pm_file.size = _get_pm_file_size(&vol->params); 709 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 710 &vol->pm_file.pm_is_pmem); 711 if (vol->pm_file.pm_buf == NULL) { 712 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 713 rc = -errno; 714 goto error; 715 } 716 717 if (vol->pm_file.size != mapped_len) { 718 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 719 vol->pm_file.size, mapped_len); 720 rc = -ENOMEM; 721 goto error; 722 } 723 724 rc = _allocate_vol_requests(vol); 725 if (rc != 0) { 726 goto error; 727 } 728 729 _initialize_vol_pm_pointers(vol); 730 731 num_chunks = vol->params.vol_size / vol->params.chunk_size; 732 for (i = 0; i < num_chunks; i++) { 733 logical_map_index = vol->pm_logical_map[i]; 734 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 735 continue; 736 } 737 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 738 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 739 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 740 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 741 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 742 } 743 } 744 } 745 746 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 747 /* Only clean up the ctx - the vol has been passed to the application 748 * for use now that volume load was successful. 749 */ 750 _init_load_cleanup(NULL, load_ctx); 751 return; 752 753 error: 754 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 755 _init_load_cleanup(vol, load_ctx); 756 } 757 758 void 759 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 760 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 761 { 762 struct spdk_reduce_vol *vol; 763 struct reduce_init_load_ctx *load_ctx; 764 765 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 766 backing_dev->unmap == NULL) { 767 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 768 cb_fn(cb_arg, NULL, -EINVAL); 769 return; 770 } 771 772 vol = calloc(1, sizeof(*vol)); 773 if (vol == NULL) { 774 cb_fn(cb_arg, NULL, -ENOMEM); 775 return; 776 } 777 778 TAILQ_INIT(&vol->free_requests); 779 TAILQ_INIT(&vol->executing_requests); 780 TAILQ_INIT(&vol->queued_requests); 781 782 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 783 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 784 if (vol->backing_super == NULL) { 785 _init_load_cleanup(vol, NULL); 786 cb_fn(cb_arg, NULL, -ENOMEM); 787 return; 788 } 789 790 vol->backing_dev = backing_dev; 791 792 load_ctx = calloc(1, sizeof(*load_ctx)); 793 if (load_ctx == NULL) { 794 _init_load_cleanup(vol, NULL); 795 cb_fn(cb_arg, NULL, -ENOMEM); 796 return; 797 } 798 799 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 800 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 801 if (load_ctx->path == NULL) { 802 _init_load_cleanup(vol, load_ctx); 803 cb_fn(cb_arg, NULL, -ENOMEM); 804 return; 805 } 806 807 load_ctx->vol = vol; 808 load_ctx->cb_fn = cb_fn; 809 load_ctx->cb_arg = cb_arg; 810 811 load_ctx->iov[0].iov_base = vol->backing_super; 812 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 813 load_ctx->iov[1].iov_base = load_ctx->path; 814 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 815 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 816 load_ctx->backing_cb_args.cb_arg = load_ctx; 817 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 818 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 819 vol->backing_dev->blocklen, 820 &load_ctx->backing_cb_args); 821 } 822 823 void 824 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 825 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 826 { 827 if (vol == NULL) { 828 /* This indicates a programming error. */ 829 assert(false); 830 cb_fn(cb_arg, -EINVAL); 831 return; 832 } 833 834 if (--g_vol_count == 0) { 835 spdk_free(g_zero_buf); 836 } 837 assert(g_vol_count >= 0); 838 _init_load_cleanup(vol, NULL); 839 cb_fn(cb_arg, 0); 840 } 841 842 struct reduce_destroy_ctx { 843 spdk_reduce_vol_op_complete cb_fn; 844 void *cb_arg; 845 struct spdk_reduce_vol *vol; 846 struct spdk_reduce_vol_superblock *super; 847 struct iovec iov; 848 struct spdk_reduce_vol_cb_args backing_cb_args; 849 int reduce_errno; 850 char pm_path[REDUCE_PATH_MAX]; 851 }; 852 853 static void 854 destroy_unload_cpl(void *cb_arg, int reduce_errno) 855 { 856 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 857 858 if (destroy_ctx->reduce_errno == 0) { 859 if (unlink(destroy_ctx->pm_path)) { 860 SPDK_ERRLOG("%s could not be unlinked: %s\n", 861 destroy_ctx->pm_path, strerror(errno)); 862 } 863 } 864 865 /* Even if the unload somehow failed, we still pass the destroy_ctx 866 * reduce_errno since that indicates whether or not the volume was 867 * actually destroyed. 868 */ 869 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 870 spdk_free(destroy_ctx->super); 871 free(destroy_ctx); 872 } 873 874 static void 875 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 876 { 877 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 878 struct spdk_reduce_vol *vol = destroy_ctx->vol; 879 880 destroy_ctx->reduce_errno = reduce_errno; 881 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 882 } 883 884 static void 885 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 886 { 887 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 888 889 if (reduce_errno != 0) { 890 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 891 spdk_free(destroy_ctx->super); 892 free(destroy_ctx); 893 return; 894 } 895 896 destroy_ctx->vol = vol; 897 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 898 destroy_ctx->iov.iov_base = destroy_ctx->super; 899 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 900 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 901 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 902 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 903 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 904 &destroy_ctx->backing_cb_args); 905 } 906 907 void 908 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 909 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 910 { 911 struct reduce_destroy_ctx *destroy_ctx; 912 913 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 914 if (destroy_ctx == NULL) { 915 cb_fn(cb_arg, -ENOMEM); 916 return; 917 } 918 919 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 920 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 921 if (destroy_ctx->super == NULL) { 922 free(destroy_ctx); 923 cb_fn(cb_arg, -ENOMEM); 924 return; 925 } 926 destroy_ctx->cb_fn = cb_fn; 927 destroy_ctx->cb_arg = cb_arg; 928 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 929 } 930 931 static bool 932 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 933 { 934 uint64_t start_chunk, end_chunk; 935 936 start_chunk = offset / vol->logical_blocks_per_chunk; 937 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 938 939 return (start_chunk != end_chunk); 940 } 941 942 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 943 944 static void 945 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 946 { 947 struct spdk_reduce_vol_request *next_req; 948 struct spdk_reduce_vol *vol = req->vol; 949 950 req->cb_fn(req->cb_arg, reduce_errno); 951 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 952 953 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 954 if (next_req->logical_map_index == req->logical_map_index) { 955 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 956 if (next_req->type == REDUCE_IO_READV) { 957 _start_readv_request(next_req); 958 } else { 959 assert(next_req->type == REDUCE_IO_WRITEV); 960 _start_writev_request(next_req); 961 } 962 break; 963 } 964 } 965 966 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 967 } 968 969 static void 970 _write_write_done(void *_req, int reduce_errno) 971 { 972 struct spdk_reduce_vol_request *req = _req; 973 struct spdk_reduce_vol *vol = req->vol; 974 uint64_t old_chunk_map_index; 975 struct spdk_reduce_chunk_map *old_chunk; 976 uint32_t i; 977 978 if (reduce_errno != 0) { 979 req->reduce_errno = reduce_errno; 980 } 981 982 assert(req->num_backing_ops > 0); 983 if (--req->num_backing_ops > 0) { 984 return; 985 } 986 987 if (req->reduce_errno != 0) { 988 _reduce_vol_complete_req(req, req->reduce_errno); 989 return; 990 } 991 992 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 993 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 994 old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); 995 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 996 if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 997 break; 998 } 999 assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); 1000 spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); 1001 old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1002 } 1003 spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); 1004 } 1005 1006 /* 1007 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1008 * becomes invalid after we update the logical map, since the old chunk map will no 1009 * longer have a reference to it in the logical map. 1010 */ 1011 1012 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1013 _reduce_persist(vol, req->chunk, 1014 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1015 1016 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1017 1018 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1019 1020 _reduce_vol_complete_req(req, 0); 1021 } 1022 1023 static void 1024 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1025 reduce_request_fn next_fn, bool is_write) 1026 { 1027 struct iovec *iov; 1028 uint8_t *buf; 1029 uint32_t i; 1030 1031 if (req->chunk_is_compressed) { 1032 iov = req->comp_buf_iov; 1033 buf = req->comp_buf; 1034 } else { 1035 iov = req->decomp_buf_iov; 1036 buf = req->decomp_buf; 1037 } 1038 1039 req->num_backing_ops = req->num_io_units; 1040 req->backing_cb_args.cb_fn = next_fn; 1041 req->backing_cb_args.cb_arg = req; 1042 for (i = 0; i < req->num_io_units; i++) { 1043 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1044 iov[i].iov_len = vol->params.backing_io_unit_size; 1045 if (is_write) { 1046 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1047 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1048 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1049 } else { 1050 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1051 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1052 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1053 } 1054 } 1055 } 1056 1057 static void 1058 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1059 uint32_t compressed_size) 1060 { 1061 struct spdk_reduce_vol *vol = req->vol; 1062 uint32_t i; 1063 uint64_t chunk_offset, remainder, total_len = 0; 1064 uint8_t *buf; 1065 int j; 1066 1067 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1068 1069 /* TODO: fail if no chunk map found - but really this should not happen if we 1070 * size the number of requests similarly to number of extra chunk maps 1071 */ 1072 assert(req->chunk_map_index != UINT32_MAX); 1073 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1074 1075 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1076 req->num_io_units = spdk_divide_round_up(compressed_size, 1077 vol->params.backing_io_unit_size); 1078 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1079 req->chunk->compressed_size = 1080 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1081 1082 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1083 if (req->chunk_is_compressed == false) { 1084 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1085 buf = req->decomp_buf; 1086 total_len = chunk_offset * vol->params.logical_block_size; 1087 1088 /* zero any offset into chunk */ 1089 if (req->rmw == false && chunk_offset) { 1090 memset(buf, 0, total_len); 1091 } 1092 buf += total_len; 1093 1094 /* copy the data */ 1095 for (j = 0; j < req->iovcnt; j++) { 1096 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1097 buf += req->iov[j].iov_len; 1098 total_len += req->iov[j].iov_len; 1099 } 1100 1101 /* zero any remainder */ 1102 remainder = vol->params.chunk_size - total_len; 1103 total_len += remainder; 1104 if (req->rmw == false && remainder) { 1105 memset(buf, 0, remainder); 1106 } 1107 assert(total_len == vol->params.chunk_size); 1108 } 1109 1110 for (i = 0; i < req->num_io_units; i++) { 1111 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1112 /* TODO: fail if no backing block found - but really this should also not 1113 * happen (see comment above). 1114 */ 1115 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1116 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1117 } 1118 1119 _issue_backing_ops(req, vol, next_fn, true /* write */); 1120 } 1121 1122 static void 1123 _write_compress_done(void *_req, int reduce_errno) 1124 { 1125 struct spdk_reduce_vol_request *req = _req; 1126 1127 /* Negative reduce_errno indicates failure for compression operations. 1128 * Just write the uncompressed data instead. Force this to happen 1129 * by just passing the full chunk size to _reduce_vol_write_chunk. 1130 * When it sees the data couldn't be compressed, it will just write 1131 * the uncompressed buffer to disk. 1132 */ 1133 if (reduce_errno < 0) { 1134 reduce_errno = req->vol->params.chunk_size; 1135 } 1136 1137 /* Positive reduce_errno indicates number of bytes in compressed buffer. */ 1138 _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); 1139 } 1140 1141 static void 1142 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1143 { 1144 struct spdk_reduce_vol *vol = req->vol; 1145 1146 req->backing_cb_args.cb_fn = next_fn; 1147 req->backing_cb_args.cb_arg = req; 1148 req->comp_buf_iov[0].iov_base = req->comp_buf; 1149 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1150 vol->backing_dev->compress(vol->backing_dev, 1151 &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1, 1152 &req->backing_cb_args); 1153 } 1154 1155 static void 1156 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1157 { 1158 struct spdk_reduce_vol *vol = req->vol; 1159 1160 req->backing_cb_args.cb_fn = next_fn; 1161 req->backing_cb_args.cb_arg = req; 1162 req->comp_buf_iov[0].iov_base = req->comp_buf; 1163 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1164 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1165 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1166 vol->backing_dev->decompress(vol->backing_dev, 1167 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1168 &req->backing_cb_args); 1169 } 1170 1171 static void 1172 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1173 { 1174 struct spdk_reduce_vol *vol = req->vol; 1175 uint64_t chunk_offset, remainder = 0; 1176 uint64_t ttl_len = 0; 1177 int i; 1178 1179 req->decomp_iovcnt = 0; 1180 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1181 1182 if (chunk_offset) { 1183 /* first iov point to our scratch buffer for any offset into the chunk */ 1184 req->decomp_iov[0].iov_base = req->decomp_buf; 1185 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1186 ttl_len += req->decomp_iov[0].iov_len; 1187 req->decomp_iovcnt = 1; 1188 } 1189 1190 /* now the user data iov, direct to the user buffer */ 1191 for (i = 0; i < req->iovcnt; i++) { 1192 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1193 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1194 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1195 } 1196 req->decomp_iovcnt += req->iovcnt; 1197 1198 /* send the rest of the chunk to our scratch buffer */ 1199 remainder = vol->params.chunk_size - ttl_len; 1200 if (remainder) { 1201 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1202 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1203 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1204 req->decomp_iovcnt++; 1205 } 1206 assert(ttl_len == vol->params.chunk_size); 1207 1208 req->backing_cb_args.cb_fn = next_fn; 1209 req->backing_cb_args.cb_arg = req; 1210 req->comp_buf_iov[0].iov_base = req->comp_buf; 1211 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1212 vol->backing_dev->decompress(vol->backing_dev, 1213 req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt, 1214 &req->backing_cb_args); 1215 } 1216 1217 static void 1218 _write_decompress_done(void *_req, int reduce_errno) 1219 { 1220 struct spdk_reduce_vol_request *req = _req; 1221 struct spdk_reduce_vol *vol = req->vol; 1222 uint64_t chunk_offset, remainder, ttl_len = 0; 1223 int i; 1224 1225 /* Negative reduce_errno indicates failure for compression operations. */ 1226 if (reduce_errno < 0) { 1227 _reduce_vol_complete_req(req, reduce_errno); 1228 return; 1229 } 1230 1231 /* Positive reduce_errno indicates number of bytes in decompressed 1232 * buffer. This should equal the chunk size - otherwise that's another 1233 * type of failure. 1234 */ 1235 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1236 _reduce_vol_complete_req(req, -EIO); 1237 return; 1238 } 1239 1240 req->decomp_iovcnt = 0; 1241 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1242 1243 if (chunk_offset) { 1244 req->decomp_iov[0].iov_base = req->decomp_buf; 1245 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1246 ttl_len += req->decomp_iov[0].iov_len; 1247 req->decomp_iovcnt = 1; 1248 } 1249 1250 for (i = 0; i < req->iovcnt; i++) { 1251 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1252 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1253 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1254 } 1255 req->decomp_iovcnt += req->iovcnt; 1256 1257 remainder = vol->params.chunk_size - ttl_len; 1258 if (remainder) { 1259 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1260 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1261 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1262 req->decomp_iovcnt++; 1263 } 1264 assert(ttl_len == vol->params.chunk_size); 1265 1266 _reduce_vol_compress_chunk(req, _write_compress_done); 1267 } 1268 1269 static void 1270 _write_read_done(void *_req, int reduce_errno) 1271 { 1272 struct spdk_reduce_vol_request *req = _req; 1273 1274 if (reduce_errno != 0) { 1275 req->reduce_errno = reduce_errno; 1276 } 1277 1278 assert(req->num_backing_ops > 0); 1279 if (--req->num_backing_ops > 0) { 1280 return; 1281 } 1282 1283 if (req->reduce_errno != 0) { 1284 _reduce_vol_complete_req(req, req->reduce_errno); 1285 return; 1286 } 1287 1288 if (req->chunk_is_compressed) { 1289 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1290 } else { 1291 _write_decompress_done(req, req->chunk->compressed_size); 1292 } 1293 } 1294 1295 static void 1296 _read_decompress_done(void *_req, int reduce_errno) 1297 { 1298 struct spdk_reduce_vol_request *req = _req; 1299 struct spdk_reduce_vol *vol = req->vol; 1300 1301 /* Negative reduce_errno indicates failure for compression operations. */ 1302 if (reduce_errno < 0) { 1303 _reduce_vol_complete_req(req, reduce_errno); 1304 return; 1305 } 1306 1307 /* Positive reduce_errno indicates number of bytes in decompressed 1308 * buffer. This should equal the chunk size - otherwise that's another 1309 * type of failure. 1310 */ 1311 if ((uint32_t)reduce_errno != vol->params.chunk_size) { 1312 _reduce_vol_complete_req(req, -EIO); 1313 return; 1314 } 1315 1316 _reduce_vol_complete_req(req, 0); 1317 } 1318 1319 static void 1320 _read_read_done(void *_req, int reduce_errno) 1321 { 1322 struct spdk_reduce_vol_request *req = _req; 1323 uint64_t chunk_offset; 1324 uint8_t *buf; 1325 int i; 1326 1327 if (reduce_errno != 0) { 1328 req->reduce_errno = reduce_errno; 1329 } 1330 1331 assert(req->num_backing_ops > 0); 1332 if (--req->num_backing_ops > 0) { 1333 return; 1334 } 1335 1336 if (req->reduce_errno != 0) { 1337 _reduce_vol_complete_req(req, req->reduce_errno); 1338 return; 1339 } 1340 1341 if (req->chunk_is_compressed) { 1342 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1343 } else { 1344 1345 /* If the chunk was compressed, the data would have been sent to the 1346 * host buffers by the decompression operation, if not we need to memcpy here. 1347 */ 1348 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1349 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1350 for (i = 0; i < req->iovcnt; i++) { 1351 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1352 buf += req->iov[i].iov_len; 1353 } 1354 1355 _read_decompress_done(req, req->chunk->compressed_size); 1356 } 1357 } 1358 1359 static void 1360 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1361 { 1362 struct spdk_reduce_vol *vol = req->vol; 1363 1364 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1365 assert(req->chunk_map_index != UINT32_MAX); 1366 1367 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1368 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1369 vol->params.backing_io_unit_size); 1370 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1371 1372 _issue_backing_ops(req, vol, next_fn, false /* read */); 1373 } 1374 1375 static bool 1376 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1377 uint64_t length) 1378 { 1379 uint64_t size = 0; 1380 int i; 1381 1382 if (iovcnt > REDUCE_MAX_IOVECS) { 1383 return false; 1384 } 1385 1386 for (i = 0; i < iovcnt; i++) { 1387 size += iov[i].iov_len; 1388 } 1389 1390 return size == (length * vol->params.logical_block_size); 1391 } 1392 1393 static bool 1394 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1395 { 1396 struct spdk_reduce_vol_request *req; 1397 1398 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1399 if (logical_map_index == req->logical_map_index) { 1400 return true; 1401 } 1402 } 1403 1404 return false; 1405 } 1406 1407 static void 1408 _start_readv_request(struct spdk_reduce_vol_request *req) 1409 { 1410 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1411 _reduce_vol_read_chunk(req, _read_read_done); 1412 } 1413 1414 void 1415 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1416 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1417 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1418 { 1419 struct spdk_reduce_vol_request *req; 1420 uint64_t logical_map_index; 1421 bool overlapped; 1422 int i; 1423 1424 if (length == 0) { 1425 cb_fn(cb_arg, 0); 1426 return; 1427 } 1428 1429 if (_request_spans_chunk_boundary(vol, offset, length)) { 1430 cb_fn(cb_arg, -EINVAL); 1431 return; 1432 } 1433 1434 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1435 cb_fn(cb_arg, -EINVAL); 1436 return; 1437 } 1438 1439 logical_map_index = offset / vol->logical_blocks_per_chunk; 1440 overlapped = _check_overlap(vol, logical_map_index); 1441 1442 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1443 /* 1444 * This chunk hasn't been allocated. So treat the data as all 1445 * zeroes for this chunk - do the memset and immediately complete 1446 * the operation. 1447 */ 1448 for (i = 0; i < iovcnt; i++) { 1449 memset(iov[i].iov_base, 0, iov[i].iov_len); 1450 } 1451 cb_fn(cb_arg, 0); 1452 return; 1453 } 1454 1455 req = TAILQ_FIRST(&vol->free_requests); 1456 if (req == NULL) { 1457 cb_fn(cb_arg, -ENOMEM); 1458 return; 1459 } 1460 1461 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1462 req->type = REDUCE_IO_READV; 1463 req->vol = vol; 1464 req->iov = iov; 1465 req->iovcnt = iovcnt; 1466 req->offset = offset; 1467 req->logical_map_index = logical_map_index; 1468 req->length = length; 1469 req->cb_fn = cb_fn; 1470 req->cb_arg = cb_arg; 1471 1472 if (!overlapped) { 1473 _start_readv_request(req); 1474 } else { 1475 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1476 } 1477 } 1478 1479 static void 1480 _start_writev_request(struct spdk_reduce_vol_request *req) 1481 { 1482 struct spdk_reduce_vol *vol = req->vol; 1483 uint64_t chunk_offset, ttl_len = 0; 1484 uint64_t remainder = 0; 1485 uint32_t lbsize; 1486 int i; 1487 1488 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1489 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1490 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1491 /* Read old chunk, then overwrite with data from this write 1492 * operation. 1493 */ 1494 req->rmw = true; 1495 _reduce_vol_read_chunk(req, _write_read_done); 1496 return; 1497 } 1498 } 1499 1500 lbsize = vol->params.logical_block_size; 1501 req->decomp_iovcnt = 0; 1502 req->rmw = false; 1503 1504 /* Note: point to our zero buf for offset into the chunk. */ 1505 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1506 if (chunk_offset != 0) { 1507 ttl_len += chunk_offset * lbsize; 1508 req->decomp_iov[0].iov_base = g_zero_buf; 1509 req->decomp_iov[0].iov_len = ttl_len; 1510 req->decomp_iovcnt = 1; 1511 } 1512 1513 /* now the user data iov, direct from the user buffer */ 1514 for (i = 0; i < req->iovcnt; i++) { 1515 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1516 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1517 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1518 } 1519 req->decomp_iovcnt += req->iovcnt; 1520 1521 remainder = vol->params.chunk_size - ttl_len; 1522 if (remainder) { 1523 req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf; 1524 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1525 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1526 req->decomp_iovcnt++; 1527 } 1528 assert(ttl_len == req->vol->params.chunk_size); 1529 1530 _reduce_vol_compress_chunk(req, _write_compress_done); 1531 } 1532 1533 void 1534 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1535 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1536 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1537 { 1538 struct spdk_reduce_vol_request *req; 1539 uint64_t logical_map_index; 1540 bool overlapped; 1541 1542 if (length == 0) { 1543 cb_fn(cb_arg, 0); 1544 return; 1545 } 1546 1547 if (_request_spans_chunk_boundary(vol, offset, length)) { 1548 cb_fn(cb_arg, -EINVAL); 1549 return; 1550 } 1551 1552 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1553 cb_fn(cb_arg, -EINVAL); 1554 return; 1555 } 1556 1557 logical_map_index = offset / vol->logical_blocks_per_chunk; 1558 overlapped = _check_overlap(vol, logical_map_index); 1559 1560 req = TAILQ_FIRST(&vol->free_requests); 1561 if (req == NULL) { 1562 cb_fn(cb_arg, -ENOMEM); 1563 return; 1564 } 1565 1566 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1567 req->type = REDUCE_IO_WRITEV; 1568 req->vol = vol; 1569 req->iov = iov; 1570 req->iovcnt = iovcnt; 1571 req->offset = offset; 1572 req->logical_map_index = logical_map_index; 1573 req->length = length; 1574 req->cb_fn = cb_fn; 1575 req->cb_arg = cb_arg; 1576 1577 if (!overlapped) { 1578 _start_writev_request(req); 1579 } else { 1580 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1581 } 1582 } 1583 1584 const struct spdk_reduce_vol_params * 1585 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1586 { 1587 return &vol->params; 1588 } 1589 1590 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1591 { 1592 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1593 uint32_t struct_size; 1594 uint64_t chunk_map_size; 1595 1596 SPDK_NOTICELOG("vol info:\n"); 1597 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1598 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1599 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1600 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1601 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1602 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1603 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1604 vol->params.vol_size / vol->params.chunk_size); 1605 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1606 vol->params.backing_io_unit_size); 1607 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1608 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1609 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1610 1611 SPDK_NOTICELOG("pmem info:\n"); 1612 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1613 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1614 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1615 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1616 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1617 vol->params.chunk_size); 1618 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1619 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1620 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1621 vol->params.backing_io_unit_size); 1622 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1623 } 1624 1625 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) 1626