1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "queue_internal.h" 10 11 #include "spdk/reduce.h" 12 #include "spdk/env.h" 13 #include "spdk/string.h" 14 #include "spdk/bit_array.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/memory.h" 18 19 #include "libpmem.h" 20 21 /* Always round up the size of the PM region to the nearest cacheline. */ 22 #define REDUCE_PM_SIZE_ALIGNMENT 64 23 24 /* Offset into the backing device where the persistent memory file's path is stored. */ 25 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 26 27 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 28 29 #define REDUCE_NUM_VOL_REQUESTS 256 30 31 /* Structure written to offset 0 of both the pm file and the backing device. */ 32 struct spdk_reduce_vol_superblock { 33 uint8_t signature[8]; 34 struct spdk_reduce_vol_params params; 35 uint8_t reserved[4040]; 36 }; 37 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 38 39 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 40 /* null terminator counts one */ 41 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 42 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 43 44 #define REDUCE_PATH_MAX 4096 45 46 #define REDUCE_ZERO_BUF_SIZE 0x100000 47 48 /** 49 * Describes a persistent memory file used to hold metadata associated with a 50 * compressed volume. 51 */ 52 struct spdk_reduce_pm_file { 53 char path[REDUCE_PATH_MAX]; 54 void *pm_buf; 55 int pm_is_pmem; 56 uint64_t size; 57 }; 58 59 #define REDUCE_IO_READV 1 60 #define REDUCE_IO_WRITEV 2 61 62 struct spdk_reduce_chunk_map { 63 uint32_t compressed_size; 64 uint32_t reserved; 65 uint64_t io_unit_index[0]; 66 }; 67 68 struct spdk_reduce_vol_request { 69 /** 70 * Scratch buffer used for uncompressed chunk. This is used for: 71 * 1) source buffer for compression operations 72 * 2) destination buffer for decompression operations 73 * 3) data buffer when writing uncompressed chunk to disk 74 * 4) data buffer when reading uncompressed chunk from disk 75 */ 76 uint8_t *decomp_buf; 77 struct iovec *decomp_buf_iov; 78 79 /** 80 * These are used to construct the iovecs that are sent to 81 * the decomp engine, they point to a mix of the scratch buffer 82 * and user buffer 83 */ 84 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 85 int decomp_iovcnt; 86 87 /** 88 * Scratch buffer used for compressed chunk. This is used for: 89 * 1) destination buffer for compression operations 90 * 2) source buffer for decompression operations 91 * 3) data buffer when writing compressed chunk to disk 92 * 4) data buffer when reading compressed chunk from disk 93 */ 94 uint8_t *comp_buf; 95 struct iovec *comp_buf_iov; 96 struct iovec *iov; 97 bool rmw; 98 struct spdk_reduce_vol *vol; 99 int type; 100 int reduce_errno; 101 int iovcnt; 102 int num_backing_ops; 103 uint32_t num_io_units; 104 struct spdk_reduce_backing_io *backing_io; 105 bool chunk_is_compressed; 106 bool copy_after_decompress; 107 uint64_t offset; 108 uint64_t logical_map_index; 109 uint64_t length; 110 uint64_t chunk_map_index; 111 struct spdk_reduce_chunk_map *chunk; 112 spdk_reduce_vol_op_complete cb_fn; 113 void *cb_arg; 114 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 115 struct spdk_reduce_vol_cb_args backing_cb_args; 116 }; 117 118 struct spdk_reduce_vol { 119 struct spdk_reduce_vol_params params; 120 uint32_t backing_io_units_per_chunk; 121 uint32_t backing_lba_per_io_unit; 122 uint32_t logical_blocks_per_chunk; 123 struct spdk_reduce_pm_file pm_file; 124 struct spdk_reduce_backing_dev *backing_dev; 125 struct spdk_reduce_vol_superblock *backing_super; 126 struct spdk_reduce_vol_superblock *pm_super; 127 uint64_t *pm_logical_map; 128 uint64_t *pm_chunk_maps; 129 130 struct spdk_bit_array *allocated_chunk_maps; 131 /* The starting position when looking for a block from allocated_chunk_maps */ 132 uint64_t find_chunk_offset; 133 /* Cache free chunks to speed up lookup of free chunk. */ 134 struct reduce_queue free_chunks_queue; 135 struct spdk_bit_array *allocated_backing_io_units; 136 /* The starting position when looking for a block from allocated_backing_io_units */ 137 uint64_t find_block_offset; 138 /* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */ 139 struct reduce_queue free_backing_blocks_queue; 140 141 struct spdk_reduce_vol_request *request_mem; 142 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 143 TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; 144 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 145 146 /* Single contiguous buffer used for all request buffers for this volume. */ 147 uint8_t *buf_mem; 148 struct iovec *buf_iov_mem; 149 /* Single contiguous buffer used for backing io buffers for this volume. */ 150 uint8_t *buf_backing_io_mem; 151 }; 152 153 static void _start_readv_request(struct spdk_reduce_vol_request *req); 154 static void _start_writev_request(struct spdk_reduce_vol_request *req); 155 static uint8_t *g_zero_buf; 156 static int g_vol_count = 0; 157 158 /* 159 * Allocate extra metadata chunks and corresponding backing io units to account for 160 * outstanding IO in worst case scenario where logical map is completely allocated 161 * and no data can be compressed. We need extra chunks in this case to handle 162 * in-flight writes since reduce never writes data in place. 163 */ 164 #define REDUCE_NUM_EXTRA_CHUNKS 128 165 166 static void 167 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 168 { 169 if (vol->pm_file.pm_is_pmem) { 170 pmem_persist(addr, len); 171 } else { 172 pmem_msync(addr, len); 173 } 174 } 175 176 static uint64_t 177 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 178 { 179 uint64_t chunks_in_logical_map, logical_map_size; 180 181 chunks_in_logical_map = vol_size / chunk_size; 182 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 183 184 /* Round up to next cacheline. */ 185 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 186 REDUCE_PM_SIZE_ALIGNMENT; 187 } 188 189 static uint64_t 190 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 191 { 192 uint64_t num_chunks; 193 194 num_chunks = vol_size / chunk_size; 195 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 196 197 return num_chunks; 198 } 199 200 static inline uint32_t 201 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 202 { 203 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 204 } 205 206 static uint64_t 207 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 208 { 209 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 210 211 num_chunks = _get_total_chunks(vol_size, chunk_size); 212 io_units_per_chunk = chunk_size / backing_io_unit_size; 213 214 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 215 216 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 217 REDUCE_PM_SIZE_ALIGNMENT; 218 } 219 220 static struct spdk_reduce_chunk_map * 221 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 222 { 223 uintptr_t chunk_map_addr; 224 225 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 226 227 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 228 chunk_map_addr += chunk_map_index * 229 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 230 231 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 232 } 233 234 static int 235 _validate_vol_params(struct spdk_reduce_vol_params *params) 236 { 237 if (params->vol_size > 0) { 238 /** 239 * User does not pass in the vol size - it gets calculated by libreduce from 240 * values in this structure plus the size of the backing device. 241 */ 242 return -EINVAL; 243 } 244 245 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 246 params->logical_block_size == 0) { 247 return -EINVAL; 248 } 249 250 /* Chunk size must be an even multiple of the backing io unit size. */ 251 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 252 return -EINVAL; 253 } 254 255 /* Chunk size must be an even multiple of the logical block size. */ 256 if ((params->chunk_size % params->logical_block_size) != 0) { 257 return -1; 258 } 259 260 return 0; 261 } 262 263 static uint64_t 264 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 265 { 266 uint64_t num_chunks; 267 268 num_chunks = backing_dev_size / chunk_size; 269 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 270 return 0; 271 } 272 273 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 274 return num_chunks * chunk_size; 275 } 276 277 static uint64_t 278 _get_pm_file_size(struct spdk_reduce_vol_params *params) 279 { 280 uint64_t total_pm_size; 281 282 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 283 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 284 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 285 params->backing_io_unit_size); 286 return total_pm_size; 287 } 288 289 const struct spdk_uuid * 290 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 291 { 292 return &vol->params.uuid; 293 } 294 295 static void 296 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 297 { 298 uint64_t logical_map_size; 299 300 /* Superblock is at the beginning of the pm file. */ 301 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 302 303 /* Logical map immediately follows the super block. */ 304 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 305 306 /* Chunks maps follow the logical map. */ 307 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 308 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 309 } 310 311 /* We need 2 iovs during load - one for the superblock, another for the path */ 312 #define LOAD_IOV_COUNT 2 313 314 struct reduce_init_load_ctx { 315 struct spdk_reduce_vol *vol; 316 struct spdk_reduce_vol_cb_args backing_cb_args; 317 spdk_reduce_vol_op_with_handle_complete cb_fn; 318 void *cb_arg; 319 struct iovec iov[LOAD_IOV_COUNT]; 320 void *path; 321 struct spdk_reduce_backing_io *backing_io; 322 }; 323 324 static inline bool 325 _addr_crosses_huge_page(const void *addr, size_t *size) 326 { 327 size_t _size; 328 uint64_t rc; 329 330 assert(size); 331 332 _size = *size; 333 rc = spdk_vtophys(addr, size); 334 335 return rc == SPDK_VTOPHYS_ERROR || _size != *size; 336 } 337 338 static inline int 339 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) 340 { 341 uint8_t *addr; 342 size_t size_tmp = buffer_size; 343 344 addr = *_addr; 345 346 /* Verify that addr + buffer_size doesn't cross huge page boundary */ 347 if (_addr_crosses_huge_page(addr, &size_tmp)) { 348 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. 349 * Skip remaining bytes and continue from the beginning of the next page */ 350 addr += size_tmp; 351 } 352 353 if (addr + buffer_size > addr_range) { 354 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); 355 return -ERANGE; 356 } 357 358 *vol_buffer = addr; 359 *_addr = addr + buffer_size; 360 361 return 0; 362 } 363 364 static int 365 _allocate_vol_requests(struct spdk_reduce_vol *vol) 366 { 367 struct spdk_reduce_vol_request *req; 368 struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev; 369 uint32_t reqs_in_2mb_page, huge_pages_needed; 370 uint8_t *buffer, *buffer_end; 371 int i = 0; 372 int rc = 0; 373 374 /* It is needed to allocate comp and decomp buffers so that they do not cross physical 375 * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not 376 * necessarily power of 2 377 * Allocate 2x since we need buffers for both read/write and compress/decompress 378 * intermediate buffers. */ 379 reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); 380 if (!reqs_in_2mb_page) { 381 return -EINVAL; 382 } 383 huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); 384 385 vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); 386 if (vol->buf_mem == NULL) { 387 return -ENOMEM; 388 } 389 390 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 391 if (vol->request_mem == NULL) { 392 spdk_free(vol->buf_mem); 393 vol->buf_mem = NULL; 394 return -ENOMEM; 395 } 396 397 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 398 * buffers. 399 */ 400 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 401 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 402 if (vol->buf_iov_mem == NULL) { 403 free(vol->request_mem); 404 spdk_free(vol->buf_mem); 405 vol->request_mem = NULL; 406 vol->buf_mem = NULL; 407 return -ENOMEM; 408 } 409 410 vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) + 411 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk); 412 if (vol->buf_backing_io_mem == NULL) { 413 free(vol->request_mem); 414 free(vol->buf_iov_mem); 415 spdk_free(vol->buf_mem); 416 vol->request_mem = NULL; 417 vol->buf_iov_mem = NULL; 418 vol->buf_mem = NULL; 419 return -ENOMEM; 420 } 421 422 buffer = vol->buf_mem; 423 buffer_end = buffer + VALUE_2MB * huge_pages_needed; 424 425 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 426 req = &vol->request_mem[i]; 427 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 428 req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i * 429 (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) * 430 vol->backing_io_units_per_chunk); 431 432 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 433 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 434 435 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); 436 if (rc) { 437 SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 438 vol->buf_mem, buffer_end); 439 break; 440 } 441 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); 442 if (rc) { 443 SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 444 vol->buf_mem, buffer_end); 445 break; 446 } 447 } 448 449 if (rc) { 450 free(vol->buf_backing_io_mem); 451 free(vol->buf_iov_mem); 452 free(vol->request_mem); 453 spdk_free(vol->buf_mem); 454 vol->buf_mem = NULL; 455 vol->buf_backing_io_mem = NULL; 456 vol->buf_iov_mem = NULL; 457 vol->request_mem = NULL; 458 } 459 460 return rc; 461 } 462 463 static void 464 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 465 { 466 if (ctx != NULL) { 467 spdk_free(ctx->path); 468 free(ctx->backing_io); 469 free(ctx); 470 } 471 472 if (vol != NULL) { 473 if (vol->pm_file.pm_buf != NULL) { 474 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 475 } 476 477 spdk_free(vol->backing_super); 478 spdk_bit_array_free(&vol->allocated_chunk_maps); 479 spdk_bit_array_free(&vol->allocated_backing_io_units); 480 free(vol->request_mem); 481 free(vol->buf_backing_io_mem); 482 free(vol->buf_iov_mem); 483 spdk_free(vol->buf_mem); 484 free(vol); 485 } 486 } 487 488 static int 489 _alloc_zero_buff(void) 490 { 491 int rc = 0; 492 493 /* The zero buffer is shared between all volumes and just used 494 * for reads so allocate one global instance here if not already 495 * allocated when another vol init'd or loaded. 496 */ 497 if (g_vol_count++ == 0) { 498 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 499 64, NULL, SPDK_ENV_LCORE_ID_ANY, 500 SPDK_MALLOC_DMA); 501 if (g_zero_buf == NULL) { 502 g_vol_count--; 503 rc = -ENOMEM; 504 } 505 } 506 return rc; 507 } 508 509 static void 510 _init_write_super_cpl(void *cb_arg, int reduce_errno) 511 { 512 struct reduce_init_load_ctx *init_ctx = cb_arg; 513 int rc; 514 515 rc = _allocate_vol_requests(init_ctx->vol); 516 if (rc != 0) { 517 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 518 _init_load_cleanup(init_ctx->vol, init_ctx); 519 return; 520 } 521 522 rc = _alloc_zero_buff(); 523 if (rc != 0) { 524 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 525 _init_load_cleanup(init_ctx->vol, init_ctx); 526 return; 527 } 528 529 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 530 /* Only clean up the ctx - the vol has been passed to the application 531 * for use now that initialization was successful. 532 */ 533 _init_load_cleanup(NULL, init_ctx); 534 } 535 536 static void 537 _init_write_path_cpl(void *cb_arg, int reduce_errno) 538 { 539 struct reduce_init_load_ctx *init_ctx = cb_arg; 540 struct spdk_reduce_vol *vol = init_ctx->vol; 541 struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io; 542 543 init_ctx->iov[0].iov_base = vol->backing_super; 544 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 545 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 546 init_ctx->backing_cb_args.cb_arg = init_ctx; 547 548 backing_io->dev = vol->backing_dev; 549 backing_io->iov = init_ctx->iov; 550 backing_io->iovcnt = 1; 551 backing_io->lba = 0; 552 backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen; 553 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 554 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 555 556 vol->backing_dev->submit_backing_io(backing_io); 557 } 558 559 static int 560 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 561 { 562 uint64_t total_chunks, total_backing_io_units; 563 uint32_t i, num_metadata_io_units; 564 565 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 566 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 567 vol->find_chunk_offset = 0; 568 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 569 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 570 vol->find_block_offset = 0; 571 572 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 573 return -ENOMEM; 574 } 575 576 /* Set backing io unit bits associated with metadata. */ 577 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 578 vol->params.backing_io_unit_size; 579 for (i = 0; i < num_metadata_io_units; i++) { 580 spdk_bit_array_set(vol->allocated_backing_io_units, i); 581 } 582 583 return 0; 584 } 585 586 void 587 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 588 struct spdk_reduce_backing_dev *backing_dev, 589 const char *pm_file_dir, 590 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 591 { 592 struct spdk_reduce_vol *vol; 593 struct reduce_init_load_ctx *init_ctx; 594 struct spdk_reduce_backing_io *backing_io; 595 uint64_t backing_dev_size; 596 size_t mapped_len; 597 int dir_len, max_dir_len, rc; 598 599 /* We need to append a path separator and the UUID to the supplied 600 * path. 601 */ 602 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 603 dir_len = strnlen(pm_file_dir, max_dir_len); 604 /* Strip trailing slash if the user provided one - we will add it back 605 * later when appending the filename. 606 */ 607 if (pm_file_dir[dir_len - 1] == '/') { 608 dir_len--; 609 } 610 if (dir_len == max_dir_len) { 611 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 612 cb_fn(cb_arg, NULL, -EINVAL); 613 return; 614 } 615 616 rc = _validate_vol_params(params); 617 if (rc != 0) { 618 SPDK_ERRLOG("invalid vol params\n"); 619 cb_fn(cb_arg, NULL, rc); 620 return; 621 } 622 623 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 624 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 625 if (params->vol_size == 0) { 626 SPDK_ERRLOG("backing device is too small\n"); 627 cb_fn(cb_arg, NULL, -EINVAL); 628 return; 629 } 630 631 if (backing_dev->submit_backing_io == NULL) { 632 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 633 cb_fn(cb_arg, NULL, -EINVAL); 634 return; 635 } 636 637 vol = calloc(1, sizeof(*vol)); 638 if (vol == NULL) { 639 cb_fn(cb_arg, NULL, -ENOMEM); 640 return; 641 } 642 643 TAILQ_INIT(&vol->free_requests); 644 TAILQ_INIT(&vol->executing_requests); 645 TAILQ_INIT(&vol->queued_requests); 646 queue_init(&vol->free_chunks_queue); 647 queue_init(&vol->free_backing_blocks_queue); 648 649 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 650 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 651 if (vol->backing_super == NULL) { 652 cb_fn(cb_arg, NULL, -ENOMEM); 653 _init_load_cleanup(vol, NULL); 654 return; 655 } 656 657 init_ctx = calloc(1, sizeof(*init_ctx)); 658 if (init_ctx == NULL) { 659 cb_fn(cb_arg, NULL, -ENOMEM); 660 _init_load_cleanup(vol, NULL); 661 return; 662 } 663 664 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 665 if (backing_io == NULL) { 666 cb_fn(cb_arg, NULL, -ENOMEM); 667 _init_load_cleanup(vol, init_ctx); 668 return; 669 } 670 init_ctx->backing_io = backing_io; 671 672 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 673 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 674 if (init_ctx->path == NULL) { 675 cb_fn(cb_arg, NULL, -ENOMEM); 676 _init_load_cleanup(vol, init_ctx); 677 return; 678 } 679 680 if (spdk_uuid_is_null(¶ms->uuid)) { 681 spdk_uuid_generate(¶ms->uuid); 682 } 683 684 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 685 vol->pm_file.path[dir_len] = '/'; 686 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 687 ¶ms->uuid); 688 vol->pm_file.size = _get_pm_file_size(params); 689 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 690 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 691 &mapped_len, &vol->pm_file.pm_is_pmem); 692 if (vol->pm_file.pm_buf == NULL) { 693 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 694 vol->pm_file.path, strerror(errno)); 695 cb_fn(cb_arg, NULL, -errno); 696 _init_load_cleanup(vol, init_ctx); 697 return; 698 } 699 700 if (vol->pm_file.size != mapped_len) { 701 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 702 vol->pm_file.size, mapped_len); 703 cb_fn(cb_arg, NULL, -ENOMEM); 704 _init_load_cleanup(vol, init_ctx); 705 return; 706 } 707 708 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 709 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 710 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 711 memcpy(&vol->params, params, sizeof(*params)); 712 713 vol->backing_dev = backing_dev; 714 715 rc = _allocate_bit_arrays(vol); 716 if (rc != 0) { 717 cb_fn(cb_arg, NULL, rc); 718 _init_load_cleanup(vol, init_ctx); 719 return; 720 } 721 722 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 723 sizeof(vol->backing_super->signature)); 724 memcpy(&vol->backing_super->params, params, sizeof(*params)); 725 726 _initialize_vol_pm_pointers(vol); 727 728 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 729 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 730 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 731 */ 732 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 733 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 734 735 init_ctx->vol = vol; 736 init_ctx->cb_fn = cb_fn; 737 init_ctx->cb_arg = cb_arg; 738 739 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 740 init_ctx->iov[0].iov_base = init_ctx->path; 741 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 742 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 743 init_ctx->backing_cb_args.cb_arg = init_ctx; 744 /* Write path to offset 4K on backing device - just after where the super 745 * block will be written. We wait until this is committed before writing the 746 * super block to guarantee we don't get the super block written without the 747 * the path if the system crashed in the middle of a write operation. 748 */ 749 backing_io->dev = vol->backing_dev; 750 backing_io->iov = init_ctx->iov; 751 backing_io->iovcnt = 1; 752 backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen; 753 backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen; 754 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 755 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 756 757 vol->backing_dev->submit_backing_io(backing_io); 758 } 759 760 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 761 762 static void 763 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 764 { 765 struct reduce_init_load_ctx *load_ctx = cb_arg; 766 struct spdk_reduce_vol *vol = load_ctx->vol; 767 uint64_t backing_dev_size; 768 uint64_t i, num_chunks, logical_map_index; 769 struct spdk_reduce_chunk_map *chunk; 770 size_t mapped_len; 771 uint32_t j; 772 int rc; 773 774 rc = _alloc_zero_buff(); 775 if (rc) { 776 goto error; 777 } 778 779 if (memcmp(vol->backing_super->signature, 780 SPDK_REDUCE_SIGNATURE, 781 sizeof(vol->backing_super->signature)) != 0) { 782 /* This backing device isn't a libreduce backing device. */ 783 rc = -EILSEQ; 784 goto error; 785 } 786 787 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 788 * So don't bother getting the volume ready to use - invoke the callback immediately 789 * so destroy_load_cb can delete the metadata off of the block device and delete the 790 * persistent memory file if it exists. 791 */ 792 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 793 if (load_ctx->cb_fn == (*destroy_load_cb)) { 794 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 795 _init_load_cleanup(NULL, load_ctx); 796 return; 797 } 798 799 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 800 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 801 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 802 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 803 804 rc = _allocate_bit_arrays(vol); 805 if (rc != 0) { 806 goto error; 807 } 808 809 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 810 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 811 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 812 backing_dev_size); 813 rc = -EILSEQ; 814 goto error; 815 } 816 817 vol->pm_file.size = _get_pm_file_size(&vol->params); 818 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 819 &vol->pm_file.pm_is_pmem); 820 if (vol->pm_file.pm_buf == NULL) { 821 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 822 rc = -errno; 823 goto error; 824 } 825 826 if (vol->pm_file.size != mapped_len) { 827 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 828 vol->pm_file.size, mapped_len); 829 rc = -ENOMEM; 830 goto error; 831 } 832 833 rc = _allocate_vol_requests(vol); 834 if (rc != 0) { 835 goto error; 836 } 837 838 _initialize_vol_pm_pointers(vol); 839 840 num_chunks = vol->params.vol_size / vol->params.chunk_size; 841 for (i = 0; i < num_chunks; i++) { 842 logical_map_index = vol->pm_logical_map[i]; 843 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 844 continue; 845 } 846 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 847 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 848 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 849 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 850 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 851 } 852 } 853 } 854 855 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 856 /* Only clean up the ctx - the vol has been passed to the application 857 * for use now that volume load was successful. 858 */ 859 _init_load_cleanup(NULL, load_ctx); 860 return; 861 862 error: 863 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 864 _init_load_cleanup(vol, load_ctx); 865 } 866 867 void 868 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 869 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 870 { 871 struct spdk_reduce_vol *vol; 872 struct reduce_init_load_ctx *load_ctx; 873 struct spdk_reduce_backing_io *backing_io; 874 875 if (backing_dev->submit_backing_io == NULL) { 876 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 877 cb_fn(cb_arg, NULL, -EINVAL); 878 return; 879 } 880 881 vol = calloc(1, sizeof(*vol)); 882 if (vol == NULL) { 883 cb_fn(cb_arg, NULL, -ENOMEM); 884 return; 885 } 886 887 TAILQ_INIT(&vol->free_requests); 888 TAILQ_INIT(&vol->executing_requests); 889 TAILQ_INIT(&vol->queued_requests); 890 queue_init(&vol->free_chunks_queue); 891 queue_init(&vol->free_backing_blocks_queue); 892 893 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 894 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 895 if (vol->backing_super == NULL) { 896 _init_load_cleanup(vol, NULL); 897 cb_fn(cb_arg, NULL, -ENOMEM); 898 return; 899 } 900 901 vol->backing_dev = backing_dev; 902 903 load_ctx = calloc(1, sizeof(*load_ctx)); 904 if (load_ctx == NULL) { 905 _init_load_cleanup(vol, NULL); 906 cb_fn(cb_arg, NULL, -ENOMEM); 907 return; 908 } 909 910 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 911 if (backing_io == NULL) { 912 _init_load_cleanup(vol, load_ctx); 913 cb_fn(cb_arg, NULL, -ENOMEM); 914 return; 915 } 916 917 load_ctx->backing_io = backing_io; 918 919 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 920 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 921 if (load_ctx->path == NULL) { 922 _init_load_cleanup(vol, load_ctx); 923 cb_fn(cb_arg, NULL, -ENOMEM); 924 return; 925 } 926 927 load_ctx->vol = vol; 928 load_ctx->cb_fn = cb_fn; 929 load_ctx->cb_arg = cb_arg; 930 931 load_ctx->iov[0].iov_base = vol->backing_super; 932 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 933 load_ctx->iov[1].iov_base = load_ctx->path; 934 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 935 backing_io->dev = vol->backing_dev; 936 backing_io->iov = load_ctx->iov; 937 backing_io->iovcnt = LOAD_IOV_COUNT; 938 backing_io->lba = 0; 939 backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 940 vol->backing_dev->blocklen; 941 backing_io->backing_cb_args = &load_ctx->backing_cb_args; 942 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 943 944 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 945 load_ctx->backing_cb_args.cb_arg = load_ctx; 946 vol->backing_dev->submit_backing_io(backing_io); 947 } 948 949 void 950 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 951 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 952 { 953 if (vol == NULL) { 954 /* This indicates a programming error. */ 955 assert(false); 956 cb_fn(cb_arg, -EINVAL); 957 return; 958 } 959 960 if (--g_vol_count == 0) { 961 spdk_free(g_zero_buf); 962 } 963 assert(g_vol_count >= 0); 964 _init_load_cleanup(vol, NULL); 965 cb_fn(cb_arg, 0); 966 } 967 968 struct reduce_destroy_ctx { 969 spdk_reduce_vol_op_complete cb_fn; 970 void *cb_arg; 971 struct spdk_reduce_vol *vol; 972 struct spdk_reduce_vol_superblock *super; 973 struct iovec iov; 974 struct spdk_reduce_vol_cb_args backing_cb_args; 975 int reduce_errno; 976 char pm_path[REDUCE_PATH_MAX]; 977 struct spdk_reduce_backing_io *backing_io; 978 }; 979 980 static void 981 destroy_unload_cpl(void *cb_arg, int reduce_errno) 982 { 983 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 984 985 if (destroy_ctx->reduce_errno == 0) { 986 if (unlink(destroy_ctx->pm_path)) { 987 SPDK_ERRLOG("%s could not be unlinked: %s\n", 988 destroy_ctx->pm_path, strerror(errno)); 989 } 990 } 991 992 /* Even if the unload somehow failed, we still pass the destroy_ctx 993 * reduce_errno since that indicates whether or not the volume was 994 * actually destroyed. 995 */ 996 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 997 spdk_free(destroy_ctx->super); 998 free(destroy_ctx->backing_io); 999 free(destroy_ctx); 1000 } 1001 1002 static void 1003 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 1004 { 1005 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1006 struct spdk_reduce_vol *vol = destroy_ctx->vol; 1007 1008 destroy_ctx->reduce_errno = reduce_errno; 1009 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 1010 } 1011 1012 static void 1013 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1014 { 1015 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1016 struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io; 1017 1018 if (reduce_errno != 0) { 1019 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 1020 spdk_free(destroy_ctx->super); 1021 free(destroy_ctx); 1022 return; 1023 } 1024 1025 destroy_ctx->vol = vol; 1026 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 1027 destroy_ctx->iov.iov_base = destroy_ctx->super; 1028 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 1029 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 1030 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 1031 1032 backing_io->dev = vol->backing_dev; 1033 backing_io->iov = &destroy_ctx->iov; 1034 backing_io->iovcnt = 1; 1035 backing_io->lba = 0; 1036 backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen; 1037 backing_io->backing_cb_args = &destroy_ctx->backing_cb_args; 1038 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1039 1040 vol->backing_dev->submit_backing_io(backing_io); 1041 } 1042 1043 void 1044 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 1045 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1046 { 1047 struct reduce_destroy_ctx *destroy_ctx; 1048 struct spdk_reduce_backing_io *backing_io; 1049 1050 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 1051 if (destroy_ctx == NULL) { 1052 cb_fn(cb_arg, -ENOMEM); 1053 return; 1054 } 1055 1056 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 1057 if (backing_io == NULL) { 1058 free(destroy_ctx); 1059 cb_fn(cb_arg, -ENOMEM); 1060 return; 1061 } 1062 1063 destroy_ctx->backing_io = backing_io; 1064 1065 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 1066 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1067 if (destroy_ctx->super == NULL) { 1068 free(destroy_ctx); 1069 free(backing_io); 1070 cb_fn(cb_arg, -ENOMEM); 1071 return; 1072 } 1073 destroy_ctx->cb_fn = cb_fn; 1074 destroy_ctx->cb_arg = cb_arg; 1075 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 1076 } 1077 1078 static bool 1079 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 1080 { 1081 uint64_t start_chunk, end_chunk; 1082 1083 start_chunk = offset / vol->logical_blocks_per_chunk; 1084 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 1085 1086 return (start_chunk != end_chunk); 1087 } 1088 1089 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 1090 1091 static void 1092 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 1093 { 1094 struct spdk_reduce_vol_request *next_req; 1095 struct spdk_reduce_vol *vol = req->vol; 1096 1097 req->cb_fn(req->cb_arg, reduce_errno); 1098 TAILQ_REMOVE(&vol->executing_requests, req, tailq); 1099 1100 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 1101 if (next_req->logical_map_index == req->logical_map_index) { 1102 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 1103 if (next_req->type == REDUCE_IO_READV) { 1104 _start_readv_request(next_req); 1105 } else { 1106 assert(next_req->type == REDUCE_IO_WRITEV); 1107 _start_writev_request(next_req); 1108 } 1109 break; 1110 } 1111 } 1112 1113 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 1114 } 1115 1116 static void 1117 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 1118 { 1119 struct spdk_reduce_chunk_map *chunk; 1120 uint64_t index; 1121 bool success; 1122 uint32_t i; 1123 1124 chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index); 1125 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 1126 index = chunk->io_unit_index[i]; 1127 if (index == REDUCE_EMPTY_MAP_ENTRY) { 1128 break; 1129 } 1130 assert(spdk_bit_array_get(vol->allocated_backing_io_units, 1131 index) == true); 1132 spdk_bit_array_clear(vol->allocated_backing_io_units, index); 1133 success = queue_enqueue(&vol->free_backing_blocks_queue, index); 1134 if (!success && index < vol->find_block_offset) { 1135 vol->find_block_offset = index; 1136 } 1137 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1138 } 1139 success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index); 1140 if (!success && chunk_map_index < vol->find_chunk_offset) { 1141 vol->find_chunk_offset = chunk_map_index; 1142 } 1143 spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index); 1144 } 1145 1146 static void 1147 _write_write_done(void *_req, int reduce_errno) 1148 { 1149 struct spdk_reduce_vol_request *req = _req; 1150 struct spdk_reduce_vol *vol = req->vol; 1151 uint64_t old_chunk_map_index; 1152 1153 if (reduce_errno != 0) { 1154 req->reduce_errno = reduce_errno; 1155 } 1156 1157 assert(req->num_backing_ops > 0); 1158 if (--req->num_backing_ops > 0) { 1159 return; 1160 } 1161 1162 if (req->reduce_errno != 0) { 1163 _reduce_vol_reset_chunk(vol, req->chunk_map_index); 1164 _reduce_vol_complete_req(req, req->reduce_errno); 1165 return; 1166 } 1167 1168 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1169 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1170 _reduce_vol_reset_chunk(vol, old_chunk_map_index); 1171 } 1172 1173 /* 1174 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1175 * becomes invalid after we update the logical map, since the old chunk map will no 1176 * longer have a reference to it in the logical map. 1177 */ 1178 1179 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1180 _reduce_persist(vol, req->chunk, 1181 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1182 1183 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1184 1185 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1186 1187 _reduce_vol_complete_req(req, 0); 1188 } 1189 1190 static struct spdk_reduce_backing_io * 1191 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index) 1192 { 1193 struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev; 1194 struct spdk_reduce_backing_io *backing_io; 1195 1196 backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io + 1197 (sizeof(*backing_io) + backing_dev->user_ctx_size) * index); 1198 1199 return backing_io; 1200 1201 } 1202 1203 struct reduce_merged_io_desc { 1204 uint64_t io_unit_index; 1205 uint32_t num_io_units; 1206 }; 1207 1208 static void 1209 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1210 reduce_request_fn next_fn, bool is_write) 1211 { 1212 struct iovec *iov; 1213 struct spdk_reduce_backing_io *backing_io; 1214 uint8_t *buf; 1215 uint32_t i; 1216 1217 if (req->chunk_is_compressed) { 1218 iov = req->comp_buf_iov; 1219 buf = req->comp_buf; 1220 } else { 1221 iov = req->decomp_buf_iov; 1222 buf = req->decomp_buf; 1223 } 1224 1225 req->num_backing_ops = req->num_io_units; 1226 req->backing_cb_args.cb_fn = next_fn; 1227 req->backing_cb_args.cb_arg = req; 1228 for (i = 0; i < req->num_io_units; i++) { 1229 backing_io = _reduce_vol_req_get_backing_io(req, i); 1230 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1231 iov[i].iov_len = vol->params.backing_io_unit_size; 1232 backing_io->dev = vol->backing_dev; 1233 backing_io->iov = &iov[i]; 1234 backing_io->iovcnt = 1; 1235 backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit; 1236 backing_io->lba_count = vol->backing_lba_per_io_unit; 1237 backing_io->backing_cb_args = &req->backing_cb_args; 1238 if (is_write) { 1239 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1240 } else { 1241 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1242 } 1243 vol->backing_dev->submit_backing_io(backing_io); 1244 } 1245 } 1246 1247 static void 1248 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1249 reduce_request_fn next_fn, bool is_write) 1250 { 1251 struct iovec *iov; 1252 struct spdk_reduce_backing_io *backing_io; 1253 struct reduce_merged_io_desc merged_io_desc[4]; 1254 uint8_t *buf; 1255 bool merge = false; 1256 uint32_t num_io = 0; 1257 uint32_t io_unit_counts = 0; 1258 uint32_t merged_io_idx = 0; 1259 uint32_t i; 1260 1261 /* The merged_io_desc value is defined here to contain four elements, 1262 * and the chunk size must be four times the maximum of the io unit. 1263 * if chunk size is too big, don't merge IO. 1264 */ 1265 if (vol->backing_io_units_per_chunk > 4) { 1266 _issue_backing_ops_without_merge(req, vol, next_fn, is_write); 1267 return; 1268 } 1269 1270 if (req->chunk_is_compressed) { 1271 iov = req->comp_buf_iov; 1272 buf = req->comp_buf; 1273 } else { 1274 iov = req->decomp_buf_iov; 1275 buf = req->decomp_buf; 1276 } 1277 1278 for (i = 0; i < req->num_io_units; i++) { 1279 if (!merge) { 1280 merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i]; 1281 merged_io_desc[merged_io_idx].num_io_units = 1; 1282 num_io++; 1283 } 1284 1285 if (i + 1 == req->num_io_units) { 1286 break; 1287 } 1288 1289 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) { 1290 merged_io_desc[merged_io_idx].num_io_units += 1; 1291 merge = true; 1292 continue; 1293 } 1294 merge = false; 1295 merged_io_idx++; 1296 } 1297 1298 req->num_backing_ops = num_io; 1299 req->backing_cb_args.cb_fn = next_fn; 1300 req->backing_cb_args.cb_arg = req; 1301 for (i = 0; i < num_io; i++) { 1302 backing_io = _reduce_vol_req_get_backing_io(req, i); 1303 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size; 1304 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units; 1305 backing_io->dev = vol->backing_dev; 1306 backing_io->iov = &iov[i]; 1307 backing_io->iovcnt = 1; 1308 backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit; 1309 backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units; 1310 backing_io->backing_cb_args = &req->backing_cb_args; 1311 if (is_write) { 1312 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1313 } else { 1314 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1315 } 1316 vol->backing_dev->submit_backing_io(backing_io); 1317 1318 /* Collects the number of processed I/O. */ 1319 io_unit_counts += merged_io_desc[i].num_io_units; 1320 } 1321 } 1322 1323 static void 1324 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1325 uint32_t compressed_size) 1326 { 1327 struct spdk_reduce_vol *vol = req->vol; 1328 uint32_t i; 1329 uint64_t chunk_offset, remainder, free_index, total_len = 0; 1330 uint8_t *buf; 1331 bool success; 1332 int j; 1333 1334 success = queue_dequeue(&vol->free_chunks_queue, &free_index); 1335 if (success) { 1336 req->chunk_map_index = free_index; 1337 } else { 1338 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 1339 vol->find_chunk_offset); 1340 vol->find_chunk_offset = req->chunk_map_index + 1; 1341 } 1342 1343 /* TODO: fail if no chunk map found - but really this should not happen if we 1344 * size the number of requests similarly to number of extra chunk maps 1345 */ 1346 assert(req->chunk_map_index != UINT32_MAX); 1347 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1348 1349 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1350 req->num_io_units = spdk_divide_round_up(compressed_size, 1351 vol->params.backing_io_unit_size); 1352 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1353 req->chunk->compressed_size = 1354 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1355 1356 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1357 if (req->chunk_is_compressed == false) { 1358 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1359 buf = req->decomp_buf; 1360 total_len = chunk_offset * vol->params.logical_block_size; 1361 1362 /* zero any offset into chunk */ 1363 if (req->rmw == false && chunk_offset) { 1364 memset(buf, 0, total_len); 1365 } 1366 buf += total_len; 1367 1368 /* copy the data */ 1369 for (j = 0; j < req->iovcnt; j++) { 1370 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1371 buf += req->iov[j].iov_len; 1372 total_len += req->iov[j].iov_len; 1373 } 1374 1375 /* zero any remainder */ 1376 remainder = vol->params.chunk_size - total_len; 1377 total_len += remainder; 1378 if (req->rmw == false && remainder) { 1379 memset(buf, 0, remainder); 1380 } 1381 assert(total_len == vol->params.chunk_size); 1382 } 1383 1384 for (i = 0; i < req->num_io_units; i++) { 1385 success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index); 1386 if (success) { 1387 req->chunk->io_unit_index[i] = free_index; 1388 } else { 1389 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 1390 vol->find_block_offset); 1391 vol->find_block_offset = req->chunk->io_unit_index[i] + 1; 1392 } 1393 /* TODO: fail if no backing block found - but really this should also not 1394 * happen (see comment above). 1395 */ 1396 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1397 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1398 } 1399 1400 _issue_backing_ops(req, vol, next_fn, true /* write */); 1401 } 1402 1403 static void 1404 _write_compress_done(void *_req, int reduce_errno) 1405 { 1406 struct spdk_reduce_vol_request *req = _req; 1407 1408 /* Negative reduce_errno indicates failure for compression operations. 1409 * Just write the uncompressed data instead. Force this to happen 1410 * by just passing the full chunk size to _reduce_vol_write_chunk. 1411 * When it sees the data couldn't be compressed, it will just write 1412 * the uncompressed buffer to disk. 1413 */ 1414 if (reduce_errno < 0) { 1415 req->backing_cb_args.output_size = req->vol->params.chunk_size; 1416 } 1417 1418 _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size); 1419 } 1420 1421 static void 1422 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1423 { 1424 struct spdk_reduce_vol *vol = req->vol; 1425 1426 req->backing_cb_args.cb_fn = next_fn; 1427 req->backing_cb_args.cb_arg = req; 1428 req->comp_buf_iov[0].iov_base = req->comp_buf; 1429 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1430 vol->backing_dev->compress(vol->backing_dev, 1431 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1432 &req->backing_cb_args); 1433 } 1434 1435 static void 1436 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1437 { 1438 struct spdk_reduce_vol *vol = req->vol; 1439 1440 req->backing_cb_args.cb_fn = next_fn; 1441 req->backing_cb_args.cb_arg = req; 1442 req->comp_buf_iov[0].iov_base = req->comp_buf; 1443 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1444 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1445 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1446 vol->backing_dev->decompress(vol->backing_dev, 1447 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1448 &req->backing_cb_args); 1449 } 1450 1451 static void 1452 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1453 { 1454 struct spdk_reduce_vol *vol = req->vol; 1455 uint64_t chunk_offset, remainder = 0; 1456 uint64_t ttl_len = 0; 1457 size_t iov_len; 1458 int i; 1459 1460 req->decomp_iovcnt = 0; 1461 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1462 1463 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1464 * if at least one of the conditions below is true: 1465 * 1. User's buffer is fragmented 1466 * 2. Length of the user's buffer is less than the chunk 1467 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1468 iov_len = req->iov[0].iov_len; 1469 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1470 req->iov[0].iov_len < vol->params.chunk_size || 1471 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len)); 1472 if (req->copy_after_decompress) { 1473 req->decomp_iov[0].iov_base = req->decomp_buf; 1474 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1475 req->decomp_iovcnt = 1; 1476 goto decompress; 1477 } 1478 1479 if (chunk_offset) { 1480 /* first iov point to our scratch buffer for any offset into the chunk */ 1481 req->decomp_iov[0].iov_base = req->decomp_buf; 1482 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1483 ttl_len += req->decomp_iov[0].iov_len; 1484 req->decomp_iovcnt = 1; 1485 } 1486 1487 /* now the user data iov, direct to the user buffer */ 1488 for (i = 0; i < req->iovcnt; i++) { 1489 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1490 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1491 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1492 } 1493 req->decomp_iovcnt += req->iovcnt; 1494 1495 /* send the rest of the chunk to our scratch buffer */ 1496 remainder = vol->params.chunk_size - ttl_len; 1497 if (remainder) { 1498 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1499 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1500 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1501 req->decomp_iovcnt++; 1502 } 1503 assert(ttl_len == vol->params.chunk_size); 1504 1505 decompress: 1506 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1507 req->backing_cb_args.cb_fn = next_fn; 1508 req->backing_cb_args.cb_arg = req; 1509 req->comp_buf_iov[0].iov_base = req->comp_buf; 1510 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1511 vol->backing_dev->decompress(vol->backing_dev, 1512 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1513 &req->backing_cb_args); 1514 } 1515 1516 static inline void 1517 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1518 { 1519 struct spdk_reduce_vol *vol = req->vol; 1520 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1521 uint64_t chunk_offset, ttl_len = 0; 1522 uint64_t remainder = 0; 1523 char *copy_offset = NULL; 1524 uint32_t lbsize = vol->params.logical_block_size; 1525 int i; 1526 1527 req->decomp_iov[0].iov_base = req->decomp_buf; 1528 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1529 req->decomp_iovcnt = 1; 1530 copy_offset = req->decomp_iov[0].iov_base; 1531 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1532 1533 if (chunk_offset) { 1534 ttl_len += chunk_offset * lbsize; 1535 /* copy_offset already points to padding buffer if zero_paddings=false */ 1536 if (zero_paddings) { 1537 memcpy(copy_offset, padding_buffer, ttl_len); 1538 } 1539 copy_offset += ttl_len; 1540 } 1541 1542 /* now the user data iov, direct from the user buffer */ 1543 for (i = 0; i < req->iovcnt; i++) { 1544 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1545 copy_offset += req->iov[i].iov_len; 1546 ttl_len += req->iov[i].iov_len; 1547 } 1548 1549 remainder = vol->params.chunk_size - ttl_len; 1550 if (remainder) { 1551 /* copy_offset already points to padding buffer if zero_paddings=false */ 1552 if (zero_paddings) { 1553 memcpy(copy_offset, padding_buffer + ttl_len, remainder); 1554 } 1555 ttl_len += remainder; 1556 } 1557 1558 assert(ttl_len == req->vol->params.chunk_size); 1559 } 1560 1561 /* This function can be called when we are compressing a new data or in case of read-modify-write 1562 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1563 * should point to already read and decompressed buffer */ 1564 static inline void 1565 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1566 { 1567 struct spdk_reduce_vol *vol = req->vol; 1568 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1569 uint64_t chunk_offset, ttl_len = 0; 1570 uint64_t remainder = 0; 1571 uint32_t lbsize = vol->params.logical_block_size; 1572 size_t iov_len; 1573 int i; 1574 1575 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1576 * if at least one of the conditions below is true: 1577 * 1. User's buffer is fragmented 1578 * 2. Length of the user's buffer is less than the chunk 1579 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1580 iov_len = req->iov[0].iov_len; 1581 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1582 req->iov[0].iov_len < vol->params.chunk_size || 1583 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) { 1584 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1585 return; 1586 } 1587 1588 req->decomp_iovcnt = 0; 1589 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1590 1591 if (chunk_offset != 0) { 1592 ttl_len += chunk_offset * lbsize; 1593 req->decomp_iov[0].iov_base = padding_buffer; 1594 req->decomp_iov[0].iov_len = ttl_len; 1595 req->decomp_iovcnt = 1; 1596 } 1597 1598 /* now the user data iov, direct from the user buffer */ 1599 for (i = 0; i < req->iovcnt; i++) { 1600 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1601 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1602 ttl_len += req->iov[i].iov_len; 1603 } 1604 req->decomp_iovcnt += req->iovcnt; 1605 1606 remainder = vol->params.chunk_size - ttl_len; 1607 if (remainder) { 1608 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1609 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1610 req->decomp_iovcnt++; 1611 ttl_len += remainder; 1612 } 1613 assert(ttl_len == req->vol->params.chunk_size); 1614 } 1615 1616 static void 1617 _write_decompress_done(void *_req, int reduce_errno) 1618 { 1619 struct spdk_reduce_vol_request *req = _req; 1620 1621 /* Negative reduce_errno indicates failure for compression operations. */ 1622 if (reduce_errno < 0) { 1623 _reduce_vol_complete_req(req, reduce_errno); 1624 return; 1625 } 1626 1627 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1628 * represents the output_size. 1629 */ 1630 if (req->backing_cb_args.output_size != req->vol->params.chunk_size) { 1631 _reduce_vol_complete_req(req, -EIO); 1632 return; 1633 } 1634 1635 _prepare_compress_chunk(req, false); 1636 _reduce_vol_compress_chunk(req, _write_compress_done); 1637 } 1638 1639 static void 1640 _write_read_done(void *_req, int reduce_errno) 1641 { 1642 struct spdk_reduce_vol_request *req = _req; 1643 1644 if (reduce_errno != 0) { 1645 req->reduce_errno = reduce_errno; 1646 } 1647 1648 assert(req->num_backing_ops > 0); 1649 if (--req->num_backing_ops > 0) { 1650 return; 1651 } 1652 1653 if (req->reduce_errno != 0) { 1654 _reduce_vol_complete_req(req, req->reduce_errno); 1655 return; 1656 } 1657 1658 if (req->chunk_is_compressed) { 1659 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1660 } else { 1661 req->backing_cb_args.output_size = req->chunk->compressed_size; 1662 1663 _write_decompress_done(req, 0); 1664 } 1665 } 1666 1667 static void 1668 _read_decompress_done(void *_req, int reduce_errno) 1669 { 1670 struct spdk_reduce_vol_request *req = _req; 1671 struct spdk_reduce_vol *vol = req->vol; 1672 1673 /* Negative reduce_errno indicates failure for compression operations. */ 1674 if (reduce_errno < 0) { 1675 _reduce_vol_complete_req(req, reduce_errno); 1676 return; 1677 } 1678 1679 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1680 * represents the output_size. 1681 */ 1682 if (req->backing_cb_args.output_size != vol->params.chunk_size) { 1683 _reduce_vol_complete_req(req, -EIO); 1684 return; 1685 } 1686 1687 if (req->copy_after_decompress) { 1688 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1689 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1690 int i; 1691 1692 for (i = 0; i < req->iovcnt; i++) { 1693 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1694 decomp_buffer += req->iov[i].iov_len; 1695 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1696 } 1697 } 1698 1699 _reduce_vol_complete_req(req, 0); 1700 } 1701 1702 static void 1703 _read_read_done(void *_req, int reduce_errno) 1704 { 1705 struct spdk_reduce_vol_request *req = _req; 1706 uint64_t chunk_offset; 1707 uint8_t *buf; 1708 int i; 1709 1710 if (reduce_errno != 0) { 1711 req->reduce_errno = reduce_errno; 1712 } 1713 1714 assert(req->num_backing_ops > 0); 1715 if (--req->num_backing_ops > 0) { 1716 return; 1717 } 1718 1719 if (req->reduce_errno != 0) { 1720 _reduce_vol_complete_req(req, req->reduce_errno); 1721 return; 1722 } 1723 1724 if (req->chunk_is_compressed) { 1725 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1726 } else { 1727 1728 /* If the chunk was compressed, the data would have been sent to the 1729 * host buffers by the decompression operation, if not we need to memcpy here. 1730 */ 1731 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1732 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1733 for (i = 0; i < req->iovcnt; i++) { 1734 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1735 buf += req->iov[i].iov_len; 1736 } 1737 1738 req->backing_cb_args.output_size = req->chunk->compressed_size; 1739 1740 _read_decompress_done(req, 0); 1741 } 1742 } 1743 1744 static void 1745 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1746 { 1747 struct spdk_reduce_vol *vol = req->vol; 1748 1749 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1750 assert(req->chunk_map_index != UINT32_MAX); 1751 1752 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1753 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1754 vol->params.backing_io_unit_size); 1755 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1756 1757 _issue_backing_ops(req, vol, next_fn, false /* read */); 1758 } 1759 1760 static bool 1761 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1762 uint64_t length) 1763 { 1764 uint64_t size = 0; 1765 int i; 1766 1767 if (iovcnt > REDUCE_MAX_IOVECS) { 1768 return false; 1769 } 1770 1771 for (i = 0; i < iovcnt; i++) { 1772 size += iov[i].iov_len; 1773 } 1774 1775 return size == (length * vol->params.logical_block_size); 1776 } 1777 1778 static bool 1779 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1780 { 1781 struct spdk_reduce_vol_request *req; 1782 1783 TAILQ_FOREACH(req, &vol->executing_requests, tailq) { 1784 if (logical_map_index == req->logical_map_index) { 1785 return true; 1786 } 1787 } 1788 1789 return false; 1790 } 1791 1792 static void 1793 _start_readv_request(struct spdk_reduce_vol_request *req) 1794 { 1795 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1796 _reduce_vol_read_chunk(req, _read_read_done); 1797 } 1798 1799 void 1800 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1801 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1802 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1803 { 1804 struct spdk_reduce_vol_request *req; 1805 uint64_t logical_map_index; 1806 bool overlapped; 1807 int i; 1808 1809 if (length == 0) { 1810 cb_fn(cb_arg, 0); 1811 return; 1812 } 1813 1814 if (_request_spans_chunk_boundary(vol, offset, length)) { 1815 cb_fn(cb_arg, -EINVAL); 1816 return; 1817 } 1818 1819 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1820 cb_fn(cb_arg, -EINVAL); 1821 return; 1822 } 1823 1824 logical_map_index = offset / vol->logical_blocks_per_chunk; 1825 overlapped = _check_overlap(vol, logical_map_index); 1826 1827 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1828 /* 1829 * This chunk hasn't been allocated. So treat the data as all 1830 * zeroes for this chunk - do the memset and immediately complete 1831 * the operation. 1832 */ 1833 for (i = 0; i < iovcnt; i++) { 1834 memset(iov[i].iov_base, 0, iov[i].iov_len); 1835 } 1836 cb_fn(cb_arg, 0); 1837 return; 1838 } 1839 1840 req = TAILQ_FIRST(&vol->free_requests); 1841 if (req == NULL) { 1842 cb_fn(cb_arg, -ENOMEM); 1843 return; 1844 } 1845 1846 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1847 req->type = REDUCE_IO_READV; 1848 req->vol = vol; 1849 req->iov = iov; 1850 req->iovcnt = iovcnt; 1851 req->offset = offset; 1852 req->logical_map_index = logical_map_index; 1853 req->length = length; 1854 req->copy_after_decompress = false; 1855 req->cb_fn = cb_fn; 1856 req->cb_arg = cb_arg; 1857 1858 if (!overlapped) { 1859 _start_readv_request(req); 1860 } else { 1861 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1862 } 1863 } 1864 1865 static void 1866 _start_writev_request(struct spdk_reduce_vol_request *req) 1867 { 1868 struct spdk_reduce_vol *vol = req->vol; 1869 1870 TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); 1871 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1872 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1873 /* Read old chunk, then overwrite with data from this write 1874 * operation. 1875 */ 1876 req->rmw = true; 1877 _reduce_vol_read_chunk(req, _write_read_done); 1878 return; 1879 } 1880 } 1881 1882 req->rmw = false; 1883 1884 _prepare_compress_chunk(req, true); 1885 _reduce_vol_compress_chunk(req, _write_compress_done); 1886 } 1887 1888 void 1889 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1890 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1891 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1892 { 1893 struct spdk_reduce_vol_request *req; 1894 uint64_t logical_map_index; 1895 bool overlapped; 1896 1897 if (length == 0) { 1898 cb_fn(cb_arg, 0); 1899 return; 1900 } 1901 1902 if (_request_spans_chunk_boundary(vol, offset, length)) { 1903 cb_fn(cb_arg, -EINVAL); 1904 return; 1905 } 1906 1907 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1908 cb_fn(cb_arg, -EINVAL); 1909 return; 1910 } 1911 1912 logical_map_index = offset / vol->logical_blocks_per_chunk; 1913 overlapped = _check_overlap(vol, logical_map_index); 1914 1915 req = TAILQ_FIRST(&vol->free_requests); 1916 if (req == NULL) { 1917 cb_fn(cb_arg, -ENOMEM); 1918 return; 1919 } 1920 1921 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1922 req->type = REDUCE_IO_WRITEV; 1923 req->vol = vol; 1924 req->iov = iov; 1925 req->iovcnt = iovcnt; 1926 req->offset = offset; 1927 req->logical_map_index = logical_map_index; 1928 req->length = length; 1929 req->copy_after_decompress = false; 1930 req->cb_fn = cb_fn; 1931 req->cb_arg = cb_arg; 1932 1933 if (!overlapped) { 1934 _start_writev_request(req); 1935 } else { 1936 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1937 } 1938 } 1939 1940 const struct spdk_reduce_vol_params * 1941 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1942 { 1943 return &vol->params; 1944 } 1945 1946 const char * 1947 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol) 1948 { 1949 return vol->pm_file.path; 1950 } 1951 1952 void 1953 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1954 { 1955 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1956 uint32_t struct_size; 1957 uint64_t chunk_map_size; 1958 1959 SPDK_NOTICELOG("vol info:\n"); 1960 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1961 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1962 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1963 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1964 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1965 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1966 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1967 vol->params.vol_size / vol->params.chunk_size); 1968 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1969 vol->params.backing_io_unit_size); 1970 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1971 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1972 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1973 1974 SPDK_NOTICELOG("pmem info:\n"); 1975 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1976 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1977 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1978 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1979 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1980 vol->params.chunk_size); 1981 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1982 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1983 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1984 vol->params.backing_io_unit_size); 1985 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1986 } 1987 1988 SPDK_LOG_REGISTER_COMPONENT(reduce) 1989