1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2018 Intel Corporation. 3 * All rights reserved. 4 * Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 5 */ 6 7 #include "spdk/stdinc.h" 8 9 #include "queue_internal.h" 10 11 #include "spdk/reduce.h" 12 #include "spdk/env.h" 13 #include "spdk/string.h" 14 #include "spdk/bit_array.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/memory.h" 18 #include "spdk/tree.h" 19 20 #include "libpmem.h" 21 22 /* Always round up the size of the PM region to the nearest cacheline. */ 23 #define REDUCE_PM_SIZE_ALIGNMENT 64 24 25 /* Offset into the backing device where the persistent memory file's path is stored. */ 26 #define REDUCE_BACKING_DEV_PATH_OFFSET 4096 27 28 #define REDUCE_EMPTY_MAP_ENTRY -1ULL 29 30 #define REDUCE_NUM_VOL_REQUESTS 256 31 32 /* Structure written to offset 0 of both the pm file and the backing device. */ 33 struct spdk_reduce_vol_superblock { 34 uint8_t signature[8]; 35 struct spdk_reduce_vol_params params; 36 uint8_t reserved[4040]; 37 }; 38 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); 39 40 #define SPDK_REDUCE_SIGNATURE "SPDKREDU" 41 /* null terminator counts one */ 42 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == 43 SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect"); 44 45 #define REDUCE_PATH_MAX 4096 46 47 #define REDUCE_ZERO_BUF_SIZE 0x100000 48 49 /** 50 * Describes a persistent memory file used to hold metadata associated with a 51 * compressed volume. 52 */ 53 struct spdk_reduce_pm_file { 54 char path[REDUCE_PATH_MAX]; 55 void *pm_buf; 56 int pm_is_pmem; 57 uint64_t size; 58 }; 59 60 #define REDUCE_IO_READV 1 61 #define REDUCE_IO_WRITEV 2 62 63 struct spdk_reduce_chunk_map { 64 uint32_t compressed_size; 65 uint32_t reserved; 66 uint64_t io_unit_index[0]; 67 }; 68 69 struct spdk_reduce_vol_request { 70 /** 71 * Scratch buffer used for uncompressed chunk. This is used for: 72 * 1) source buffer for compression operations 73 * 2) destination buffer for decompression operations 74 * 3) data buffer when writing uncompressed chunk to disk 75 * 4) data buffer when reading uncompressed chunk from disk 76 */ 77 uint8_t *decomp_buf; 78 struct iovec *decomp_buf_iov; 79 80 /** 81 * These are used to construct the iovecs that are sent to 82 * the decomp engine, they point to a mix of the scratch buffer 83 * and user buffer 84 */ 85 struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; 86 int decomp_iovcnt; 87 88 /** 89 * Scratch buffer used for compressed chunk. This is used for: 90 * 1) destination buffer for compression operations 91 * 2) source buffer for decompression operations 92 * 3) data buffer when writing compressed chunk to disk 93 * 4) data buffer when reading compressed chunk from disk 94 */ 95 uint8_t *comp_buf; 96 struct iovec *comp_buf_iov; 97 struct iovec *iov; 98 bool rmw; 99 struct spdk_reduce_vol *vol; 100 int type; 101 int reduce_errno; 102 int iovcnt; 103 int num_backing_ops; 104 uint32_t num_io_units; 105 struct spdk_reduce_backing_io *backing_io; 106 bool chunk_is_compressed; 107 bool copy_after_decompress; 108 uint64_t offset; 109 uint64_t logical_map_index; 110 uint64_t length; 111 uint64_t chunk_map_index; 112 struct spdk_reduce_chunk_map *chunk; 113 spdk_reduce_vol_op_complete cb_fn; 114 void *cb_arg; 115 TAILQ_ENTRY(spdk_reduce_vol_request) tailq; 116 RB_ENTRY(spdk_reduce_vol_request) rbnode; 117 struct spdk_reduce_vol_cb_args backing_cb_args; 118 }; 119 120 struct spdk_reduce_vol { 121 struct spdk_reduce_vol_params params; 122 uint32_t backing_io_units_per_chunk; 123 uint32_t backing_lba_per_io_unit; 124 uint32_t logical_blocks_per_chunk; 125 struct spdk_reduce_pm_file pm_file; 126 struct spdk_reduce_backing_dev *backing_dev; 127 struct spdk_reduce_vol_superblock *backing_super; 128 struct spdk_reduce_vol_superblock *pm_super; 129 uint64_t *pm_logical_map; 130 uint64_t *pm_chunk_maps; 131 132 struct spdk_bit_array *allocated_chunk_maps; 133 /* The starting position when looking for a block from allocated_chunk_maps */ 134 uint64_t find_chunk_offset; 135 /* Cache free chunks to speed up lookup of free chunk. */ 136 struct reduce_queue free_chunks_queue; 137 struct spdk_bit_array *allocated_backing_io_units; 138 /* The starting position when looking for a block from allocated_backing_io_units */ 139 uint64_t find_block_offset; 140 /* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */ 141 struct reduce_queue free_backing_blocks_queue; 142 143 struct spdk_reduce_vol_request *request_mem; 144 TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; 145 RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests; 146 TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; 147 148 /* Single contiguous buffer used for all request buffers for this volume. */ 149 uint8_t *buf_mem; 150 struct iovec *buf_iov_mem; 151 /* Single contiguous buffer used for backing io buffers for this volume. */ 152 uint8_t *buf_backing_io_mem; 153 }; 154 155 static void _start_readv_request(struct spdk_reduce_vol_request *req); 156 static void _start_writev_request(struct spdk_reduce_vol_request *req); 157 static uint8_t *g_zero_buf; 158 static int g_vol_count = 0; 159 160 /* 161 * Allocate extra metadata chunks and corresponding backing io units to account for 162 * outstanding IO in worst case scenario where logical map is completely allocated 163 * and no data can be compressed. We need extra chunks in this case to handle 164 * in-flight writes since reduce never writes data in place. 165 */ 166 #define REDUCE_NUM_EXTRA_CHUNKS 128 167 168 static void 169 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) 170 { 171 if (vol->pm_file.pm_is_pmem) { 172 pmem_persist(addr, len); 173 } else { 174 pmem_msync(addr, len); 175 } 176 } 177 178 static uint64_t 179 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) 180 { 181 uint64_t chunks_in_logical_map, logical_map_size; 182 183 chunks_in_logical_map = vol_size / chunk_size; 184 logical_map_size = chunks_in_logical_map * sizeof(uint64_t); 185 186 /* Round up to next cacheline. */ 187 return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * 188 REDUCE_PM_SIZE_ALIGNMENT; 189 } 190 191 static uint64_t 192 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size) 193 { 194 uint64_t num_chunks; 195 196 num_chunks = vol_size / chunk_size; 197 num_chunks += REDUCE_NUM_EXTRA_CHUNKS; 198 199 return num_chunks; 200 } 201 202 static inline uint32_t 203 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) 204 { 205 return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; 206 } 207 208 static uint64_t 209 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) 210 { 211 uint64_t io_units_per_chunk, num_chunks, total_chunks_size; 212 213 num_chunks = _get_total_chunks(vol_size, chunk_size); 214 io_units_per_chunk = chunk_size / backing_io_unit_size; 215 216 total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); 217 218 return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * 219 REDUCE_PM_SIZE_ALIGNMENT; 220 } 221 222 static struct spdk_reduce_chunk_map * 223 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 224 { 225 uintptr_t chunk_map_addr; 226 227 assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); 228 229 chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; 230 chunk_map_addr += chunk_map_index * 231 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 232 233 return (struct spdk_reduce_chunk_map *)chunk_map_addr; 234 } 235 236 static int 237 _validate_vol_params(struct spdk_reduce_vol_params *params) 238 { 239 if (params->vol_size > 0) { 240 /** 241 * User does not pass in the vol size - it gets calculated by libreduce from 242 * values in this structure plus the size of the backing device. 243 */ 244 return -EINVAL; 245 } 246 247 if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || 248 params->logical_block_size == 0) { 249 return -EINVAL; 250 } 251 252 /* Chunk size must be an even multiple of the backing io unit size. */ 253 if ((params->chunk_size % params->backing_io_unit_size) != 0) { 254 return -EINVAL; 255 } 256 257 /* Chunk size must be an even multiple of the logical block size. */ 258 if ((params->chunk_size % params->logical_block_size) != 0) { 259 return -1; 260 } 261 262 return 0; 263 } 264 265 static uint64_t 266 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) 267 { 268 uint64_t num_chunks; 269 270 num_chunks = backing_dev_size / chunk_size; 271 if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { 272 return 0; 273 } 274 275 num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; 276 return num_chunks * chunk_size; 277 } 278 279 static uint64_t 280 _get_pm_file_size(struct spdk_reduce_vol_params *params) 281 { 282 uint64_t total_pm_size; 283 284 total_pm_size = sizeof(struct spdk_reduce_vol_superblock); 285 total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); 286 total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, 287 params->backing_io_unit_size); 288 return total_pm_size; 289 } 290 291 const struct spdk_uuid * 292 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) 293 { 294 return &vol->params.uuid; 295 } 296 297 static void 298 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) 299 { 300 uint64_t logical_map_size; 301 302 /* Superblock is at the beginning of the pm file. */ 303 vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; 304 305 /* Logical map immediately follows the super block. */ 306 vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); 307 308 /* Chunks maps follow the logical map. */ 309 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); 310 vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); 311 } 312 313 /* We need 2 iovs during load - one for the superblock, another for the path */ 314 #define LOAD_IOV_COUNT 2 315 316 struct reduce_init_load_ctx { 317 struct spdk_reduce_vol *vol; 318 struct spdk_reduce_vol_cb_args backing_cb_args; 319 spdk_reduce_vol_op_with_handle_complete cb_fn; 320 void *cb_arg; 321 struct iovec iov[LOAD_IOV_COUNT]; 322 void *path; 323 struct spdk_reduce_backing_io *backing_io; 324 }; 325 326 static inline bool 327 _addr_crosses_huge_page(const void *addr, size_t *size) 328 { 329 size_t _size; 330 uint64_t rc; 331 332 assert(size); 333 334 _size = *size; 335 rc = spdk_vtophys(addr, size); 336 337 return rc == SPDK_VTOPHYS_ERROR || _size != *size; 338 } 339 340 static inline int 341 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size) 342 { 343 uint8_t *addr; 344 size_t size_tmp = buffer_size; 345 346 addr = *_addr; 347 348 /* Verify that addr + buffer_size doesn't cross huge page boundary */ 349 if (_addr_crosses_huge_page(addr, &size_tmp)) { 350 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page. 351 * Skip remaining bytes and continue from the beginning of the next page */ 352 addr += size_tmp; 353 } 354 355 if (addr + buffer_size > addr_range) { 356 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range); 357 return -ERANGE; 358 } 359 360 *vol_buffer = addr; 361 *_addr = addr + buffer_size; 362 363 return 0; 364 } 365 366 static int 367 _allocate_vol_requests(struct spdk_reduce_vol *vol) 368 { 369 struct spdk_reduce_vol_request *req; 370 struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev; 371 uint32_t reqs_in_2mb_page, huge_pages_needed; 372 uint8_t *buffer, *buffer_end; 373 int i = 0; 374 int rc = 0; 375 376 /* It is needed to allocate comp and decomp buffers so that they do not cross physical 377 * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not 378 * necessarily power of 2 379 * Allocate 2x since we need buffers for both read/write and compress/decompress 380 * intermediate buffers. */ 381 reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2); 382 if (!reqs_in_2mb_page) { 383 return -EINVAL; 384 } 385 huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page); 386 387 vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL); 388 if (vol->buf_mem == NULL) { 389 return -ENOMEM; 390 } 391 392 vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); 393 if (vol->request_mem == NULL) { 394 spdk_free(vol->buf_mem); 395 vol->buf_mem = NULL; 396 return -ENOMEM; 397 } 398 399 /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate 400 * buffers. 401 */ 402 vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, 403 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); 404 if (vol->buf_iov_mem == NULL) { 405 free(vol->request_mem); 406 spdk_free(vol->buf_mem); 407 vol->request_mem = NULL; 408 vol->buf_mem = NULL; 409 return -ENOMEM; 410 } 411 412 vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) + 413 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk); 414 if (vol->buf_backing_io_mem == NULL) { 415 free(vol->request_mem); 416 free(vol->buf_iov_mem); 417 spdk_free(vol->buf_mem); 418 vol->request_mem = NULL; 419 vol->buf_iov_mem = NULL; 420 vol->buf_mem = NULL; 421 return -ENOMEM; 422 } 423 424 buffer = vol->buf_mem; 425 buffer_end = buffer + VALUE_2MB * huge_pages_needed; 426 427 for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { 428 req = &vol->request_mem[i]; 429 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 430 req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i * 431 (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) * 432 vol->backing_io_units_per_chunk); 433 434 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; 435 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; 436 437 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size); 438 if (rc) { 439 SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 440 vol->buf_mem, buffer_end); 441 break; 442 } 443 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size); 444 if (rc) { 445 SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer, 446 vol->buf_mem, buffer_end); 447 break; 448 } 449 } 450 451 if (rc) { 452 free(vol->buf_backing_io_mem); 453 free(vol->buf_iov_mem); 454 free(vol->request_mem); 455 spdk_free(vol->buf_mem); 456 vol->buf_mem = NULL; 457 vol->buf_backing_io_mem = NULL; 458 vol->buf_iov_mem = NULL; 459 vol->request_mem = NULL; 460 } 461 462 return rc; 463 } 464 465 static void 466 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) 467 { 468 if (ctx != NULL) { 469 spdk_free(ctx->path); 470 free(ctx->backing_io); 471 free(ctx); 472 } 473 474 if (vol != NULL) { 475 if (vol->pm_file.pm_buf != NULL) { 476 pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); 477 } 478 479 spdk_free(vol->backing_super); 480 spdk_bit_array_free(&vol->allocated_chunk_maps); 481 spdk_bit_array_free(&vol->allocated_backing_io_units); 482 free(vol->request_mem); 483 free(vol->buf_backing_io_mem); 484 free(vol->buf_iov_mem); 485 spdk_free(vol->buf_mem); 486 free(vol); 487 } 488 } 489 490 static int 491 _alloc_zero_buff(void) 492 { 493 int rc = 0; 494 495 /* The zero buffer is shared between all volumes and just used 496 * for reads so allocate one global instance here if not already 497 * allocated when another vol init'd or loaded. 498 */ 499 if (g_vol_count++ == 0) { 500 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, 501 64, NULL, SPDK_ENV_LCORE_ID_ANY, 502 SPDK_MALLOC_DMA); 503 if (g_zero_buf == NULL) { 504 g_vol_count--; 505 rc = -ENOMEM; 506 } 507 } 508 return rc; 509 } 510 511 static void 512 _init_write_super_cpl(void *cb_arg, int reduce_errno) 513 { 514 struct reduce_init_load_ctx *init_ctx = cb_arg; 515 int rc; 516 517 rc = _allocate_vol_requests(init_ctx->vol); 518 if (rc != 0) { 519 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 520 _init_load_cleanup(init_ctx->vol, init_ctx); 521 return; 522 } 523 524 rc = _alloc_zero_buff(); 525 if (rc != 0) { 526 init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); 527 _init_load_cleanup(init_ctx->vol, init_ctx); 528 return; 529 } 530 531 init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); 532 /* Only clean up the ctx - the vol has been passed to the application 533 * for use now that initialization was successful. 534 */ 535 _init_load_cleanup(NULL, init_ctx); 536 } 537 538 static void 539 _init_write_path_cpl(void *cb_arg, int reduce_errno) 540 { 541 struct reduce_init_load_ctx *init_ctx = cb_arg; 542 struct spdk_reduce_vol *vol = init_ctx->vol; 543 struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io; 544 545 init_ctx->iov[0].iov_base = vol->backing_super; 546 init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 547 init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; 548 init_ctx->backing_cb_args.cb_arg = init_ctx; 549 550 backing_io->dev = vol->backing_dev; 551 backing_io->iov = init_ctx->iov; 552 backing_io->iovcnt = 1; 553 backing_io->lba = 0; 554 backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen; 555 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 556 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 557 558 vol->backing_dev->submit_backing_io(backing_io); 559 } 560 561 static int 562 _allocate_bit_arrays(struct spdk_reduce_vol *vol) 563 { 564 uint64_t total_chunks, total_backing_io_units; 565 uint32_t i, num_metadata_io_units; 566 567 total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 568 vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); 569 vol->find_chunk_offset = 0; 570 total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); 571 vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); 572 vol->find_block_offset = 0; 573 574 if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { 575 return -ENOMEM; 576 } 577 578 /* Set backing io unit bits associated with metadata. */ 579 num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 580 vol->params.backing_io_unit_size; 581 for (i = 0; i < num_metadata_io_units; i++) { 582 spdk_bit_array_set(vol->allocated_backing_io_units, i); 583 } 584 585 return 0; 586 } 587 588 static int 589 overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2) 590 { 591 return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index > 592 req2->logical_map_index); 593 } 594 RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp); 595 596 597 void 598 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, 599 struct spdk_reduce_backing_dev *backing_dev, 600 const char *pm_file_dir, 601 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 602 { 603 struct spdk_reduce_vol *vol; 604 struct reduce_init_load_ctx *init_ctx; 605 struct spdk_reduce_backing_io *backing_io; 606 uint64_t backing_dev_size; 607 size_t mapped_len; 608 int dir_len, max_dir_len, rc; 609 610 /* We need to append a path separator and the UUID to the supplied 611 * path. 612 */ 613 max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; 614 dir_len = strnlen(pm_file_dir, max_dir_len); 615 /* Strip trailing slash if the user provided one - we will add it back 616 * later when appending the filename. 617 */ 618 if (pm_file_dir[dir_len - 1] == '/') { 619 dir_len--; 620 } 621 if (dir_len == max_dir_len) { 622 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); 623 cb_fn(cb_arg, NULL, -EINVAL); 624 return; 625 } 626 627 rc = _validate_vol_params(params); 628 if (rc != 0) { 629 SPDK_ERRLOG("invalid vol params\n"); 630 cb_fn(cb_arg, NULL, rc); 631 return; 632 } 633 634 backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; 635 params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); 636 if (params->vol_size == 0) { 637 SPDK_ERRLOG("backing device is too small\n"); 638 cb_fn(cb_arg, NULL, -EINVAL); 639 return; 640 } 641 642 if (backing_dev->submit_backing_io == NULL) { 643 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 644 cb_fn(cb_arg, NULL, -EINVAL); 645 return; 646 } 647 648 vol = calloc(1, sizeof(*vol)); 649 if (vol == NULL) { 650 cb_fn(cb_arg, NULL, -ENOMEM); 651 return; 652 } 653 654 TAILQ_INIT(&vol->free_requests); 655 RB_INIT(&vol->executing_requests); 656 TAILQ_INIT(&vol->queued_requests); 657 queue_init(&vol->free_chunks_queue); 658 queue_init(&vol->free_backing_blocks_queue); 659 660 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, 661 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 662 if (vol->backing_super == NULL) { 663 cb_fn(cb_arg, NULL, -ENOMEM); 664 _init_load_cleanup(vol, NULL); 665 return; 666 } 667 668 init_ctx = calloc(1, sizeof(*init_ctx)); 669 if (init_ctx == NULL) { 670 cb_fn(cb_arg, NULL, -ENOMEM); 671 _init_load_cleanup(vol, NULL); 672 return; 673 } 674 675 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 676 if (backing_io == NULL) { 677 cb_fn(cb_arg, NULL, -ENOMEM); 678 _init_load_cleanup(vol, init_ctx); 679 return; 680 } 681 init_ctx->backing_io = backing_io; 682 683 init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, 684 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 685 if (init_ctx->path == NULL) { 686 cb_fn(cb_arg, NULL, -ENOMEM); 687 _init_load_cleanup(vol, init_ctx); 688 return; 689 } 690 691 if (spdk_uuid_is_null(¶ms->uuid)) { 692 spdk_uuid_generate(¶ms->uuid); 693 } 694 695 memcpy(vol->pm_file.path, pm_file_dir, dir_len); 696 vol->pm_file.path[dir_len] = '/'; 697 spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, 698 ¶ms->uuid); 699 vol->pm_file.size = _get_pm_file_size(params); 700 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, 701 PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, 702 &mapped_len, &vol->pm_file.pm_is_pmem); 703 if (vol->pm_file.pm_buf == NULL) { 704 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", 705 vol->pm_file.path, strerror(errno)); 706 cb_fn(cb_arg, NULL, -errno); 707 _init_load_cleanup(vol, init_ctx); 708 return; 709 } 710 711 if (vol->pm_file.size != mapped_len) { 712 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 713 vol->pm_file.size, mapped_len); 714 cb_fn(cb_arg, NULL, -ENOMEM); 715 _init_load_cleanup(vol, init_ctx); 716 return; 717 } 718 719 vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; 720 vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; 721 vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; 722 memcpy(&vol->params, params, sizeof(*params)); 723 724 vol->backing_dev = backing_dev; 725 726 rc = _allocate_bit_arrays(vol); 727 if (rc != 0) { 728 cb_fn(cb_arg, NULL, rc); 729 _init_load_cleanup(vol, init_ctx); 730 return; 731 } 732 733 memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, 734 sizeof(vol->backing_super->signature)); 735 memcpy(&vol->backing_super->params, params, sizeof(*params)); 736 737 _initialize_vol_pm_pointers(vol); 738 739 memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); 740 /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. 741 * Note that this writes 0xFF to not just the logical map but the chunk maps as well. 742 */ 743 memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); 744 _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); 745 746 init_ctx->vol = vol; 747 init_ctx->cb_fn = cb_fn; 748 init_ctx->cb_arg = cb_arg; 749 750 memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); 751 init_ctx->iov[0].iov_base = init_ctx->path; 752 init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; 753 init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; 754 init_ctx->backing_cb_args.cb_arg = init_ctx; 755 /* Write path to offset 4K on backing device - just after where the super 756 * block will be written. We wait until this is committed before writing the 757 * super block to guarantee we don't get the super block written without the 758 * the path if the system crashed in the middle of a write operation. 759 */ 760 backing_io->dev = vol->backing_dev; 761 backing_io->iov = init_ctx->iov; 762 backing_io->iovcnt = 1; 763 backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen; 764 backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen; 765 backing_io->backing_cb_args = &init_ctx->backing_cb_args; 766 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 767 768 vol->backing_dev->submit_backing_io(backing_io); 769 } 770 771 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); 772 773 static void 774 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) 775 { 776 struct reduce_init_load_ctx *load_ctx = cb_arg; 777 struct spdk_reduce_vol *vol = load_ctx->vol; 778 uint64_t backing_dev_size; 779 uint64_t i, num_chunks, logical_map_index; 780 struct spdk_reduce_chunk_map *chunk; 781 size_t mapped_len; 782 uint32_t j; 783 int rc; 784 785 if (reduce_errno != 0) { 786 rc = reduce_errno; 787 goto error; 788 } 789 790 rc = _alloc_zero_buff(); 791 if (rc) { 792 goto error; 793 } 794 795 if (memcmp(vol->backing_super->signature, 796 SPDK_REDUCE_SIGNATURE, 797 sizeof(vol->backing_super->signature)) != 0) { 798 /* This backing device isn't a libreduce backing device. */ 799 rc = -EILSEQ; 800 goto error; 801 } 802 803 /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. 804 * So don't bother getting the volume ready to use - invoke the callback immediately 805 * so destroy_load_cb can delete the metadata off of the block device and delete the 806 * persistent memory file if it exists. 807 */ 808 memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); 809 if (load_ctx->cb_fn == (*destroy_load_cb)) { 810 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 811 _init_load_cleanup(NULL, load_ctx); 812 return; 813 } 814 815 memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); 816 vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; 817 vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; 818 vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; 819 820 rc = _allocate_bit_arrays(vol); 821 if (rc != 0) { 822 goto error; 823 } 824 825 backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; 826 if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { 827 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", 828 backing_dev_size); 829 rc = -EILSEQ; 830 goto error; 831 } 832 833 vol->pm_file.size = _get_pm_file_size(&vol->params); 834 vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, 835 &vol->pm_file.pm_is_pmem); 836 if (vol->pm_file.pm_buf == NULL) { 837 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); 838 rc = -errno; 839 goto error; 840 } 841 842 if (vol->pm_file.size != mapped_len) { 843 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", 844 vol->pm_file.size, mapped_len); 845 rc = -ENOMEM; 846 goto error; 847 } 848 849 rc = _allocate_vol_requests(vol); 850 if (rc != 0) { 851 goto error; 852 } 853 854 _initialize_vol_pm_pointers(vol); 855 856 num_chunks = vol->params.vol_size / vol->params.chunk_size; 857 for (i = 0; i < num_chunks; i++) { 858 logical_map_index = vol->pm_logical_map[i]; 859 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { 860 continue; 861 } 862 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); 863 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); 864 for (j = 0; j < vol->backing_io_units_per_chunk; j++) { 865 if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { 866 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); 867 } 868 } 869 } 870 871 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); 872 /* Only clean up the ctx - the vol has been passed to the application 873 * for use now that volume load was successful. 874 */ 875 _init_load_cleanup(NULL, load_ctx); 876 return; 877 878 error: 879 load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); 880 _init_load_cleanup(vol, load_ctx); 881 } 882 883 void 884 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, 885 spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) 886 { 887 struct spdk_reduce_vol *vol; 888 struct reduce_init_load_ctx *load_ctx; 889 struct spdk_reduce_backing_io *backing_io; 890 891 if (backing_dev->submit_backing_io == NULL) { 892 SPDK_ERRLOG("backing_dev function pointer not specified\n"); 893 cb_fn(cb_arg, NULL, -EINVAL); 894 return; 895 } 896 897 vol = calloc(1, sizeof(*vol)); 898 if (vol == NULL) { 899 cb_fn(cb_arg, NULL, -ENOMEM); 900 return; 901 } 902 903 TAILQ_INIT(&vol->free_requests); 904 RB_INIT(&vol->executing_requests); 905 TAILQ_INIT(&vol->queued_requests); 906 queue_init(&vol->free_chunks_queue); 907 queue_init(&vol->free_backing_blocks_queue); 908 909 vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, 910 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 911 if (vol->backing_super == NULL) { 912 _init_load_cleanup(vol, NULL); 913 cb_fn(cb_arg, NULL, -ENOMEM); 914 return; 915 } 916 917 vol->backing_dev = backing_dev; 918 919 load_ctx = calloc(1, sizeof(*load_ctx)); 920 if (load_ctx == NULL) { 921 _init_load_cleanup(vol, NULL); 922 cb_fn(cb_arg, NULL, -ENOMEM); 923 return; 924 } 925 926 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 927 if (backing_io == NULL) { 928 _init_load_cleanup(vol, load_ctx); 929 cb_fn(cb_arg, NULL, -ENOMEM); 930 return; 931 } 932 933 load_ctx->backing_io = backing_io; 934 935 load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, 936 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 937 if (load_ctx->path == NULL) { 938 _init_load_cleanup(vol, load_ctx); 939 cb_fn(cb_arg, NULL, -ENOMEM); 940 return; 941 } 942 943 load_ctx->vol = vol; 944 load_ctx->cb_fn = cb_fn; 945 load_ctx->cb_arg = cb_arg; 946 947 load_ctx->iov[0].iov_base = vol->backing_super; 948 load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); 949 load_ctx->iov[1].iov_base = load_ctx->path; 950 load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; 951 backing_io->dev = vol->backing_dev; 952 backing_io->iov = load_ctx->iov; 953 backing_io->iovcnt = LOAD_IOV_COUNT; 954 backing_io->lba = 0; 955 backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / 956 vol->backing_dev->blocklen; 957 backing_io->backing_cb_args = &load_ctx->backing_cb_args; 958 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 959 960 load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; 961 load_ctx->backing_cb_args.cb_arg = load_ctx; 962 vol->backing_dev->submit_backing_io(backing_io); 963 } 964 965 void 966 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, 967 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 968 { 969 if (vol == NULL) { 970 /* This indicates a programming error. */ 971 assert(false); 972 cb_fn(cb_arg, -EINVAL); 973 return; 974 } 975 976 if (--g_vol_count == 0) { 977 spdk_free(g_zero_buf); 978 } 979 assert(g_vol_count >= 0); 980 _init_load_cleanup(vol, NULL); 981 cb_fn(cb_arg, 0); 982 } 983 984 struct reduce_destroy_ctx { 985 spdk_reduce_vol_op_complete cb_fn; 986 void *cb_arg; 987 struct spdk_reduce_vol *vol; 988 struct spdk_reduce_vol_superblock *super; 989 struct iovec iov; 990 struct spdk_reduce_vol_cb_args backing_cb_args; 991 int reduce_errno; 992 char pm_path[REDUCE_PATH_MAX]; 993 struct spdk_reduce_backing_io *backing_io; 994 }; 995 996 static void 997 destroy_unload_cpl(void *cb_arg, int reduce_errno) 998 { 999 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1000 1001 if (destroy_ctx->reduce_errno == 0) { 1002 if (unlink(destroy_ctx->pm_path)) { 1003 SPDK_ERRLOG("%s could not be unlinked: %s\n", 1004 destroy_ctx->pm_path, strerror(errno)); 1005 } 1006 } 1007 1008 /* Even if the unload somehow failed, we still pass the destroy_ctx 1009 * reduce_errno since that indicates whether or not the volume was 1010 * actually destroyed. 1011 */ 1012 destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); 1013 spdk_free(destroy_ctx->super); 1014 free(destroy_ctx->backing_io); 1015 free(destroy_ctx); 1016 } 1017 1018 static void 1019 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno) 1020 { 1021 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1022 struct spdk_reduce_vol *vol = destroy_ctx->vol; 1023 1024 destroy_ctx->reduce_errno = reduce_errno; 1025 spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); 1026 } 1027 1028 static void 1029 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) 1030 { 1031 struct reduce_destroy_ctx *destroy_ctx = cb_arg; 1032 struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io; 1033 1034 if (reduce_errno != 0) { 1035 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); 1036 spdk_free(destroy_ctx->super); 1037 free(destroy_ctx); 1038 return; 1039 } 1040 1041 destroy_ctx->vol = vol; 1042 memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); 1043 destroy_ctx->iov.iov_base = destroy_ctx->super; 1044 destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); 1045 destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; 1046 destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; 1047 1048 backing_io->dev = vol->backing_dev; 1049 backing_io->iov = &destroy_ctx->iov; 1050 backing_io->iovcnt = 1; 1051 backing_io->lba = 0; 1052 backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen; 1053 backing_io->backing_cb_args = &destroy_ctx->backing_cb_args; 1054 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1055 1056 vol->backing_dev->submit_backing_io(backing_io); 1057 } 1058 1059 void 1060 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, 1061 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1062 { 1063 struct reduce_destroy_ctx *destroy_ctx; 1064 struct spdk_reduce_backing_io *backing_io; 1065 1066 destroy_ctx = calloc(1, sizeof(*destroy_ctx)); 1067 if (destroy_ctx == NULL) { 1068 cb_fn(cb_arg, -ENOMEM); 1069 return; 1070 } 1071 1072 backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size); 1073 if (backing_io == NULL) { 1074 free(destroy_ctx); 1075 cb_fn(cb_arg, -ENOMEM); 1076 return; 1077 } 1078 1079 destroy_ctx->backing_io = backing_io; 1080 1081 destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, 1082 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 1083 if (destroy_ctx->super == NULL) { 1084 free(destroy_ctx); 1085 free(backing_io); 1086 cb_fn(cb_arg, -ENOMEM); 1087 return; 1088 } 1089 destroy_ctx->cb_fn = cb_fn; 1090 destroy_ctx->cb_arg = cb_arg; 1091 spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); 1092 } 1093 1094 static bool 1095 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) 1096 { 1097 uint64_t start_chunk, end_chunk; 1098 1099 start_chunk = offset / vol->logical_blocks_per_chunk; 1100 end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; 1101 1102 return (start_chunk != end_chunk); 1103 } 1104 1105 typedef void (*reduce_request_fn)(void *_req, int reduce_errno); 1106 1107 static void 1108 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) 1109 { 1110 struct spdk_reduce_vol_request *next_req; 1111 struct spdk_reduce_vol *vol = req->vol; 1112 1113 req->cb_fn(req->cb_arg, reduce_errno); 1114 RB_REMOVE(executing_req_tree, &vol->executing_requests, req); 1115 1116 TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { 1117 if (next_req->logical_map_index == req->logical_map_index) { 1118 TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); 1119 if (next_req->type == REDUCE_IO_READV) { 1120 _start_readv_request(next_req); 1121 } else { 1122 assert(next_req->type == REDUCE_IO_WRITEV); 1123 _start_writev_request(next_req); 1124 } 1125 break; 1126 } 1127 } 1128 1129 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); 1130 } 1131 1132 static void 1133 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) 1134 { 1135 struct spdk_reduce_chunk_map *chunk; 1136 uint64_t index; 1137 bool success; 1138 uint32_t i; 1139 1140 chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index); 1141 for (i = 0; i < vol->backing_io_units_per_chunk; i++) { 1142 index = chunk->io_unit_index[i]; 1143 if (index == REDUCE_EMPTY_MAP_ENTRY) { 1144 break; 1145 } 1146 assert(spdk_bit_array_get(vol->allocated_backing_io_units, 1147 index) == true); 1148 spdk_bit_array_clear(vol->allocated_backing_io_units, index); 1149 success = queue_enqueue(&vol->free_backing_blocks_queue, index); 1150 if (!success && index < vol->find_block_offset) { 1151 vol->find_block_offset = index; 1152 } 1153 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; 1154 } 1155 success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index); 1156 if (!success && chunk_map_index < vol->find_chunk_offset) { 1157 vol->find_chunk_offset = chunk_map_index; 1158 } 1159 spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index); 1160 } 1161 1162 static void 1163 _write_write_done(void *_req, int reduce_errno) 1164 { 1165 struct spdk_reduce_vol_request *req = _req; 1166 struct spdk_reduce_vol *vol = req->vol; 1167 uint64_t old_chunk_map_index; 1168 1169 if (reduce_errno != 0) { 1170 req->reduce_errno = reduce_errno; 1171 } 1172 1173 assert(req->num_backing_ops > 0); 1174 if (--req->num_backing_ops > 0) { 1175 return; 1176 } 1177 1178 if (req->reduce_errno != 0) { 1179 _reduce_vol_reset_chunk(vol, req->chunk_map_index); 1180 _reduce_vol_complete_req(req, req->reduce_errno); 1181 return; 1182 } 1183 1184 old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1185 if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { 1186 _reduce_vol_reset_chunk(vol, old_chunk_map_index); 1187 } 1188 1189 /* 1190 * We don't need to persist the clearing of the old chunk map here. The old chunk map 1191 * becomes invalid after we update the logical map, since the old chunk map will no 1192 * longer have a reference to it in the logical map. 1193 */ 1194 1195 /* Persist the new chunk map. This must be persisted before we update the logical map. */ 1196 _reduce_persist(vol, req->chunk, 1197 _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); 1198 1199 vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; 1200 1201 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); 1202 1203 _reduce_vol_complete_req(req, 0); 1204 } 1205 1206 static struct spdk_reduce_backing_io * 1207 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index) 1208 { 1209 struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev; 1210 struct spdk_reduce_backing_io *backing_io; 1211 1212 backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io + 1213 (sizeof(*backing_io) + backing_dev->user_ctx_size) * index); 1214 1215 return backing_io; 1216 1217 } 1218 1219 struct reduce_merged_io_desc { 1220 uint64_t io_unit_index; 1221 uint32_t num_io_units; 1222 }; 1223 1224 static void 1225 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1226 reduce_request_fn next_fn, bool is_write) 1227 { 1228 struct iovec *iov; 1229 struct spdk_reduce_backing_io *backing_io; 1230 uint8_t *buf; 1231 uint32_t i; 1232 1233 if (req->chunk_is_compressed) { 1234 iov = req->comp_buf_iov; 1235 buf = req->comp_buf; 1236 } else { 1237 iov = req->decomp_buf_iov; 1238 buf = req->decomp_buf; 1239 } 1240 1241 req->num_backing_ops = req->num_io_units; 1242 req->backing_cb_args.cb_fn = next_fn; 1243 req->backing_cb_args.cb_arg = req; 1244 for (i = 0; i < req->num_io_units; i++) { 1245 backing_io = _reduce_vol_req_get_backing_io(req, i); 1246 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; 1247 iov[i].iov_len = vol->params.backing_io_unit_size; 1248 backing_io->dev = vol->backing_dev; 1249 backing_io->iov = &iov[i]; 1250 backing_io->iovcnt = 1; 1251 backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit; 1252 backing_io->lba_count = vol->backing_lba_per_io_unit; 1253 backing_io->backing_cb_args = &req->backing_cb_args; 1254 if (is_write) { 1255 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1256 } else { 1257 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1258 } 1259 vol->backing_dev->submit_backing_io(backing_io); 1260 } 1261 } 1262 1263 static void 1264 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, 1265 reduce_request_fn next_fn, bool is_write) 1266 { 1267 struct iovec *iov; 1268 struct spdk_reduce_backing_io *backing_io; 1269 struct reduce_merged_io_desc merged_io_desc[4]; 1270 uint8_t *buf; 1271 bool merge = false; 1272 uint32_t num_io = 0; 1273 uint32_t io_unit_counts = 0; 1274 uint32_t merged_io_idx = 0; 1275 uint32_t i; 1276 1277 /* The merged_io_desc value is defined here to contain four elements, 1278 * and the chunk size must be four times the maximum of the io unit. 1279 * if chunk size is too big, don't merge IO. 1280 */ 1281 if (vol->backing_io_units_per_chunk > 4) { 1282 _issue_backing_ops_without_merge(req, vol, next_fn, is_write); 1283 return; 1284 } 1285 1286 if (req->chunk_is_compressed) { 1287 iov = req->comp_buf_iov; 1288 buf = req->comp_buf; 1289 } else { 1290 iov = req->decomp_buf_iov; 1291 buf = req->decomp_buf; 1292 } 1293 1294 for (i = 0; i < req->num_io_units; i++) { 1295 if (!merge) { 1296 merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i]; 1297 merged_io_desc[merged_io_idx].num_io_units = 1; 1298 num_io++; 1299 } 1300 1301 if (i + 1 == req->num_io_units) { 1302 break; 1303 } 1304 1305 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) { 1306 merged_io_desc[merged_io_idx].num_io_units += 1; 1307 merge = true; 1308 continue; 1309 } 1310 merge = false; 1311 merged_io_idx++; 1312 } 1313 1314 req->num_backing_ops = num_io; 1315 req->backing_cb_args.cb_fn = next_fn; 1316 req->backing_cb_args.cb_arg = req; 1317 for (i = 0; i < num_io; i++) { 1318 backing_io = _reduce_vol_req_get_backing_io(req, i); 1319 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size; 1320 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units; 1321 backing_io->dev = vol->backing_dev; 1322 backing_io->iov = &iov[i]; 1323 backing_io->iovcnt = 1; 1324 backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit; 1325 backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units; 1326 backing_io->backing_cb_args = &req->backing_cb_args; 1327 if (is_write) { 1328 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE; 1329 } else { 1330 backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ; 1331 } 1332 vol->backing_dev->submit_backing_io(backing_io); 1333 1334 /* Collects the number of processed I/O. */ 1335 io_unit_counts += merged_io_desc[i].num_io_units; 1336 } 1337 } 1338 1339 static void 1340 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, 1341 uint32_t compressed_size) 1342 { 1343 struct spdk_reduce_vol *vol = req->vol; 1344 uint32_t i; 1345 uint64_t chunk_offset, remainder, free_index, total_len = 0; 1346 uint8_t *buf; 1347 bool success; 1348 int j; 1349 1350 success = queue_dequeue(&vol->free_chunks_queue, &free_index); 1351 if (success) { 1352 req->chunk_map_index = free_index; 1353 } else { 1354 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 1355 vol->find_chunk_offset); 1356 vol->find_chunk_offset = req->chunk_map_index + 1; 1357 } 1358 1359 /* TODO: fail if no chunk map found - but really this should not happen if we 1360 * size the number of requests similarly to number of extra chunk maps 1361 */ 1362 assert(req->chunk_map_index != UINT32_MAX); 1363 spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); 1364 1365 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1366 req->num_io_units = spdk_divide_round_up(compressed_size, 1367 vol->params.backing_io_unit_size); 1368 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1369 req->chunk->compressed_size = 1370 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; 1371 1372 /* if the chunk is uncompressed we need to copy the data from the host buffers. */ 1373 if (req->chunk_is_compressed == false) { 1374 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1375 buf = req->decomp_buf; 1376 total_len = chunk_offset * vol->params.logical_block_size; 1377 1378 /* zero any offset into chunk */ 1379 if (req->rmw == false && chunk_offset) { 1380 memset(buf, 0, total_len); 1381 } 1382 buf += total_len; 1383 1384 /* copy the data */ 1385 for (j = 0; j < req->iovcnt; j++) { 1386 memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); 1387 buf += req->iov[j].iov_len; 1388 total_len += req->iov[j].iov_len; 1389 } 1390 1391 /* zero any remainder */ 1392 remainder = vol->params.chunk_size - total_len; 1393 total_len += remainder; 1394 if (req->rmw == false && remainder) { 1395 memset(buf, 0, remainder); 1396 } 1397 assert(total_len == vol->params.chunk_size); 1398 } 1399 1400 for (i = 0; i < req->num_io_units; i++) { 1401 success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index); 1402 if (success) { 1403 req->chunk->io_unit_index[i] = free_index; 1404 } else { 1405 req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 1406 vol->find_block_offset); 1407 vol->find_block_offset = req->chunk->io_unit_index[i] + 1; 1408 } 1409 /* TODO: fail if no backing block found - but really this should also not 1410 * happen (see comment above). 1411 */ 1412 assert(req->chunk->io_unit_index[i] != UINT32_MAX); 1413 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); 1414 } 1415 1416 _issue_backing_ops(req, vol, next_fn, true /* write */); 1417 } 1418 1419 static void 1420 _write_compress_done(void *_req, int reduce_errno) 1421 { 1422 struct spdk_reduce_vol_request *req = _req; 1423 1424 /* Negative reduce_errno indicates failure for compression operations. 1425 * Just write the uncompressed data instead. Force this to happen 1426 * by just passing the full chunk size to _reduce_vol_write_chunk. 1427 * When it sees the data couldn't be compressed, it will just write 1428 * the uncompressed buffer to disk. 1429 */ 1430 if (reduce_errno < 0) { 1431 req->backing_cb_args.output_size = req->vol->params.chunk_size; 1432 } 1433 1434 _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size); 1435 } 1436 1437 static void 1438 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1439 { 1440 struct spdk_reduce_vol *vol = req->vol; 1441 1442 req->backing_cb_args.cb_fn = next_fn; 1443 req->backing_cb_args.cb_arg = req; 1444 req->comp_buf_iov[0].iov_base = req->comp_buf; 1445 req->comp_buf_iov[0].iov_len = vol->params.chunk_size; 1446 vol->backing_dev->compress(vol->backing_dev, 1447 req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1, 1448 &req->backing_cb_args); 1449 } 1450 1451 static void 1452 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1453 { 1454 struct spdk_reduce_vol *vol = req->vol; 1455 1456 req->backing_cb_args.cb_fn = next_fn; 1457 req->backing_cb_args.cb_arg = req; 1458 req->comp_buf_iov[0].iov_base = req->comp_buf; 1459 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1460 req->decomp_buf_iov[0].iov_base = req->decomp_buf; 1461 req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; 1462 vol->backing_dev->decompress(vol->backing_dev, 1463 req->comp_buf_iov, 1, req->decomp_buf_iov, 1, 1464 &req->backing_cb_args); 1465 } 1466 1467 static void 1468 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1469 { 1470 struct spdk_reduce_vol *vol = req->vol; 1471 uint64_t chunk_offset, remainder = 0; 1472 uint64_t ttl_len = 0; 1473 size_t iov_len; 1474 int i; 1475 1476 req->decomp_iovcnt = 0; 1477 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1478 1479 /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer 1480 * if at least one of the conditions below is true: 1481 * 1. User's buffer is fragmented 1482 * 2. Length of the user's buffer is less than the chunk 1483 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1484 iov_len = req->iov[0].iov_len; 1485 req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 || 1486 req->iov[0].iov_len < vol->params.chunk_size || 1487 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len)); 1488 if (req->copy_after_decompress) { 1489 req->decomp_iov[0].iov_base = req->decomp_buf; 1490 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1491 req->decomp_iovcnt = 1; 1492 goto decompress; 1493 } 1494 1495 if (chunk_offset) { 1496 /* first iov point to our scratch buffer for any offset into the chunk */ 1497 req->decomp_iov[0].iov_base = req->decomp_buf; 1498 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; 1499 ttl_len += req->decomp_iov[0].iov_len; 1500 req->decomp_iovcnt = 1; 1501 } 1502 1503 /* now the user data iov, direct to the user buffer */ 1504 for (i = 0; i < req->iovcnt; i++) { 1505 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1506 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1507 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; 1508 } 1509 req->decomp_iovcnt += req->iovcnt; 1510 1511 /* send the rest of the chunk to our scratch buffer */ 1512 remainder = vol->params.chunk_size - ttl_len; 1513 if (remainder) { 1514 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; 1515 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1516 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; 1517 req->decomp_iovcnt++; 1518 } 1519 assert(ttl_len == vol->params.chunk_size); 1520 1521 decompress: 1522 assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1)); 1523 req->backing_cb_args.cb_fn = next_fn; 1524 req->backing_cb_args.cb_arg = req; 1525 req->comp_buf_iov[0].iov_base = req->comp_buf; 1526 req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; 1527 vol->backing_dev->decompress(vol->backing_dev, 1528 req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt, 1529 &req->backing_cb_args); 1530 } 1531 1532 static inline void 1533 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings) 1534 { 1535 struct spdk_reduce_vol *vol = req->vol; 1536 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1537 uint64_t chunk_offset, ttl_len = 0; 1538 uint64_t remainder = 0; 1539 char *copy_offset = NULL; 1540 uint32_t lbsize = vol->params.logical_block_size; 1541 int i; 1542 1543 req->decomp_iov[0].iov_base = req->decomp_buf; 1544 req->decomp_iov[0].iov_len = vol->params.chunk_size; 1545 req->decomp_iovcnt = 1; 1546 copy_offset = req->decomp_iov[0].iov_base; 1547 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1548 1549 if (chunk_offset) { 1550 ttl_len += chunk_offset * lbsize; 1551 /* copy_offset already points to padding buffer if zero_paddings=false */ 1552 if (zero_paddings) { 1553 memcpy(copy_offset, padding_buffer, ttl_len); 1554 } 1555 copy_offset += ttl_len; 1556 } 1557 1558 /* now the user data iov, direct from the user buffer */ 1559 for (i = 0; i < req->iovcnt; i++) { 1560 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len); 1561 copy_offset += req->iov[i].iov_len; 1562 ttl_len += req->iov[i].iov_len; 1563 } 1564 1565 remainder = vol->params.chunk_size - ttl_len; 1566 if (remainder) { 1567 /* copy_offset already points to padding buffer if zero_paddings=false */ 1568 if (zero_paddings) { 1569 memcpy(copy_offset, padding_buffer + ttl_len, remainder); 1570 } 1571 ttl_len += remainder; 1572 } 1573 1574 assert(ttl_len == req->vol->params.chunk_size); 1575 } 1576 1577 /* This function can be called when we are compressing a new data or in case of read-modify-write 1578 * In the first case possible paddings should be filled with zeroes, in the second case the paddings 1579 * should point to already read and decompressed buffer */ 1580 static inline void 1581 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings) 1582 { 1583 struct spdk_reduce_vol *vol = req->vol; 1584 char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf; 1585 uint64_t chunk_offset, ttl_len = 0; 1586 uint64_t remainder = 0; 1587 uint32_t lbsize = vol->params.logical_block_size; 1588 size_t iov_len; 1589 int i; 1590 1591 /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf 1592 * if at least one of the conditions below is true: 1593 * 1. User's buffer is fragmented 1594 * 2. Length of the user's buffer is less than the chunk 1595 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */ 1596 iov_len = req->iov[0].iov_len; 1597 if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 || 1598 req->iov[0].iov_len < vol->params.chunk_size || 1599 _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) { 1600 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings); 1601 return; 1602 } 1603 1604 req->decomp_iovcnt = 0; 1605 chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1606 1607 if (chunk_offset != 0) { 1608 ttl_len += chunk_offset * lbsize; 1609 req->decomp_iov[0].iov_base = padding_buffer; 1610 req->decomp_iov[0].iov_len = ttl_len; 1611 req->decomp_iovcnt = 1; 1612 } 1613 1614 /* now the user data iov, direct from the user buffer */ 1615 for (i = 0; i < req->iovcnt; i++) { 1616 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; 1617 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; 1618 ttl_len += req->iov[i].iov_len; 1619 } 1620 req->decomp_iovcnt += req->iovcnt; 1621 1622 remainder = vol->params.chunk_size - ttl_len; 1623 if (remainder) { 1624 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len; 1625 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; 1626 req->decomp_iovcnt++; 1627 ttl_len += remainder; 1628 } 1629 assert(ttl_len == req->vol->params.chunk_size); 1630 } 1631 1632 static void 1633 _write_decompress_done(void *_req, int reduce_errno) 1634 { 1635 struct spdk_reduce_vol_request *req = _req; 1636 1637 /* Negative reduce_errno indicates failure for compression operations. */ 1638 if (reduce_errno < 0) { 1639 _reduce_vol_complete_req(req, reduce_errno); 1640 return; 1641 } 1642 1643 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1644 * represents the output_size. 1645 */ 1646 if (req->backing_cb_args.output_size != req->vol->params.chunk_size) { 1647 _reduce_vol_complete_req(req, -EIO); 1648 return; 1649 } 1650 1651 _prepare_compress_chunk(req, false); 1652 _reduce_vol_compress_chunk(req, _write_compress_done); 1653 } 1654 1655 static void 1656 _write_read_done(void *_req, int reduce_errno) 1657 { 1658 struct spdk_reduce_vol_request *req = _req; 1659 1660 if (reduce_errno != 0) { 1661 req->reduce_errno = reduce_errno; 1662 } 1663 1664 assert(req->num_backing_ops > 0); 1665 if (--req->num_backing_ops > 0) { 1666 return; 1667 } 1668 1669 if (req->reduce_errno != 0) { 1670 _reduce_vol_complete_req(req, req->reduce_errno); 1671 return; 1672 } 1673 1674 if (req->chunk_is_compressed) { 1675 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); 1676 } else { 1677 req->backing_cb_args.output_size = req->chunk->compressed_size; 1678 1679 _write_decompress_done(req, 0); 1680 } 1681 } 1682 1683 static void 1684 _read_decompress_done(void *_req, int reduce_errno) 1685 { 1686 struct spdk_reduce_vol_request *req = _req; 1687 struct spdk_reduce_vol *vol = req->vol; 1688 1689 /* Negative reduce_errno indicates failure for compression operations. */ 1690 if (reduce_errno < 0) { 1691 _reduce_vol_complete_req(req, reduce_errno); 1692 return; 1693 } 1694 1695 /* Positive reduce_errno indicates that the output size field in the backing_cb_args 1696 * represents the output_size. 1697 */ 1698 if (req->backing_cb_args.output_size != vol->params.chunk_size) { 1699 _reduce_vol_complete_req(req, -EIO); 1700 return; 1701 } 1702 1703 if (req->copy_after_decompress) { 1704 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk; 1705 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size; 1706 int i; 1707 1708 for (i = 0; i < req->iovcnt; i++) { 1709 memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len); 1710 decomp_buffer += req->iov[i].iov_len; 1711 assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size); 1712 } 1713 } 1714 1715 _reduce_vol_complete_req(req, 0); 1716 } 1717 1718 static void 1719 _read_read_done(void *_req, int reduce_errno) 1720 { 1721 struct spdk_reduce_vol_request *req = _req; 1722 uint64_t chunk_offset; 1723 uint8_t *buf; 1724 int i; 1725 1726 if (reduce_errno != 0) { 1727 req->reduce_errno = reduce_errno; 1728 } 1729 1730 assert(req->num_backing_ops > 0); 1731 if (--req->num_backing_ops > 0) { 1732 return; 1733 } 1734 1735 if (req->reduce_errno != 0) { 1736 _reduce_vol_complete_req(req, req->reduce_errno); 1737 return; 1738 } 1739 1740 if (req->chunk_is_compressed) { 1741 _reduce_vol_decompress_chunk(req, _read_decompress_done); 1742 } else { 1743 1744 /* If the chunk was compressed, the data would have been sent to the 1745 * host buffers by the decompression operation, if not we need to memcpy here. 1746 */ 1747 chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; 1748 buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; 1749 for (i = 0; i < req->iovcnt; i++) { 1750 memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); 1751 buf += req->iov[i].iov_len; 1752 } 1753 1754 req->backing_cb_args.output_size = req->chunk->compressed_size; 1755 1756 _read_decompress_done(req, 0); 1757 } 1758 } 1759 1760 static void 1761 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) 1762 { 1763 struct spdk_reduce_vol *vol = req->vol; 1764 1765 req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; 1766 assert(req->chunk_map_index != UINT32_MAX); 1767 1768 req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); 1769 req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, 1770 vol->params.backing_io_unit_size); 1771 req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); 1772 1773 _issue_backing_ops(req, vol, next_fn, false /* read */); 1774 } 1775 1776 static bool 1777 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, 1778 uint64_t length) 1779 { 1780 uint64_t size = 0; 1781 int i; 1782 1783 if (iovcnt > REDUCE_MAX_IOVECS) { 1784 return false; 1785 } 1786 1787 for (i = 0; i < iovcnt; i++) { 1788 size += iov[i].iov_len; 1789 } 1790 1791 return size == (length * vol->params.logical_block_size); 1792 } 1793 1794 static bool 1795 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) 1796 { 1797 struct spdk_reduce_vol_request req; 1798 1799 req.logical_map_index = logical_map_index; 1800 1801 return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req)); 1802 } 1803 1804 static void 1805 _start_readv_request(struct spdk_reduce_vol_request *req) 1806 { 1807 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1808 _reduce_vol_read_chunk(req, _read_read_done); 1809 } 1810 1811 void 1812 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, 1813 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1814 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1815 { 1816 struct spdk_reduce_vol_request *req; 1817 uint64_t logical_map_index; 1818 bool overlapped; 1819 int i; 1820 1821 if (length == 0) { 1822 cb_fn(cb_arg, 0); 1823 return; 1824 } 1825 1826 if (_request_spans_chunk_boundary(vol, offset, length)) { 1827 cb_fn(cb_arg, -EINVAL); 1828 return; 1829 } 1830 1831 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1832 cb_fn(cb_arg, -EINVAL); 1833 return; 1834 } 1835 1836 logical_map_index = offset / vol->logical_blocks_per_chunk; 1837 overlapped = _check_overlap(vol, logical_map_index); 1838 1839 if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { 1840 /* 1841 * This chunk hasn't been allocated. So treat the data as all 1842 * zeroes for this chunk - do the memset and immediately complete 1843 * the operation. 1844 */ 1845 for (i = 0; i < iovcnt; i++) { 1846 memset(iov[i].iov_base, 0, iov[i].iov_len); 1847 } 1848 cb_fn(cb_arg, 0); 1849 return; 1850 } 1851 1852 req = TAILQ_FIRST(&vol->free_requests); 1853 if (req == NULL) { 1854 cb_fn(cb_arg, -ENOMEM); 1855 return; 1856 } 1857 1858 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1859 req->type = REDUCE_IO_READV; 1860 req->vol = vol; 1861 req->iov = iov; 1862 req->iovcnt = iovcnt; 1863 req->offset = offset; 1864 req->logical_map_index = logical_map_index; 1865 req->length = length; 1866 req->copy_after_decompress = false; 1867 req->cb_fn = cb_fn; 1868 req->cb_arg = cb_arg; 1869 1870 if (!overlapped) { 1871 _start_readv_request(req); 1872 } else { 1873 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1874 } 1875 } 1876 1877 static void 1878 _start_writev_request(struct spdk_reduce_vol_request *req) 1879 { 1880 struct spdk_reduce_vol *vol = req->vol; 1881 1882 RB_INSERT(executing_req_tree, &req->vol->executing_requests, req); 1883 if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { 1884 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { 1885 /* Read old chunk, then overwrite with data from this write 1886 * operation. 1887 */ 1888 req->rmw = true; 1889 _reduce_vol_read_chunk(req, _write_read_done); 1890 return; 1891 } 1892 } 1893 1894 req->rmw = false; 1895 1896 _prepare_compress_chunk(req, true); 1897 _reduce_vol_compress_chunk(req, _write_compress_done); 1898 } 1899 1900 void 1901 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, 1902 struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, 1903 spdk_reduce_vol_op_complete cb_fn, void *cb_arg) 1904 { 1905 struct spdk_reduce_vol_request *req; 1906 uint64_t logical_map_index; 1907 bool overlapped; 1908 1909 if (length == 0) { 1910 cb_fn(cb_arg, 0); 1911 return; 1912 } 1913 1914 if (_request_spans_chunk_boundary(vol, offset, length)) { 1915 cb_fn(cb_arg, -EINVAL); 1916 return; 1917 } 1918 1919 if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { 1920 cb_fn(cb_arg, -EINVAL); 1921 return; 1922 } 1923 1924 logical_map_index = offset / vol->logical_blocks_per_chunk; 1925 overlapped = _check_overlap(vol, logical_map_index); 1926 1927 req = TAILQ_FIRST(&vol->free_requests); 1928 if (req == NULL) { 1929 cb_fn(cb_arg, -ENOMEM); 1930 return; 1931 } 1932 1933 TAILQ_REMOVE(&vol->free_requests, req, tailq); 1934 req->type = REDUCE_IO_WRITEV; 1935 req->vol = vol; 1936 req->iov = iov; 1937 req->iovcnt = iovcnt; 1938 req->offset = offset; 1939 req->logical_map_index = logical_map_index; 1940 req->length = length; 1941 req->copy_after_decompress = false; 1942 req->cb_fn = cb_fn; 1943 req->cb_arg = cb_arg; 1944 1945 if (!overlapped) { 1946 _start_writev_request(req); 1947 } else { 1948 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); 1949 } 1950 } 1951 1952 const struct spdk_reduce_vol_params * 1953 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) 1954 { 1955 return &vol->params; 1956 } 1957 1958 const char * 1959 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol) 1960 { 1961 return vol->pm_file.path; 1962 } 1963 1964 void 1965 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) 1966 { 1967 uint64_t logical_map_size, num_chunks, ttl_chunk_sz; 1968 uint32_t struct_size; 1969 uint64_t chunk_map_size; 1970 1971 SPDK_NOTICELOG("vol info:\n"); 1972 SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); 1973 SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); 1974 SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); 1975 SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); 1976 num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); 1977 SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); 1978 SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", 1979 vol->params.vol_size / vol->params.chunk_size); 1980 ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1981 vol->params.backing_io_unit_size); 1982 SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); 1983 struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); 1984 SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); 1985 1986 SPDK_NOTICELOG("pmem info:\n"); 1987 SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); 1988 SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); 1989 SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); 1990 SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); 1991 logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, 1992 vol->params.chunk_size); 1993 SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); 1994 SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); 1995 chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, 1996 vol->params.backing_io_unit_size); 1997 SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); 1998 } 1999 2000 SPDK_LOG_REGISTER_COMPONENT(reduce) 2001