1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "queue_internal.h" 10 11 #include "spdk/reduce.h" 12 #include "spdk/env.h" 13 #include "spdk/string.h" 14 #include "spdk/bit_array.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/memory.h" 18 #include "spdk/tree.h" 19 20 #include "libpmem.h" 21 22 /* Always round up the size of the PM region to the nearest cacheline. */ 23 #define REDUCE_PM_SIZE_ALIGNMENT 64 24 25 /* Offset into the backing device where the persistent memory file's path is stored. */ 26 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 27 28 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 29 30 #define REDUCE_NUM_VOL_REQUESTS 256 31 32 /* Structure written to offset 0 of both the pm file and the backing device. */ 33 struct spdk_reduce_vol_superblock { 34 uint8_t signature[8]; 35 struct spdk_reduce_vol_params params; 36 uint8_t reserved[4040]; 37 }; 38 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 39 40 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 41 /* null terminator counts one */ 42 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 43 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 44 45 #define REDUCE_PATH_MAX 4096 46 47 #define REDUCE_ZERO_BUF_SIZE 0x100000 48 49 /** 50 * Describes a persistent memory file used to hold metadata associated with a 51 * compressed volume. 52 */ 53 struct spdk_reduce_pm_file { 54 char path[REDUCE_PATH_MAX]; 55 void *pm_buf; 56 int pm_is_pmem; 57 uint64_t size; 58 }; 59 60 #define REDUCE_IO_READV 1 61 #define REDUCE_IO_WRITEV 2 62 #define REDUCE_IO_UNMAP 3 63 64 struct spdk_reduce_chunk_map { 65 uint32_t compressed_size; 66 uint32_t reserved; 67 uint64_t io_unit_index[0]; 68 }; 69 70 struct spdk_reduce_vol_request { 71 /** 72 * Scratch buffer used for uncompressed chunk. This is used for: 73 * 1) source buffer for compression operations 74 * 2) destination buffer for decompression operations 75 * 3) data buffer when writing uncompressed chunk to disk 76 * 4) data buffer when reading uncompressed chunk from disk 77 */ 78 uint8_t *decomp_buf; 79 struct iovec *decomp_buf_iov; 80 81 /** 82 * These are used to construct the iovecs that are sent to 83 * the decomp engine, they point to a mix of the scratch buffer 84 * and user buffer 85 */ 86 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 87 int decomp_iovcnt; 88 89 /** 90 * Scratch buffer used for compressed chunk. This is used for: 91 * 1) destination buffer for compression operations 92 * 2) source buffer for decompression operations 93 * 3) data buffer when writing compressed chunk to disk 94 * 4) data buffer when reading compressed chunk from disk 95 */ 96 uint8_t *comp_buf; 97 struct iovec *comp_buf_iov; 98 struct iovec *iov; 99 bool rmw; 100 struct spdk_reduce_vol *vol; 101 int type; 102 int reduce_errno; 103 int iovcnt; 104 int num_backing_ops; 105 uint32_t num_io_units; 106 struct spdk_reduce_backing_io *backing_io; 107 bool chunk_is_compressed; 108 bool copy_after_decompress; 109 uint64_t offset; 110 uint64_t logical_map_index; 111 uint64_t length; 112 uint64_t chunk_map_index; 113 struct spdk_reduce_chunk_map *chunk; 114 spdk_reduce_vol_op_complete cb_fn; 115 void *cb_arg; 116 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 117 RB_ENTRY(spdk_reduce_vol_request) rbnode; 118 struct spdk_reduce_vol_cb_args backing_cb_args; 119 }; 120 121 struct spdk_reduce_vol { 122 struct spdk_reduce_vol_params params; 123 uint32_t backing_io_units_per_chunk; 124 uint32_t backing_lba_per_io_unit; 125 uint32_t logical_blocks_per_chunk; 126 struct spdk_reduce_pm_file pm_file; 127 struct spdk_reduce_backing_dev *backing_dev; 128 struct spdk_reduce_vol_superblock *backing_super; 129 struct spdk_reduce_vol_superblock *pm_super; 130 uint64_t *pm_logical_map; 131 uint64_t *pm_chunk_maps; 132 133 struct spdk_bit_array *allocated_chunk_maps; 134 /* The starting position when looking for a block from allocated_chunk_maps */ 135 uint64_t find_chunk_offset; 136 /* Cache free chunks to speed up lookup of free chunk. */ 137 struct reduce_queue free_chunks_queue; 138 struct spdk_bit_array *allocated_backing_io_units; 139 /* The starting position when looking for a block from allocated_backing_io_units */ 140 uint64_t find_block_offset; 141 /* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */ 142 struct reduce_queue free_backing_blocks_queue; 143 144 struct spdk_reduce_vol_request *request_mem; 145 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 146 RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests; 147 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 148 149 /* Single contiguous buffer used for all request buffers for this volume. */ 150 uint8_t *buf_mem; 151 struct iovec *buf_iov_mem; 152 /* Single contiguous buffer used for backing io buffers for this volume. */ 153 uint8_t *buf_backing_io_mem; 154 }; 155 156 static void _start_readv_request(struct spdk_reduce_vol_request *req); 157 static void _start_writev_request(struct spdk_reduce_vol_request *req); 158 static uint8_t *g_zero_buf; 159 static int g_vol_count = 0; 160 161 /* 162 * Allocate extra metadata chunks and corresponding backing io units to account for 163 * outstanding IO in worst case scenario where logical map is completely allocated 164 * and no data can be compressed. We need extra chunks in this case to handle 165 * in-flight writes since reduce never writes data in place. 166 */ 167 #define REDUCE_NUM_EXTRA_CHUNKS 128 168 169 static void 170 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 171 { 172 if (vol->pm_file.pm_is_pmem) { 173 pmem_persist(addr, len); 174 } else { 175 pmem_msync(addr, len); 176 } 177 } 178 179 static uint64_t 180 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 181 { 182 uint64_t chunks_in_logical_map, logical_map_size; 183 184 chunks_in_logical_map = vol_size / chunk_size; 185 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 186 187 /* Round up to next cacheline. */ 188 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 189 REDUCE_PM_SIZE_ALIGNMENT; 190 } 191 192 static uint64_t 193 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 194 { 195 uint64_t num_chunks; 196 197 num_chunks = vol_size / chunk_size; 198 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 199 200 return num_chunks; 201 } 202 203 static inline uint32_t 204 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 205 { 206 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 207 } 208 209 static uint64_t 210 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 211 { 212 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 213 214 num_chunks = _get_total_chunks(vol_size, chunk_size); 215 io_units_per_chunk = chunk_size / backing_io_unit_size; 216 217 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 218 219 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 220 REDUCE_PM_SIZE_ALIGNMENT; 221 } 222 223 static struct spdk_reduce_chunk_map * 224 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 225 { 226 uintptr_t chunk_map_addr; 227 228 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 229 230 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 231 chunk_map_addr += chunk_map_index * 232 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 233 234 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 235 } 236 237 static int 238 _validate_vol_params(struct spdk_reduce_vol_params *params) 239 { 240 if (params->vol_size > 0) { 241 /** 242 * User does not pass in the vol size - it gets calculated by libreduce from 243 * values in this structure plus the size of the backing device. 244 */ 245 return -EINVAL; 246 } 247 248 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 249 params->logical_block_size == 0) { 250 return -EINVAL; 251 } 252 253 /* Chunk size must be an even multiple of the backing io unit size. */ 254 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 255 return -EINVAL; 256 } 257 258 /* Chunk size must be an even multiple of the logical block size. */ 259 if ((params->chunk_size % params->logical_block_size) != 0) { 260 return -1; 261 } 262 263 return 0; 264 } 265 266 static uint64_t 267 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 268 { 269 uint64_t num_chunks; 270 271 num_chunks = backing_dev_size / chunk_size; 272 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 273 return 0; 274 } 275 276 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 277 return num_chunks * chunk_size; 278 } 279 280 static uint64_t 281 _get_pm_file_size(struct spdk_reduce_vol_params *params) 282 { 283 uint64_t total_pm_size; 284 285 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 286 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 287 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 288 params->backing_io_unit_size); 289 return total_pm_size; 290 } 291 292 const struct spdk_uuid * 293 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 294 { 295 return &vol->params.uuid; 296 } 297 298 static void 299 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 300 { 301 uint64_t logical_map_size; 302 303 /* Superblock is at the beginning of the pm file. */ 304 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 305 306 /* Logical map immediately follows the super block. */ 307 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 308 309 /* Chunks maps follow the logical map. */ 310 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 311 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 312 } 313 314 /* We need 2 iovs during load - one for the superblock, another for the path */ 315 #define LOAD_IOV_COUNT 2 316 317 struct reduce_init_load_ctx { 318 struct spdk_reduce_vol *vol; 319 struct spdk_reduce_vol_cb_args backing_cb_args; 320 spdk_reduce_vol_op_with_handle_complete cb_fn; 321 void *cb_arg; 322 struct iovec iov[LOAD_IOV_COUNT]; 323 void *path; 324 struct spdk_reduce_backing_io *backing_io; 325 }; 326 327 static inline bool 328 _addr_crosses_huge_page(const void *addr, size_t *size) 329 { 330 size_t _size; 331 uint64_t rc; 332 333 assert(size); 334 335 _size = *size; 336 rc = spdk_vtophys(addr, size); 337 338 return rc == SPDK_VTOPHYS_ERROR || _size != *size; 339 } 340 341 static inline int 342 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) 343 { 344 uint8_t *addr; 345 size_t size_tmp = buffer_size; 346 347 addr = *_addr; 348 349 /* Verify that addr + buffer_size doesn't cross huge page boundary */ 350 if (_addr_crosses_huge_page(addr, &size_tmp)) { 351 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. 352 * Skip remaining bytes and continue from the beginning of the next page */ 353 addr += size_tmp; 354 } 355 356 if (addr + buffer_size > addr_range) { 357 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); 358 return -ERANGE; 359 } 360 361 *vol_buffer = addr; 362 *_addr = addr + buffer_size; 363 364 return 0; 365 } 366 367 static int 368 _allocate_vol_requests(struct spdk_reduce_vol *vol) 369 { 370 struct spdk_reduce_vol_request *req; 371 struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev; 372 uint32_t reqs_in_2mb_page, huge_pages_needed; 373 uint8_t *buffer, *buffer_end; 374 int i = 0; 375 int rc = 0; 376 377 /* It is needed to allocate comp and decomp buffers so that they do not cross physical 378 * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not 379 * necessarily power of 2 380 * Allocate 2x since we need buffers for both read/write and compress/decompress 381 * intermediate buffers. */ 382 reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); 383 if (!reqs_in_2mb_page) { 384 return -EINVAL; 385 } 386 huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); 387 388 vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); 389 if (vol->buf_mem == NULL) { 390 return -ENOMEM; 391 } 392 393 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 394 if (vol->request_mem == NULL) { 395 spdk_free(vol->buf_mem); 396 vol->buf_mem = NULL; 397 return -ENOMEM; 398 } 399 400 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 401 * buffers. 402 */ 403 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 404 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 405 if (vol->buf_iov_mem == NULL) { 406 free(vol->request_mem); 407 spdk_free(vol->buf_mem); 408 vol->request_mem = NULL; 409 vol->buf_mem = NULL; 410 return -ENOMEM; 411 } 412 413 vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) + 414 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk); 415 if (vol->buf_backing_io_mem == NULL) { 416 free(vol->request_mem); 417 free(vol->buf_iov_mem); 418 spdk_free(vol->buf_mem); 419 vol->request_mem = NULL; 420 vol->buf_iov_mem = NULL; 421 vol->buf_mem = NULL; 422 return -ENOMEM; 423 } 424 425 buffer = vol->buf_mem; 426 buffer_end = buffer + VALUE_2MB * huge_pages_needed; 427 428 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 429 req = &vol->request_mem[i]; 430 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 431 req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i * 432 (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) * 433 vol->backing_io_units_per_chunk); 434 435 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 436 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 437 438 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); 439 if (rc) { 440 SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 441 vol->buf_mem, buffer_end); 442 break; 443 } 444 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); 445 if (rc) { 446 SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 447 vol->buf_mem, buffer_end); 448 break; 449 } 450 } 451 452 if (rc) { 453 free(vol->buf_backing_io_mem); 454 free(vol->buf_iov_mem); 455 free(vol->request_mem); 456 spdk_free(vol->buf_mem); 457 vol->buf_mem = NULL; 458 vol->buf_backing_io_mem = NULL; 459 vol->buf_iov_mem = NULL; 460 vol->request_mem = NULL; 461 } 462 463 return rc; 464 } 465 466 static void 467 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 468 { 469 if (ctx != NULL) { 470 spdk_free(ctx->path); 471 free(ctx->backing_io); 472 free(ctx); 473 } 474 475 if (vol != NULL) { 476 if (vol->pm_file.pm_buf != NULL) { 477 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 478 } 479 480 spdk_free(vol->backing_super); 481 spdk_bit_array_free(&vol->allocated_chunk_maps); 482 spdk_bit_array_free(&vol->allocated_backing_io_units); 483 free(vol->request_mem); 484 free(vol->buf_backing_io_mem); 485 free(vol->buf_iov_mem); 486 spdk_free(vol->buf_mem); 487 free(vol); 488 } 489 } 490 491 static int 492 _alloc_zero_buff(void) 493 { 494 int rc = 0; 495 496 /* The zero buffer is shared between all volumes and just used 497 * for reads so allocate one global instance here if not already 498 * allocated when another vol init'd or loaded. 499 */ 500 if (g_vol_count++ == 0) { 501 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 502 64, NULL, SPDK_ENV_LCORE_ID_ANY, 503 SPDK_MALLOC_DMA); 504 if (g_zero_buf == NULL) { 505 g_vol_count--; 506 rc = -ENOMEM; 507 } 508 } 509 return rc; 510 } 511 512 static void 513 _init_write_super_cpl(void *cb_arg, int reduce_errno) 514 { 515 struct reduce_init_load_ctx *init_ctx = cb_arg; 516 int rc = 0; 517 518 if (reduce_errno != 0) { 519 rc = reduce_errno; 520 goto err; 521 } 522 523 rc = _allocate_vol_requests(init_ctx->vol); 524 if (rc != 0) { 525 goto err; 526 } 527 528 rc = _alloc_zero_buff(); 529 if (rc != 0) { 530 goto err; 531 } 532 533 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, rc); 534 /* Only clean up the ctx - the vol has been passed to the application 535 * for use now that initialization was successful. 536 */ 537 _init_load_cleanup(NULL, init_ctx); 538 539 return; 540 err: 541 if (unlink(init_ctx->path)) { 542 SPDK_ERRLOG("%s could not be unlinked: %s\n", 543 (char *)init_ctx->path, spdk_strerror(errno)); 544 } 545 546 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 547 _init_load_cleanup(init_ctx->vol, init_ctx); 548 } 549 550 static void 551 _init_write_path_cpl(void *cb_arg, int reduce_errno) 552 { 553 struct reduce_init_load_ctx *init_ctx = cb_arg; 554 struct spdk_reduce_vol *vol = init_ctx->vol; 555 struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io; 556 557 if (reduce_errno != 0) { 558 _init_write_super_cpl(cb_arg, reduce_errno); 559 return; 560 } 561 562 init_ctx->iov[0].iov_base = vol->backing_super; 563 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 564 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 565 init_ctx->backing_cb_args.cb_arg = init_ctx; 566 567 backing_io->dev = vol->backing_dev; 568 backing_io->iov = init_ctx->iov; 569 backing_io->iovcnt = 1; 570 backing_io->lba = 0; 571 backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen; 572 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 573 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 574 575 vol->backing_dev->submit_backing_io(backing_io); 576 } 577 578 static int 579 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 580 { 581 uint64_t total_chunks, total_backing_io_units; 582 uint32_t i, num_metadata_io_units; 583 584 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 585 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 586 vol->find_chunk_offset = 0; 587 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 588 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 589 vol->find_block_offset = 0; 590 591 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 592 return -ENOMEM; 593 } 594 595 /* Set backing io unit bits associated with metadata. */ 596 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 597 vol->params.backing_io_unit_size; 598 for (i = 0; i < num_metadata_io_units; i++) { 599 spdk_bit_array_set(vol->allocated_backing_io_units, i); 600 } 601 602 return 0; 603 } 604 605 static int 606 overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2) 607 { 608 return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index > 609 req2->logical_map_index); 610 } 611 RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp); 612 613 614 void 615 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 616 struct spdk_reduce_backing_dev *backing_dev, 617 const char *pm_file_dir, 618 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 619 { 620 struct spdk_reduce_vol *vol; 621 struct reduce_init_load_ctx *init_ctx; 622 struct spdk_reduce_backing_io *backing_io; 623 uint64_t backing_dev_size; 624 size_t mapped_len; 625 int dir_len, max_dir_len, rc; 626 627 /* We need to append a path separator and the UUID to the supplied 628 * path. 629 */ 630 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 631 dir_len = strnlen(pm_file_dir, max_dir_len); 632 /* Strip trailing slash if the user provided one - we will add it back 633 * later when appending the filename. 634 */ 635 if (pm_file_dir[dir_len - 1] == '/') { 636 dir_len--; 637 } 638 if (dir_len == max_dir_len) { 639 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 640 cb_fn(cb_arg, NULL, -EINVAL); 641 return; 642 } 643 644 rc = _validate_vol_params(params); 645 if (rc != 0) { 646 SPDK_ERRLOG("invalid vol params\n"); 647 cb_fn(cb_arg, NULL, rc); 648 return; 649 } 650 651 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 652 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 653 if (params->vol_size == 0) { 654 SPDK_ERRLOG("backing device is too small\n"); 655 cb_fn(cb_arg, NULL, -EINVAL); 656 return; 657 } 658 659 if (backing_dev->submit_backing_io == NULL) { 660 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 661 cb_fn(cb_arg, NULL, -EINVAL); 662 return; 663 } 664 665 vol = calloc(1, sizeof(*vol)); 666 if (vol == NULL) { 667 cb_fn(cb_arg, NULL, -ENOMEM); 668 return; 669 } 670 671 TAILQ_INIT(&vol->free_requests); 672 RB_INIT(&vol->executing_requests); 673 TAILQ_INIT(&vol->queued_requests); 674 queue_init(&vol->free_chunks_queue); 675 queue_init(&vol->free_backing_blocks_queue); 676 677 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 678 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 679 if (vol->backing_super == NULL) { 680 cb_fn(cb_arg, NULL, -ENOMEM); 681 _init_load_cleanup(vol, NULL); 682 return; 683 } 684 685 init_ctx = calloc(1, sizeof(*init_ctx)); 686 if (init_ctx == NULL) { 687 cb_fn(cb_arg, NULL, -ENOMEM); 688 _init_load_cleanup(vol, NULL); 689 return; 690 } 691 692 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 693 if (backing_io == NULL) { 694 cb_fn(cb_arg, NULL, -ENOMEM); 695 _init_load_cleanup(vol, init_ctx); 696 return; 697 } 698 init_ctx->backing_io = backing_io; 699 700 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 701 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 702 if (init_ctx->path == NULL) { 703 cb_fn(cb_arg, NULL, -ENOMEM); 704 _init_load_cleanup(vol, init_ctx); 705 return; 706 } 707 708 if (spdk_uuid_is_null(¶ms->uuid)) { 709 spdk_uuid_generate(¶ms->uuid); 710 } 711 712 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 713 vol->pm_file.path[dir_len] = '/'; 714 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 715 ¶ms->uuid); 716 vol->pm_file.size = _get_pm_file_size(params); 717 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 718 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 719 &mapped_len, &vol->pm_file.pm_is_pmem); 720 if (vol->pm_file.pm_buf == NULL) { 721 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 722 vol->pm_file.path, strerror(errno)); 723 cb_fn(cb_arg, NULL, -errno); 724 _init_load_cleanup(vol, init_ctx); 725 return; 726 } 727 728 if (vol->pm_file.size != mapped_len) { 729 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 730 vol->pm_file.size, mapped_len); 731 cb_fn(cb_arg, NULL, -ENOMEM); 732 _init_load_cleanup(vol, init_ctx); 733 return; 734 } 735 736 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 737 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 738 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 739 memcpy(&vol->params, params, sizeof(*params)); 740 741 vol->backing_dev = backing_dev; 742 743 rc = _allocate_bit_arrays(vol); 744 if (rc != 0) { 745 cb_fn(cb_arg, NULL, rc); 746 _init_load_cleanup(vol, init_ctx); 747 return; 748 } 749 750 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 751 sizeof(vol->backing_super->signature)); 752 memcpy(&vol->backing_super->params, params, sizeof(*params)); 753 754 _initialize_vol_pm_pointers(vol); 755 756 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 757 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 758 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 759 */ 760 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 761 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 762 763 init_ctx->vol = vol; 764 init_ctx->cb_fn = cb_fn; 765 init_ctx->cb_arg = cb_arg; 766 767 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 768 init_ctx->iov[0].iov_base = init_ctx->path; 769 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 770 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 771 init_ctx->backing_cb_args.cb_arg = init_ctx; 772 /* Write path to offset 4K on backing device - just after where the super 773 * block will be written. We wait until this is committed before writing the 774 * super block to guarantee we don't get the super block written without the 775 * the path if the system crashed in the middle of a write operation. 776 */ 777 backing_io->dev = vol->backing_dev; 778 backing_io->iov = init_ctx->iov; 779 backing_io->iovcnt = 1; 780 backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen; 781 backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen; 782 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 783 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 784 785 vol->backing_dev->submit_backing_io(backing_io); 786 } 787 788 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 789 790 static void 791 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 792 { 793 struct reduce_init_load_ctx *load_ctx = cb_arg; 794 struct spdk_reduce_vol *vol = load_ctx->vol; 795 uint64_t backing_dev_size; 796 uint64_t i, num_chunks, logical_map_index; 797 struct spdk_reduce_chunk_map *chunk; 798 size_t mapped_len; 799 uint32_t j; 800 int rc; 801 802 if (reduce_errno != 0) { 803 rc = reduce_errno; 804 goto error; 805 } 806 807 rc = _alloc_zero_buff(); 808 if (rc) { 809 goto error; 810 } 811 812 if (memcmp(vol->backing_super->signature, 813 SPDK_REDUCE_SIGNATURE, 814 sizeof(vol->backing_super->signature)) != 0) { 815 /* This backing device isn't a libreduce backing device. */ 816 rc = -EILSEQ; 817 goto error; 818 } 819 820 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 821 * So don't bother getting the volume ready to use - invoke the callback immediately 822 * so destroy_load_cb can delete the metadata off of the block device and delete the 823 * persistent memory file if it exists. 824 */ 825 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 826 if (load_ctx->cb_fn == (*destroy_load_cb)) { 827 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 828 _init_load_cleanup(NULL, load_ctx); 829 return; 830 } 831 832 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 833 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 834 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 835 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 836 837 rc = _allocate_bit_arrays(vol); 838 if (rc != 0) { 839 goto error; 840 } 841 842 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 843 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 844 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 845 backing_dev_size); 846 rc = -EILSEQ; 847 goto error; 848 } 849 850 vol->pm_file.size = _get_pm_file_size(&vol->params); 851 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 852 &vol->pm_file.pm_is_pmem); 853 if (vol->pm_file.pm_buf == NULL) { 854 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 855 rc = -errno; 856 goto error; 857 } 858 859 if (vol->pm_file.size != mapped_len) { 860 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 861 vol->pm_file.size, mapped_len); 862 rc = -ENOMEM; 863 goto error; 864 } 865 866 rc = _allocate_vol_requests(vol); 867 if (rc != 0) { 868 goto error; 869 } 870 871 _initialize_vol_pm_pointers(vol); 872 873 num_chunks = vol->params.vol_size / vol->params.chunk_size; 874 for (i = 0; i < num_chunks; i++) { 875 logical_map_index = vol->pm_logical_map[i]; 876 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 877 continue; 878 } 879 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 880 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 881 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 882 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 883 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 884 } 885 } 886 } 887 888 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 889 /* Only clean up the ctx - the vol has been passed to the application 890 * for use now that volume load was successful. 891 */ 892 _init_load_cleanup(NULL, load_ctx); 893 return; 894 895 error: 896 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 897 _init_load_cleanup(vol, load_ctx); 898 } 899 900 void 901 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 902 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 903 { 904 struct spdk_reduce_vol *vol; 905 struct reduce_init_load_ctx *load_ctx; 906 struct spdk_reduce_backing_io *backing_io; 907 908 if (backing_dev->submit_backing_io == NULL) { 909 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 910 cb_fn(cb_arg, NULL, -EINVAL); 911 return; 912 } 913 914 vol = calloc(1, sizeof(*vol)); 915 if (vol == NULL) { 916 cb_fn(cb_arg, NULL, -ENOMEM); 917 return; 918 } 919 920 TAILQ_INIT(&vol->free_requests); 921 RB_INIT(&vol->executing_requests); 922 TAILQ_INIT(&vol->queued_requests); 923 queue_init(&vol->free_chunks_queue); 924 queue_init(&vol->free_backing_blocks_queue); 925 926 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 927 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 928 if (vol->backing_super == NULL) { 929 _init_load_cleanup(vol, NULL); 930 cb_fn(cb_arg, NULL, -ENOMEM); 931 return; 932 } 933 934 vol->backing_dev = backing_dev; 935 936 load_ctx = calloc(1, sizeof(*load_ctx)); 937 if (load_ctx == NULL) { 938 _init_load_cleanup(vol, NULL); 939 cb_fn(cb_arg, NULL, -ENOMEM); 940 return; 941 } 942 943 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 944 if (backing_io == NULL) { 945 _init_load_cleanup(vol, load_ctx); 946 cb_fn(cb_arg, NULL, -ENOMEM); 947 return; 948 } 949 950 load_ctx->backing_io = backing_io; 951 952 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 953 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 954 if (load_ctx->path == NULL) { 955 _init_load_cleanup(vol, load_ctx); 956 cb_fn(cb_arg, NULL, -ENOMEM); 957 return; 958 } 959 960 load_ctx->vol = vol; 961 load_ctx->cb_fn = cb_fn; 962 load_ctx->cb_arg = cb_arg; 963 964 load_ctx->iov[0].iov_base = vol->backing_super; 965 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 966 load_ctx->iov[1].iov_base = load_ctx->path; 967 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 968 backing_io->dev = vol->backing_dev; 969 backing_io->iov = load_ctx->iov; 970 backing_io->iovcnt = LOAD_IOV_COUNT; 971 backing_io->lba = 0; 972 backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 973 vol->backing_dev->blocklen; 974 backing_io->backing_cb_args = &load_ctx->backing_cb_args; 975 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 976 977 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 978 load_ctx->backing_cb_args.cb_arg = load_ctx; 979 vol->backing_dev->submit_backing_io(backing_io); 980 } 981 982 void 983 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 984 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 985 { 986 if (vol == NULL) { 987 /* This indicates a programming error. */ 988 assert(false); 989 cb_fn(cb_arg, -EINVAL); 990 return; 991 } 992 993 if (--g_vol_count == 0) { 994 spdk_free(g_zero_buf); 995 } 996 assert(g_vol_count >= 0); 997 _init_load_cleanup(vol, NULL); 998 cb_fn(cb_arg, 0); 999 } 1000 1001 struct reduce_destroy_ctx { 1002 spdk_reduce_vol_op_complete cb_fn; 1003 void *cb_arg; 1004 struct spdk_reduce_vol *vol; 1005 struct spdk_reduce_vol_superblock *super; 1006 struct iovec iov; 1007 struct spdk_reduce_vol_cb_args backing_cb_args; 1008 int reduce_errno; 1009 char pm_path[REDUCE_PATH_MAX]; 1010 struct spdk_reduce_backing_io *backing_io; 1011 }; 1012 1013 static void 1014 destroy_unload_cpl(void *cb_arg, int reduce_errno) 1015 { 1016 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1017 1018 if (destroy_ctx->reduce_errno == 0) { 1019 if (unlink(destroy_ctx->pm_path)) { 1020 SPDK_ERRLOG("%s could not be unlinked: %s\n", 1021 destroy_ctx->pm_path, strerror(errno)); 1022 } 1023 } 1024 1025 /* Even if the unload somehow failed, we still pass the destroy_ctx 1026 * reduce_errno since that indicates whether or not the volume was 1027 * actually destroyed. 1028 */ 1029 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 1030 spdk_free(destroy_ctx->super); 1031 free(destroy_ctx->backing_io); 1032 free(destroy_ctx); 1033 } 1034 1035 static void 1036 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 1037 { 1038 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1039 struct spdk_reduce_vol *vol = destroy_ctx->vol; 1040 1041 destroy_ctx->reduce_errno = reduce_errno; 1042 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 1043 } 1044 1045 static void 1046 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1047 { 1048 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1049 struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io; 1050 1051 if (reduce_errno != 0) { 1052 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 1053 spdk_free(destroy_ctx->super); 1054 free(destroy_ctx); 1055 return; 1056 } 1057 1058 destroy_ctx->vol = vol; 1059 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 1060 destroy_ctx->iov.iov_base = destroy_ctx->super; 1061 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 1062 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 1063 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 1064 1065 backing_io->dev = vol->backing_dev; 1066 backing_io->iov = &destroy_ctx->iov; 1067 backing_io->iovcnt = 1; 1068 backing_io->lba = 0; 1069 backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen; 1070 backing_io->backing_cb_args = &destroy_ctx->backing_cb_args; 1071 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1072 1073 vol->backing_dev->submit_backing_io(backing_io); 1074 } 1075 1076 void 1077 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 1078 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1079 { 1080 struct reduce_destroy_ctx *destroy_ctx; 1081 struct spdk_reduce_backing_io *backing_io; 1082 1083 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 1084 if (destroy_ctx == NULL) { 1085 cb_fn(cb_arg, -ENOMEM); 1086 return; 1087 } 1088 1089 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 1090 if (backing_io == NULL) { 1091 free(destroy_ctx); 1092 cb_fn(cb_arg, -ENOMEM); 1093 return; 1094 } 1095 1096 destroy_ctx->backing_io = backing_io; 1097 1098 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 1099 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1100 if (destroy_ctx->super == NULL) { 1101 free(destroy_ctx); 1102 free(backing_io); 1103 cb_fn(cb_arg, -ENOMEM); 1104 return; 1105 } 1106 destroy_ctx->cb_fn = cb_fn; 1107 destroy_ctx->cb_arg = cb_arg; 1108 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 1109 } 1110 1111 static bool 1112 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 1113 { 1114 uint64_t start_chunk, end_chunk; 1115 1116 start_chunk = offset / vol->logical_blocks_per_chunk; 1117 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 1118 1119 return (start_chunk != end_chunk); 1120 } 1121 1122 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 1123 static void _start_unmap_request_full_chunk(void *ctx); 1124 1125 static void 1126 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 1127 { 1128 struct spdk_reduce_vol_request *next_req; 1129 struct spdk_reduce_vol *vol = req->vol; 1130 1131 req->cb_fn(req->cb_arg, reduce_errno); 1132 RB_REMOVE(executing_req_tree, &vol->executing_requests, req); 1133 1134 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 1135 if (next_req->logical_map_index == req->logical_map_index) { 1136 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 1137 if (next_req->type == REDUCE_IO_READV) { 1138 _start_readv_request(next_req); 1139 } else if (next_req->type == REDUCE_IO_WRITEV) { 1140 _start_writev_request(next_req); 1141 } else { 1142 assert(next_req->type == REDUCE_IO_UNMAP); 1143 _start_unmap_request_full_chunk(next_req); 1144 } 1145 break; 1146 } 1147 } 1148 1149 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 1150 } 1151 1152 static void 1153 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 1154 { 1155 struct spdk_reduce_chunk_map *chunk; 1156 uint64_t index; 1157 bool success; 1158 uint32_t i; 1159 1160 chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index); 1161 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 1162 index = chunk->io_unit_index[i]; 1163 if (index == REDUCE_EMPTY_MAP_ENTRY) { 1164 break; 1165 } 1166 assert(spdk_bit_array_get(vol->allocated_backing_io_units, 1167 index) == true); 1168 spdk_bit_array_clear(vol->allocated_backing_io_units, index); 1169 success = queue_enqueue(&vol->free_backing_blocks_queue, index); 1170 if (!success && index < vol->find_block_offset) { 1171 vol->find_block_offset = index; 1172 } 1173 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1174 } 1175 success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index); 1176 if (!success && chunk_map_index < vol->find_chunk_offset) { 1177 vol->find_chunk_offset = chunk_map_index; 1178 } 1179 spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index); 1180 } 1181 1182 static void 1183 _write_write_done(void *_req, int reduce_errno) 1184 { 1185 struct spdk_reduce_vol_request *req = _req; 1186 struct spdk_reduce_vol *vol = req->vol; 1187 uint64_t old_chunk_map_index; 1188 1189 if (reduce_errno != 0) { 1190 req->reduce_errno = reduce_errno; 1191 } 1192 1193 assert(req->num_backing_ops > 0); 1194 if (--req->num_backing_ops > 0) { 1195 return; 1196 } 1197 1198 if (req->reduce_errno != 0) { 1199 _reduce_vol_reset_chunk(vol, req->chunk_map_index); 1200 _reduce_vol_complete_req(req, req->reduce_errno); 1201 return; 1202 } 1203 1204 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1205 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1206 _reduce_vol_reset_chunk(vol, old_chunk_map_index); 1207 } 1208 1209 /* 1210 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1211 * becomes invalid after we update the logical map, since the old chunk map will no 1212 * longer have a reference to it in the logical map. 1213 */ 1214 1215 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1216 _reduce_persist(vol, req->chunk, 1217 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1218 1219 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1220 1221 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1222 1223 _reduce_vol_complete_req(req, 0); 1224 } 1225 1226 static struct spdk_reduce_backing_io * 1227 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index) 1228 { 1229 struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev; 1230 struct spdk_reduce_backing_io *backing_io; 1231 1232 backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io + 1233 (sizeof(*backing_io) + backing_dev->user_ctx_size) * index); 1234 1235 return backing_io; 1236 1237 } 1238 1239 struct reduce_merged_io_desc { 1240 uint64_t io_unit_index; 1241 uint32_t num_io_units; 1242 }; 1243 1244 static void 1245 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1246 reduce_request_fn next_fn, bool is_write) 1247 { 1248 struct iovec *iov; 1249 struct spdk_reduce_backing_io *backing_io; 1250 uint8_t *buf; 1251 uint32_t i; 1252 1253 if (req->chunk_is_compressed) { 1254 iov = req->comp_buf_iov; 1255 buf = req->comp_buf; 1256 } else { 1257 iov = req->decomp_buf_iov; 1258 buf = req->decomp_buf; 1259 } 1260 1261 req->num_backing_ops = req->num_io_units; 1262 req->backing_cb_args.cb_fn = next_fn; 1263 req->backing_cb_args.cb_arg = req; 1264 for (i = 0; i < req->num_io_units; i++) { 1265 backing_io = _reduce_vol_req_get_backing_io(req, i); 1266 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1267 iov[i].iov_len = vol->params.backing_io_unit_size; 1268 backing_io->dev = vol->backing_dev; 1269 backing_io->iov = &iov[i]; 1270 backing_io->iovcnt = 1; 1271 backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit; 1272 backing_io->lba_count = vol->backing_lba_per_io_unit; 1273 backing_io->backing_cb_args = &req->backing_cb_args; 1274 if (is_write) { 1275 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1276 } else { 1277 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1278 } 1279 vol->backing_dev->submit_backing_io(backing_io); 1280 } 1281 } 1282 1283 static void 1284 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1285 reduce_request_fn next_fn, bool is_write) 1286 { 1287 struct iovec *iov; 1288 struct spdk_reduce_backing_io *backing_io; 1289 struct reduce_merged_io_desc merged_io_desc[4]; 1290 uint8_t *buf; 1291 bool merge = false; 1292 uint32_t num_io = 0; 1293 uint32_t io_unit_counts = 0; 1294 uint32_t merged_io_idx = 0; 1295 uint32_t i; 1296 1297 /* The merged_io_desc value is defined here to contain four elements, 1298 * and the chunk size must be four times the maximum of the io unit. 1299 * if chunk size is too big, don't merge IO. 1300 */ 1301 if (vol->backing_io_units_per_chunk > 4) { 1302 _issue_backing_ops_without_merge(req, vol, next_fn, is_write); 1303 return; 1304 } 1305 1306 if (req->chunk_is_compressed) { 1307 iov = req->comp_buf_iov; 1308 buf = req->comp_buf; 1309 } else { 1310 iov = req->decomp_buf_iov; 1311 buf = req->decomp_buf; 1312 } 1313 1314 for (i = 0; i < req->num_io_units; i++) { 1315 if (!merge) { 1316 merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i]; 1317 merged_io_desc[merged_io_idx].num_io_units = 1; 1318 num_io++; 1319 } 1320 1321 if (i + 1 == req->num_io_units) { 1322 break; 1323 } 1324 1325 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) { 1326 merged_io_desc[merged_io_idx].num_io_units += 1; 1327 merge = true; 1328 continue; 1329 } 1330 merge = false; 1331 merged_io_idx++; 1332 } 1333 1334 req->num_backing_ops = num_io; 1335 req->backing_cb_args.cb_fn = next_fn; 1336 req->backing_cb_args.cb_arg = req; 1337 for (i = 0; i < num_io; i++) { 1338 backing_io = _reduce_vol_req_get_backing_io(req, i); 1339 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size; 1340 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units; 1341 backing_io->dev = vol->backing_dev; 1342 backing_io->iov = &iov[i]; 1343 backing_io->iovcnt = 1; 1344 backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit; 1345 backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units; 1346 backing_io->backing_cb_args = &req->backing_cb_args; 1347 if (is_write) { 1348 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1349 } else { 1350 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1351 } 1352 vol->backing_dev->submit_backing_io(backing_io); 1353 1354 /* Collects the number of processed I/O. */ 1355 io_unit_counts += merged_io_desc[i].num_io_units; 1356 } 1357 } 1358 1359 static void 1360 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1361 uint32_t compressed_size) 1362 { 1363 struct spdk_reduce_vol *vol = req->vol; 1364 uint32_t i; 1365 uint64_t chunk_offset, remainder, free_index, total_len = 0; 1366 uint8_t *buf; 1367 bool success; 1368 int j; 1369 1370 success = queue_dequeue(&vol->free_chunks_queue, &free_index); 1371 if (success) { 1372 req->chunk_map_index = free_index; 1373 } else { 1374 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 1375 vol->find_chunk_offset); 1376 vol->find_chunk_offset = req->chunk_map_index + 1; 1377 } 1378 1379 /* TODO: fail if no chunk map found - but really this should not happen if we 1380 * size the number of requests similarly to number of extra chunk maps 1381 */ 1382 assert(req->chunk_map_index != UINT32_MAX); 1383 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1384 1385 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1386 req->num_io_units = spdk_divide_round_up(compressed_size, 1387 vol->params.backing_io_unit_size); 1388 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1389 req->chunk->compressed_size = 1390 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1391 1392 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1393 if (req->chunk_is_compressed == false) { 1394 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1395 buf = req->decomp_buf; 1396 total_len = chunk_offset * vol->params.logical_block_size; 1397 1398 /* zero any offset into chunk */ 1399 if (req->rmw == false && chunk_offset) { 1400 memset(buf, 0, total_len); 1401 } 1402 buf += total_len; 1403 1404 /* copy the data */ 1405 for (j = 0; j < req->iovcnt; j++) { 1406 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1407 buf += req->iov[j].iov_len; 1408 total_len += req->iov[j].iov_len; 1409 } 1410 1411 /* zero any remainder */ 1412 remainder = vol->params.chunk_size - total_len; 1413 total_len += remainder; 1414 if (req->rmw == false && remainder) { 1415 memset(buf, 0, remainder); 1416 } 1417 assert(total_len == vol->params.chunk_size); 1418 } 1419 1420 for (i = 0; i < req->num_io_units; i++) { 1421 success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index); 1422 if (success) { 1423 req->chunk->io_unit_index[i] = free_index; 1424 } else { 1425 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 1426 vol->find_block_offset); 1427 vol->find_block_offset = req->chunk->io_unit_index[i] + 1; 1428 } 1429 /* TODO: fail if no backing block found - but really this should also not 1430 * happen (see comment above). 1431 */ 1432 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1433 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1434 } 1435 1436 _issue_backing_ops(req, vol, next_fn, true /* write */); 1437 } 1438 1439 static void 1440 _write_compress_done(void *_req, int reduce_errno) 1441 { 1442 struct spdk_reduce_vol_request *req = _req; 1443 1444 /* Negative reduce_errno indicates failure for compression operations. 1445 * Just write the uncompressed data instead. Force this to happen 1446 * by just passing the full chunk size to _reduce_vol_write_chunk. 1447 * When it sees the data couldn't be compressed, it will just write 1448 * the uncompressed buffer to disk. 1449 */ 1450 if (reduce_errno < 0) { 1451 req->backing_cb_args.output_size = req->vol->params.chunk_size; 1452 } 1453 1454 _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size); 1455 } 1456 1457 static void 1458 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1459 { 1460 struct spdk_reduce_vol *vol = req->vol; 1461 1462 req->backing_cb_args.cb_fn = next_fn; 1463 req->backing_cb_args.cb_arg = req; 1464 req->comp_buf_iov[0].iov_base = req->comp_buf; 1465 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1466 vol->backing_dev->compress(vol->backing_dev, 1467 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1468 &req->backing_cb_args); 1469 } 1470 1471 static void 1472 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1473 { 1474 struct spdk_reduce_vol *vol = req->vol; 1475 1476 req->backing_cb_args.cb_fn = next_fn; 1477 req->backing_cb_args.cb_arg = req; 1478 req->comp_buf_iov[0].iov_base = req->comp_buf; 1479 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1480 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1481 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1482 vol->backing_dev->decompress(vol->backing_dev, 1483 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1484 &req->backing_cb_args); 1485 } 1486 1487 static void 1488 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1489 { 1490 struct spdk_reduce_vol *vol = req->vol; 1491 uint64_t chunk_offset, remainder = 0; 1492 uint64_t ttl_len = 0; 1493 size_t iov_len; 1494 int i; 1495 1496 req->decomp_iovcnt = 0; 1497 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1498 1499 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1500 * if at least one of the conditions below is true: 1501 * 1. User's buffer is fragmented 1502 * 2. Length of the user's buffer is less than the chunk 1503 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1504 iov_len = req->iov[0].iov_len; 1505 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1506 req->iov[0].iov_len < vol->params.chunk_size || 1507 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len)); 1508 if (req->copy_after_decompress) { 1509 req->decomp_iov[0].iov_base = req->decomp_buf; 1510 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1511 req->decomp_iovcnt = 1; 1512 goto decompress; 1513 } 1514 1515 if (chunk_offset) { 1516 /* first iov point to our scratch buffer for any offset into the chunk */ 1517 req->decomp_iov[0].iov_base = req->decomp_buf; 1518 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1519 ttl_len += req->decomp_iov[0].iov_len; 1520 req->decomp_iovcnt = 1; 1521 } 1522 1523 /* now the user data iov, direct to the user buffer */ 1524 for (i = 0; i < req->iovcnt; i++) { 1525 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1526 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1527 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1528 } 1529 req->decomp_iovcnt += req->iovcnt; 1530 1531 /* send the rest of the chunk to our scratch buffer */ 1532 remainder = vol->params.chunk_size - ttl_len; 1533 if (remainder) { 1534 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1535 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1536 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1537 req->decomp_iovcnt++; 1538 } 1539 assert(ttl_len == vol->params.chunk_size); 1540 1541 decompress: 1542 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1543 req->backing_cb_args.cb_fn = next_fn; 1544 req->backing_cb_args.cb_arg = req; 1545 req->comp_buf_iov[0].iov_base = req->comp_buf; 1546 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1547 vol->backing_dev->decompress(vol->backing_dev, 1548 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1549 &req->backing_cb_args); 1550 } 1551 1552 static inline void 1553 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1554 { 1555 struct spdk_reduce_vol *vol = req->vol; 1556 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1557 uint64_t chunk_offset, ttl_len = 0; 1558 uint64_t remainder = 0; 1559 char *copy_offset = NULL; 1560 uint32_t lbsize = vol->params.logical_block_size; 1561 int i; 1562 1563 req->decomp_iov[0].iov_base = req->decomp_buf; 1564 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1565 req->decomp_iovcnt = 1; 1566 copy_offset = req->decomp_iov[0].iov_base; 1567 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1568 1569 if (chunk_offset) { 1570 ttl_len += chunk_offset * lbsize; 1571 /* copy_offset already points to padding buffer if zero_paddings=false */ 1572 if (zero_paddings) { 1573 memcpy(copy_offset, padding_buffer, ttl_len); 1574 } 1575 copy_offset += ttl_len; 1576 } 1577 1578 /* now the user data iov, direct from the user buffer */ 1579 for (i = 0; i < req->iovcnt; i++) { 1580 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1581 copy_offset += req->iov[i].iov_len; 1582 ttl_len += req->iov[i].iov_len; 1583 } 1584 1585 remainder = vol->params.chunk_size - ttl_len; 1586 if (remainder) { 1587 /* copy_offset already points to padding buffer if zero_paddings=false */ 1588 if (zero_paddings) { 1589 memcpy(copy_offset, padding_buffer + ttl_len, remainder); 1590 } 1591 ttl_len += remainder; 1592 } 1593 1594 assert(ttl_len == req->vol->params.chunk_size); 1595 } 1596 1597 /* This function can be called when we are compressing a new data or in case of read-modify-write 1598 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1599 * should point to already read and decompressed buffer */ 1600 static inline void 1601 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1602 { 1603 struct spdk_reduce_vol *vol = req->vol; 1604 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1605 uint64_t chunk_offset, ttl_len = 0; 1606 uint64_t remainder = 0; 1607 uint32_t lbsize = vol->params.logical_block_size; 1608 size_t iov_len; 1609 int i; 1610 1611 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1612 * if at least one of the conditions below is true: 1613 * 1. User's buffer is fragmented 1614 * 2. Length of the user's buffer is less than the chunk 1615 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1616 iov_len = req->iov[0].iov_len; 1617 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1618 req->iov[0].iov_len < vol->params.chunk_size || 1619 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) { 1620 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1621 return; 1622 } 1623 1624 req->decomp_iovcnt = 0; 1625 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1626 1627 if (chunk_offset != 0) { 1628 ttl_len += chunk_offset * lbsize; 1629 req->decomp_iov[0].iov_base = padding_buffer; 1630 req->decomp_iov[0].iov_len = ttl_len; 1631 req->decomp_iovcnt = 1; 1632 } 1633 1634 /* now the user data iov, direct from the user buffer */ 1635 for (i = 0; i < req->iovcnt; i++) { 1636 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1637 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1638 ttl_len += req->iov[i].iov_len; 1639 } 1640 req->decomp_iovcnt += req->iovcnt; 1641 1642 remainder = vol->params.chunk_size - ttl_len; 1643 if (remainder) { 1644 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1645 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1646 req->decomp_iovcnt++; 1647 ttl_len += remainder; 1648 } 1649 assert(ttl_len == req->vol->params.chunk_size); 1650 } 1651 1652 static void 1653 _write_decompress_done(void *_req, int reduce_errno) 1654 { 1655 struct spdk_reduce_vol_request *req = _req; 1656 1657 /* Negative reduce_errno indicates failure for compression operations. */ 1658 if (reduce_errno < 0) { 1659 _reduce_vol_complete_req(req, reduce_errno); 1660 return; 1661 } 1662 1663 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1664 * represents the output_size. 1665 */ 1666 if (req->backing_cb_args.output_size != req->vol->params.chunk_size) { 1667 _reduce_vol_complete_req(req, -EIO); 1668 return; 1669 } 1670 1671 _prepare_compress_chunk(req, false); 1672 _reduce_vol_compress_chunk(req, _write_compress_done); 1673 } 1674 1675 static void 1676 _write_read_done(void *_req, int reduce_errno) 1677 { 1678 struct spdk_reduce_vol_request *req = _req; 1679 1680 if (reduce_errno != 0) { 1681 req->reduce_errno = reduce_errno; 1682 } 1683 1684 assert(req->num_backing_ops > 0); 1685 if (--req->num_backing_ops > 0) { 1686 return; 1687 } 1688 1689 if (req->reduce_errno != 0) { 1690 _reduce_vol_complete_req(req, req->reduce_errno); 1691 return; 1692 } 1693 1694 if (req->chunk_is_compressed) { 1695 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1696 } else { 1697 req->backing_cb_args.output_size = req->chunk->compressed_size; 1698 1699 _write_decompress_done(req, 0); 1700 } 1701 } 1702 1703 static void 1704 _read_decompress_done(void *_req, int reduce_errno) 1705 { 1706 struct spdk_reduce_vol_request *req = _req; 1707 struct spdk_reduce_vol *vol = req->vol; 1708 1709 /* Negative reduce_errno indicates failure for compression operations. */ 1710 if (reduce_errno < 0) { 1711 _reduce_vol_complete_req(req, reduce_errno); 1712 return; 1713 } 1714 1715 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1716 * represents the output_size. 1717 */ 1718 if (req->backing_cb_args.output_size != vol->params.chunk_size) { 1719 _reduce_vol_complete_req(req, -EIO); 1720 return; 1721 } 1722 1723 if (req->copy_after_decompress) { 1724 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1725 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1726 int i; 1727 1728 for (i = 0; i < req->iovcnt; i++) { 1729 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1730 decomp_buffer += req->iov[i].iov_len; 1731 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1732 } 1733 } 1734 1735 _reduce_vol_complete_req(req, 0); 1736 } 1737 1738 static void 1739 _read_read_done(void *_req, int reduce_errno) 1740 { 1741 struct spdk_reduce_vol_request *req = _req; 1742 uint64_t chunk_offset; 1743 uint8_t *buf; 1744 int i; 1745 1746 if (reduce_errno != 0) { 1747 req->reduce_errno = reduce_errno; 1748 } 1749 1750 assert(req->num_backing_ops > 0); 1751 if (--req->num_backing_ops > 0) { 1752 return; 1753 } 1754 1755 if (req->reduce_errno != 0) { 1756 _reduce_vol_complete_req(req, req->reduce_errno); 1757 return; 1758 } 1759 1760 if (req->chunk_is_compressed) { 1761 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1762 } else { 1763 1764 /* If the chunk was compressed, the data would have been sent to the 1765 * host buffers by the decompression operation, if not we need to memcpy here. 1766 */ 1767 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1768 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1769 for (i = 0; i < req->iovcnt; i++) { 1770 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1771 buf += req->iov[i].iov_len; 1772 } 1773 1774 req->backing_cb_args.output_size = req->chunk->compressed_size; 1775 1776 _read_decompress_done(req, 0); 1777 } 1778 } 1779 1780 static void 1781 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1782 { 1783 struct spdk_reduce_vol *vol = req->vol; 1784 1785 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1786 assert(req->chunk_map_index != UINT32_MAX); 1787 1788 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1789 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1790 vol->params.backing_io_unit_size); 1791 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1792 1793 _issue_backing_ops(req, vol, next_fn, false /* read */); 1794 } 1795 1796 static bool 1797 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1798 uint64_t length) 1799 { 1800 uint64_t size = 0; 1801 int i; 1802 1803 if (iovcnt > REDUCE_MAX_IOVECS) { 1804 return false; 1805 } 1806 1807 for (i = 0; i < iovcnt; i++) { 1808 size += iov[i].iov_len; 1809 } 1810 1811 return size == (length * vol->params.logical_block_size); 1812 } 1813 1814 static bool 1815 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1816 { 1817 struct spdk_reduce_vol_request req; 1818 1819 req.logical_map_index = logical_map_index; 1820 1821 return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req)); 1822 } 1823 1824 static void 1825 _start_readv_request(struct spdk_reduce_vol_request *req) 1826 { 1827 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1828 _reduce_vol_read_chunk(req, _read_read_done); 1829 } 1830 1831 void 1832 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1833 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1834 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1835 { 1836 struct spdk_reduce_vol_request *req; 1837 uint64_t logical_map_index; 1838 bool overlapped; 1839 int i; 1840 1841 if (length == 0) { 1842 cb_fn(cb_arg, 0); 1843 return; 1844 } 1845 1846 if (_request_spans_chunk_boundary(vol, offset, length)) { 1847 cb_fn(cb_arg, -EINVAL); 1848 return; 1849 } 1850 1851 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1852 cb_fn(cb_arg, -EINVAL); 1853 return; 1854 } 1855 1856 logical_map_index = offset / vol->logical_blocks_per_chunk; 1857 overlapped = _check_overlap(vol, logical_map_index); 1858 1859 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1860 /* 1861 * This chunk hasn't been allocated. So treat the data as all 1862 * zeroes for this chunk - do the memset and immediately complete 1863 * the operation. 1864 */ 1865 for (i = 0; i < iovcnt; i++) { 1866 memset(iov[i].iov_base, 0, iov[i].iov_len); 1867 } 1868 cb_fn(cb_arg, 0); 1869 return; 1870 } 1871 1872 req = TAILQ_FIRST(&vol->free_requests); 1873 if (req == NULL) { 1874 cb_fn(cb_arg, -ENOMEM); 1875 return; 1876 } 1877 1878 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1879 req->type = REDUCE_IO_READV; 1880 req->vol = vol; 1881 req->iov = iov; 1882 req->iovcnt = iovcnt; 1883 req->offset = offset; 1884 req->logical_map_index = logical_map_index; 1885 req->length = length; 1886 req->copy_after_decompress = false; 1887 req->cb_fn = cb_fn; 1888 req->cb_arg = cb_arg; 1889 1890 if (!overlapped) { 1891 _start_readv_request(req); 1892 } else { 1893 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1894 } 1895 } 1896 1897 static void 1898 _start_writev_request(struct spdk_reduce_vol_request *req) 1899 { 1900 struct spdk_reduce_vol *vol = req->vol; 1901 1902 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1903 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1904 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1905 /* Read old chunk, then overwrite with data from this write 1906 * operation. 1907 */ 1908 req->rmw = true; 1909 _reduce_vol_read_chunk(req, _write_read_done); 1910 return; 1911 } 1912 } 1913 1914 req->rmw = false; 1915 1916 _prepare_compress_chunk(req, true); 1917 _reduce_vol_compress_chunk(req, _write_compress_done); 1918 } 1919 1920 void 1921 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1922 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1923 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1924 { 1925 struct spdk_reduce_vol_request *req; 1926 uint64_t logical_map_index; 1927 bool overlapped; 1928 1929 if (length == 0) { 1930 cb_fn(cb_arg, 0); 1931 return; 1932 } 1933 1934 if (_request_spans_chunk_boundary(vol, offset, length)) { 1935 cb_fn(cb_arg, -EINVAL); 1936 return; 1937 } 1938 1939 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1940 cb_fn(cb_arg, -EINVAL); 1941 return; 1942 } 1943 1944 logical_map_index = offset / vol->logical_blocks_per_chunk; 1945 overlapped = _check_overlap(vol, logical_map_index); 1946 1947 req = TAILQ_FIRST(&vol->free_requests); 1948 if (req == NULL) { 1949 cb_fn(cb_arg, -ENOMEM); 1950 return; 1951 } 1952 1953 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1954 req->type = REDUCE_IO_WRITEV; 1955 req->vol = vol; 1956 req->iov = iov; 1957 req->iovcnt = iovcnt; 1958 req->offset = offset; 1959 req->logical_map_index = logical_map_index; 1960 req->length = length; 1961 req->copy_after_decompress = false; 1962 req->cb_fn = cb_fn; 1963 req->cb_arg = cb_arg; 1964 1965 if (!overlapped) { 1966 _start_writev_request(req); 1967 } else { 1968 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1969 } 1970 } 1971 1972 static void 1973 _start_unmap_request_full_chunk(void *ctx) 1974 { 1975 struct spdk_reduce_vol_request *req = ctx; 1976 struct spdk_reduce_vol *vol = req->vol; 1977 uint64_t chunk_map_index; 1978 1979 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1980 1981 chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1982 if (chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1983 _reduce_vol_reset_chunk(vol, chunk_map_index); 1984 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1985 _reduce_persist(vol, req->chunk, 1986 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1987 vol->pm_logical_map[req->logical_map_index] = REDUCE_EMPTY_MAP_ENTRY; 1988 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1989 } 1990 _reduce_vol_complete_req(req, 0); 1991 } 1992 1993 static void 1994 _reduce_vol_unmap_full_chunk(struct spdk_reduce_vol *vol, 1995 uint64_t offset, uint64_t length, 1996 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1997 { 1998 struct spdk_reduce_vol_request *req; 1999 uint64_t logical_map_index; 2000 bool overlapped; 2001 2002 if (_request_spans_chunk_boundary(vol, offset, length)) { 2003 cb_fn(cb_arg, -EINVAL); 2004 return; 2005 } 2006 2007 logical_map_index = offset / vol->logical_blocks_per_chunk; 2008 overlapped = _check_overlap(vol, logical_map_index); 2009 2010 req = TAILQ_FIRST(&vol->free_requests); 2011 if (req == NULL) { 2012 cb_fn(cb_arg, -ENOMEM); 2013 return; 2014 } 2015 2016 TAILQ_REMOVE(&vol->free_requests, req, tailq); 2017 req->type = REDUCE_IO_UNMAP; 2018 req->vol = vol; 2019 req->iov = NULL; 2020 req->iovcnt = 0; 2021 req->offset = offset; 2022 req->logical_map_index = logical_map_index; 2023 req->length = length; 2024 req->copy_after_decompress = false; 2025 req->cb_fn = cb_fn; 2026 req->cb_arg = cb_arg; 2027 2028 if (!overlapped) { 2029 _start_unmap_request_full_chunk(req); 2030 } else { 2031 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 2032 } 2033 } 2034 2035 struct unmap_partial_chunk_ctx { 2036 struct spdk_reduce_vol *vol; 2037 struct iovec iov; 2038 spdk_reduce_vol_op_complete cb_fn; 2039 void *cb_arg; 2040 }; 2041 2042 static void 2043 _reduce_unmap_partial_chunk_complete(void *_ctx, int reduce_errno) 2044 { 2045 struct unmap_partial_chunk_ctx *ctx = _ctx; 2046 2047 ctx->cb_fn(ctx->cb_arg, reduce_errno); 2048 free(ctx); 2049 } 2050 2051 static void 2052 _reduce_vol_unmap_partial_chunk(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length, 2053 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 2054 { 2055 struct unmap_partial_chunk_ctx *ctx; 2056 2057 ctx = calloc(1, sizeof(struct unmap_partial_chunk_ctx)); 2058 if (ctx == NULL) { 2059 cb_fn(cb_arg, -ENOMEM); 2060 return; 2061 } 2062 2063 ctx->vol = vol; 2064 ctx->iov.iov_base = g_zero_buf; 2065 ctx->iov.iov_len = length * vol->params.logical_block_size; 2066 ctx->cb_fn = cb_fn; 2067 ctx->cb_arg = cb_arg; 2068 2069 spdk_reduce_vol_writev(vol, &ctx->iov, 1, offset, length, _reduce_unmap_partial_chunk_complete, 2070 ctx); 2071 } 2072 2073 void 2074 spdk_reduce_vol_unmap(struct spdk_reduce_vol *vol, 2075 uint64_t offset, uint64_t length, 2076 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 2077 { 2078 if (length < vol->logical_blocks_per_chunk) { 2079 _reduce_vol_unmap_partial_chunk(vol, offset, length, cb_fn, cb_arg); 2080 } else if (length == vol->logical_blocks_per_chunk) { 2081 _reduce_vol_unmap_full_chunk(vol, offset, length, cb_fn, cb_arg); 2082 } else { 2083 cb_fn(cb_arg, -EINVAL); 2084 } 2085 } 2086 2087 const struct spdk_reduce_vol_params * 2088 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 2089 { 2090 return &vol->params; 2091 } 2092 2093 const char * 2094 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol) 2095 { 2096 return vol->pm_file.path; 2097 } 2098 2099 void 2100 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 2101 { 2102 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 2103 uint32_t struct_size; 2104 uint64_t chunk_map_size; 2105 2106 SPDK_NOTICELOG("vol info:\n"); 2107 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 2108 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 2109 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 2110 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 2111 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 2112 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 2113 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 2114 vol->params.vol_size / vol->params.chunk_size); 2115 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 2116 vol->params.backing_io_unit_size); 2117 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 2118 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 2119 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 2120 2121 SPDK_NOTICELOG("pmem info:\n"); 2122 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 2123 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 2124 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 2125 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 2126 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 2127 vol->params.chunk_size); 2128 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 2129 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 2130 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 2131 vol->params.backing_io_unit_size); 2132 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 2133 } 2134 2135 SPDK_LOG_REGISTER_COMPONENT(reduce) 2136