1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "spdk/reduce.h" 10 #include "spdk/env.h" 11 #include "spdk/string.h" 12 #include "spdk/bit_array.h" 13 #include "spdk/util.h" 14 #include "spdk/log.h" 15 #include "spdk/memory.h" 16 17 #include "libpmem.h" 18 19 /* Always round up the size of the PM region to the nearest cacheline. */ 20 #define REDUCE_PM_SIZE_ALIGNMENT 64 21 22 /* Offset into the backing device where the persistent memory file's path is stored. */ 23 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 24 25 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 26 27 #define REDUCE_NUM_VOL_REQUESTS 256 28 29 /* Structure written to offset 0 of both the pm file and the backing device. */ 30 struct spdk_reduce_vol_superblock { 31 uint8_t signature[8]; 32 struct spdk_reduce_vol_params params; 33 uint8_t reserved[4048]; 34 }; 35 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 36 37 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 38 /* null terminator counts one */ 39 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 40 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 41 42 #define REDUCE_PATH_MAX 4096 43 44 #define REDUCE_ZERO_BUF_SIZE 0x100000 45 46 /** 47 * Describes a persistent memory file used to hold metadata associated with a 48 * compressed volume. 49 */ 50 struct spdk_reduce_pm_file { 51 char path[REDUCE_PATH_MAX]; 52 void *pm_buf; 53 int pm_is_pmem; 54 uint64_t size; 55 }; 56 57 #define REDUCE_IO_READV 1 58 #define REDUCE_IO_WRITEV 2 59 60 struct spdk_reduce_chunk_map { 61 uint32_t compressed_size; 62 uint32_t reserved; 63 uint64_t io_unit_index[0]; 64 }; 65 66 struct spdk_reduce_vol_request { 67 /** 68 * Scratch buffer used for uncompressed chunk. This is used for: 69 * 1) source buffer for compression operations 70 * 2) destination buffer for decompression operations 71 * 3) data buffer when writing uncompressed chunk to disk 72 * 4) data buffer when reading uncompressed chunk from disk 73 */ 74 uint8_t *decomp_buf; 75 struct iovec *decomp_buf_iov; 76 77 /** 78 * These are used to construct the iovecs that are sent to 79 * the decomp engine, they point to a mix of the scratch buffer 80 * and user buffer 81 */ 82 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 83 int decomp_iovcnt; 84 85 /** 86 * Scratch buffer used for compressed chunk. This is used for: 87 * 1) destination buffer for compression operations 88 * 2) source buffer for decompression operations 89 * 3) data buffer when writing compressed chunk to disk 90 * 4) data buffer when reading compressed chunk from disk 91 */ 92 uint8_t *comp_buf; 93 struct iovec *comp_buf_iov; 94 struct iovec *iov; 95 bool rmw; 96 struct spdk_reduce_vol *vol; 97 int type; 98 int reduce_errno; 99 int iovcnt; 100 int num_backing_ops; 101 uint32_t num_io_units; 102 struct spdk_reduce_backing_io *backing_io; 103 bool chunk_is_compressed; 104 bool copy_after_decompress; 105 uint64_t offset; 106 uint64_t logical_map_index; 107 uint64_t length; 108 uint64_t chunk_map_index; 109 struct spdk_reduce_chunk_map *chunk; 110 spdk_reduce_vol_op_complete cb_fn; 111 void *cb_arg; 112 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 113 struct spdk_reduce_vol_cb_args backing_cb_args; 114 }; 115 116 struct spdk_reduce_vol { 117 struct spdk_reduce_vol_params params; 118 uint32_t backing_io_units_per_chunk; 119 uint32_t backing_lba_per_io_unit; 120 uint32_t logical_blocks_per_chunk; 121 struct spdk_reduce_pm_file pm_file; 122 struct spdk_reduce_backing_dev *backing_dev; 123 struct spdk_reduce_vol_superblock *backing_super; 124 struct spdk_reduce_vol_superblock *pm_super; 125 uint64_t *pm_logical_map; 126 uint64_t *pm_chunk_maps; 127 128 struct spdk_bit_array *allocated_chunk_maps; 129 struct spdk_bit_array *allocated_backing_io_units; 130 131 struct spdk_reduce_vol_request *request_mem; 132 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 133 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 134 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 135 136 /* Single contiguous buffer used for all request buffers for this volume. */ 137 uint8_t *buf_mem; 138 struct iovec *buf_iov_mem; 139 /* Single contiguous buffer used for backing io buffers for this volume. */ 140 uint8_t *buf_backing_io_mem; 141 }; 142 143 static void _start_readv_request(struct spdk_reduce_vol_request *req); 144 static void _start_writev_request(struct spdk_reduce_vol_request *req); 145 static uint8_t *g_zero_buf; 146 static int g_vol_count = 0; 147 148 /* 149 * Allocate extra metadata chunks and corresponding backing io units to account for 150 * outstanding IO in worst case scenario where logical map is completely allocated 151 * and no data can be compressed. We need extra chunks in this case to handle 152 * in-flight writes since reduce never writes data in place. 153 */ 154 #define REDUCE_NUM_EXTRA_CHUNKS 128 155 156 static void 157 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 158 { 159 if (vol->pm_file.pm_is_pmem) { 160 pmem_persist(addr, len); 161 } else { 162 pmem_msync(addr, len); 163 } 164 } 165 166 static uint64_t 167 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 168 { 169 uint64_t chunks_in_logical_map, logical_map_size; 170 171 chunks_in_logical_map = vol_size / chunk_size; 172 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 173 174 /* Round up to next cacheline. */ 175 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 176 REDUCE_PM_SIZE_ALIGNMENT; 177 } 178 179 static uint64_t 180 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 181 { 182 uint64_t num_chunks; 183 184 num_chunks = vol_size / chunk_size; 185 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 186 187 return num_chunks; 188 } 189 190 static inline uint32_t 191 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 192 { 193 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 194 } 195 196 static uint64_t 197 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 198 { 199 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 200 201 num_chunks = _get_total_chunks(vol_size, chunk_size); 202 io_units_per_chunk = chunk_size / backing_io_unit_size; 203 204 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 205 206 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 207 REDUCE_PM_SIZE_ALIGNMENT; 208 } 209 210 static struct spdk_reduce_chunk_map * 211 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 212 { 213 uintptr_t chunk_map_addr; 214 215 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 216 217 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 218 chunk_map_addr += chunk_map_index * 219 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 220 221 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 222 } 223 224 static int 225 _validate_vol_params(struct spdk_reduce_vol_params *params) 226 { 227 if (params->vol_size > 0) { 228 /** 229 * User does not pass in the vol size - it gets calculated by libreduce from 230 * values in this structure plus the size of the backing device. 231 */ 232 return -EINVAL; 233 } 234 235 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 236 params->logical_block_size == 0) { 237 return -EINVAL; 238 } 239 240 /* Chunk size must be an even multiple of the backing io unit size. */ 241 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 242 return -EINVAL; 243 } 244 245 /* Chunk size must be an even multiple of the logical block size. */ 246 if ((params->chunk_size % params->logical_block_size) != 0) { 247 return -1; 248 } 249 250 return 0; 251 } 252 253 static uint64_t 254 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 255 { 256 uint64_t num_chunks; 257 258 num_chunks = backing_dev_size / chunk_size; 259 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 260 return 0; 261 } 262 263 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 264 return num_chunks * chunk_size; 265 } 266 267 static uint64_t 268 _get_pm_file_size(struct spdk_reduce_vol_params *params) 269 { 270 uint64_t total_pm_size; 271 272 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 273 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 274 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 275 params->backing_io_unit_size); 276 return total_pm_size; 277 } 278 279 const struct spdk_uuid * 280 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 281 { 282 return &vol->params.uuid; 283 } 284 285 static void 286 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 287 { 288 uint64_t logical_map_size; 289 290 /* Superblock is at the beginning of the pm file. */ 291 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 292 293 /* Logical map immediately follows the super block. */ 294 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 295 296 /* Chunks maps follow the logical map. */ 297 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 298 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 299 } 300 301 /* We need 2 iovs during load - one for the superblock, another for the path */ 302 #define LOAD_IOV_COUNT 2 303 304 struct reduce_init_load_ctx { 305 struct spdk_reduce_vol *vol; 306 struct spdk_reduce_vol_cb_args backing_cb_args; 307 spdk_reduce_vol_op_with_handle_complete cb_fn; 308 void *cb_arg; 309 struct iovec iov[LOAD_IOV_COUNT]; 310 void *path; 311 struct spdk_reduce_backing_io *backing_io; 312 }; 313 314 static inline bool 315 _addr_crosses_huge_page(const void *addr, size_t *size) 316 { 317 size_t _size; 318 uint64_t rc; 319 320 assert(size); 321 322 _size = *size; 323 rc = spdk_vtophys(addr, size); 324 325 return rc == SPDK_VTOPHYS_ERROR || _size != *size; 326 } 327 328 static inline int 329 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) 330 { 331 uint8_t *addr; 332 size_t size_tmp = buffer_size; 333 334 addr = *_addr; 335 336 /* Verify that addr + buffer_size doesn't cross huge page boundary */ 337 if (_addr_crosses_huge_page(addr, &size_tmp)) { 338 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. 339 * Skip remaining bytes and continue from the beginning of the next page */ 340 addr += size_tmp; 341 } 342 343 if (addr + buffer_size > addr_range) { 344 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); 345 return -ERANGE; 346 } 347 348 *vol_buffer = addr; 349 *_addr = addr + buffer_size; 350 351 return 0; 352 } 353 354 static int 355 _allocate_vol_requests(struct spdk_reduce_vol *vol) 356 { 357 struct spdk_reduce_vol_request *req; 358 struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev; 359 uint32_t reqs_in_2mb_page, huge_pages_needed; 360 uint8_t *buffer, *buffer_end; 361 int i = 0; 362 int rc = 0; 363 364 /* It is needed to allocate comp and decomp buffers so that they do not cross physical 365 * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not 366 * necessarily power of 2 367 * Allocate 2x since we need buffers for both read/write and compress/decompress 368 * intermediate buffers. */ 369 reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); 370 if (!reqs_in_2mb_page) { 371 return -EINVAL; 372 } 373 huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); 374 375 vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); 376 if (vol->buf_mem == NULL) { 377 return -ENOMEM; 378 } 379 380 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 381 if (vol->request_mem == NULL) { 382 spdk_free(vol->buf_mem); 383 vol->buf_mem = NULL; 384 return -ENOMEM; 385 } 386 387 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 388 * buffers. 389 */ 390 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 391 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 392 if (vol->buf_iov_mem == NULL) { 393 free(vol->request_mem); 394 spdk_free(vol->buf_mem); 395 vol->request_mem = NULL; 396 vol->buf_mem = NULL; 397 return -ENOMEM; 398 } 399 400 vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) + 401 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk); 402 if (vol->buf_backing_io_mem == NULL) { 403 free(vol->request_mem); 404 free(vol->buf_iov_mem); 405 spdk_free(vol->buf_mem); 406 vol->request_mem = NULL; 407 vol->buf_iov_mem = NULL; 408 vol->buf_mem = NULL; 409 return -ENOMEM; 410 } 411 412 buffer = vol->buf_mem; 413 buffer_end = buffer + VALUE_2MB * huge_pages_needed; 414 415 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 416 req = &vol->request_mem[i]; 417 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 418 req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i * 419 (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) * 420 vol->backing_io_units_per_chunk); 421 422 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 423 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 424 425 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); 426 if (rc) { 427 SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 428 vol->buf_mem, buffer_end); 429 break; 430 } 431 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); 432 if (rc) { 433 SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 434 vol->buf_mem, buffer_end); 435 break; 436 } 437 } 438 439 if (rc) { 440 free(vol->buf_backing_io_mem); 441 free(vol->buf_iov_mem); 442 free(vol->request_mem); 443 spdk_free(vol->buf_mem); 444 vol->buf_mem = NULL; 445 vol->buf_backing_io_mem = NULL; 446 vol->buf_iov_mem = NULL; 447 vol->request_mem = NULL; 448 } 449 450 return rc; 451 } 452 453 static void 454 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 455 { 456 if (ctx != NULL) { 457 spdk_free(ctx->path); 458 free(ctx->backing_io); 459 free(ctx); 460 } 461 462 if (vol != NULL) { 463 if (vol->pm_file.pm_buf != NULL) { 464 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 465 } 466 467 spdk_free(vol->backing_super); 468 spdk_bit_array_free(&vol->allocated_chunk_maps); 469 spdk_bit_array_free(&vol->allocated_backing_io_units); 470 free(vol->request_mem); 471 free(vol->buf_backing_io_mem); 472 free(vol->buf_iov_mem); 473 spdk_free(vol->buf_mem); 474 free(vol); 475 } 476 } 477 478 static int 479 _alloc_zero_buff(void) 480 { 481 int rc = 0; 482 483 /* The zero buffer is shared between all volumes and just used 484 * for reads so allocate one global instance here if not already 485 * allocated when another vol init'd or loaded. 486 */ 487 if (g_vol_count++ == 0) { 488 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 489 64, NULL, SPDK_ENV_LCORE_ID_ANY, 490 SPDK_MALLOC_DMA); 491 if (g_zero_buf == NULL) { 492 g_vol_count--; 493 rc = -ENOMEM; 494 } 495 } 496 return rc; 497 } 498 499 static void 500 _init_write_super_cpl(void *cb_arg, int reduce_errno) 501 { 502 struct reduce_init_load_ctx *init_ctx = cb_arg; 503 int rc; 504 505 rc = _allocate_vol_requests(init_ctx->vol); 506 if (rc != 0) { 507 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 508 _init_load_cleanup(init_ctx->vol, init_ctx); 509 return; 510 } 511 512 rc = _alloc_zero_buff(); 513 if (rc != 0) { 514 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 515 _init_load_cleanup(init_ctx->vol, init_ctx); 516 return; 517 } 518 519 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 520 /* Only clean up the ctx - the vol has been passed to the application 521 * for use now that initialization was successful. 522 */ 523 _init_load_cleanup(NULL, init_ctx); 524 } 525 526 static void 527 _init_write_path_cpl(void *cb_arg, int reduce_errno) 528 { 529 struct reduce_init_load_ctx *init_ctx = cb_arg; 530 struct spdk_reduce_vol *vol = init_ctx->vol; 531 struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io; 532 533 init_ctx->iov[0].iov_base = vol->backing_super; 534 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 535 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 536 init_ctx->backing_cb_args.cb_arg = init_ctx; 537 538 backing_io->dev = vol->backing_dev; 539 backing_io->iov = init_ctx->iov; 540 backing_io->iovcnt = 1; 541 backing_io->lba = 0; 542 backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen; 543 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 544 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 545 546 vol->backing_dev->submit_backing_io(backing_io); 547 } 548 549 static int 550 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 551 { 552 uint64_t total_chunks, total_backing_io_units; 553 uint32_t i, num_metadata_io_units; 554 555 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 556 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 557 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 558 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 559 560 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 561 return -ENOMEM; 562 } 563 564 /* Set backing io unit bits associated with metadata. */ 565 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 566 vol->params.backing_io_unit_size; 567 for (i = 0; i < num_metadata_io_units; i++) { 568 spdk_bit_array_set(vol->allocated_backing_io_units, i); 569 } 570 571 return 0; 572 } 573 574 void 575 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 576 struct spdk_reduce_backing_dev *backing_dev, 577 const char *pm_file_dir, 578 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 579 { 580 struct spdk_reduce_vol *vol; 581 struct reduce_init_load_ctx *init_ctx; 582 struct spdk_reduce_backing_io *backing_io; 583 uint64_t backing_dev_size; 584 size_t mapped_len; 585 int dir_len, max_dir_len, rc; 586 587 /* We need to append a path separator and the UUID to the supplied 588 * path. 589 */ 590 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 591 dir_len = strnlen(pm_file_dir, max_dir_len); 592 /* Strip trailing slash if the user provided one - we will add it back 593 * later when appending the filename. 594 */ 595 if (pm_file_dir[dir_len - 1] == '/') { 596 dir_len--; 597 } 598 if (dir_len == max_dir_len) { 599 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 600 cb_fn(cb_arg, NULL, -EINVAL); 601 return; 602 } 603 604 rc = _validate_vol_params(params); 605 if (rc != 0) { 606 SPDK_ERRLOG("invalid vol params\n"); 607 cb_fn(cb_arg, NULL, rc); 608 return; 609 } 610 611 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 612 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 613 if (params->vol_size == 0) { 614 SPDK_ERRLOG("backing device is too small\n"); 615 cb_fn(cb_arg, NULL, -EINVAL); 616 return; 617 } 618 619 if (backing_dev->submit_backing_io == NULL) { 620 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 621 cb_fn(cb_arg, NULL, -EINVAL); 622 return; 623 } 624 625 vol = calloc(1, sizeof(*vol)); 626 if (vol == NULL) { 627 cb_fn(cb_arg, NULL, -ENOMEM); 628 return; 629 } 630 631 TAILQ_INIT(&vol->free_requests); 632 TAILQ_INIT(&vol->executing_requests); 633 TAILQ_INIT(&vol->queued_requests); 634 635 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 636 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 637 if (vol->backing_super == NULL) { 638 cb_fn(cb_arg, NULL, -ENOMEM); 639 _init_load_cleanup(vol, NULL); 640 return; 641 } 642 643 init_ctx = calloc(1, sizeof(*init_ctx)); 644 if (init_ctx == NULL) { 645 cb_fn(cb_arg, NULL, -ENOMEM); 646 _init_load_cleanup(vol, NULL); 647 return; 648 } 649 650 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 651 if (backing_io == NULL) { 652 cb_fn(cb_arg, NULL, -ENOMEM); 653 _init_load_cleanup(vol, init_ctx); 654 return; 655 } 656 init_ctx->backing_io = backing_io; 657 658 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 659 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 660 if (init_ctx->path == NULL) { 661 cb_fn(cb_arg, NULL, -ENOMEM); 662 _init_load_cleanup(vol, init_ctx); 663 return; 664 } 665 666 if (spdk_uuid_is_null(¶ms->uuid)) { 667 spdk_uuid_generate(¶ms->uuid); 668 } 669 670 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 671 vol->pm_file.path[dir_len] = '/'; 672 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 673 ¶ms->uuid); 674 vol->pm_file.size = _get_pm_file_size(params); 675 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 676 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 677 &mapped_len, &vol->pm_file.pm_is_pmem); 678 if (vol->pm_file.pm_buf == NULL) { 679 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 680 vol->pm_file.path, strerror(errno)); 681 cb_fn(cb_arg, NULL, -errno); 682 _init_load_cleanup(vol, init_ctx); 683 return; 684 } 685 686 if (vol->pm_file.size != mapped_len) { 687 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 688 vol->pm_file.size, mapped_len); 689 cb_fn(cb_arg, NULL, -ENOMEM); 690 _init_load_cleanup(vol, init_ctx); 691 return; 692 } 693 694 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 695 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 696 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 697 memcpy(&vol->params, params, sizeof(*params)); 698 699 vol->backing_dev = backing_dev; 700 701 rc = _allocate_bit_arrays(vol); 702 if (rc != 0) { 703 cb_fn(cb_arg, NULL, rc); 704 _init_load_cleanup(vol, init_ctx); 705 return; 706 } 707 708 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 709 sizeof(vol->backing_super->signature)); 710 memcpy(&vol->backing_super->params, params, sizeof(*params)); 711 712 _initialize_vol_pm_pointers(vol); 713 714 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 715 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 716 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 717 */ 718 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 719 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 720 721 init_ctx->vol = vol; 722 init_ctx->cb_fn = cb_fn; 723 init_ctx->cb_arg = cb_arg; 724 725 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 726 init_ctx->iov[0].iov_base = init_ctx->path; 727 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 728 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 729 init_ctx->backing_cb_args.cb_arg = init_ctx; 730 /* Write path to offset 4K on backing device - just after where the super 731 * block will be written. We wait until this is committed before writing the 732 * super block to guarantee we don't get the super block written without the 733 * the path if the system crashed in the middle of a write operation. 734 */ 735 backing_io->dev = vol->backing_dev; 736 backing_io->iov = init_ctx->iov; 737 backing_io->iovcnt = 1; 738 backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen; 739 backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen; 740 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 741 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 742 743 vol->backing_dev->submit_backing_io(backing_io); 744 } 745 746 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 747 748 static void 749 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 750 { 751 struct reduce_init_load_ctx *load_ctx = cb_arg; 752 struct spdk_reduce_vol *vol = load_ctx->vol; 753 uint64_t backing_dev_size; 754 uint64_t i, num_chunks, logical_map_index; 755 struct spdk_reduce_chunk_map *chunk; 756 size_t mapped_len; 757 uint32_t j; 758 int rc; 759 760 rc = _alloc_zero_buff(); 761 if (rc) { 762 goto error; 763 } 764 765 if (memcmp(vol->backing_super->signature, 766 SPDK_REDUCE_SIGNATURE, 767 sizeof(vol->backing_super->signature)) != 0) { 768 /* This backing device isn't a libreduce backing device. */ 769 rc = -EILSEQ; 770 goto error; 771 } 772 773 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 774 * So don't bother getting the volume ready to use - invoke the callback immediately 775 * so destroy_load_cb can delete the metadata off of the block device and delete the 776 * persistent memory file if it exists. 777 */ 778 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 779 if (load_ctx->cb_fn == (*destroy_load_cb)) { 780 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 781 _init_load_cleanup(NULL, load_ctx); 782 return; 783 } 784 785 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 786 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 787 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 788 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 789 790 rc = _allocate_bit_arrays(vol); 791 if (rc != 0) { 792 goto error; 793 } 794 795 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 796 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 797 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 798 backing_dev_size); 799 rc = -EILSEQ; 800 goto error; 801 } 802 803 vol->pm_file.size = _get_pm_file_size(&vol->params); 804 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 805 &vol->pm_file.pm_is_pmem); 806 if (vol->pm_file.pm_buf == NULL) { 807 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 808 rc = -errno; 809 goto error; 810 } 811 812 if (vol->pm_file.size != mapped_len) { 813 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 814 vol->pm_file.size, mapped_len); 815 rc = -ENOMEM; 816 goto error; 817 } 818 819 rc = _allocate_vol_requests(vol); 820 if (rc != 0) { 821 goto error; 822 } 823 824 _initialize_vol_pm_pointers(vol); 825 826 num_chunks = vol->params.vol_size / vol->params.chunk_size; 827 for (i = 0; i < num_chunks; i++) { 828 logical_map_index = vol->pm_logical_map[i]; 829 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 830 continue; 831 } 832 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 833 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 834 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 835 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 836 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 837 } 838 } 839 } 840 841 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 842 /* Only clean up the ctx - the vol has been passed to the application 843 * for use now that volume load was successful. 844 */ 845 _init_load_cleanup(NULL, load_ctx); 846 return; 847 848 error: 849 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 850 _init_load_cleanup(vol, load_ctx); 851 } 852 853 void 854 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 855 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 856 { 857 struct spdk_reduce_vol *vol; 858 struct reduce_init_load_ctx *load_ctx; 859 struct spdk_reduce_backing_io *backing_io; 860 861 if (backing_dev->submit_backing_io == NULL) { 862 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 863 cb_fn(cb_arg, NULL, -EINVAL); 864 return; 865 } 866 867 vol = calloc(1, sizeof(*vol)); 868 if (vol == NULL) { 869 cb_fn(cb_arg, NULL, -ENOMEM); 870 return; 871 } 872 873 TAILQ_INIT(&vol->free_requests); 874 TAILQ_INIT(&vol->executing_requests); 875 TAILQ_INIT(&vol->queued_requests); 876 877 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 878 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 879 if (vol->backing_super == NULL) { 880 _init_load_cleanup(vol, NULL); 881 cb_fn(cb_arg, NULL, -ENOMEM); 882 return; 883 } 884 885 vol->backing_dev = backing_dev; 886 887 load_ctx = calloc(1, sizeof(*load_ctx)); 888 if (load_ctx == NULL) { 889 _init_load_cleanup(vol, NULL); 890 cb_fn(cb_arg, NULL, -ENOMEM); 891 return; 892 } 893 894 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 895 if (backing_io == NULL) { 896 _init_load_cleanup(vol, load_ctx); 897 cb_fn(cb_arg, NULL, -ENOMEM); 898 return; 899 } 900 901 load_ctx->backing_io = backing_io; 902 903 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 904 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 905 if (load_ctx->path == NULL) { 906 _init_load_cleanup(vol, load_ctx); 907 cb_fn(cb_arg, NULL, -ENOMEM); 908 return; 909 } 910 911 load_ctx->vol = vol; 912 load_ctx->cb_fn = cb_fn; 913 load_ctx->cb_arg = cb_arg; 914 915 load_ctx->iov[0].iov_base = vol->backing_super; 916 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 917 load_ctx->iov[1].iov_base = load_ctx->path; 918 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 919 backing_io->dev = vol->backing_dev; 920 backing_io->iov = load_ctx->iov; 921 backing_io->iovcnt = LOAD_IOV_COUNT; 922 backing_io->lba = 0; 923 backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 924 vol->backing_dev->blocklen; 925 backing_io->backing_cb_args = &load_ctx->backing_cb_args; 926 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 927 928 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 929 load_ctx->backing_cb_args.cb_arg = load_ctx; 930 vol->backing_dev->submit_backing_io(backing_io); 931 } 932 933 void 934 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 935 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 936 { 937 if (vol == NULL) { 938 /* This indicates a programming error. */ 939 assert(false); 940 cb_fn(cb_arg, -EINVAL); 941 return; 942 } 943 944 if (--g_vol_count == 0) { 945 spdk_free(g_zero_buf); 946 } 947 assert(g_vol_count >= 0); 948 _init_load_cleanup(vol, NULL); 949 cb_fn(cb_arg, 0); 950 } 951 952 struct reduce_destroy_ctx { 953 spdk_reduce_vol_op_complete cb_fn; 954 void *cb_arg; 955 struct spdk_reduce_vol *vol; 956 struct spdk_reduce_vol_superblock *super; 957 struct iovec iov; 958 struct spdk_reduce_vol_cb_args backing_cb_args; 959 int reduce_errno; 960 char pm_path[REDUCE_PATH_MAX]; 961 struct spdk_reduce_backing_io *backing_io; 962 }; 963 964 static void 965 destroy_unload_cpl(void *cb_arg, int reduce_errno) 966 { 967 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 968 969 if (destroy_ctx->reduce_errno == 0) { 970 if (unlink(destroy_ctx->pm_path)) { 971 SPDK_ERRLOG("%s could not be unlinked: %s\n", 972 destroy_ctx->pm_path, strerror(errno)); 973 } 974 } 975 976 /* Even if the unload somehow failed, we still pass the destroy_ctx 977 * reduce_errno since that indicates whether or not the volume was 978 * actually destroyed. 979 */ 980 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 981 spdk_free(destroy_ctx->super); 982 free(destroy_ctx->backing_io); 983 free(destroy_ctx); 984 } 985 986 static void 987 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 988 { 989 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 990 struct spdk_reduce_vol *vol = destroy_ctx->vol; 991 992 destroy_ctx->reduce_errno = reduce_errno; 993 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 994 } 995 996 static void 997 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 998 { 999 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1000 struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io; 1001 1002 if (reduce_errno != 0) { 1003 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 1004 spdk_free(destroy_ctx->super); 1005 free(destroy_ctx); 1006 return; 1007 } 1008 1009 destroy_ctx->vol = vol; 1010 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 1011 destroy_ctx->iov.iov_base = destroy_ctx->super; 1012 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 1013 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 1014 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 1015 1016 backing_io->dev = vol->backing_dev; 1017 backing_io->iov = &destroy_ctx->iov; 1018 backing_io->iovcnt = 1; 1019 backing_io->lba = 0; 1020 backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen; 1021 backing_io->backing_cb_args = &destroy_ctx->backing_cb_args; 1022 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1023 1024 vol->backing_dev->submit_backing_io(backing_io); 1025 } 1026 1027 void 1028 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 1029 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1030 { 1031 struct reduce_destroy_ctx *destroy_ctx; 1032 struct spdk_reduce_backing_io *backing_io; 1033 1034 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 1035 if (destroy_ctx == NULL) { 1036 cb_fn(cb_arg, -ENOMEM); 1037 return; 1038 } 1039 1040 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 1041 if (backing_io == NULL) { 1042 free(destroy_ctx); 1043 cb_fn(cb_arg, -ENOMEM); 1044 return; 1045 } 1046 1047 destroy_ctx->backing_io = backing_io; 1048 1049 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 1050 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1051 if (destroy_ctx->super == NULL) { 1052 free(destroy_ctx); 1053 free(backing_io); 1054 cb_fn(cb_arg, -ENOMEM); 1055 return; 1056 } 1057 destroy_ctx->cb_fn = cb_fn; 1058 destroy_ctx->cb_arg = cb_arg; 1059 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 1060 } 1061 1062 static bool 1063 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 1064 { 1065 uint64_t start_chunk, end_chunk; 1066 1067 start_chunk = offset / vol->logical_blocks_per_chunk; 1068 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 1069 1070 return (start_chunk != end_chunk); 1071 } 1072 1073 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 1074 1075 static void 1076 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 1077 { 1078 struct spdk_reduce_vol_request *next_req; 1079 struct spdk_reduce_vol *vol = req->vol; 1080 1081 req->cb_fn(req->cb_arg, reduce_errno); 1082 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 1083 1084 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 1085 if (next_req->logical_map_index == req->logical_map_index) { 1086 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 1087 if (next_req->type == REDUCE_IO_READV) { 1088 _start_readv_request(next_req); 1089 } else { 1090 assert(next_req->type == REDUCE_IO_WRITEV); 1091 _start_writev_request(next_req); 1092 } 1093 break; 1094 } 1095 } 1096 1097 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 1098 } 1099 1100 static void 1101 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 1102 { 1103 struct spdk_reduce_chunk_map *chunk; 1104 uint32_t i; 1105 1106 chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index); 1107 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 1108 if (chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { 1109 break; 1110 } 1111 assert(spdk_bit_array_get(vol->allocated_backing_io_units, 1112 chunk->io_unit_index[i]) == true); 1113 spdk_bit_array_clear(vol->allocated_backing_io_units, chunk->io_unit_index[i]); 1114 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1115 } 1116 spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index); 1117 } 1118 1119 static void 1120 _write_write_done(void *_req, int reduce_errno) 1121 { 1122 struct spdk_reduce_vol_request *req = _req; 1123 struct spdk_reduce_vol *vol = req->vol; 1124 uint64_t old_chunk_map_index; 1125 1126 if (reduce_errno != 0) { 1127 req->reduce_errno = reduce_errno; 1128 } 1129 1130 assert(req->num_backing_ops > 0); 1131 if (--req->num_backing_ops > 0) { 1132 return; 1133 } 1134 1135 if (req->reduce_errno != 0) { 1136 _reduce_vol_reset_chunk(vol, req->chunk_map_index); 1137 _reduce_vol_complete_req(req, req->reduce_errno); 1138 return; 1139 } 1140 1141 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1142 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1143 _reduce_vol_reset_chunk(vol, old_chunk_map_index); 1144 } 1145 1146 /* 1147 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1148 * becomes invalid after we update the logical map, since the old chunk map will no 1149 * longer have a reference to it in the logical map. 1150 */ 1151 1152 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1153 _reduce_persist(vol, req->chunk, 1154 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1155 1156 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1157 1158 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1159 1160 _reduce_vol_complete_req(req, 0); 1161 } 1162 1163 static struct spdk_reduce_backing_io * 1164 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index) 1165 { 1166 struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev; 1167 struct spdk_reduce_backing_io *backing_io; 1168 1169 backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io + 1170 (sizeof(*backing_io) + backing_dev->user_ctx_size) * index); 1171 1172 return backing_io; 1173 1174 } 1175 1176 struct reduce_merged_io_desc { 1177 uint64_t io_unit_index; 1178 uint32_t num_io_units; 1179 }; 1180 1181 static void 1182 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1183 reduce_request_fn next_fn, bool is_write) 1184 { 1185 struct iovec *iov; 1186 struct spdk_reduce_backing_io *backing_io; 1187 uint8_t *buf; 1188 uint32_t i; 1189 1190 if (req->chunk_is_compressed) { 1191 iov = req->comp_buf_iov; 1192 buf = req->comp_buf; 1193 } else { 1194 iov = req->decomp_buf_iov; 1195 buf = req->decomp_buf; 1196 } 1197 1198 req->num_backing_ops = req->num_io_units; 1199 req->backing_cb_args.cb_fn = next_fn; 1200 req->backing_cb_args.cb_arg = req; 1201 for (i = 0; i < req->num_io_units; i++) { 1202 backing_io = _reduce_vol_req_get_backing_io(req, i); 1203 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1204 iov[i].iov_len = vol->params.backing_io_unit_size; 1205 backing_io->dev = vol->backing_dev; 1206 backing_io->iov = &iov[i]; 1207 backing_io->iovcnt = 1; 1208 backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit; 1209 backing_io->lba_count = vol->backing_lba_per_io_unit; 1210 backing_io->backing_cb_args = &req->backing_cb_args; 1211 if (is_write) { 1212 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1213 } else { 1214 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1215 } 1216 vol->backing_dev->submit_backing_io(backing_io); 1217 } 1218 } 1219 1220 static void 1221 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1222 reduce_request_fn next_fn, bool is_write) 1223 { 1224 struct iovec *iov; 1225 struct spdk_reduce_backing_io *backing_io; 1226 struct reduce_merged_io_desc merged_io_desc[4]; 1227 uint8_t *buf; 1228 bool merge = false; 1229 uint32_t num_io = 0; 1230 uint32_t io_unit_counts = 0; 1231 uint32_t merged_io_idx = 0; 1232 uint32_t i; 1233 1234 /* The merged_io_desc value is defined here to contain four elements, 1235 * and the chunk size must be four times the maximum of the io unit. 1236 * if chunk size is too big, don't merge IO. 1237 */ 1238 if (vol->backing_io_units_per_chunk > 4) { 1239 _issue_backing_ops_without_merge(req, vol, next_fn, is_write); 1240 return; 1241 } 1242 1243 if (req->chunk_is_compressed) { 1244 iov = req->comp_buf_iov; 1245 buf = req->comp_buf; 1246 } else { 1247 iov = req->decomp_buf_iov; 1248 buf = req->decomp_buf; 1249 } 1250 1251 for (i = 0; i < req->num_io_units; i++) { 1252 if (!merge) { 1253 merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i]; 1254 merged_io_desc[merged_io_idx].num_io_units = 1; 1255 num_io++; 1256 } 1257 1258 if (i + 1 == req->num_io_units) { 1259 break; 1260 } 1261 1262 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) { 1263 merged_io_desc[merged_io_idx].num_io_units += 1; 1264 merge = true; 1265 continue; 1266 } 1267 merge = false; 1268 merged_io_idx++; 1269 } 1270 1271 req->num_backing_ops = num_io; 1272 req->backing_cb_args.cb_fn = next_fn; 1273 req->backing_cb_args.cb_arg = req; 1274 for (i = 0; i < num_io; i++) { 1275 backing_io = _reduce_vol_req_get_backing_io(req, i); 1276 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size; 1277 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units; 1278 backing_io->dev = vol->backing_dev; 1279 backing_io->iov = &iov[i]; 1280 backing_io->iovcnt = 1; 1281 backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit; 1282 backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units; 1283 backing_io->backing_cb_args = &req->backing_cb_args; 1284 if (is_write) { 1285 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1286 } else { 1287 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1288 } 1289 vol->backing_dev->submit_backing_io(backing_io); 1290 1291 /* Collects the number of processed I/O. */ 1292 io_unit_counts += merged_io_desc[i].num_io_units; 1293 } 1294 } 1295 1296 static void 1297 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1298 uint32_t compressed_size) 1299 { 1300 struct spdk_reduce_vol *vol = req->vol; 1301 uint32_t i; 1302 uint64_t chunk_offset, remainder, total_len = 0; 1303 uint8_t *buf; 1304 int j; 1305 1306 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); 1307 1308 /* TODO: fail if no chunk map found - but really this should not happen if we 1309 * size the number of requests similarly to number of extra chunk maps 1310 */ 1311 assert(req->chunk_map_index != UINT32_MAX); 1312 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1313 1314 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1315 req->num_io_units = spdk_divide_round_up(compressed_size, 1316 vol->params.backing_io_unit_size); 1317 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1318 req->chunk->compressed_size = 1319 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1320 1321 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1322 if (req->chunk_is_compressed == false) { 1323 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1324 buf = req->decomp_buf; 1325 total_len = chunk_offset * vol->params.logical_block_size; 1326 1327 /* zero any offset into chunk */ 1328 if (req->rmw == false && chunk_offset) { 1329 memset(buf, 0, total_len); 1330 } 1331 buf += total_len; 1332 1333 /* copy the data */ 1334 for (j = 0; j < req->iovcnt; j++) { 1335 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1336 buf += req->iov[j].iov_len; 1337 total_len += req->iov[j].iov_len; 1338 } 1339 1340 /* zero any remainder */ 1341 remainder = vol->params.chunk_size - total_len; 1342 total_len += remainder; 1343 if (req->rmw == false && remainder) { 1344 memset(buf, 0, remainder); 1345 } 1346 assert(total_len == vol->params.chunk_size); 1347 } 1348 1349 for (i = 0; i < req->num_io_units; i++) { 1350 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); 1351 /* TODO: fail if no backing block found - but really this should also not 1352 * happen (see comment above). 1353 */ 1354 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1355 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1356 } 1357 1358 _issue_backing_ops(req, vol, next_fn, true /* write */); 1359 } 1360 1361 static void 1362 _write_compress_done(void *_req, int reduce_errno) 1363 { 1364 struct spdk_reduce_vol_request *req = _req; 1365 1366 /* Negative reduce_errno indicates failure for compression operations. 1367 * Just write the uncompressed data instead. Force this to happen 1368 * by just passing the full chunk size to _reduce_vol_write_chunk. 1369 * When it sees the data couldn't be compressed, it will just write 1370 * the uncompressed buffer to disk. 1371 */ 1372 if (reduce_errno < 0) { 1373 req->backing_cb_args.output_size = req->vol->params.chunk_size; 1374 } 1375 1376 _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size); 1377 } 1378 1379 static void 1380 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1381 { 1382 struct spdk_reduce_vol *vol = req->vol; 1383 1384 req->backing_cb_args.cb_fn = next_fn; 1385 req->backing_cb_args.cb_arg = req; 1386 req->comp_buf_iov[0].iov_base = req->comp_buf; 1387 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1388 vol->backing_dev->compress(vol->backing_dev, 1389 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1390 &req->backing_cb_args); 1391 } 1392 1393 static void 1394 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1395 { 1396 struct spdk_reduce_vol *vol = req->vol; 1397 1398 req->backing_cb_args.cb_fn = next_fn; 1399 req->backing_cb_args.cb_arg = req; 1400 req->comp_buf_iov[0].iov_base = req->comp_buf; 1401 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1402 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1403 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1404 vol->backing_dev->decompress(vol->backing_dev, 1405 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1406 &req->backing_cb_args); 1407 } 1408 1409 static void 1410 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1411 { 1412 struct spdk_reduce_vol *vol = req->vol; 1413 uint64_t chunk_offset, remainder = 0; 1414 uint64_t ttl_len = 0; 1415 size_t iov_len; 1416 int i; 1417 1418 req->decomp_iovcnt = 0; 1419 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1420 1421 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1422 * if at least one of the conditions below is true: 1423 * 1. User's buffer is fragmented 1424 * 2. Length of the user's buffer is less than the chunk 1425 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1426 iov_len = req->iov[0].iov_len; 1427 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1428 req->iov[0].iov_len < vol->params.chunk_size || 1429 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len)); 1430 if (req->copy_after_decompress) { 1431 req->decomp_iov[0].iov_base = req->decomp_buf; 1432 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1433 req->decomp_iovcnt = 1; 1434 goto decompress; 1435 } 1436 1437 if (chunk_offset) { 1438 /* first iov point to our scratch buffer for any offset into the chunk */ 1439 req->decomp_iov[0].iov_base = req->decomp_buf; 1440 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1441 ttl_len += req->decomp_iov[0].iov_len; 1442 req->decomp_iovcnt = 1; 1443 } 1444 1445 /* now the user data iov, direct to the user buffer */ 1446 for (i = 0; i < req->iovcnt; i++) { 1447 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1448 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1449 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1450 } 1451 req->decomp_iovcnt += req->iovcnt; 1452 1453 /* send the rest of the chunk to our scratch buffer */ 1454 remainder = vol->params.chunk_size - ttl_len; 1455 if (remainder) { 1456 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1457 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1458 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1459 req->decomp_iovcnt++; 1460 } 1461 assert(ttl_len == vol->params.chunk_size); 1462 1463 decompress: 1464 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1465 req->backing_cb_args.cb_fn = next_fn; 1466 req->backing_cb_args.cb_arg = req; 1467 req->comp_buf_iov[0].iov_base = req->comp_buf; 1468 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1469 vol->backing_dev->decompress(vol->backing_dev, 1470 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1471 &req->backing_cb_args); 1472 } 1473 1474 static inline void 1475 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1476 { 1477 struct spdk_reduce_vol *vol = req->vol; 1478 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1479 uint64_t chunk_offset, ttl_len = 0; 1480 uint64_t remainder = 0; 1481 char *copy_offset = NULL; 1482 uint32_t lbsize = vol->params.logical_block_size; 1483 int i; 1484 1485 req->decomp_iov[0].iov_base = req->decomp_buf; 1486 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1487 req->decomp_iovcnt = 1; 1488 copy_offset = req->decomp_iov[0].iov_base; 1489 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1490 1491 if (chunk_offset) { 1492 ttl_len += chunk_offset * lbsize; 1493 /* copy_offset already points to padding buffer if zero_paddings=false */ 1494 if (zero_paddings) { 1495 memcpy(copy_offset, padding_buffer, ttl_len); 1496 } 1497 copy_offset += ttl_len; 1498 } 1499 1500 /* now the user data iov, direct from the user buffer */ 1501 for (i = 0; i < req->iovcnt; i++) { 1502 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1503 copy_offset += req->iov[i].iov_len; 1504 ttl_len += req->iov[i].iov_len; 1505 } 1506 1507 remainder = vol->params.chunk_size - ttl_len; 1508 if (remainder) { 1509 /* copy_offset already points to padding buffer if zero_paddings=false */ 1510 if (zero_paddings) { 1511 memcpy(copy_offset, padding_buffer + ttl_len, remainder); 1512 } 1513 ttl_len += remainder; 1514 } 1515 1516 assert(ttl_len == req->vol->params.chunk_size); 1517 } 1518 1519 /* This function can be called when we are compressing a new data or in case of read-modify-write 1520 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1521 * should point to already read and decompressed buffer */ 1522 static inline void 1523 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1524 { 1525 struct spdk_reduce_vol *vol = req->vol; 1526 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1527 uint64_t chunk_offset, ttl_len = 0; 1528 uint64_t remainder = 0; 1529 uint32_t lbsize = vol->params.logical_block_size; 1530 size_t iov_len; 1531 int i; 1532 1533 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1534 * if at least one of the conditions below is true: 1535 * 1. User's buffer is fragmented 1536 * 2. Length of the user's buffer is less than the chunk 1537 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1538 iov_len = req->iov[0].iov_len; 1539 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1540 req->iov[0].iov_len < vol->params.chunk_size || 1541 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) { 1542 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1543 return; 1544 } 1545 1546 req->decomp_iovcnt = 0; 1547 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1548 1549 if (chunk_offset != 0) { 1550 ttl_len += chunk_offset * lbsize; 1551 req->decomp_iov[0].iov_base = padding_buffer; 1552 req->decomp_iov[0].iov_len = ttl_len; 1553 req->decomp_iovcnt = 1; 1554 } 1555 1556 /* now the user data iov, direct from the user buffer */ 1557 for (i = 0; i < req->iovcnt; i++) { 1558 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1559 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1560 ttl_len += req->iov[i].iov_len; 1561 } 1562 req->decomp_iovcnt += req->iovcnt; 1563 1564 remainder = vol->params.chunk_size - ttl_len; 1565 if (remainder) { 1566 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1567 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1568 req->decomp_iovcnt++; 1569 ttl_len += remainder; 1570 } 1571 assert(ttl_len == req->vol->params.chunk_size); 1572 } 1573 1574 static void 1575 _write_decompress_done(void *_req, int reduce_errno) 1576 { 1577 struct spdk_reduce_vol_request *req = _req; 1578 1579 /* Negative reduce_errno indicates failure for compression operations. */ 1580 if (reduce_errno < 0) { 1581 _reduce_vol_complete_req(req, reduce_errno); 1582 return; 1583 } 1584 1585 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1586 * represents the output_size. 1587 */ 1588 if (req->backing_cb_args.output_size != req->vol->params.chunk_size) { 1589 _reduce_vol_complete_req(req, -EIO); 1590 return; 1591 } 1592 1593 _prepare_compress_chunk(req, false); 1594 _reduce_vol_compress_chunk(req, _write_compress_done); 1595 } 1596 1597 static void 1598 _write_read_done(void *_req, int reduce_errno) 1599 { 1600 struct spdk_reduce_vol_request *req = _req; 1601 1602 if (reduce_errno != 0) { 1603 req->reduce_errno = reduce_errno; 1604 } 1605 1606 assert(req->num_backing_ops > 0); 1607 if (--req->num_backing_ops > 0) { 1608 return; 1609 } 1610 1611 if (req->reduce_errno != 0) { 1612 _reduce_vol_complete_req(req, req->reduce_errno); 1613 return; 1614 } 1615 1616 if (req->chunk_is_compressed) { 1617 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1618 } else { 1619 req->backing_cb_args.output_size = req->chunk->compressed_size; 1620 1621 _write_decompress_done(req, 0); 1622 } 1623 } 1624 1625 static void 1626 _read_decompress_done(void *_req, int reduce_errno) 1627 { 1628 struct spdk_reduce_vol_request *req = _req; 1629 struct spdk_reduce_vol *vol = req->vol; 1630 1631 /* Negative reduce_errno indicates failure for compression operations. */ 1632 if (reduce_errno < 0) { 1633 _reduce_vol_complete_req(req, reduce_errno); 1634 return; 1635 } 1636 1637 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1638 * represents the output_size. 1639 */ 1640 if (req->backing_cb_args.output_size != vol->params.chunk_size) { 1641 _reduce_vol_complete_req(req, -EIO); 1642 return; 1643 } 1644 1645 if (req->copy_after_decompress) { 1646 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1647 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1648 int i; 1649 1650 for (i = 0; i < req->iovcnt; i++) { 1651 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1652 decomp_buffer += req->iov[i].iov_len; 1653 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1654 } 1655 } 1656 1657 _reduce_vol_complete_req(req, 0); 1658 } 1659 1660 static void 1661 _read_read_done(void *_req, int reduce_errno) 1662 { 1663 struct spdk_reduce_vol_request *req = _req; 1664 uint64_t chunk_offset; 1665 uint8_t *buf; 1666 int i; 1667 1668 if (reduce_errno != 0) { 1669 req->reduce_errno = reduce_errno; 1670 } 1671 1672 assert(req->num_backing_ops > 0); 1673 if (--req->num_backing_ops > 0) { 1674 return; 1675 } 1676 1677 if (req->reduce_errno != 0) { 1678 _reduce_vol_complete_req(req, req->reduce_errno); 1679 return; 1680 } 1681 1682 if (req->chunk_is_compressed) { 1683 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1684 } else { 1685 1686 /* If the chunk was compressed, the data would have been sent to the 1687 * host buffers by the decompression operation, if not we need to memcpy here. 1688 */ 1689 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1690 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1691 for (i = 0; i < req->iovcnt; i++) { 1692 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1693 buf += req->iov[i].iov_len; 1694 } 1695 1696 req->backing_cb_args.output_size = req->chunk->compressed_size; 1697 1698 _read_decompress_done(req, 0); 1699 } 1700 } 1701 1702 static void 1703 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1704 { 1705 struct spdk_reduce_vol *vol = req->vol; 1706 1707 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1708 assert(req->chunk_map_index != UINT32_MAX); 1709 1710 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1711 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1712 vol->params.backing_io_unit_size); 1713 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1714 1715 _issue_backing_ops(req, vol, next_fn, false /* read */); 1716 } 1717 1718 static bool 1719 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1720 uint64_t length) 1721 { 1722 uint64_t size = 0; 1723 int i; 1724 1725 if (iovcnt > REDUCE_MAX_IOVECS) { 1726 return false; 1727 } 1728 1729 for (i = 0; i < iovcnt; i++) { 1730 size += iov[i].iov_len; 1731 } 1732 1733 return size == (length * vol->params.logical_block_size); 1734 } 1735 1736 static bool 1737 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1738 { 1739 struct spdk_reduce_vol_request *req; 1740 1741 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1742 if (logical_map_index == req->logical_map_index) { 1743 return true; 1744 } 1745 } 1746 1747 return false; 1748 } 1749 1750 static void 1751 _start_readv_request(struct spdk_reduce_vol_request *req) 1752 { 1753 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1754 _reduce_vol_read_chunk(req, _read_read_done); 1755 } 1756 1757 void 1758 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1759 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1760 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1761 { 1762 struct spdk_reduce_vol_request *req; 1763 uint64_t logical_map_index; 1764 bool overlapped; 1765 int i; 1766 1767 if (length == 0) { 1768 cb_fn(cb_arg, 0); 1769 return; 1770 } 1771 1772 if (_request_spans_chunk_boundary(vol, offset, length)) { 1773 cb_fn(cb_arg, -EINVAL); 1774 return; 1775 } 1776 1777 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1778 cb_fn(cb_arg, -EINVAL); 1779 return; 1780 } 1781 1782 logical_map_index = offset / vol->logical_blocks_per_chunk; 1783 overlapped = _check_overlap(vol, logical_map_index); 1784 1785 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1786 /* 1787 * This chunk hasn't been allocated. So treat the data as all 1788 * zeroes for this chunk - do the memset and immediately complete 1789 * the operation. 1790 */ 1791 for (i = 0; i < iovcnt; i++) { 1792 memset(iov[i].iov_base, 0, iov[i].iov_len); 1793 } 1794 cb_fn(cb_arg, 0); 1795 return; 1796 } 1797 1798 req = TAILQ_FIRST(&vol->free_requests); 1799 if (req == NULL) { 1800 cb_fn(cb_arg, -ENOMEM); 1801 return; 1802 } 1803 1804 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1805 req->type = REDUCE_IO_READV; 1806 req->vol = vol; 1807 req->iov = iov; 1808 req->iovcnt = iovcnt; 1809 req->offset = offset; 1810 req->logical_map_index = logical_map_index; 1811 req->length = length; 1812 req->copy_after_decompress = false; 1813 req->cb_fn = cb_fn; 1814 req->cb_arg = cb_arg; 1815 1816 if (!overlapped) { 1817 _start_readv_request(req); 1818 } else { 1819 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1820 } 1821 } 1822 1823 static void 1824 _start_writev_request(struct spdk_reduce_vol_request *req) 1825 { 1826 struct spdk_reduce_vol *vol = req->vol; 1827 1828 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1829 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1830 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1831 /* Read old chunk, then overwrite with data from this write 1832 * operation. 1833 */ 1834 req->rmw = true; 1835 _reduce_vol_read_chunk(req, _write_read_done); 1836 return; 1837 } 1838 } 1839 1840 req->rmw = false; 1841 1842 _prepare_compress_chunk(req, true); 1843 _reduce_vol_compress_chunk(req, _write_compress_done); 1844 } 1845 1846 void 1847 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1848 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1849 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1850 { 1851 struct spdk_reduce_vol_request *req; 1852 uint64_t logical_map_index; 1853 bool overlapped; 1854 1855 if (length == 0) { 1856 cb_fn(cb_arg, 0); 1857 return; 1858 } 1859 1860 if (_request_spans_chunk_boundary(vol, offset, length)) { 1861 cb_fn(cb_arg, -EINVAL); 1862 return; 1863 } 1864 1865 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1866 cb_fn(cb_arg, -EINVAL); 1867 return; 1868 } 1869 1870 logical_map_index = offset / vol->logical_blocks_per_chunk; 1871 overlapped = _check_overlap(vol, logical_map_index); 1872 1873 req = TAILQ_FIRST(&vol->free_requests); 1874 if (req == NULL) { 1875 cb_fn(cb_arg, -ENOMEM); 1876 return; 1877 } 1878 1879 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1880 req->type = REDUCE_IO_WRITEV; 1881 req->vol = vol; 1882 req->iov = iov; 1883 req->iovcnt = iovcnt; 1884 req->offset = offset; 1885 req->logical_map_index = logical_map_index; 1886 req->length = length; 1887 req->copy_after_decompress = false; 1888 req->cb_fn = cb_fn; 1889 req->cb_arg = cb_arg; 1890 1891 if (!overlapped) { 1892 _start_writev_request(req); 1893 } else { 1894 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1895 } 1896 } 1897 1898 const struct spdk_reduce_vol_params * 1899 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1900 { 1901 return &vol->params; 1902 } 1903 1904 const char * 1905 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol) 1906 { 1907 return vol->pm_file.path; 1908 } 1909 1910 void 1911 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1912 { 1913 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1914 uint32_t struct_size; 1915 uint64_t chunk_map_size; 1916 1917 SPDK_NOTICELOG("vol info:\n"); 1918 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1919 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1920 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1921 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1922 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1923 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1924 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1925 vol->params.vol_size / vol->params.chunk_size); 1926 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1927 vol->params.backing_io_unit_size); 1928 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1929 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1930 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1931 1932 SPDK_NOTICELOG("pmem info:\n"); 1933 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1934 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1935 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1936 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1937 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1938 vol->params.chunk_size); 1939 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1940 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1941 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1942 vol->params.backing_io_unit_size); 1943 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1944 } 1945 1946 SPDK_LOG_REGISTER_COMPONENT(reduce) 1947