1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/env.h" 11 #include "spdk/string.h" 12 #include "spdk/bit_array.h" 13 #include "spdk/util.h" 14 #include "spdk/log.h" 15 #include "spdk/memory.h" 16 17 #include "libpmem.h" 18 19 /* Always round up the size of the PM region to the nearest cacheline. */ 20 #define REDUCE_PM_SIZE_ALIGNMENT 64 21 22 /* Offset into the backing device where the persistent memory file's path is stored. */ 23 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 24 25 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 26 27 #define REDUCE_NUM_VOL_REQUESTS 256 28 29 /* Structure written to offset 0 of both the pm file and the backing device. */ 30 struct spdk_reduce_vol_superblock { 31 uint8_t signature[8]; 32 struct spdk_reduce_vol_params params; 33 uint8_t reserved[4048]; 34 }; 35 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 36 37 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 38 /* null terminator counts one */ 39 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 40 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 41 42 #define REDUCE_PATH_MAX 4096 43 44 #define REDUCE_ZERO_BUF_SIZE 0x100000 45 46 /** 47 * Describes a persistent memory file used to hold metadata associated with a 48 * compressed volume. 49 */ 50 struct spdk_reduce_pm_file { 51 char path[REDUCE_PATH_MAX]; 52 void *pm_buf; 53 int pm_is_pmem; 54 uint64_t size; 55 }; 56 57 #define REDUCE_IO_READV 1 58 #define REDUCE_IO_WRITEV 2 59 60 struct spdk_reduce_chunk_map { 61 uint32_t compressed_size; 62 uint32_t reserved; 63 uint64_t io_unit_index[0]; 64 }; 65 66 struct spdk_reduce_vol_request { 67 /** 68 * Scratch buffer used for uncompressed chunk. This is used for: 69 * 1) source buffer for compression operations 70 * 2) destination buffer for decompression operations 71 * 3) data buffer when writing uncompressed chunk to disk 72 * 4) data buffer when reading uncompressed chunk from disk 73 */ 74 uint8_t *decomp_buf; 75 struct iovec *decomp_buf_iov; 76 77 /** 78 * These are used to construct the iovecs that are sent to 79 * the decomp engine, they point to a mix of the scratch buffer 80 * and user buffer 81 */ 82 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 83 int decomp_iovcnt; 84 85 /** 86 * Scratch buffer used for compressed chunk. This is used for: 87 * 1) destination buffer for compression operations 88 * 2) source buffer for decompression operations 89 * 3) data buffer when writing compressed chunk to disk 90 * 4) data buffer when reading compressed chunk from disk 91 */ 92 uint8_t *comp_buf; 93 struct iovec *comp_buf_iov; 94 struct iovec *iov; 95 bool rmw; 96 struct spdk_reduce_vol *vol; 97 int type; 98 int reduce_errno; 99 int iovcnt; 100 int num_backing_ops; 101 uint32_t num_io_units; 102 bool chunk_is_compressed; 103 bool copy_after_decompress; 104 uint64_t offset; 105 uint64_t logical_map_index; 106 uint64_t length; 107 uint64_t chunk_map_index; 108 struct spdk_reduce_chunk_map *chunk; 109 spdk_reduce_vol_op_complete cb_fn; 110 void *cb_arg; 111 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 112 struct spdk_reduce_vol_cb_args backing_cb_args; 113 }; 114 115 struct spdk_reduce_vol { 116 struct spdk_reduce_vol_params params; 117 uint32_t backing_io_units_per_chunk; 118 uint32_t backing_lba_per_io_unit; 119 uint32_t logical_blocks_per_chunk; 120 struct spdk_reduce_pm_file pm_file; 121 struct spdk_reduce_backing_dev *backing_dev; 122 struct spdk_reduce_vol_superblock *backing_super; 123 struct spdk_reduce_vol_superblock *pm_super; 124 uint64_t *pm_logical_map; 125 uint64_t *pm_chunk_maps; 126 127 struct spdk_bit_array *allocated_chunk_maps; 128 struct spdk_bit_array *allocated_backing_io_units; 129 130 struct spdk_reduce_vol_request *request_mem; 131 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 132 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 133 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 134 135 /* Single contiguous buffer used for all request buffers for this volume. */ 136 uint8_t *buf_mem; 137 struct iovec *buf_iov_mem; 138 }; 139 140 static void _start_readv_request(struct spdk_reduce_vol_request *req); 141 static void _start_writev_request(struct spdk_reduce_vol_request *req); 142 static uint8_t *g_zero_buf; 143 static int g_vol_count = 0; 144 145 /* 146 * Allocate extra metadata chunks and corresponding backing io units to account for 147 * outstanding IO in worst case scenario where logical map is completely allocated 148 * and no data can be compressed. We need extra chunks in this case to handle 149 * in-flight writes since reduce never writes data in place. 150 */ 151 #define REDUCE_NUM_EXTRA_CHUNKS 128 152 153 static void 154 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 155 { 156 if (vol->pm_file.pm_is_pmem) { 157 pmem_persist(addr, len); 158 } else { 159 pmem_msync(addr, len); 160 } 161 } 162 163 static uint64_t 164 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 165 { 166 uint64_t chunks_in_logical_map, logical_map_size; 167 168 chunks_in_logical_map = vol_size / chunk_size; 169 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 170 171 /* Round up to next cacheline. */ 172 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 173 REDUCE_PM_SIZE_ALIGNMENT; 174 } 175 176 static uint64_t 177 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 178 { 179 uint64_t num_chunks; 180 181 num_chunks = vol_size / chunk_size; 182 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 183 184 return num_chunks; 185 } 186 187 static inline uint32_t 188 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 189 { 190 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 191 } 192 193 static uint64_t 194 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 195 { 196 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 197 198 num_chunks = _get_total_chunks(vol_size, chunk_size); 199 io_units_per_chunk = chunk_size / backing_io_unit_size; 200 201 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 202 203 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 204 REDUCE_PM_SIZE_ALIGNMENT; 205 } 206 207 static struct spdk_reduce_chunk_map * 208 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 209 { 210 uintptr_t chunk_map_addr; 211 212 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 213 214 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 215 chunk_map_addr += chunk_map_index * 216 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 217 218 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 219 } 220 221 static int 222 _validate_vol_params(struct spdk_reduce_vol_params *params) 223 { 224 if (params->vol_size > 0) { 225 /** 226 * User does not pass in the vol size - it gets calculated by libreduce from 227 * values in this structure plus the size of the backing device. 228 */ 229 return -EINVAL; 230 } 231 232 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 233 params->logical_block_size == 0) { 234 return -EINVAL; 235 } 236 237 /* Chunk size must be an even multiple of the backing io unit size. */ 238 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 239 return -EINVAL; 240 } 241 242 /* Chunk size must be an even multiple of the logical block size. */ 243 if ((params->chunk_size % params->logical_block_size) != 0) { 244 return -1; 245 } 246 247 return 0; 248 } 249 250 static uint64_t 251 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 252 { 253 uint64_t num_chunks; 254 255 num_chunks = backing_dev_size / chunk_size; 256 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 257 return 0; 258 } 259 260 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 261 return num_chunks * chunk_size; 262 } 263 264 static uint64_t 265 _get_pm_file_size(struct spdk_reduce_vol_params *params) 266 { 267 uint64_t total_pm_size; 268 269 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 270 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 271 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 272 params->backing_io_unit_size); 273 return total_pm_size; 274 } 275 276 const struct spdk_uuid * 277 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 278 { 279 return &vol->params.uuid; 280 } 281 282 static void 283 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 284 { 285 uint64_t logical_map_size; 286 287 /* Superblock is at the beginning of the pm file. */ 288 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 289 290 /* Logical map immediately follows the super block. */ 291 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 292 293 /* Chunks maps follow the logical map. */ 294 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 295 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 296 } 297 298 /* We need 2 iovs during load - one for the superblock, another for the path */ 299 #define LOAD_IOV_COUNT 2 300 301 struct reduce_init_load_ctx { 302 struct spdk_reduce_vol *vol; 303 struct spdk_reduce_vol_cb_args backing_cb_args; 304 spdk_reduce_vol_op_with_handle_complete cb_fn; 305 void *cb_arg; 306 struct iovec iov[LOAD_IOV_COUNT]; 307 void *path; 308 }; 309 310 static inline bool 311 _addr_crosses_huge_page(const void *addr, size_t *size) 312 { 313 size_t _size; 314 uint64_t rc; 315 316 assert(size); 317 318 _size = *size; 319 rc = spdk_vtophys(addr, size); 320 321 return rc == SPDK_VTOPHYS_ERROR || _size != *size; 322 } 323 324 static inline int 325 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) 326 { 327 uint8_t *addr; 328 size_t size_tmp = buffer_size; 329 330 addr = *_addr; 331 332 /* Verify that addr + buffer_size doesn't cross huge page boundary */ 333 if (_addr_crosses_huge_page(addr, &size_tmp)) { 334 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. 335 * Skip remaining bytes and continue from the beginning of the next page */ 336 addr += size_tmp; 337 } 338 339 if (addr + buffer_size > addr_range) { 340 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); 341 return -ERANGE; 342 } 343 344 *vol_buffer = addr; 345 *_addr = addr + buffer_size; 346 347 return 0; 348 } 349 350 static int 351 _allocate_vol_requests(struct spdk_reduce_vol *vol) 352 { 353 struct spdk_reduce_vol_request *req; 354 uint32_t reqs_in_2mb_page, huge_pages_needed; 355 uint8_t *buffer, *buffer_end; 356 int i = 0; 357 int rc = 0; 358 359 /* It is needed to allocate comp and decomp buffers so that they do not cross physical 360 * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not 361 * necessarily power of 2 362 * Allocate 2x since we need buffers for both read/write and compress/decompress 363 * intermediate buffers. */ 364 reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); 365 if (!reqs_in_2mb_page) { 366 return -EINVAL; 367 } 368 huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); 369 370 vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); 371 if (vol->buf_mem == NULL) { 372 return -ENOMEM; 373 } 374 375 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 376 if (vol->request_mem == NULL) { 377 spdk_free(vol->buf_mem); 378 vol->buf_mem = NULL; 379 return -ENOMEM; 380 } 381 382 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 383 * buffers. 384 */ 385 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 386 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 387 if (vol->buf_iov_mem == NULL) { 388 free(vol->request_mem); 389 spdk_free(vol->buf_mem); 390 vol->request_mem = NULL; 391 vol->buf_mem = NULL; 392 return -ENOMEM; 393 } 394 395 buffer = vol->buf_mem; 396 buffer_end = buffer + VALUE_2MB * huge_pages_needed; 397 398 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 399 req = &vol->request_mem[i]; 400 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 401 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 402 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 403 404 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); 405 if (rc) { 406 SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 407 vol->buf_mem, buffer_end); 408 break; 409 } 410 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); 411 if (rc) { 412 SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 413 vol->buf_mem, buffer_end); 414 break; 415 } 416 } 417 418 if (rc) { 419 free(vol->buf_iov_mem); 420 free(vol->request_mem); 421 spdk_free(vol->buf_mem); 422 vol->buf_mem = NULL; 423 vol->buf_iov_mem = NULL; 424 vol->request_mem = NULL; 425 } 426 427 return rc; 428 } 429 430 static void 431 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 432 { 433 if (ctx != NULL) { 434 spdk_free(ctx->path); 435 free(ctx); 436 } 437 438 if (vol != NULL) { 439 if (vol->pm_file.pm_buf != NULL) { 440 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 441 } 442 443 spdk_free(vol->backing_super); 444 spdk_bit_array_free(&vol->allocated_chunk_maps); 445 spdk_bit_array_free(&vol->allocated_backing_io_units); 446 free(vol->request_mem); 447 free(vol->buf_iov_mem); 448 spdk_free(vol->buf_mem); 449 free(vol); 450 } 451 } 452 453 static int 454 _alloc_zero_buff(void) 455 { 456 int rc = 0; 457 458 /* The zero buffer is shared between all volumes and just used 459 * for reads so allocate one global instance here if not already 460 * allocated when another vol init'd or loaded. 461 */ 462 if (g_vol_count++ == 0) { 463 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 464 64, NULL, SPDK_ENV_LCORE_ID_ANY, 465 SPDK_MALLOC_DMA); 466 if (g_zero_buf == NULL) { 467 g_vol_count--; 468 rc = -ENOMEM; 469 } 470 } 471 return rc; 472 } 473 474 static void 475 _init_write_super_cpl(void *cb_arg, int reduce_errno) 476 { 477 struct reduce_init_load_ctx *init_ctx = cb_arg; 478 int rc; 479 480 rc = _allocate_vol_requests(init_ctx->vol); 481 if (rc != 0) { 482 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 483 _init_load_cleanup(init_ctx->vol, init_ctx); 484 return; 485 } 486 487 rc = _alloc_zero_buff(); 488 if (rc != 0) { 489 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 490 _init_load_cleanup(init_ctx->vol, init_ctx); 491 return; 492 } 493 494 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 495 /* Only clean up the ctx - the vol has been passed to the application 496 * for use now that initialization was successful. 497 */ 498 _init_load_cleanup(NULL, init_ctx); 499 } 500 501 static void 502 _init_write_path_cpl(void *cb_arg, int reduce_errno) 503 { 504 struct reduce_init_load_ctx *init_ctx = cb_arg; 505 struct spdk_reduce_vol *vol = init_ctx->vol; 506 507 init_ctx->iov[0].iov_base = vol->backing_super; 508 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 509 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 510 init_ctx->backing_cb_args.cb_arg = init_ctx; 511 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 512 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, 513 &init_ctx->backing_cb_args); 514 } 515 516 static int 517 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 518 { 519 uint64_t total_chunks, total_backing_io_units; 520 uint32_t i, num_metadata_io_units; 521 522 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 523 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 524 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 525 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 526 527 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 528 return -ENOMEM; 529 } 530 531 /* Set backing io unit bits associated with metadata. */ 532 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 533 vol->params.backing_io_unit_size; 534 for (i = 0; i < num_metadata_io_units; i++) { 535 spdk_bit_array_set(vol->allocated_backing_io_units, i); 536 } 537 538 return 0; 539 } 540 541 void 542 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 543 struct spdk_reduce_backing_dev *backing_dev, 544 const char *pm_file_dir, 545 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 546 { 547 struct spdk_reduce_vol *vol; 548 struct reduce_init_load_ctx *init_ctx; 549 uint64_t backing_dev_size; 550 size_t mapped_len; 551 int dir_len, max_dir_len, rc; 552 553 /* We need to append a path separator and the UUID to the supplied 554 * path. 555 */ 556 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 557 dir_len = strnlen(pm_file_dir, max_dir_len); 558 /* Strip trailing slash if the user provided one - we will add it back 559 * later when appending the filename. 560 */ 561 if (pm_file_dir[dir_len - 1] == '/') { 562 dir_len--; 563 } 564 if (dir_len == max_dir_len) { 565 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 566 cb_fn(cb_arg, NULL, -EINVAL); 567 return; 568 } 569 570 rc = _validate_vol_params(params); 571 if (rc != 0) { 572 SPDK_ERRLOG("invalid vol params\n"); 573 cb_fn(cb_arg, NULL, rc); 574 return; 575 } 576 577 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 578 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 579 if (params->vol_size == 0) { 580 SPDK_ERRLOG("backing device is too small\n"); 581 cb_fn(cb_arg, NULL, -EINVAL); 582 return; 583 } 584 585 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 586 backing_dev->unmap == NULL) { 587 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 588 cb_fn(cb_arg, NULL, -EINVAL); 589 return; 590 } 591 592 vol = calloc(1, sizeof(*vol)); 593 if (vol == NULL) { 594 cb_fn(cb_arg, NULL, -ENOMEM); 595 return; 596 } 597 598 TAILQ_INIT(&vol->free_requests); 599 TAILQ_INIT(&vol->executing_requests); 600 TAILQ_INIT(&vol->queued_requests); 601 602 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 603 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 604 if (vol->backing_super == NULL) { 605 cb_fn(cb_arg, NULL, -ENOMEM); 606 _init_load_cleanup(vol, NULL); 607 return; 608 } 609 610 init_ctx = calloc(1, sizeof(*init_ctx)); 611 if (init_ctx == NULL) { 612 cb_fn(cb_arg, NULL, -ENOMEM); 613 _init_load_cleanup(vol, NULL); 614 return; 615 } 616 617 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 618 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 619 if (init_ctx->path == NULL) { 620 cb_fn(cb_arg, NULL, -ENOMEM); 621 _init_load_cleanup(vol, init_ctx); 622 return; 623 } 624 625 if (spdk_uuid_is_null(¶ms->uuid)) { 626 spdk_uuid_generate(¶ms->uuid); 627 } 628 629 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 630 vol->pm_file.path[dir_len] = '/'; 631 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 632 ¶ms->uuid); 633 vol->pm_file.size = _get_pm_file_size(params); 634 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 635 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 636 &mapped_len, &vol->pm_file.pm_is_pmem); 637 if (vol->pm_file.pm_buf == NULL) { 638 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 639 vol->pm_file.path, strerror(errno)); 640 cb_fn(cb_arg, NULL, -errno); 641 _init_load_cleanup(vol, init_ctx); 642 return; 643 } 644 645 if (vol->pm_file.size != mapped_len) { 646 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 647 vol->pm_file.size, mapped_len); 648 cb_fn(cb_arg, NULL, -ENOMEM); 649 _init_load_cleanup(vol, init_ctx); 650 return; 651 } 652 653 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 654 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 655 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 656 memcpy(&vol->params, params, sizeof(*params)); 657 658 vol->backing_dev = backing_dev; 659 660 rc = _allocate_bit_arrays(vol); 661 if (rc != 0) { 662 cb_fn(cb_arg, NULL, rc); 663 _init_load_cleanup(vol, init_ctx); 664 return; 665 } 666 667 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 668 sizeof(vol->backing_super->signature)); 669 memcpy(&vol->backing_super->params, params, sizeof(*params)); 670 671 _initialize_vol_pm_pointers(vol); 672 673 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 674 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 675 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 676 */ 677 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 678 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 679 680 init_ctx->vol = vol; 681 init_ctx->cb_fn = cb_fn; 682 init_ctx->cb_arg = cb_arg; 683 684 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 685 init_ctx->iov[0].iov_base = init_ctx->path; 686 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 687 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 688 init_ctx->backing_cb_args.cb_arg = init_ctx; 689 /* Write path to offset 4K on backing device - just after where the super 690 * block will be written. We wait until this is committed before writing the 691 * super block to guarantee we don't get the super block written without the 692 * the path if the system crashed in the middle of a write operation. 693 */ 694 vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, 695 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, 696 REDUCE_PATH_MAX / vol->backing_dev->blocklen, 697 &init_ctx->backing_cb_args); 698 } 699 700 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 701 702 static void 703 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 704 { 705 struct reduce_init_load_ctx *load_ctx = cb_arg; 706 struct spdk_reduce_vol *vol = load_ctx->vol; 707 uint64_t backing_dev_size; 708 uint64_t i, num_chunks, logical_map_index; 709 struct spdk_reduce_chunk_map *chunk; 710 size_t mapped_len; 711 uint32_t j; 712 int rc; 713 714 rc = _alloc_zero_buff(); 715 if (rc) { 716 goto error; 717 } 718 719 if (memcmp(vol->backing_super->signature, 720 SPDK_REDUCE_SIGNATURE, 721 sizeof(vol->backing_super->signature)) != 0) { 722 /* This backing device isn't a libreduce backing device. */ 723 rc = -EILSEQ; 724 goto error; 725 } 726 727 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 728 * So don't bother getting the volume ready to use - invoke the callback immediately 729 * so destroy_load_cb can delete the metadata off of the block device and delete the 730 * persistent memory file if it exists. 731 */ 732 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 733 if (load_ctx->cb_fn == (*destroy_load_cb)) { 734 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 735 _init_load_cleanup(NULL, load_ctx); 736 return; 737 } 738 739 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 740 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 741 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 742 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 743 744 rc = _allocate_bit_arrays(vol); 745 if (rc != 0) { 746 goto error; 747 } 748 749 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 750 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 751 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 752 backing_dev_size); 753 rc = -EILSEQ; 754 goto error; 755 } 756 757 vol->pm_file.size = _get_pm_file_size(&vol->params); 758 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 759 &vol->pm_file.pm_is_pmem); 760 if (vol->pm_file.pm_buf == NULL) { 761 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 762 rc = -errno; 763 goto error; 764 } 765 766 if (vol->pm_file.size != mapped_len) { 767 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 768 vol->pm_file.size, mapped_len); 769 rc = -ENOMEM; 770 goto error; 771 } 772 773 rc = _allocate_vol_requests(vol); 774 if (rc != 0) { 775 goto error; 776 } 777 778 _initialize_vol_pm_pointers(vol); 779 780 num_chunks = vol->params.vol_size / vol->params.chunk_size; 781 for (i = 0; i < num_chunks; i++) { 782 logical_map_index = vol->pm_logical_map[i]; 783 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 784 continue; 785 } 786 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 787 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 788 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 789 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 790 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 791 } 792 } 793 } 794 795 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 796 /* Only clean up the ctx - the vol has been passed to the application 797 * for use now that volume load was successful. 798 */ 799 _init_load_cleanup(NULL, load_ctx); 800 return; 801 802 error: 803 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 804 _init_load_cleanup(vol, load_ctx); 805 } 806 807 void 808 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 809 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 810 { 811 struct spdk_reduce_vol *vol; 812 struct reduce_init_load_ctx *load_ctx; 813 814 if (backing_dev->readv == NULL || backing_dev->writev == NULL || 815 backing_dev->unmap == NULL) { 816 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 817 cb_fn(cb_arg, NULL, -EINVAL); 818 return; 819 } 820 821 vol = calloc(1, sizeof(*vol)); 822 if (vol == NULL) { 823 cb_fn(cb_arg, NULL, -ENOMEM); 824 return; 825 } 826 827 TAILQ_INIT(&vol->free_requests); 828 TAILQ_INIT(&vol->executing_requests); 829 TAILQ_INIT(&vol->queued_requests); 830 831 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 832 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 833 if (vol->backing_super == NULL) { 834 _init_load_cleanup(vol, NULL); 835 cb_fn(cb_arg, NULL, -ENOMEM); 836 return; 837 } 838 839 vol->backing_dev = backing_dev; 840 841 load_ctx = calloc(1, sizeof(*load_ctx)); 842 if (load_ctx == NULL) { 843 _init_load_cleanup(vol, NULL); 844 cb_fn(cb_arg, NULL, -ENOMEM); 845 return; 846 } 847 848 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 849 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 850 if (load_ctx->path == NULL) { 851 _init_load_cleanup(vol, load_ctx); 852 cb_fn(cb_arg, NULL, -ENOMEM); 853 return; 854 } 855 856 load_ctx->vol = vol; 857 load_ctx->cb_fn = cb_fn; 858 load_ctx->cb_arg = cb_arg; 859 860 load_ctx->iov[0].iov_base = vol->backing_super; 861 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 862 load_ctx->iov[1].iov_base = load_ctx->path; 863 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 864 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 865 load_ctx->backing_cb_args.cb_arg = load_ctx; 866 vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, 867 (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 868 vol->backing_dev->blocklen, 869 &load_ctx->backing_cb_args); 870 } 871 872 void 873 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 874 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 875 { 876 if (vol == NULL) { 877 /* This indicates a programming error. */ 878 assert(false); 879 cb_fn(cb_arg, -EINVAL); 880 return; 881 } 882 883 if (--g_vol_count == 0) { 884 spdk_free(g_zero_buf); 885 } 886 assert(g_vol_count >= 0); 887 _init_load_cleanup(vol, NULL); 888 cb_fn(cb_arg, 0); 889 } 890 891 struct reduce_destroy_ctx { 892 spdk_reduce_vol_op_complete cb_fn; 893 void *cb_arg; 894 struct spdk_reduce_vol *vol; 895 struct spdk_reduce_vol_superblock *super; 896 struct iovec iov; 897 struct spdk_reduce_vol_cb_args backing_cb_args; 898 int reduce_errno; 899 char pm_path[REDUCE_PATH_MAX]; 900 }; 901 902 static void 903 destroy_unload_cpl(void *cb_arg, int reduce_errno) 904 { 905 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 906 907 if (destroy_ctx->reduce_errno == 0) { 908 if (unlink(destroy_ctx->pm_path)) { 909 SPDK_ERRLOG("%s could not be unlinked: %s\n", 910 destroy_ctx->pm_path, strerror(errno)); 911 } 912 } 913 914 /* Even if the unload somehow failed, we still pass the destroy_ctx 915 * reduce_errno since that indicates whether or not the volume was 916 * actually destroyed. 917 */ 918 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 919 spdk_free(destroy_ctx->super); 920 free(destroy_ctx); 921 } 922 923 static void 924 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 925 { 926 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 927 struct spdk_reduce_vol *vol = destroy_ctx->vol; 928 929 destroy_ctx->reduce_errno = reduce_errno; 930 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 931 } 932 933 static void 934 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 935 { 936 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 937 938 if (reduce_errno != 0) { 939 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 940 spdk_free(destroy_ctx->super); 941 free(destroy_ctx); 942 return; 943 } 944 945 destroy_ctx->vol = vol; 946 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 947 destroy_ctx->iov.iov_base = destroy_ctx->super; 948 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 949 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 950 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 951 vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, 952 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, 953 &destroy_ctx->backing_cb_args); 954 } 955 956 void 957 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 958 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 959 { 960 struct reduce_destroy_ctx *destroy_ctx; 961 962 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 963 if (destroy_ctx == NULL) { 964 cb_fn(cb_arg, -ENOMEM); 965 return; 966 } 967 968 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 969 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 970 if (destroy_ctx->super == NULL) { 971 free(destroy_ctx); 972 cb_fn(cb_arg, -ENOMEM); 973 return; 974 } 975 destroy_ctx->cb_fn = cb_fn; 976 destroy_ctx->cb_arg = cb_arg; 977 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 978 } 979 980 static bool 981 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 982 { 983 uint64_t start_chunk, end_chunk; 984 985 start_chunk = offset / vol->logical_blocks_per_chunk; 986 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 987 988 return (start_chunk != end_chunk); 989 } 990 991 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 992 993 static void 994 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 995 { 996 struct spdk_reduce_vol_request *next_req; 997 struct spdk_reduce_vol *vol = req->vol; 998 999 req->cb_fn(req->cb_arg, reduce_errno); 1000 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 1001 1002 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 1003 if (next_req->logical_map_index == req->logical_map_index) { 1004 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 1005 if (next_req->type == REDUCE_IO_READV) { 1006 _start_readv_request(next_req); 1007 } else { 1008 assert(next_req->type == REDUCE_IO_WRITEV); 1009 _start_writev_request(next_req); 1010 } 1011 break; 1012 } 1013 } 1014 1015 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 1016 } 1017 1018 static void 1019 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 1020 { 1021 struct spdk_reduce_chunk_map *chunk; 1022 uint32_t i; 1023 1024 chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index); 1025 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 1026 if (chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 1027 break; 1028 } 1029 assert(spdk_bit_array_get(vol->allocated_backing_io_units, 1030 chunk->io_unit_index[i]) == true); 1031 spdk_bit_array_clear(vol->allocated_backing_io_units, chunk->io_unit_index[i]); 1032 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1033 } 1034 spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index); 1035 } 1036 1037 static void 1038 _write_write_done(void *_req, int reduce_errno) 1039 { 1040 struct spdk_reduce_vol_request *req = _req; 1041 struct spdk_reduce_vol *vol = req->vol; 1042 uint64_t old_chunk_map_index; 1043 1044 if (reduce_errno != 0) { 1045 req->reduce_errno = reduce_errno; 1046 } 1047 1048 assert(req->num_backing_ops > 0); 1049 if (--req->num_backing_ops > 0) { 1050 return; 1051 } 1052 1053 if (req->reduce_errno != 0) { 1054 _reduce_vol_reset_chunk(vol, req->chunk_map_index); 1055 _reduce_vol_complete_req(req, req->reduce_errno); 1056 return; 1057 } 1058 1059 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1060 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1061 _reduce_vol_reset_chunk(vol, old_chunk_map_index); 1062 } 1063 1064 /* 1065 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1066 * becomes invalid after we update the logical map, since the old chunk map will no 1067 * longer have a reference to it in the logical map. 1068 */ 1069 1070 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1071 _reduce_persist(vol, req->chunk, 1072 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1073 1074 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1075 1076 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1077 1078 _reduce_vol_complete_req(req, 0); 1079 } 1080 1081 struct reduce_merged_io_desc { 1082 uint64_t io_unit_index; 1083 uint32_t num_io_units; 1084 }; 1085 1086 static void 1087 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1088 reduce_request_fn next_fn, bool is_write) 1089 { 1090 struct iovec *iov; 1091 uint8_t *buf; 1092 uint32_t i; 1093 1094 if (req->chunk_is_compressed) { 1095 iov = req->comp_buf_iov; 1096 buf = req->comp_buf; 1097 } else { 1098 iov = req->decomp_buf_iov; 1099 buf = req->decomp_buf; 1100 } 1101 1102 req->num_backing_ops = req->num_io_units; 1103 req->backing_cb_args.cb_fn = next_fn; 1104 req->backing_cb_args.cb_arg = req; 1105 for (i = 0; i < req->num_io_units; i++) { 1106 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1107 iov[i].iov_len = vol->params.backing_io_unit_size; 1108 if (is_write) { 1109 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1110 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1111 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1112 } else { 1113 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1114 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, 1115 vol->backing_lba_per_io_unit, &req->backing_cb_args); 1116 } 1117 } 1118 } 1119 1120 static void 1121 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1122 reduce_request_fn next_fn, bool is_write) 1123 { 1124 struct iovec *iov; 1125 struct reduce_merged_io_desc merged_io_desc[4]; 1126 uint8_t *buf; 1127 bool merge = false; 1128 uint32_t num_io = 0; 1129 uint32_t io_unit_counts = 0; 1130 uint32_t merged_io_idx = 0; 1131 uint32_t i; 1132 1133 /* The merged_io_desc value is defined here to contain four elements, 1134 * and the chunk size must be four times the maximum of the io unit. 1135 * if chunk size is too big, don't merge IO. 1136 */ 1137 if (vol->backing_io_units_per_chunk > 4) { 1138 _issue_backing_ops_without_merge(req, vol, next_fn, is_write); 1139 return; 1140 } 1141 1142 if (req->chunk_is_compressed) { 1143 iov = req->comp_buf_iov; 1144 buf = req->comp_buf; 1145 } else { 1146 iov = req->decomp_buf_iov; 1147 buf = req->decomp_buf; 1148 } 1149 1150 for (i = 0; i < req->num_io_units; i++) { 1151 if (!merge) { 1152 merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i]; 1153 merged_io_desc[merged_io_idx].num_io_units = 1; 1154 num_io++; 1155 } 1156 1157 if (i + 1 == req->num_io_units) { 1158 break; 1159 } 1160 1161 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) { 1162 merged_io_desc[merged_io_idx].num_io_units += 1; 1163 merge = true; 1164 continue; 1165 } 1166 merge = false; 1167 merged_io_idx++; 1168 } 1169 1170 req->num_backing_ops = num_io; 1171 req->backing_cb_args.cb_fn = next_fn; 1172 req->backing_cb_args.cb_arg = req; 1173 for (i = 0; i < num_io; i++) { 1174 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size; 1175 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units; 1176 if (is_write) { 1177 vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, 1178 merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit, 1179 vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units, 1180 &req->backing_cb_args); 1181 } else { 1182 vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, 1183 merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit, 1184 vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units, 1185 &req->backing_cb_args); 1186 } 1187 1188 /* Collects the number of processed I/O. */ 1189 io_unit_counts += merged_io_desc[i].num_io_units; 1190 } 1191 } 1192 1193 static void 1194 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1195 uint32_t compressed_size) 1196 { 1197 struct spdk_reduce_vol *vol = req->vol; 1198 uint32_t i; 1199 uint64_t chunk_offset, remainder, total_len = 0; 1200 uint8_t *buf; 1201 int j; 1202 1203 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1204 1205 /* TODO: fail if no chunk map found - but really this should not happen if we 1206 * size the number of requests similarly to number of extra chunk maps 1207 */ 1208 assert(req->chunk_map_index != UINT32_MAX); 1209 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1210 1211 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1212 req->num_io_units = spdk_divide_round_up(compressed_size, 1213 vol->params.backing_io_unit_size); 1214 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1215 req->chunk->compressed_size = 1216 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1217 1218 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1219 if (req->chunk_is_compressed == false) { 1220 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1221 buf = req->decomp_buf; 1222 total_len = chunk_offset * vol->params.logical_block_size; 1223 1224 /* zero any offset into chunk */ 1225 if (req->rmw == false && chunk_offset) { 1226 memset(buf, 0, total_len); 1227 } 1228 buf += total_len; 1229 1230 /* copy the data */ 1231 for (j = 0; j < req->iovcnt; j++) { 1232 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1233 buf += req->iov[j].iov_len; 1234 total_len += req->iov[j].iov_len; 1235 } 1236 1237 /* zero any remainder */ 1238 remainder = vol->params.chunk_size - total_len; 1239 total_len += remainder; 1240 if (req->rmw == false && remainder) { 1241 memset(buf, 0, remainder); 1242 } 1243 assert(total_len == vol->params.chunk_size); 1244 } 1245 1246 for (i = 0; i < req->num_io_units; i++) { 1247 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1248 /* TODO: fail if no backing block found - but really this should also not 1249 * happen (see comment above). 1250 */ 1251 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1252 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1253 } 1254 1255 _issue_backing_ops(req, vol, next_fn, true /* write */); 1256 } 1257 1258 static void 1259 _write_compress_done(void *_req, int reduce_errno) 1260 { 1261 struct spdk_reduce_vol_request *req = _req; 1262 1263 /* Negative reduce_errno indicates failure for compression operations. 1264 * Just write the uncompressed data instead. Force this to happen 1265 * by just passing the full chunk size to _reduce_vol_write_chunk. 1266 * When it sees the data couldn't be compressed, it will just write 1267 * the uncompressed buffer to disk. 1268 */ 1269 if (reduce_errno < 0) { 1270 req->backing_cb_args.output_size = req->vol->params.chunk_size; 1271 } 1272 1273 _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size); 1274 } 1275 1276 static void 1277 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1278 { 1279 struct spdk_reduce_vol *vol = req->vol; 1280 1281 req->backing_cb_args.cb_fn = next_fn; 1282 req->backing_cb_args.cb_arg = req; 1283 req->comp_buf_iov[0].iov_base = req->comp_buf; 1284 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1285 vol->backing_dev->compress(vol->backing_dev, 1286 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1287 &req->backing_cb_args); 1288 } 1289 1290 static void 1291 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1292 { 1293 struct spdk_reduce_vol *vol = req->vol; 1294 1295 req->backing_cb_args.cb_fn = next_fn; 1296 req->backing_cb_args.cb_arg = req; 1297 req->comp_buf_iov[0].iov_base = req->comp_buf; 1298 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1299 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1300 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1301 vol->backing_dev->decompress(vol->backing_dev, 1302 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1303 &req->backing_cb_args); 1304 } 1305 1306 static void 1307 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1308 { 1309 struct spdk_reduce_vol *vol = req->vol; 1310 uint64_t chunk_offset, remainder = 0; 1311 uint64_t ttl_len = 0; 1312 size_t iov_len; 1313 int i; 1314 1315 req->decomp_iovcnt = 0; 1316 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1317 1318 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1319 * if at least one of the conditions below is true: 1320 * 1. User's buffer is fragmented 1321 * 2. Length of the user's buffer is less than the chunk 1322 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1323 iov_len = req->iov[0].iov_len; 1324 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1325 req->iov[0].iov_len < vol->params.chunk_size || 1326 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len)); 1327 if (req->copy_after_decompress) { 1328 req->decomp_iov[0].iov_base = req->decomp_buf; 1329 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1330 req->decomp_iovcnt = 1; 1331 goto decompress; 1332 } 1333 1334 if (chunk_offset) { 1335 /* first iov point to our scratch buffer for any offset into the chunk */ 1336 req->decomp_iov[0].iov_base = req->decomp_buf; 1337 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1338 ttl_len += req->decomp_iov[0].iov_len; 1339 req->decomp_iovcnt = 1; 1340 } 1341 1342 /* now the user data iov, direct to the user buffer */ 1343 for (i = 0; i < req->iovcnt; i++) { 1344 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1345 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1346 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1347 } 1348 req->decomp_iovcnt += req->iovcnt; 1349 1350 /* send the rest of the chunk to our scratch buffer */ 1351 remainder = vol->params.chunk_size - ttl_len; 1352 if (remainder) { 1353 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1354 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1355 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1356 req->decomp_iovcnt++; 1357 } 1358 assert(ttl_len == vol->params.chunk_size); 1359 1360 decompress: 1361 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1362 req->backing_cb_args.cb_fn = next_fn; 1363 req->backing_cb_args.cb_arg = req; 1364 req->comp_buf_iov[0].iov_base = req->comp_buf; 1365 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1366 vol->backing_dev->decompress(vol->backing_dev, 1367 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1368 &req->backing_cb_args); 1369 } 1370 1371 static inline void 1372 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1373 { 1374 struct spdk_reduce_vol *vol = req->vol; 1375 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1376 uint64_t chunk_offset, ttl_len = 0; 1377 uint64_t remainder = 0; 1378 char *copy_offset = NULL; 1379 uint32_t lbsize = vol->params.logical_block_size; 1380 int i; 1381 1382 req->decomp_iov[0].iov_base = req->decomp_buf; 1383 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1384 req->decomp_iovcnt = 1; 1385 copy_offset = req->decomp_iov[0].iov_base; 1386 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1387 1388 if (chunk_offset) { 1389 ttl_len += chunk_offset * lbsize; 1390 /* copy_offset already points to padding buffer if zero_paddings=false */ 1391 if (zero_paddings) { 1392 memcpy(copy_offset, padding_buffer, ttl_len); 1393 } 1394 copy_offset += ttl_len; 1395 } 1396 1397 /* now the user data iov, direct from the user buffer */ 1398 for (i = 0; i < req->iovcnt; i++) { 1399 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1400 copy_offset += req->iov[i].iov_len; 1401 ttl_len += req->iov[i].iov_len; 1402 } 1403 1404 remainder = vol->params.chunk_size - ttl_len; 1405 if (remainder) { 1406 /* copy_offset already points to padding buffer if zero_paddings=false */ 1407 if (zero_paddings) { 1408 memcpy(copy_offset, padding_buffer + ttl_len, remainder); 1409 } 1410 ttl_len += remainder; 1411 } 1412 1413 assert(ttl_len == req->vol->params.chunk_size); 1414 } 1415 1416 /* This function can be called when we are compressing a new data or in case of read-modify-write 1417 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1418 * should point to already read and decompressed buffer */ 1419 static inline void 1420 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1421 { 1422 struct spdk_reduce_vol *vol = req->vol; 1423 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1424 uint64_t chunk_offset, ttl_len = 0; 1425 uint64_t remainder = 0; 1426 uint32_t lbsize = vol->params.logical_block_size; 1427 size_t iov_len; 1428 int i; 1429 1430 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1431 * if at least one of the conditions below is true: 1432 * 1. User's buffer is fragmented 1433 * 2. Length of the user's buffer is less than the chunk 1434 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1435 iov_len = req->iov[0].iov_len; 1436 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1437 req->iov[0].iov_len < vol->params.chunk_size || 1438 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) { 1439 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1440 return; 1441 } 1442 1443 req->decomp_iovcnt = 0; 1444 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1445 1446 if (chunk_offset != 0) { 1447 ttl_len += chunk_offset * lbsize; 1448 req->decomp_iov[0].iov_base = padding_buffer; 1449 req->decomp_iov[0].iov_len = ttl_len; 1450 req->decomp_iovcnt = 1; 1451 } 1452 1453 /* now the user data iov, direct from the user buffer */ 1454 for (i = 0; i < req->iovcnt; i++) { 1455 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1456 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1457 ttl_len += req->iov[i].iov_len; 1458 } 1459 req->decomp_iovcnt += req->iovcnt; 1460 1461 remainder = vol->params.chunk_size - ttl_len; 1462 if (remainder) { 1463 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1464 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1465 req->decomp_iovcnt++; 1466 ttl_len += remainder; 1467 } 1468 assert(ttl_len == req->vol->params.chunk_size); 1469 } 1470 1471 static void 1472 _write_decompress_done(void *_req, int reduce_errno) 1473 { 1474 struct spdk_reduce_vol_request *req = _req; 1475 1476 /* Negative reduce_errno indicates failure for compression operations. */ 1477 if (reduce_errno < 0) { 1478 _reduce_vol_complete_req(req, reduce_errno); 1479 return; 1480 } 1481 1482 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1483 * represents the output_size. 1484 */ 1485 if (req->backing_cb_args.output_size != req->vol->params.chunk_size) { 1486 _reduce_vol_complete_req(req, -EIO); 1487 return; 1488 } 1489 1490 _prepare_compress_chunk(req, false); 1491 _reduce_vol_compress_chunk(req, _write_compress_done); 1492 } 1493 1494 static void 1495 _write_read_done(void *_req, int reduce_errno) 1496 { 1497 struct spdk_reduce_vol_request *req = _req; 1498 1499 if (reduce_errno != 0) { 1500 req->reduce_errno = reduce_errno; 1501 } 1502 1503 assert(req->num_backing_ops > 0); 1504 if (--req->num_backing_ops > 0) { 1505 return; 1506 } 1507 1508 if (req->reduce_errno != 0) { 1509 _reduce_vol_complete_req(req, req->reduce_errno); 1510 return; 1511 } 1512 1513 if (req->chunk_is_compressed) { 1514 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1515 } else { 1516 req->backing_cb_args.output_size = req->chunk->compressed_size; 1517 1518 _write_decompress_done(req, 0); 1519 } 1520 } 1521 1522 static void 1523 _read_decompress_done(void *_req, int reduce_errno) 1524 { 1525 struct spdk_reduce_vol_request *req = _req; 1526 struct spdk_reduce_vol *vol = req->vol; 1527 1528 /* Negative reduce_errno indicates failure for compression operations. */ 1529 if (reduce_errno < 0) { 1530 _reduce_vol_complete_req(req, reduce_errno); 1531 return; 1532 } 1533 1534 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1535 * represents the output_size. 1536 */ 1537 if (req->backing_cb_args.output_size != vol->params.chunk_size) { 1538 _reduce_vol_complete_req(req, -EIO); 1539 return; 1540 } 1541 1542 if (req->copy_after_decompress) { 1543 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1544 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1545 int i; 1546 1547 for (i = 0; i < req->iovcnt; i++) { 1548 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1549 decomp_buffer += req->iov[i].iov_len; 1550 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1551 } 1552 } 1553 1554 _reduce_vol_complete_req(req, 0); 1555 } 1556 1557 static void 1558 _read_read_done(void *_req, int reduce_errno) 1559 { 1560 struct spdk_reduce_vol_request *req = _req; 1561 uint64_t chunk_offset; 1562 uint8_t *buf; 1563 int i; 1564 1565 if (reduce_errno != 0) { 1566 req->reduce_errno = reduce_errno; 1567 } 1568 1569 assert(req->num_backing_ops > 0); 1570 if (--req->num_backing_ops > 0) { 1571 return; 1572 } 1573 1574 if (req->reduce_errno != 0) { 1575 _reduce_vol_complete_req(req, req->reduce_errno); 1576 return; 1577 } 1578 1579 if (req->chunk_is_compressed) { 1580 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1581 } else { 1582 1583 /* If the chunk was compressed, the data would have been sent to the 1584 * host buffers by the decompression operation, if not we need to memcpy here. 1585 */ 1586 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1587 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1588 for (i = 0; i < req->iovcnt; i++) { 1589 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1590 buf += req->iov[i].iov_len; 1591 } 1592 1593 req->backing_cb_args.output_size = req->chunk->compressed_size; 1594 1595 _read_decompress_done(req, 0); 1596 } 1597 } 1598 1599 static void 1600 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1601 { 1602 struct spdk_reduce_vol *vol = req->vol; 1603 1604 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1605 assert(req->chunk_map_index != UINT32_MAX); 1606 1607 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1608 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1609 vol->params.backing_io_unit_size); 1610 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1611 1612 _issue_backing_ops(req, vol, next_fn, false /* read */); 1613 } 1614 1615 static bool 1616 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1617 uint64_t length) 1618 { 1619 uint64_t size = 0; 1620 int i; 1621 1622 if (iovcnt > REDUCE_MAX_IOVECS) { 1623 return false; 1624 } 1625 1626 for (i = 0; i < iovcnt; i++) { 1627 size += iov[i].iov_len; 1628 } 1629 1630 return size == (length * vol->params.logical_block_size); 1631 } 1632 1633 static bool 1634 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1635 { 1636 struct spdk_reduce_vol_request *req; 1637 1638 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1639 if (logical_map_index == req->logical_map_index) { 1640 return true; 1641 } 1642 } 1643 1644 return false; 1645 } 1646 1647 static void 1648 _start_readv_request(struct spdk_reduce_vol_request *req) 1649 { 1650 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1651 _reduce_vol_read_chunk(req, _read_read_done); 1652 } 1653 1654 void 1655 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1656 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1657 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1658 { 1659 struct spdk_reduce_vol_request *req; 1660 uint64_t logical_map_index; 1661 bool overlapped; 1662 int i; 1663 1664 if (length == 0) { 1665 cb_fn(cb_arg, 0); 1666 return; 1667 } 1668 1669 if (_request_spans_chunk_boundary(vol, offset, length)) { 1670 cb_fn(cb_arg, -EINVAL); 1671 return; 1672 } 1673 1674 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1675 cb_fn(cb_arg, -EINVAL); 1676 return; 1677 } 1678 1679 logical_map_index = offset / vol->logical_blocks_per_chunk; 1680 overlapped = _check_overlap(vol, logical_map_index); 1681 1682 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1683 /* 1684 * This chunk hasn't been allocated. So treat the data as all 1685 * zeroes for this chunk - do the memset and immediately complete 1686 * the operation. 1687 */ 1688 for (i = 0; i < iovcnt; i++) { 1689 memset(iov[i].iov_base, 0, iov[i].iov_len); 1690 } 1691 cb_fn(cb_arg, 0); 1692 return; 1693 } 1694 1695 req = TAILQ_FIRST(&vol->free_requests); 1696 if (req == NULL) { 1697 cb_fn(cb_arg, -ENOMEM); 1698 return; 1699 } 1700 1701 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1702 req->type = REDUCE_IO_READV; 1703 req->vol = vol; 1704 req->iov = iov; 1705 req->iovcnt = iovcnt; 1706 req->offset = offset; 1707 req->logical_map_index = logical_map_index; 1708 req->length = length; 1709 req->copy_after_decompress = false; 1710 req->cb_fn = cb_fn; 1711 req->cb_arg = cb_arg; 1712 1713 if (!overlapped) { 1714 _start_readv_request(req); 1715 } else { 1716 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1717 } 1718 } 1719 1720 static void 1721 _start_writev_request(struct spdk_reduce_vol_request *req) 1722 { 1723 struct spdk_reduce_vol *vol = req->vol; 1724 1725 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1726 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1727 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1728 /* Read old chunk, then overwrite with data from this write 1729 * operation. 1730 */ 1731 req->rmw = true; 1732 _reduce_vol_read_chunk(req, _write_read_done); 1733 return; 1734 } 1735 } 1736 1737 req->rmw = false; 1738 1739 _prepare_compress_chunk(req, true); 1740 _reduce_vol_compress_chunk(req, _write_compress_done); 1741 } 1742 1743 void 1744 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1745 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1746 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1747 { 1748 struct spdk_reduce_vol_request *req; 1749 uint64_t logical_map_index; 1750 bool overlapped; 1751 1752 if (length == 0) { 1753 cb_fn(cb_arg, 0); 1754 return; 1755 } 1756 1757 if (_request_spans_chunk_boundary(vol, offset, length)) { 1758 cb_fn(cb_arg, -EINVAL); 1759 return; 1760 } 1761 1762 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1763 cb_fn(cb_arg, -EINVAL); 1764 return; 1765 } 1766 1767 logical_map_index = offset / vol->logical_blocks_per_chunk; 1768 overlapped = _check_overlap(vol, logical_map_index); 1769 1770 req = TAILQ_FIRST(&vol->free_requests); 1771 if (req == NULL) { 1772 cb_fn(cb_arg, -ENOMEM); 1773 return; 1774 } 1775 1776 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1777 req->type = REDUCE_IO_WRITEV; 1778 req->vol = vol; 1779 req->iov = iov; 1780 req->iovcnt = iovcnt; 1781 req->offset = offset; 1782 req->logical_map_index = logical_map_index; 1783 req->length = length; 1784 req->copy_after_decompress = false; 1785 req->cb_fn = cb_fn; 1786 req->cb_arg = cb_arg; 1787 1788 if (!overlapped) { 1789 _start_writev_request(req); 1790 } else { 1791 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1792 } 1793 } 1794 1795 const struct spdk_reduce_vol_params * 1796 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1797 { 1798 return &vol->params; 1799 } 1800 1801 const char * 1802 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol) 1803 { 1804 return vol->pm_file.path; 1805 } 1806 1807 void 1808 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1809 { 1810 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1811 uint32_t struct_size; 1812 uint64_t chunk_map_size; 1813 1814 SPDK_NOTICELOG("vol info:\n"); 1815 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1816 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1817 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1818 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1819 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1820 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1821 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1822 vol->params.vol_size / vol->params.chunk_size); 1823 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1824 vol->params.backing_io_unit_size); 1825 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1826 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1827 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1828 1829 SPDK_NOTICELOG("pmem info:\n"); 1830 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1831 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1832 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1833 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1834 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1835 vol->params.chunk_size); 1836 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1837 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1838 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1839 vol->params.backing_io_unit_size); 1840 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1841 } 1842 1843 SPDK_LOG_REGISTER_COMPONENT(reduce) 1844