1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "queue_internal.h" 10 11 #include "spdk/reduce.h" 12 #include "spdk/env.h" 13 #include "spdk/string.h" 14 #include "spdk/bit_array.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/memory.h" 18 #include "spdk/tree.h" 19 20 #include "libpmem.h" 21 22 /* Always round up the size of the PM region to the nearest cacheline. */ 23 #define REDUCE_PM_SIZE_ALIGNMENT 64 24 25 /* Offset into the backing device where the persistent memory file's path is stored. */ 26 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 27 28 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 29 30 #define REDUCE_NUM_VOL_REQUESTS 256 31 32 /* Structure written to offset 0 of both the pm file and the backing device. */ 33 struct spdk_reduce_vol_superblock { 34 uint8_t signature[8]; 35 struct spdk_reduce_vol_params params; 36 uint8_t reserved[4040]; 37 }; 38 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 39 40 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 41 /* null terminator counts one */ 42 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 43 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 44 45 #define REDUCE_PATH_MAX 4096 46 47 #define REDUCE_ZERO_BUF_SIZE 0x100000 48 49 /** 50 * Describes a persistent memory file used to hold metadata associated with a 51 * compressed volume. 52 */ 53 struct spdk_reduce_pm_file { 54 char path[REDUCE_PATH_MAX]; 55 void *pm_buf; 56 int pm_is_pmem; 57 uint64_t size; 58 }; 59 60 #define REDUCE_IO_READV 1 61 #define REDUCE_IO_WRITEV 2 62 #define REDUCE_IO_UNMAP 3 63 64 struct spdk_reduce_chunk_map { 65 uint32_t compressed_size; 66 uint32_t reserved; 67 uint64_t io_unit_index[0]; 68 }; 69 70 struct spdk_reduce_vol_request { 71 /** 72 * Scratch buffer used for uncompressed chunk. This is used for: 73 * 1) source buffer for compression operations 74 * 2) destination buffer for decompression operations 75 * 3) data buffer when writing uncompressed chunk to disk 76 * 4) data buffer when reading uncompressed chunk from disk 77 */ 78 uint8_t *decomp_buf; 79 struct iovec *decomp_buf_iov; 80 81 /** 82 * These are used to construct the iovecs that are sent to 83 * the decomp engine, they point to a mix of the scratch buffer 84 * and user buffer 85 */ 86 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 87 int decomp_iovcnt; 88 89 /** 90 * Scratch buffer used for compressed chunk. This is used for: 91 * 1) destination buffer for compression operations 92 * 2) source buffer for decompression operations 93 * 3) data buffer when writing compressed chunk to disk 94 * 4) data buffer when reading compressed chunk from disk 95 */ 96 uint8_t *comp_buf; 97 struct iovec *comp_buf_iov; 98 struct iovec *iov; 99 bool rmw; 100 struct spdk_reduce_vol *vol; 101 int type; 102 int reduce_errno; 103 int iovcnt; 104 int num_backing_ops; 105 uint32_t num_io_units; 106 struct spdk_reduce_backing_io *backing_io; 107 bool chunk_is_compressed; 108 bool copy_after_decompress; 109 uint64_t offset; 110 uint64_t logical_map_index; 111 uint64_t length; 112 uint64_t chunk_map_index; 113 struct spdk_reduce_chunk_map *chunk; 114 spdk_reduce_vol_op_complete cb_fn; 115 void *cb_arg; 116 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 117 RB_ENTRY(spdk_reduce_vol_request) rbnode; 118 struct spdk_reduce_vol_cb_args backing_cb_args; 119 }; 120 121 struct spdk_reduce_vol { 122 struct spdk_reduce_vol_params params; 123 struct spdk_reduce_vol_info info; 124 uint32_t backing_io_units_per_chunk; 125 uint32_t backing_lba_per_io_unit; 126 uint32_t logical_blocks_per_chunk; 127 struct spdk_reduce_pm_file pm_file; 128 struct spdk_reduce_backing_dev *backing_dev; 129 struct spdk_reduce_vol_superblock *backing_super; 130 struct spdk_reduce_vol_superblock *pm_super; 131 uint64_t *pm_logical_map; 132 uint64_t *pm_chunk_maps; 133 134 struct spdk_bit_array *allocated_chunk_maps; 135 /* The starting position when looking for a block from allocated_chunk_maps */ 136 uint64_t find_chunk_offset; 137 /* Cache free chunks to speed up lookup of free chunk. */ 138 struct reduce_queue free_chunks_queue; 139 struct spdk_bit_array *allocated_backing_io_units; 140 /* The starting position when looking for a block from allocated_backing_io_units */ 141 uint64_t find_block_offset; 142 /* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */ 143 struct reduce_queue free_backing_blocks_queue; 144 145 struct spdk_reduce_vol_request *request_mem; 146 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 147 RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests; 148 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 149 150 /* Single contiguous buffer used for all request buffers for this volume. */ 151 uint8_t *buf_mem; 152 struct iovec *buf_iov_mem; 153 /* Single contiguous buffer used for backing io buffers for this volume. */ 154 uint8_t *buf_backing_io_mem; 155 }; 156 157 static void _start_readv_request(struct spdk_reduce_vol_request *req); 158 static void _start_writev_request(struct spdk_reduce_vol_request *req); 159 static uint8_t *g_zero_buf; 160 static int g_vol_count = 0; 161 162 /* 163 * Allocate extra metadata chunks and corresponding backing io units to account for 164 * outstanding IO in worst case scenario where logical map is completely allocated 165 * and no data can be compressed. We need extra chunks in this case to handle 166 * in-flight writes since reduce never writes data in place. 167 */ 168 #define REDUCE_NUM_EXTRA_CHUNKS 128 169 170 static void 171 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 172 { 173 if (vol->pm_file.pm_is_pmem) { 174 pmem_persist(addr, len); 175 } else { 176 pmem_msync(addr, len); 177 } 178 } 179 180 static uint64_t 181 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 182 { 183 uint64_t chunks_in_logical_map, logical_map_size; 184 185 chunks_in_logical_map = vol_size / chunk_size; 186 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 187 188 /* Round up to next cacheline. */ 189 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 190 REDUCE_PM_SIZE_ALIGNMENT; 191 } 192 193 static uint64_t 194 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 195 { 196 uint64_t num_chunks; 197 198 num_chunks = vol_size / chunk_size; 199 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 200 201 return num_chunks; 202 } 203 204 static inline uint32_t 205 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 206 { 207 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 208 } 209 210 static uint64_t 211 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 212 { 213 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 214 215 num_chunks = _get_total_chunks(vol_size, chunk_size); 216 io_units_per_chunk = chunk_size / backing_io_unit_size; 217 218 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 219 220 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 221 REDUCE_PM_SIZE_ALIGNMENT; 222 } 223 224 static struct spdk_reduce_chunk_map * 225 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 226 { 227 uintptr_t chunk_map_addr; 228 229 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 230 231 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 232 chunk_map_addr += chunk_map_index * 233 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 234 235 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 236 } 237 238 static int 239 _validate_vol_params(struct spdk_reduce_vol_params *params) 240 { 241 if (params->vol_size > 0) { 242 /** 243 * User does not pass in the vol size - it gets calculated by libreduce from 244 * values in this structure plus the size of the backing device. 245 */ 246 return -EINVAL; 247 } 248 249 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 250 params->logical_block_size == 0) { 251 return -EINVAL; 252 } 253 254 /* Chunk size must be an even multiple of the backing io unit size. */ 255 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 256 return -EINVAL; 257 } 258 259 /* Chunk size must be an even multiple of the logical block size. */ 260 if ((params->chunk_size % params->logical_block_size) != 0) { 261 return -1; 262 } 263 264 return 0; 265 } 266 267 static uint64_t 268 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 269 { 270 uint64_t num_chunks; 271 272 num_chunks = backing_dev_size / chunk_size; 273 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 274 return 0; 275 } 276 277 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 278 return num_chunks * chunk_size; 279 } 280 281 static uint64_t 282 _get_pm_file_size(struct spdk_reduce_vol_params *params) 283 { 284 uint64_t total_pm_size; 285 286 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 287 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 288 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 289 params->backing_io_unit_size); 290 return total_pm_size; 291 } 292 293 const struct spdk_uuid * 294 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 295 { 296 return &vol->params.uuid; 297 } 298 299 static void 300 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 301 { 302 uint64_t logical_map_size; 303 304 /* Superblock is at the beginning of the pm file. */ 305 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 306 307 /* Logical map immediately follows the super block. */ 308 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 309 310 /* Chunks maps follow the logical map. */ 311 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 312 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 313 } 314 315 /* We need 2 iovs during load - one for the superblock, another for the path */ 316 #define LOAD_IOV_COUNT 2 317 318 struct reduce_init_load_ctx { 319 struct spdk_reduce_vol *vol; 320 struct spdk_reduce_vol_cb_args backing_cb_args; 321 spdk_reduce_vol_op_with_handle_complete cb_fn; 322 void *cb_arg; 323 struct iovec iov[LOAD_IOV_COUNT]; 324 void *path; 325 struct spdk_reduce_backing_io *backing_io; 326 }; 327 328 static inline bool 329 _addr_crosses_huge_page(const void *addr, size_t *size) 330 { 331 size_t _size; 332 uint64_t rc; 333 334 assert(size); 335 336 _size = *size; 337 rc = spdk_vtophys(addr, size); 338 339 return rc == SPDK_VTOPHYS_ERROR || _size != *size; 340 } 341 342 static inline int 343 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) 344 { 345 uint8_t *addr; 346 size_t size_tmp = buffer_size; 347 348 addr = *_addr; 349 350 /* Verify that addr + buffer_size doesn't cross huge page boundary */ 351 if (_addr_crosses_huge_page(addr, &size_tmp)) { 352 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. 353 * Skip remaining bytes and continue from the beginning of the next page */ 354 addr += size_tmp; 355 } 356 357 if (addr + buffer_size > addr_range) { 358 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); 359 return -ERANGE; 360 } 361 362 *vol_buffer = addr; 363 *_addr = addr + buffer_size; 364 365 return 0; 366 } 367 368 static int 369 _allocate_vol_requests(struct spdk_reduce_vol *vol) 370 { 371 struct spdk_reduce_vol_request *req; 372 struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev; 373 uint32_t reqs_in_2mb_page, huge_pages_needed; 374 uint8_t *buffer, *buffer_end; 375 int i = 0; 376 int rc = 0; 377 378 /* It is needed to allocate comp and decomp buffers so that they do not cross physical 379 * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not 380 * necessarily power of 2 381 * Allocate 2x since we need buffers for both read/write and compress/decompress 382 * intermediate buffers. */ 383 reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); 384 if (!reqs_in_2mb_page) { 385 return -EINVAL; 386 } 387 huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); 388 389 vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); 390 if (vol->buf_mem == NULL) { 391 return -ENOMEM; 392 } 393 394 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 395 if (vol->request_mem == NULL) { 396 spdk_free(vol->buf_mem); 397 vol->buf_mem = NULL; 398 return -ENOMEM; 399 } 400 401 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 402 * buffers. 403 */ 404 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 405 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 406 if (vol->buf_iov_mem == NULL) { 407 free(vol->request_mem); 408 spdk_free(vol->buf_mem); 409 vol->request_mem = NULL; 410 vol->buf_mem = NULL; 411 return -ENOMEM; 412 } 413 414 vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) + 415 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk); 416 if (vol->buf_backing_io_mem == NULL) { 417 free(vol->request_mem); 418 free(vol->buf_iov_mem); 419 spdk_free(vol->buf_mem); 420 vol->request_mem = NULL; 421 vol->buf_iov_mem = NULL; 422 vol->buf_mem = NULL; 423 return -ENOMEM; 424 } 425 426 buffer = vol->buf_mem; 427 buffer_end = buffer + VALUE_2MB * huge_pages_needed; 428 429 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 430 req = &vol->request_mem[i]; 431 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 432 req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i * 433 (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) * 434 vol->backing_io_units_per_chunk); 435 436 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 437 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 438 439 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); 440 if (rc) { 441 SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 442 vol->buf_mem, buffer_end); 443 break; 444 } 445 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); 446 if (rc) { 447 SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 448 vol->buf_mem, buffer_end); 449 break; 450 } 451 } 452 453 if (rc) { 454 free(vol->buf_backing_io_mem); 455 free(vol->buf_iov_mem); 456 free(vol->request_mem); 457 spdk_free(vol->buf_mem); 458 vol->buf_mem = NULL; 459 vol->buf_backing_io_mem = NULL; 460 vol->buf_iov_mem = NULL; 461 vol->request_mem = NULL; 462 } 463 464 return rc; 465 } 466 467 const struct spdk_reduce_vol_info * 468 spdk_reduce_vol_get_info(const struct spdk_reduce_vol *vol) 469 { 470 return &vol->info; 471 } 472 473 static void 474 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 475 { 476 if (ctx != NULL) { 477 spdk_free(ctx->path); 478 free(ctx->backing_io); 479 free(ctx); 480 } 481 482 if (vol != NULL) { 483 if (vol->pm_file.pm_buf != NULL) { 484 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 485 } 486 487 spdk_free(vol->backing_super); 488 spdk_bit_array_free(&vol->allocated_chunk_maps); 489 spdk_bit_array_free(&vol->allocated_backing_io_units); 490 free(vol->request_mem); 491 free(vol->buf_backing_io_mem); 492 free(vol->buf_iov_mem); 493 spdk_free(vol->buf_mem); 494 free(vol); 495 } 496 } 497 498 static int 499 _alloc_zero_buff(void) 500 { 501 int rc = 0; 502 503 /* The zero buffer is shared between all volumes and just used 504 * for reads so allocate one global instance here if not already 505 * allocated when another vol init'd or loaded. 506 */ 507 if (g_vol_count++ == 0) { 508 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 509 64, NULL, SPDK_ENV_LCORE_ID_ANY, 510 SPDK_MALLOC_DMA); 511 if (g_zero_buf == NULL) { 512 g_vol_count--; 513 rc = -ENOMEM; 514 } 515 } 516 return rc; 517 } 518 519 static void 520 _init_write_super_cpl(void *cb_arg, int reduce_errno) 521 { 522 struct reduce_init_load_ctx *init_ctx = cb_arg; 523 int rc = 0; 524 525 if (reduce_errno != 0) { 526 rc = reduce_errno; 527 goto err; 528 } 529 530 rc = _allocate_vol_requests(init_ctx->vol); 531 if (rc != 0) { 532 goto err; 533 } 534 535 rc = _alloc_zero_buff(); 536 if (rc != 0) { 537 goto err; 538 } 539 540 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, rc); 541 /* Only clean up the ctx - the vol has been passed to the application 542 * for use now that initialization was successful. 543 */ 544 _init_load_cleanup(NULL, init_ctx); 545 546 return; 547 err: 548 if (unlink(init_ctx->path)) { 549 SPDK_ERRLOG("%s could not be unlinked: %s\n", 550 (char *)init_ctx->path, spdk_strerror(errno)); 551 } 552 553 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 554 _init_load_cleanup(init_ctx->vol, init_ctx); 555 } 556 557 static void 558 _init_write_path_cpl(void *cb_arg, int reduce_errno) 559 { 560 struct reduce_init_load_ctx *init_ctx = cb_arg; 561 struct spdk_reduce_vol *vol = init_ctx->vol; 562 struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io; 563 564 if (reduce_errno != 0) { 565 _init_write_super_cpl(cb_arg, reduce_errno); 566 return; 567 } 568 569 init_ctx->iov[0].iov_base = vol->backing_super; 570 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 571 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 572 init_ctx->backing_cb_args.cb_arg = init_ctx; 573 574 backing_io->dev = vol->backing_dev; 575 backing_io->iov = init_ctx->iov; 576 backing_io->iovcnt = 1; 577 backing_io->lba = 0; 578 backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen; 579 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 580 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 581 582 vol->backing_dev->submit_backing_io(backing_io); 583 } 584 585 static int 586 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 587 { 588 uint64_t total_chunks, total_backing_io_units; 589 uint32_t i, num_metadata_io_units; 590 591 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 592 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 593 vol->find_chunk_offset = 0; 594 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 595 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 596 vol->find_block_offset = 0; 597 598 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 599 return -ENOMEM; 600 } 601 602 /* Set backing io unit bits associated with metadata. */ 603 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 604 vol->params.backing_io_unit_size; 605 for (i = 0; i < num_metadata_io_units; i++) { 606 spdk_bit_array_set(vol->allocated_backing_io_units, i); 607 vol->info.allocated_io_units++; 608 } 609 610 return 0; 611 } 612 613 static int 614 overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2) 615 { 616 return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index > 617 req2->logical_map_index); 618 } 619 RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp); 620 621 622 void 623 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 624 struct spdk_reduce_backing_dev *backing_dev, 625 const char *pm_file_dir, 626 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 627 { 628 struct spdk_reduce_vol *vol; 629 struct reduce_init_load_ctx *init_ctx; 630 struct spdk_reduce_backing_io *backing_io; 631 uint64_t backing_dev_size; 632 size_t mapped_len; 633 int dir_len, max_dir_len, rc; 634 635 /* We need to append a path separator and the UUID to the supplied 636 * path. 637 */ 638 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 639 dir_len = strnlen(pm_file_dir, max_dir_len); 640 /* Strip trailing slash if the user provided one - we will add it back 641 * later when appending the filename. 642 */ 643 if (pm_file_dir[dir_len - 1] == '/') { 644 dir_len--; 645 } 646 if (dir_len == max_dir_len) { 647 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 648 cb_fn(cb_arg, NULL, -EINVAL); 649 return; 650 } 651 652 rc = _validate_vol_params(params); 653 if (rc != 0) { 654 SPDK_ERRLOG("invalid vol params\n"); 655 cb_fn(cb_arg, NULL, rc); 656 return; 657 } 658 659 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 660 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 661 if (params->vol_size == 0) { 662 SPDK_ERRLOG("backing device is too small\n"); 663 cb_fn(cb_arg, NULL, -EINVAL); 664 return; 665 } 666 667 if (backing_dev->submit_backing_io == NULL) { 668 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 669 cb_fn(cb_arg, NULL, -EINVAL); 670 return; 671 } 672 673 vol = calloc(1, sizeof(*vol)); 674 if (vol == NULL) { 675 cb_fn(cb_arg, NULL, -ENOMEM); 676 return; 677 } 678 679 TAILQ_INIT(&vol->free_requests); 680 RB_INIT(&vol->executing_requests); 681 TAILQ_INIT(&vol->queued_requests); 682 queue_init(&vol->free_chunks_queue); 683 queue_init(&vol->free_backing_blocks_queue); 684 685 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 686 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 687 if (vol->backing_super == NULL) { 688 cb_fn(cb_arg, NULL, -ENOMEM); 689 _init_load_cleanup(vol, NULL); 690 return; 691 } 692 693 init_ctx = calloc(1, sizeof(*init_ctx)); 694 if (init_ctx == NULL) { 695 cb_fn(cb_arg, NULL, -ENOMEM); 696 _init_load_cleanup(vol, NULL); 697 return; 698 } 699 700 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 701 if (backing_io == NULL) { 702 cb_fn(cb_arg, NULL, -ENOMEM); 703 _init_load_cleanup(vol, init_ctx); 704 return; 705 } 706 init_ctx->backing_io = backing_io; 707 708 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 709 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 710 if (init_ctx->path == NULL) { 711 cb_fn(cb_arg, NULL, -ENOMEM); 712 _init_load_cleanup(vol, init_ctx); 713 return; 714 } 715 716 if (spdk_uuid_is_null(¶ms->uuid)) { 717 spdk_uuid_generate(¶ms->uuid); 718 } 719 720 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 721 vol->pm_file.path[dir_len] = '/'; 722 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 723 ¶ms->uuid); 724 vol->pm_file.size = _get_pm_file_size(params); 725 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 726 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 727 &mapped_len, &vol->pm_file.pm_is_pmem); 728 if (vol->pm_file.pm_buf == NULL) { 729 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 730 vol->pm_file.path, strerror(errno)); 731 cb_fn(cb_arg, NULL, -errno); 732 _init_load_cleanup(vol, init_ctx); 733 return; 734 } 735 736 if (vol->pm_file.size != mapped_len) { 737 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 738 vol->pm_file.size, mapped_len); 739 cb_fn(cb_arg, NULL, -ENOMEM); 740 _init_load_cleanup(vol, init_ctx); 741 return; 742 } 743 744 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 745 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 746 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 747 memcpy(&vol->params, params, sizeof(*params)); 748 749 vol->backing_dev = backing_dev; 750 751 rc = _allocate_bit_arrays(vol); 752 if (rc != 0) { 753 cb_fn(cb_arg, NULL, rc); 754 _init_load_cleanup(vol, init_ctx); 755 return; 756 } 757 758 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 759 sizeof(vol->backing_super->signature)); 760 memcpy(&vol->backing_super->params, params, sizeof(*params)); 761 762 _initialize_vol_pm_pointers(vol); 763 764 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 765 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 766 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 767 */ 768 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 769 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 770 771 init_ctx->vol = vol; 772 init_ctx->cb_fn = cb_fn; 773 init_ctx->cb_arg = cb_arg; 774 775 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 776 init_ctx->iov[0].iov_base = init_ctx->path; 777 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 778 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 779 init_ctx->backing_cb_args.cb_arg = init_ctx; 780 /* Write path to offset 4K on backing device - just after where the super 781 * block will be written. We wait until this is committed before writing the 782 * super block to guarantee we don't get the super block written without the 783 * the path if the system crashed in the middle of a write operation. 784 */ 785 backing_io->dev = vol->backing_dev; 786 backing_io->iov = init_ctx->iov; 787 backing_io->iovcnt = 1; 788 backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen; 789 backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen; 790 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 791 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 792 793 vol->backing_dev->submit_backing_io(backing_io); 794 } 795 796 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 797 798 static void 799 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 800 { 801 struct reduce_init_load_ctx *load_ctx = cb_arg; 802 struct spdk_reduce_vol *vol = load_ctx->vol; 803 uint64_t backing_dev_size; 804 uint64_t i, num_chunks, logical_map_index; 805 struct spdk_reduce_chunk_map *chunk; 806 size_t mapped_len; 807 uint32_t j; 808 int rc; 809 810 if (reduce_errno != 0) { 811 rc = reduce_errno; 812 goto error; 813 } 814 815 rc = _alloc_zero_buff(); 816 if (rc) { 817 goto error; 818 } 819 820 if (memcmp(vol->backing_super->signature, 821 SPDK_REDUCE_SIGNATURE, 822 sizeof(vol->backing_super->signature)) != 0) { 823 /* This backing device isn't a libreduce backing device. */ 824 rc = -EILSEQ; 825 goto error; 826 } 827 828 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 829 * So don't bother getting the volume ready to use - invoke the callback immediately 830 * so destroy_load_cb can delete the metadata off of the block device and delete the 831 * persistent memory file if it exists. 832 */ 833 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 834 if (load_ctx->cb_fn == (*destroy_load_cb)) { 835 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 836 _init_load_cleanup(NULL, load_ctx); 837 return; 838 } 839 840 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 841 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 842 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 843 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 844 845 rc = _allocate_bit_arrays(vol); 846 if (rc != 0) { 847 goto error; 848 } 849 850 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 851 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 852 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 853 backing_dev_size); 854 rc = -EILSEQ; 855 goto error; 856 } 857 858 vol->pm_file.size = _get_pm_file_size(&vol->params); 859 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 860 &vol->pm_file.pm_is_pmem); 861 if (vol->pm_file.pm_buf == NULL) { 862 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 863 rc = -errno; 864 goto error; 865 } 866 867 if (vol->pm_file.size != mapped_len) { 868 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 869 vol->pm_file.size, mapped_len); 870 rc = -ENOMEM; 871 goto error; 872 } 873 874 rc = _allocate_vol_requests(vol); 875 if (rc != 0) { 876 goto error; 877 } 878 879 _initialize_vol_pm_pointers(vol); 880 881 num_chunks = vol->params.vol_size / vol->params.chunk_size; 882 for (i = 0; i < num_chunks; i++) { 883 logical_map_index = vol->pm_logical_map[i]; 884 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 885 continue; 886 } 887 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 888 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 889 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 890 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 891 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 892 vol->info.allocated_io_units++; 893 } 894 } 895 } 896 897 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 898 /* Only clean up the ctx - the vol has been passed to the application 899 * for use now that volume load was successful. 900 */ 901 _init_load_cleanup(NULL, load_ctx); 902 return; 903 904 error: 905 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 906 _init_load_cleanup(vol, load_ctx); 907 } 908 909 void 910 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 911 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 912 { 913 struct spdk_reduce_vol *vol; 914 struct reduce_init_load_ctx *load_ctx; 915 struct spdk_reduce_backing_io *backing_io; 916 917 if (backing_dev->submit_backing_io == NULL) { 918 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 919 cb_fn(cb_arg, NULL, -EINVAL); 920 return; 921 } 922 923 vol = calloc(1, sizeof(*vol)); 924 if (vol == NULL) { 925 cb_fn(cb_arg, NULL, -ENOMEM); 926 return; 927 } 928 929 TAILQ_INIT(&vol->free_requests); 930 RB_INIT(&vol->executing_requests); 931 TAILQ_INIT(&vol->queued_requests); 932 queue_init(&vol->free_chunks_queue); 933 queue_init(&vol->free_backing_blocks_queue); 934 935 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 936 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 937 if (vol->backing_super == NULL) { 938 _init_load_cleanup(vol, NULL); 939 cb_fn(cb_arg, NULL, -ENOMEM); 940 return; 941 } 942 943 vol->backing_dev = backing_dev; 944 945 load_ctx = calloc(1, sizeof(*load_ctx)); 946 if (load_ctx == NULL) { 947 _init_load_cleanup(vol, NULL); 948 cb_fn(cb_arg, NULL, -ENOMEM); 949 return; 950 } 951 952 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 953 if (backing_io == NULL) { 954 _init_load_cleanup(vol, load_ctx); 955 cb_fn(cb_arg, NULL, -ENOMEM); 956 return; 957 } 958 959 load_ctx->backing_io = backing_io; 960 961 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 962 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 963 if (load_ctx->path == NULL) { 964 _init_load_cleanup(vol, load_ctx); 965 cb_fn(cb_arg, NULL, -ENOMEM); 966 return; 967 } 968 969 load_ctx->vol = vol; 970 load_ctx->cb_fn = cb_fn; 971 load_ctx->cb_arg = cb_arg; 972 973 load_ctx->iov[0].iov_base = vol->backing_super; 974 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 975 load_ctx->iov[1].iov_base = load_ctx->path; 976 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 977 backing_io->dev = vol->backing_dev; 978 backing_io->iov = load_ctx->iov; 979 backing_io->iovcnt = LOAD_IOV_COUNT; 980 backing_io->lba = 0; 981 backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 982 vol->backing_dev->blocklen; 983 backing_io->backing_cb_args = &load_ctx->backing_cb_args; 984 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 985 986 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 987 load_ctx->backing_cb_args.cb_arg = load_ctx; 988 vol->backing_dev->submit_backing_io(backing_io); 989 } 990 991 void 992 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 993 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 994 { 995 if (vol == NULL) { 996 /* This indicates a programming error. */ 997 assert(false); 998 cb_fn(cb_arg, -EINVAL); 999 return; 1000 } 1001 1002 if (--g_vol_count == 0) { 1003 spdk_free(g_zero_buf); 1004 } 1005 assert(g_vol_count >= 0); 1006 _init_load_cleanup(vol, NULL); 1007 cb_fn(cb_arg, 0); 1008 } 1009 1010 struct reduce_destroy_ctx { 1011 spdk_reduce_vol_op_complete cb_fn; 1012 void *cb_arg; 1013 struct spdk_reduce_vol *vol; 1014 struct spdk_reduce_vol_superblock *super; 1015 struct iovec iov; 1016 struct spdk_reduce_vol_cb_args backing_cb_args; 1017 int reduce_errno; 1018 char pm_path[REDUCE_PATH_MAX]; 1019 struct spdk_reduce_backing_io *backing_io; 1020 }; 1021 1022 static void 1023 destroy_unload_cpl(void *cb_arg, int reduce_errno) 1024 { 1025 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1026 1027 if (destroy_ctx->reduce_errno == 0) { 1028 if (unlink(destroy_ctx->pm_path)) { 1029 SPDK_ERRLOG("%s could not be unlinked: %s\n", 1030 destroy_ctx->pm_path, strerror(errno)); 1031 } 1032 } 1033 1034 /* Even if the unload somehow failed, we still pass the destroy_ctx 1035 * reduce_errno since that indicates whether or not the volume was 1036 * actually destroyed. 1037 */ 1038 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 1039 spdk_free(destroy_ctx->super); 1040 free(destroy_ctx->backing_io); 1041 free(destroy_ctx); 1042 } 1043 1044 static void 1045 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 1046 { 1047 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1048 struct spdk_reduce_vol *vol = destroy_ctx->vol; 1049 1050 destroy_ctx->reduce_errno = reduce_errno; 1051 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 1052 } 1053 1054 static void 1055 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1056 { 1057 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1058 struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io; 1059 1060 if (reduce_errno != 0) { 1061 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 1062 spdk_free(destroy_ctx->super); 1063 free(destroy_ctx); 1064 return; 1065 } 1066 1067 destroy_ctx->vol = vol; 1068 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 1069 destroy_ctx->iov.iov_base = destroy_ctx->super; 1070 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 1071 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 1072 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 1073 1074 backing_io->dev = vol->backing_dev; 1075 backing_io->iov = &destroy_ctx->iov; 1076 backing_io->iovcnt = 1; 1077 backing_io->lba = 0; 1078 backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen; 1079 backing_io->backing_cb_args = &destroy_ctx->backing_cb_args; 1080 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1081 1082 vol->backing_dev->submit_backing_io(backing_io); 1083 } 1084 1085 void 1086 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 1087 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1088 { 1089 struct reduce_destroy_ctx *destroy_ctx; 1090 struct spdk_reduce_backing_io *backing_io; 1091 1092 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 1093 if (destroy_ctx == NULL) { 1094 cb_fn(cb_arg, -ENOMEM); 1095 return; 1096 } 1097 1098 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 1099 if (backing_io == NULL) { 1100 free(destroy_ctx); 1101 cb_fn(cb_arg, -ENOMEM); 1102 return; 1103 } 1104 1105 destroy_ctx->backing_io = backing_io; 1106 1107 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 1108 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1109 if (destroy_ctx->super == NULL) { 1110 free(destroy_ctx); 1111 free(backing_io); 1112 cb_fn(cb_arg, -ENOMEM); 1113 return; 1114 } 1115 destroy_ctx->cb_fn = cb_fn; 1116 destroy_ctx->cb_arg = cb_arg; 1117 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 1118 } 1119 1120 static bool 1121 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 1122 { 1123 uint64_t start_chunk, end_chunk; 1124 1125 start_chunk = offset / vol->logical_blocks_per_chunk; 1126 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 1127 1128 return (start_chunk != end_chunk); 1129 } 1130 1131 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 1132 static void _start_unmap_request_full_chunk(void *ctx); 1133 1134 static void 1135 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 1136 { 1137 struct spdk_reduce_vol_request *next_req; 1138 struct spdk_reduce_vol *vol = req->vol; 1139 1140 req->cb_fn(req->cb_arg, reduce_errno); 1141 RB_REMOVE(executing_req_tree, &vol->executing_requests, req); 1142 1143 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 1144 if (next_req->logical_map_index == req->logical_map_index) { 1145 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 1146 if (next_req->type == REDUCE_IO_READV) { 1147 _start_readv_request(next_req); 1148 } else if (next_req->type == REDUCE_IO_WRITEV) { 1149 _start_writev_request(next_req); 1150 } else { 1151 assert(next_req->type == REDUCE_IO_UNMAP); 1152 _start_unmap_request_full_chunk(next_req); 1153 } 1154 break; 1155 } 1156 } 1157 1158 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 1159 } 1160 1161 static void 1162 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 1163 { 1164 struct spdk_reduce_chunk_map *chunk; 1165 uint64_t index; 1166 bool success; 1167 uint32_t i; 1168 1169 chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index); 1170 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 1171 index = chunk->io_unit_index[i]; 1172 if (index == REDUCE_EMPTY_MAP_ENTRY) { 1173 break; 1174 } 1175 assert(spdk_bit_array_get(vol->allocated_backing_io_units, 1176 index) == true); 1177 spdk_bit_array_clear(vol->allocated_backing_io_units, index); 1178 vol->info.allocated_io_units--; 1179 success = queue_enqueue(&vol->free_backing_blocks_queue, index); 1180 if (!success && index < vol->find_block_offset) { 1181 vol->find_block_offset = index; 1182 } 1183 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1184 } 1185 success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index); 1186 if (!success && chunk_map_index < vol->find_chunk_offset) { 1187 vol->find_chunk_offset = chunk_map_index; 1188 } 1189 spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index); 1190 } 1191 1192 static void 1193 _write_write_done(void *_req, int reduce_errno) 1194 { 1195 struct spdk_reduce_vol_request *req = _req; 1196 struct spdk_reduce_vol *vol = req->vol; 1197 uint64_t old_chunk_map_index; 1198 1199 if (reduce_errno != 0) { 1200 req->reduce_errno = reduce_errno; 1201 } 1202 1203 assert(req->num_backing_ops > 0); 1204 if (--req->num_backing_ops > 0) { 1205 return; 1206 } 1207 1208 if (req->reduce_errno != 0) { 1209 _reduce_vol_reset_chunk(vol, req->chunk_map_index); 1210 _reduce_vol_complete_req(req, req->reduce_errno); 1211 return; 1212 } 1213 1214 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1215 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1216 _reduce_vol_reset_chunk(vol, old_chunk_map_index); 1217 } 1218 1219 /* 1220 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1221 * becomes invalid after we update the logical map, since the old chunk map will no 1222 * longer have a reference to it in the logical map. 1223 */ 1224 1225 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1226 _reduce_persist(vol, req->chunk, 1227 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1228 1229 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1230 1231 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1232 1233 _reduce_vol_complete_req(req, 0); 1234 } 1235 1236 static struct spdk_reduce_backing_io * 1237 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index) 1238 { 1239 struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev; 1240 struct spdk_reduce_backing_io *backing_io; 1241 1242 backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io + 1243 (sizeof(*backing_io) + backing_dev->user_ctx_size) * index); 1244 1245 return backing_io; 1246 1247 } 1248 1249 struct reduce_merged_io_desc { 1250 uint64_t io_unit_index; 1251 uint32_t num_io_units; 1252 }; 1253 1254 static void 1255 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1256 reduce_request_fn next_fn, bool is_write) 1257 { 1258 struct iovec *iov; 1259 struct spdk_reduce_backing_io *backing_io; 1260 uint8_t *buf; 1261 uint32_t i; 1262 1263 if (req->chunk_is_compressed) { 1264 iov = req->comp_buf_iov; 1265 buf = req->comp_buf; 1266 } else { 1267 iov = req->decomp_buf_iov; 1268 buf = req->decomp_buf; 1269 } 1270 1271 req->num_backing_ops = req->num_io_units; 1272 req->backing_cb_args.cb_fn = next_fn; 1273 req->backing_cb_args.cb_arg = req; 1274 for (i = 0; i < req->num_io_units; i++) { 1275 backing_io = _reduce_vol_req_get_backing_io(req, i); 1276 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1277 iov[i].iov_len = vol->params.backing_io_unit_size; 1278 backing_io->dev = vol->backing_dev; 1279 backing_io->iov = &iov[i]; 1280 backing_io->iovcnt = 1; 1281 backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit; 1282 backing_io->lba_count = vol->backing_lba_per_io_unit; 1283 backing_io->backing_cb_args = &req->backing_cb_args; 1284 if (is_write) { 1285 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1286 } else { 1287 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1288 } 1289 vol->backing_dev->submit_backing_io(backing_io); 1290 } 1291 } 1292 1293 static void 1294 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1295 reduce_request_fn next_fn, bool is_write) 1296 { 1297 struct iovec *iov; 1298 struct spdk_reduce_backing_io *backing_io; 1299 struct reduce_merged_io_desc merged_io_desc[4]; 1300 uint8_t *buf; 1301 bool merge = false; 1302 uint32_t num_io = 0; 1303 uint32_t io_unit_counts = 0; 1304 uint32_t merged_io_idx = 0; 1305 uint32_t i; 1306 1307 /* The merged_io_desc value is defined here to contain four elements, 1308 * and the chunk size must be four times the maximum of the io unit. 1309 * if chunk size is too big, don't merge IO. 1310 */ 1311 if (vol->backing_io_units_per_chunk > 4) { 1312 _issue_backing_ops_without_merge(req, vol, next_fn, is_write); 1313 return; 1314 } 1315 1316 if (req->chunk_is_compressed) { 1317 iov = req->comp_buf_iov; 1318 buf = req->comp_buf; 1319 } else { 1320 iov = req->decomp_buf_iov; 1321 buf = req->decomp_buf; 1322 } 1323 1324 for (i = 0; i < req->num_io_units; i++) { 1325 if (!merge) { 1326 merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i]; 1327 merged_io_desc[merged_io_idx].num_io_units = 1; 1328 num_io++; 1329 } 1330 1331 if (i + 1 == req->num_io_units) { 1332 break; 1333 } 1334 1335 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) { 1336 merged_io_desc[merged_io_idx].num_io_units += 1; 1337 merge = true; 1338 continue; 1339 } 1340 merge = false; 1341 merged_io_idx++; 1342 } 1343 1344 req->num_backing_ops = num_io; 1345 req->backing_cb_args.cb_fn = next_fn; 1346 req->backing_cb_args.cb_arg = req; 1347 for (i = 0; i < num_io; i++) { 1348 backing_io = _reduce_vol_req_get_backing_io(req, i); 1349 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size; 1350 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units; 1351 backing_io->dev = vol->backing_dev; 1352 backing_io->iov = &iov[i]; 1353 backing_io->iovcnt = 1; 1354 backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit; 1355 backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units; 1356 backing_io->backing_cb_args = &req->backing_cb_args; 1357 if (is_write) { 1358 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1359 } else { 1360 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1361 } 1362 vol->backing_dev->submit_backing_io(backing_io); 1363 1364 /* Collects the number of processed I/O. */ 1365 io_unit_counts += merged_io_desc[i].num_io_units; 1366 } 1367 } 1368 1369 static void 1370 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1371 uint32_t compressed_size) 1372 { 1373 struct spdk_reduce_vol *vol = req->vol; 1374 uint32_t i; 1375 uint64_t chunk_offset, remainder, free_index, total_len = 0; 1376 uint8_t *buf; 1377 bool success; 1378 int j; 1379 1380 success = queue_dequeue(&vol->free_chunks_queue, &free_index); 1381 if (success) { 1382 req->chunk_map_index = free_index; 1383 } else { 1384 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 1385 vol->find_chunk_offset); 1386 vol->find_chunk_offset = req->chunk_map_index + 1; 1387 } 1388 1389 /* TODO: fail if no chunk map found - but really this should not happen if we 1390 * size the number of requests similarly to number of extra chunk maps 1391 */ 1392 assert(req->chunk_map_index != REDUCE_EMPTY_MAP_ENTRY); 1393 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1394 1395 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1396 req->num_io_units = spdk_divide_round_up(compressed_size, 1397 vol->params.backing_io_unit_size); 1398 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1399 req->chunk->compressed_size = 1400 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1401 1402 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1403 if (req->chunk_is_compressed == false) { 1404 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1405 buf = req->decomp_buf; 1406 total_len = chunk_offset * vol->params.logical_block_size; 1407 1408 /* zero any offset into chunk */ 1409 if (req->rmw == false && chunk_offset) { 1410 memset(buf, 0, total_len); 1411 } 1412 buf += total_len; 1413 1414 /* copy the data */ 1415 for (j = 0; j < req->iovcnt; j++) { 1416 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1417 buf += req->iov[j].iov_len; 1418 total_len += req->iov[j].iov_len; 1419 } 1420 1421 /* zero any remainder */ 1422 remainder = vol->params.chunk_size - total_len; 1423 total_len += remainder; 1424 if (req->rmw == false && remainder) { 1425 memset(buf, 0, remainder); 1426 } 1427 assert(total_len == vol->params.chunk_size); 1428 } 1429 1430 for (i = 0; i < req->num_io_units; i++) { 1431 success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index); 1432 if (success) { 1433 req->chunk->io_unit_index[i] = free_index; 1434 } else { 1435 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 1436 vol->find_block_offset); 1437 vol->find_block_offset = req->chunk->io_unit_index[i] + 1; 1438 } 1439 /* TODO: fail if no backing block found - but really this should also not 1440 * happen (see comment above). 1441 */ 1442 assert(req->chunk->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY); 1443 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1444 vol->info.allocated_io_units++; 1445 } 1446 1447 _issue_backing_ops(req, vol, next_fn, true /* write */); 1448 } 1449 1450 static void 1451 _write_compress_done(void *_req, int reduce_errno) 1452 { 1453 struct spdk_reduce_vol_request *req = _req; 1454 1455 /* Negative reduce_errno indicates failure for compression operations. 1456 * Just write the uncompressed data instead. Force this to happen 1457 * by just passing the full chunk size to _reduce_vol_write_chunk. 1458 * When it sees the data couldn't be compressed, it will just write 1459 * the uncompressed buffer to disk. 1460 */ 1461 if (reduce_errno < 0) { 1462 req->backing_cb_args.output_size = req->vol->params.chunk_size; 1463 } 1464 1465 _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size); 1466 } 1467 1468 static void 1469 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1470 { 1471 struct spdk_reduce_vol *vol = req->vol; 1472 1473 req->backing_cb_args.cb_fn = next_fn; 1474 req->backing_cb_args.cb_arg = req; 1475 req->comp_buf_iov[0].iov_base = req->comp_buf; 1476 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1477 vol->backing_dev->compress(vol->backing_dev, 1478 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1479 &req->backing_cb_args); 1480 } 1481 1482 static void 1483 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1484 { 1485 struct spdk_reduce_vol *vol = req->vol; 1486 1487 req->backing_cb_args.cb_fn = next_fn; 1488 req->backing_cb_args.cb_arg = req; 1489 req->comp_buf_iov[0].iov_base = req->comp_buf; 1490 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1491 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1492 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1493 vol->backing_dev->decompress(vol->backing_dev, 1494 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1495 &req->backing_cb_args); 1496 } 1497 1498 static void 1499 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1500 { 1501 struct spdk_reduce_vol *vol = req->vol; 1502 uint64_t chunk_offset, remainder = 0; 1503 uint64_t ttl_len = 0; 1504 size_t iov_len; 1505 int i; 1506 1507 req->decomp_iovcnt = 0; 1508 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1509 1510 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1511 * if at least one of the conditions below is true: 1512 * 1. User's buffer is fragmented 1513 * 2. Length of the user's buffer is less than the chunk 1514 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1515 iov_len = req->iov[0].iov_len; 1516 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1517 req->iov[0].iov_len < vol->params.chunk_size || 1518 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len)); 1519 if (req->copy_after_decompress) { 1520 req->decomp_iov[0].iov_base = req->decomp_buf; 1521 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1522 req->decomp_iovcnt = 1; 1523 goto decompress; 1524 } 1525 1526 if (chunk_offset) { 1527 /* first iov point to our scratch buffer for any offset into the chunk */ 1528 req->decomp_iov[0].iov_base = req->decomp_buf; 1529 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1530 ttl_len += req->decomp_iov[0].iov_len; 1531 req->decomp_iovcnt = 1; 1532 } 1533 1534 /* now the user data iov, direct to the user buffer */ 1535 for (i = 0; i < req->iovcnt; i++) { 1536 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1537 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1538 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1539 } 1540 req->decomp_iovcnt += req->iovcnt; 1541 1542 /* send the rest of the chunk to our scratch buffer */ 1543 remainder = vol->params.chunk_size - ttl_len; 1544 if (remainder) { 1545 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1546 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1547 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1548 req->decomp_iovcnt++; 1549 } 1550 assert(ttl_len == vol->params.chunk_size); 1551 1552 decompress: 1553 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1554 req->backing_cb_args.cb_fn = next_fn; 1555 req->backing_cb_args.cb_arg = req; 1556 req->comp_buf_iov[0].iov_base = req->comp_buf; 1557 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1558 vol->backing_dev->decompress(vol->backing_dev, 1559 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1560 &req->backing_cb_args); 1561 } 1562 1563 static inline void 1564 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1565 { 1566 struct spdk_reduce_vol *vol = req->vol; 1567 uint64_t chunk_offset, ttl_len = 0; 1568 uint64_t remainder = 0; 1569 char *copy_offset = NULL; 1570 uint32_t lbsize = vol->params.logical_block_size; 1571 int i; 1572 1573 req->decomp_iov[0].iov_base = req->decomp_buf; 1574 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1575 req->decomp_iovcnt = 1; 1576 copy_offset = req->decomp_iov[0].iov_base; 1577 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1578 1579 if (chunk_offset) { 1580 ttl_len += chunk_offset * lbsize; 1581 /* copy_offset already points to the correct buffer if zero_paddings=false */ 1582 if (zero_paddings) { 1583 memset(copy_offset, 0, ttl_len); 1584 } 1585 copy_offset += ttl_len; 1586 } 1587 1588 /* now the user data iov, direct from the user buffer */ 1589 for (i = 0; i < req->iovcnt; i++) { 1590 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1591 copy_offset += req->iov[i].iov_len; 1592 ttl_len += req->iov[i].iov_len; 1593 } 1594 1595 remainder = vol->params.chunk_size - ttl_len; 1596 if (remainder) { 1597 /* copy_offset already points to the correct buffer if zero_paddings=false */ 1598 if (zero_paddings) { 1599 memset(copy_offset, 0, remainder); 1600 } 1601 ttl_len += remainder; 1602 } 1603 1604 assert(ttl_len == req->vol->params.chunk_size); 1605 } 1606 1607 /* This function can be called when we are compressing a new data or in case of read-modify-write 1608 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1609 * should point to already read and decompressed buffer */ 1610 static inline void 1611 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1612 { 1613 struct spdk_reduce_vol *vol = req->vol; 1614 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1615 uint64_t chunk_offset, ttl_len = 0; 1616 uint64_t remainder = 0; 1617 uint32_t lbsize = vol->params.logical_block_size; 1618 size_t iov_len; 1619 int i; 1620 1621 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1622 * if at least one of the conditions below is true: 1623 * 1. User's buffer is fragmented 1624 * 2. Length of the user's buffer is less than the chunk 1625 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1626 iov_len = req->iov[0].iov_len; 1627 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1628 req->iov[0].iov_len < vol->params.chunk_size || 1629 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) { 1630 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1631 return; 1632 } 1633 1634 req->decomp_iovcnt = 0; 1635 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1636 1637 if (chunk_offset != 0) { 1638 ttl_len += chunk_offset * lbsize; 1639 req->decomp_iov[0].iov_base = padding_buffer; 1640 req->decomp_iov[0].iov_len = ttl_len; 1641 req->decomp_iovcnt = 1; 1642 } 1643 1644 /* now the user data iov, direct from the user buffer */ 1645 for (i = 0; i < req->iovcnt; i++) { 1646 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1647 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1648 ttl_len += req->iov[i].iov_len; 1649 } 1650 req->decomp_iovcnt += req->iovcnt; 1651 1652 remainder = vol->params.chunk_size - ttl_len; 1653 if (remainder) { 1654 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1655 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1656 req->decomp_iovcnt++; 1657 ttl_len += remainder; 1658 } 1659 assert(ttl_len == req->vol->params.chunk_size); 1660 } 1661 1662 static void 1663 _write_decompress_done(void *_req, int reduce_errno) 1664 { 1665 struct spdk_reduce_vol_request *req = _req; 1666 1667 /* Negative reduce_errno indicates failure for compression operations. */ 1668 if (reduce_errno < 0) { 1669 _reduce_vol_complete_req(req, reduce_errno); 1670 return; 1671 } 1672 1673 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1674 * represents the output_size. 1675 */ 1676 if (req->backing_cb_args.output_size != req->vol->params.chunk_size) { 1677 _reduce_vol_complete_req(req, -EIO); 1678 return; 1679 } 1680 1681 _prepare_compress_chunk(req, false); 1682 _reduce_vol_compress_chunk(req, _write_compress_done); 1683 } 1684 1685 static void 1686 _write_read_done(void *_req, int reduce_errno) 1687 { 1688 struct spdk_reduce_vol_request *req = _req; 1689 1690 if (reduce_errno != 0) { 1691 req->reduce_errno = reduce_errno; 1692 } 1693 1694 assert(req->num_backing_ops > 0); 1695 if (--req->num_backing_ops > 0) { 1696 return; 1697 } 1698 1699 if (req->reduce_errno != 0) { 1700 _reduce_vol_complete_req(req, req->reduce_errno); 1701 return; 1702 } 1703 1704 if (req->chunk_is_compressed) { 1705 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1706 } else { 1707 req->backing_cb_args.output_size = req->chunk->compressed_size; 1708 1709 _write_decompress_done(req, 0); 1710 } 1711 } 1712 1713 static void 1714 _read_decompress_done(void *_req, int reduce_errno) 1715 { 1716 struct spdk_reduce_vol_request *req = _req; 1717 struct spdk_reduce_vol *vol = req->vol; 1718 1719 /* Negative reduce_errno indicates failure for compression operations. */ 1720 if (reduce_errno < 0) { 1721 _reduce_vol_complete_req(req, reduce_errno); 1722 return; 1723 } 1724 1725 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1726 * represents the output_size. 1727 */ 1728 if (req->backing_cb_args.output_size != vol->params.chunk_size) { 1729 _reduce_vol_complete_req(req, -EIO); 1730 return; 1731 } 1732 1733 if (req->copy_after_decompress) { 1734 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1735 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1736 int i; 1737 1738 for (i = 0; i < req->iovcnt; i++) { 1739 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1740 decomp_buffer += req->iov[i].iov_len; 1741 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1742 } 1743 } 1744 1745 _reduce_vol_complete_req(req, 0); 1746 } 1747 1748 static void 1749 _read_read_done(void *_req, int reduce_errno) 1750 { 1751 struct spdk_reduce_vol_request *req = _req; 1752 1753 if (reduce_errno != 0) { 1754 req->reduce_errno = reduce_errno; 1755 } 1756 1757 assert(req->num_backing_ops > 0); 1758 if (--req->num_backing_ops > 0) { 1759 return; 1760 } 1761 1762 if (req->reduce_errno != 0) { 1763 _reduce_vol_complete_req(req, req->reduce_errno); 1764 return; 1765 } 1766 1767 if (req->chunk_is_compressed) { 1768 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1769 } else { 1770 1771 /* If the chunk was compressed, the data would have been sent to the 1772 * host buffers by the decompression operation, if not we need to memcpy 1773 * from req->decomp_buf. 1774 */ 1775 req->copy_after_decompress = true; 1776 req->backing_cb_args.output_size = req->chunk->compressed_size; 1777 1778 _read_decompress_done(req, 0); 1779 } 1780 } 1781 1782 static void 1783 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1784 { 1785 struct spdk_reduce_vol *vol = req->vol; 1786 1787 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1788 assert(req->chunk_map_index != REDUCE_EMPTY_MAP_ENTRY); 1789 1790 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1791 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1792 vol->params.backing_io_unit_size); 1793 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1794 1795 _issue_backing_ops(req, vol, next_fn, false /* read */); 1796 } 1797 1798 static bool 1799 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1800 uint64_t length) 1801 { 1802 uint64_t size = 0; 1803 int i; 1804 1805 if (iovcnt > REDUCE_MAX_IOVECS) { 1806 return false; 1807 } 1808 1809 for (i = 0; i < iovcnt; i++) { 1810 size += iov[i].iov_len; 1811 } 1812 1813 return size == (length * vol->params.logical_block_size); 1814 } 1815 1816 static bool 1817 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1818 { 1819 struct spdk_reduce_vol_request req; 1820 1821 req.logical_map_index = logical_map_index; 1822 1823 return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req)); 1824 } 1825 1826 static void 1827 _start_readv_request(struct spdk_reduce_vol_request *req) 1828 { 1829 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1830 _reduce_vol_read_chunk(req, _read_read_done); 1831 } 1832 1833 void 1834 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1835 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1836 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1837 { 1838 struct spdk_reduce_vol_request *req; 1839 uint64_t logical_map_index; 1840 bool overlapped; 1841 int i; 1842 1843 if (length == 0) { 1844 cb_fn(cb_arg, 0); 1845 return; 1846 } 1847 1848 if (_request_spans_chunk_boundary(vol, offset, length)) { 1849 cb_fn(cb_arg, -EINVAL); 1850 return; 1851 } 1852 1853 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1854 cb_fn(cb_arg, -EINVAL); 1855 return; 1856 } 1857 1858 logical_map_index = offset / vol->logical_blocks_per_chunk; 1859 overlapped = _check_overlap(vol, logical_map_index); 1860 1861 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1862 /* 1863 * This chunk hasn't been allocated. So treat the data as all 1864 * zeroes for this chunk - do the memset and immediately complete 1865 * the operation. 1866 */ 1867 for (i = 0; i < iovcnt; i++) { 1868 memset(iov[i].iov_base, 0, iov[i].iov_len); 1869 } 1870 cb_fn(cb_arg, 0); 1871 return; 1872 } 1873 1874 req = TAILQ_FIRST(&vol->free_requests); 1875 if (req == NULL) { 1876 cb_fn(cb_arg, -ENOMEM); 1877 return; 1878 } 1879 1880 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1881 req->type = REDUCE_IO_READV; 1882 req->vol = vol; 1883 req->iov = iov; 1884 req->iovcnt = iovcnt; 1885 req->offset = offset; 1886 req->logical_map_index = logical_map_index; 1887 req->length = length; 1888 req->copy_after_decompress = false; 1889 req->cb_fn = cb_fn; 1890 req->cb_arg = cb_arg; 1891 req->reduce_errno = 0; 1892 1893 if (!overlapped) { 1894 _start_readv_request(req); 1895 } else { 1896 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1897 } 1898 } 1899 1900 static void 1901 _start_writev_request(struct spdk_reduce_vol_request *req) 1902 { 1903 struct spdk_reduce_vol *vol = req->vol; 1904 1905 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1906 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1907 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1908 /* Read old chunk, then overwrite with data from this write 1909 * operation. 1910 */ 1911 req->rmw = true; 1912 _reduce_vol_read_chunk(req, _write_read_done); 1913 return; 1914 } 1915 } 1916 1917 req->rmw = false; 1918 1919 _prepare_compress_chunk(req, true); 1920 _reduce_vol_compress_chunk(req, _write_compress_done); 1921 } 1922 1923 void 1924 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1925 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1926 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1927 { 1928 struct spdk_reduce_vol_request *req; 1929 uint64_t logical_map_index; 1930 bool overlapped; 1931 1932 if (length == 0) { 1933 cb_fn(cb_arg, 0); 1934 return; 1935 } 1936 1937 if (_request_spans_chunk_boundary(vol, offset, length)) { 1938 cb_fn(cb_arg, -EINVAL); 1939 return; 1940 } 1941 1942 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1943 cb_fn(cb_arg, -EINVAL); 1944 return; 1945 } 1946 1947 logical_map_index = offset / vol->logical_blocks_per_chunk; 1948 overlapped = _check_overlap(vol, logical_map_index); 1949 1950 req = TAILQ_FIRST(&vol->free_requests); 1951 if (req == NULL) { 1952 cb_fn(cb_arg, -ENOMEM); 1953 return; 1954 } 1955 1956 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1957 req->type = REDUCE_IO_WRITEV; 1958 req->vol = vol; 1959 req->iov = iov; 1960 req->iovcnt = iovcnt; 1961 req->offset = offset; 1962 req->logical_map_index = logical_map_index; 1963 req->length = length; 1964 req->copy_after_decompress = false; 1965 req->cb_fn = cb_fn; 1966 req->cb_arg = cb_arg; 1967 req->reduce_errno = 0; 1968 1969 if (!overlapped) { 1970 _start_writev_request(req); 1971 } else { 1972 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1973 } 1974 } 1975 1976 static void 1977 _start_unmap_request_full_chunk(void *ctx) 1978 { 1979 struct spdk_reduce_vol_request *req = ctx; 1980 struct spdk_reduce_vol *vol = req->vol; 1981 uint64_t chunk_map_index; 1982 1983 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1984 1985 chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1986 if (chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1987 _reduce_vol_reset_chunk(vol, chunk_map_index); 1988 vol->pm_logical_map[req->logical_map_index] = REDUCE_EMPTY_MAP_ENTRY; 1989 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1990 } 1991 _reduce_vol_complete_req(req, 0); 1992 } 1993 1994 static void 1995 _reduce_vol_unmap_full_chunk(struct spdk_reduce_vol *vol, 1996 uint64_t offset, uint64_t length, 1997 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1998 { 1999 struct spdk_reduce_vol_request *req; 2000 uint64_t logical_map_index; 2001 bool overlapped; 2002 2003 if (_request_spans_chunk_boundary(vol, offset, length)) { 2004 cb_fn(cb_arg, -EINVAL); 2005 return; 2006 } 2007 2008 logical_map_index = offset / vol->logical_blocks_per_chunk; 2009 overlapped = _check_overlap(vol, logical_map_index); 2010 2011 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 2012 /* 2013 * This chunk hasn't been allocated. Nothing needs to be done. 2014 */ 2015 cb_fn(cb_arg, 0); 2016 return; 2017 } 2018 2019 req = TAILQ_FIRST(&vol->free_requests); 2020 if (req == NULL) { 2021 cb_fn(cb_arg, -ENOMEM); 2022 return; 2023 } 2024 2025 TAILQ_REMOVE(&vol->free_requests, req, tailq); 2026 req->type = REDUCE_IO_UNMAP; 2027 req->vol = vol; 2028 req->iov = NULL; 2029 req->iovcnt = 0; 2030 req->offset = offset; 2031 req->logical_map_index = logical_map_index; 2032 req->length = length; 2033 req->copy_after_decompress = false; 2034 req->cb_fn = cb_fn; 2035 req->cb_arg = cb_arg; 2036 req->reduce_errno = 0; 2037 2038 if (!overlapped) { 2039 _start_unmap_request_full_chunk(req); 2040 } else { 2041 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 2042 } 2043 } 2044 2045 struct unmap_partial_chunk_ctx { 2046 struct spdk_reduce_vol *vol; 2047 struct iovec iov; 2048 spdk_reduce_vol_op_complete cb_fn; 2049 void *cb_arg; 2050 }; 2051 2052 static void 2053 _reduce_unmap_partial_chunk_complete(void *_ctx, int reduce_errno) 2054 { 2055 struct unmap_partial_chunk_ctx *ctx = _ctx; 2056 2057 ctx->cb_fn(ctx->cb_arg, reduce_errno); 2058 free(ctx); 2059 } 2060 2061 static void 2062 _reduce_vol_unmap_partial_chunk(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length, 2063 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 2064 { 2065 struct unmap_partial_chunk_ctx *ctx; 2066 2067 ctx = calloc(1, sizeof(struct unmap_partial_chunk_ctx)); 2068 if (ctx == NULL) { 2069 cb_fn(cb_arg, -ENOMEM); 2070 return; 2071 } 2072 2073 ctx->vol = vol; 2074 ctx->iov.iov_base = g_zero_buf; 2075 ctx->iov.iov_len = length * vol->params.logical_block_size; 2076 ctx->cb_fn = cb_fn; 2077 ctx->cb_arg = cb_arg; 2078 2079 spdk_reduce_vol_writev(vol, &ctx->iov, 1, offset, length, _reduce_unmap_partial_chunk_complete, 2080 ctx); 2081 } 2082 2083 void 2084 spdk_reduce_vol_unmap(struct spdk_reduce_vol *vol, 2085 uint64_t offset, uint64_t length, 2086 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 2087 { 2088 if (length < vol->logical_blocks_per_chunk) { 2089 _reduce_vol_unmap_partial_chunk(vol, offset, length, cb_fn, cb_arg); 2090 } else if (length == vol->logical_blocks_per_chunk) { 2091 _reduce_vol_unmap_full_chunk(vol, offset, length, cb_fn, cb_arg); 2092 } else { 2093 cb_fn(cb_arg, -EINVAL); 2094 } 2095 } 2096 2097 const struct spdk_reduce_vol_params * 2098 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 2099 { 2100 return &vol->params; 2101 } 2102 2103 const char * 2104 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol) 2105 { 2106 return vol->pm_file.path; 2107 } 2108 2109 void 2110 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 2111 { 2112 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 2113 uint32_t struct_size; 2114 uint64_t chunk_map_size; 2115 2116 SPDK_NOTICELOG("vol info:\n"); 2117 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 2118 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 2119 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 2120 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 2121 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 2122 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 2123 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 2124 vol->params.vol_size / vol->params.chunk_size); 2125 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 2126 vol->params.backing_io_unit_size); 2127 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 2128 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 2129 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 2130 2131 SPDK_NOTICELOG("pmem info:\n"); 2132 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 2133 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 2134 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 2135 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 2136 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 2137 vol->params.chunk_size); 2138 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 2139 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 2140 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 2141 vol->params.backing_io_unit_size); 2142 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 2143 } 2144 2145 SPDK_LOG_REGISTER_COMPONENT(reduce) 2146