xref: /spdk/lib/reduce/reduce.c (revision 310fc0b5d56fa43b80af869270fcf2758df9c92d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 /**
71  * Describes a persistent memory file used to hold metadata associated with a
72  *  compressed volume.
73  */
74 struct spdk_reduce_pm_file {
75 	char			path[REDUCE_PATH_MAX];
76 	void			*pm_buf;
77 	int			pm_is_pmem;
78 	uint64_t		size;
79 };
80 
81 #define REDUCE_IO_READV		1
82 #define REDUCE_IO_WRITEV	2
83 
84 struct spdk_reduce_chunk_map {
85 	uint32_t		compressed_size;
86 	uint32_t		reserved;
87 	uint64_t		io_unit_index[0];
88 };
89 
90 struct spdk_reduce_vol_request {
91 	/**
92 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
93 	 *   1) source buffer for compression operations
94 	 *   2) destination buffer for decompression operations
95 	 *   3) data buffer when writing uncompressed chunk to disk
96 	 *   4) data buffer when reading uncompressed chunk from disk
97 	 */
98 	uint8_t					*decomp_buf;
99 	struct iovec				*decomp_buf_iov;
100 	/**
101 	 *  Scratch buffer used for compressed chunk.  This is used for:
102 	 *   1) destination buffer for compression operations
103 	 *   2) source buffer for decompression operations
104 	 *   3) data buffer when writing compressed chunk to disk
105 	 *   4) data buffer when reading compressed chunk from disk
106 	 */
107 	uint8_t					*comp_buf;
108 	struct iovec				*comp_buf_iov;
109 	struct iovec				*iov;
110 	struct spdk_reduce_vol			*vol;
111 	int					type;
112 	int					reduce_errno;
113 	int					iovcnt;
114 	int					num_backing_ops;
115 	uint32_t				num_io_units;
116 	bool					chunk_is_compressed;
117 	uint64_t				offset;
118 	uint64_t				logical_map_index;
119 	uint64_t				length;
120 	uint64_t				chunk_map_index;
121 	struct spdk_reduce_chunk_map		*chunk;
122 	spdk_reduce_vol_op_complete		cb_fn;
123 	void					*cb_arg;
124 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
125 	struct spdk_reduce_vol_cb_args		backing_cb_args;
126 };
127 
128 struct spdk_reduce_vol {
129 	struct spdk_reduce_vol_params		params;
130 	uint32_t				backing_io_units_per_chunk;
131 	uint32_t				backing_lba_per_io_unit;
132 	uint32_t				logical_blocks_per_chunk;
133 	struct spdk_reduce_pm_file		pm_file;
134 	struct spdk_reduce_backing_dev		*backing_dev;
135 	struct spdk_reduce_vol_superblock	*backing_super;
136 	struct spdk_reduce_vol_superblock	*pm_super;
137 	uint64_t				*pm_logical_map;
138 	uint64_t				*pm_chunk_maps;
139 
140 	struct spdk_bit_array			*allocated_chunk_maps;
141 	struct spdk_bit_array			*allocated_backing_io_units;
142 
143 	struct spdk_reduce_vol_request		*request_mem;
144 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
145 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
146 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
147 
148 	/* Single contiguous buffer used for all request buffers for this volume. */
149 	uint8_t					*buf_mem;
150 	struct iovec				*buf_iov_mem;
151 };
152 
153 static void _start_readv_request(struct spdk_reduce_vol_request *req);
154 static void _start_writev_request(struct spdk_reduce_vol_request *req);
155 
156 /*
157  * Allocate extra metadata chunks and corresponding backing io units to account for
158  *  outstanding IO in worst case scenario where logical map is completely allocated
159  *  and no data can be compressed.  We need extra chunks in this case to handle
160  *  in-flight writes since reduce never writes data in place.
161  */
162 #define REDUCE_NUM_EXTRA_CHUNKS 128
163 
164 static void
165 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
166 {
167 	if (vol->pm_file.pm_is_pmem) {
168 		pmem_persist(addr, len);
169 	} else {
170 		pmem_msync(addr, len);
171 	}
172 }
173 
174 static uint64_t
175 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
176 {
177 	uint64_t chunks_in_logical_map, logical_map_size;
178 
179 	chunks_in_logical_map = vol_size / chunk_size;
180 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
181 
182 	/* Round up to next cacheline. */
183 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
184 	       REDUCE_PM_SIZE_ALIGNMENT;
185 }
186 
187 static uint64_t
188 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
189 {
190 	uint64_t num_chunks;
191 
192 	num_chunks = vol_size / chunk_size;
193 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
194 
195 	return num_chunks;
196 }
197 
198 static uint64_t
199 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
200 {
201 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
202 
203 	num_chunks = _get_total_chunks(vol_size, chunk_size);
204 	io_units_per_chunk = chunk_size / backing_io_unit_size;
205 	total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t);
206 
207 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
208 	       REDUCE_PM_SIZE_ALIGNMENT;
209 }
210 
211 static inline uint32_t
212 _reduce_vol_get_chunk_struct_size(struct spdk_reduce_vol *vol)
213 {
214 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * vol->backing_io_units_per_chunk;
215 }
216 
217 static struct spdk_reduce_chunk_map *
218 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
219 {
220 	uintptr_t chunk_map_addr;
221 
222 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
223 
224 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
225 	chunk_map_addr += chunk_map_index * _reduce_vol_get_chunk_struct_size(vol);
226 
227 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
228 }
229 
230 static int
231 _validate_vol_params(struct spdk_reduce_vol_params *params)
232 {
233 	if (params->vol_size > 0) {
234 		/**
235 		 * User does not pass in the vol size - it gets calculated by libreduce from
236 		 *  values in this structure plus the size of the backing device.
237 		 */
238 		return -EINVAL;
239 	}
240 
241 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
242 	    params->logical_block_size == 0) {
243 		return -EINVAL;
244 	}
245 
246 	/* Chunk size must be an even multiple of the backing io unit size. */
247 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
248 		return -EINVAL;
249 	}
250 
251 	/* Chunk size must be an even multiple of the logical block size. */
252 	if ((params->chunk_size % params->logical_block_size) != 0) {
253 		return -1;
254 	}
255 
256 	return 0;
257 }
258 
259 static uint64_t
260 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
261 {
262 	uint64_t num_chunks;
263 
264 	num_chunks = backing_dev_size / chunk_size;
265 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
266 		return 0;
267 	}
268 
269 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
270 	return num_chunks * chunk_size;
271 }
272 
273 static uint64_t
274 _get_pm_file_size(struct spdk_reduce_vol_params *params)
275 {
276 	uint64_t total_pm_size;
277 
278 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
279 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
280 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
281 			 params->backing_io_unit_size);
282 	return total_pm_size;
283 }
284 
285 const struct spdk_uuid *
286 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
287 {
288 	return &vol->params.uuid;
289 }
290 
291 static void
292 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
293 {
294 	uint64_t logical_map_size;
295 
296 	/* Superblock is at the beginning of the pm file. */
297 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
298 
299 	/* Logical map immediately follows the super block. */
300 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
301 
302 	/* Chunks maps follow the logical map. */
303 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
304 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
305 }
306 
307 /* We need 2 iovs during load - one for the superblock, another for the path */
308 #define LOAD_IOV_COUNT	2
309 
310 struct reduce_init_load_ctx {
311 	struct spdk_reduce_vol			*vol;
312 	struct spdk_reduce_vol_cb_args		backing_cb_args;
313 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
314 	void					*cb_arg;
315 	struct iovec				iov[LOAD_IOV_COUNT];
316 	void					*path;
317 };
318 
319 static int
320 _allocate_vol_requests(struct spdk_reduce_vol *vol)
321 {
322 	struct spdk_reduce_vol_request *req;
323 	int i;
324 
325 	/* Allocate 2x since we need buffers for both read/write and compress/decompress
326 	 *  intermediate buffers.
327 	 */
328 	vol->buf_mem = spdk_dma_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL);
329 	if (vol->buf_mem == NULL) {
330 		return -ENOMEM;
331 	}
332 
333 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
334 	if (vol->request_mem == NULL) {
335 		spdk_dma_free(vol->buf_mem);
336 		vol->buf_mem = NULL;
337 		return -ENOMEM;
338 	}
339 
340 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
341 	 *  buffers.
342 	 */
343 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
344 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
345 	if (vol->buf_iov_mem == NULL) {
346 		free(vol->request_mem);
347 		spdk_dma_free(vol->buf_mem);
348 		vol->request_mem = NULL;
349 		vol->buf_mem = NULL;
350 		return -ENOMEM;
351 	}
352 
353 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
354 		req = &vol->request_mem[i];
355 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
356 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
357 		req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size;
358 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
359 		req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size;
360 	}
361 
362 	return 0;
363 }
364 
365 static void
366 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
367 {
368 	if (ctx != NULL) {
369 		spdk_dma_free(ctx->path);
370 		free(ctx);
371 	}
372 
373 	if (vol != NULL) {
374 		if (vol->pm_file.pm_buf != NULL) {
375 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
376 		}
377 
378 		spdk_dma_free(vol->backing_super);
379 		spdk_bit_array_free(&vol->allocated_chunk_maps);
380 		spdk_bit_array_free(&vol->allocated_backing_io_units);
381 		free(vol->request_mem);
382 		free(vol->buf_iov_mem);
383 		spdk_dma_free(vol->buf_mem);
384 		free(vol);
385 	}
386 }
387 
388 static void
389 _init_write_super_cpl(void *cb_arg, int reduce_errno)
390 {
391 	struct reduce_init_load_ctx *init_ctx = cb_arg;
392 	int rc;
393 
394 	rc = _allocate_vol_requests(init_ctx->vol);
395 	if (rc != 0) {
396 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
397 		_init_load_cleanup(init_ctx->vol, init_ctx);
398 		return;
399 	}
400 
401 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
402 	/* Only clean up the ctx - the vol has been passed to the application
403 	 *  for use now that initialization was successful.
404 	 */
405 	_init_load_cleanup(NULL, init_ctx);
406 }
407 
408 static void
409 _init_write_path_cpl(void *cb_arg, int reduce_errno)
410 {
411 	struct reduce_init_load_ctx *init_ctx = cb_arg;
412 	struct spdk_reduce_vol *vol = init_ctx->vol;
413 
414 	init_ctx->iov[0].iov_base = vol->backing_super;
415 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
416 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
417 	init_ctx->backing_cb_args.cb_arg = init_ctx;
418 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
419 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
420 				 &init_ctx->backing_cb_args);
421 }
422 
423 static int
424 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
425 {
426 	uint64_t total_chunks, total_backing_io_units;
427 	uint32_t i, num_metadata_io_units;
428 
429 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
430 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
431 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
432 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
433 
434 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
435 		return -ENOMEM;
436 	}
437 
438 	/* Set backing io unit bits associated with metadata. */
439 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
440 				vol->backing_dev->blocklen;
441 	for (i = 0; i < num_metadata_io_units; i++) {
442 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
443 	}
444 
445 	return 0;
446 }
447 
448 void
449 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
450 		     struct spdk_reduce_backing_dev *backing_dev,
451 		     const char *pm_file_dir,
452 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
453 {
454 	struct spdk_reduce_vol *vol;
455 	struct reduce_init_load_ctx *init_ctx;
456 	uint64_t backing_dev_size;
457 	size_t mapped_len;
458 	int dir_len, max_dir_len, rc;
459 
460 	/* We need to append a path separator and the UUID to the supplied
461 	 * path.
462 	 */
463 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
464 	dir_len = strnlen(pm_file_dir, max_dir_len);
465 	/* Strip trailing slash if the user provided one - we will add it back
466 	 * later when appending the filename.
467 	 */
468 	if (pm_file_dir[dir_len - 1] == '/') {
469 		dir_len--;
470 	}
471 	if (dir_len == max_dir_len) {
472 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
473 		cb_fn(cb_arg, NULL, -EINVAL);
474 		return;
475 	}
476 
477 	rc = _validate_vol_params(params);
478 	if (rc != 0) {
479 		SPDK_ERRLOG("invalid vol params\n");
480 		cb_fn(cb_arg, NULL, rc);
481 		return;
482 	}
483 
484 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
485 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
486 	if (params->vol_size == 0) {
487 		SPDK_ERRLOG("backing device is too small\n");
488 		cb_fn(cb_arg, NULL, -EINVAL);
489 		return;
490 	}
491 
492 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
493 	    backing_dev->unmap == NULL) {
494 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
495 		cb_fn(cb_arg, NULL, -EINVAL);
496 		return;
497 	}
498 
499 	vol = calloc(1, sizeof(*vol));
500 	if (vol == NULL) {
501 		cb_fn(cb_arg, NULL, -ENOMEM);
502 		return;
503 	}
504 
505 	TAILQ_INIT(&vol->free_requests);
506 	TAILQ_INIT(&vol->executing_requests);
507 	TAILQ_INIT(&vol->queued_requests);
508 
509 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL);
510 	if (vol->backing_super == NULL) {
511 		cb_fn(cb_arg, NULL, -ENOMEM);
512 		_init_load_cleanup(vol, NULL);
513 		return;
514 	}
515 
516 	init_ctx = calloc(1, sizeof(*init_ctx));
517 	if (init_ctx == NULL) {
518 		cb_fn(cb_arg, NULL, -ENOMEM);
519 		_init_load_cleanup(vol, NULL);
520 		return;
521 	}
522 
523 	init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL);
524 	if (init_ctx->path == NULL) {
525 		cb_fn(cb_arg, NULL, -ENOMEM);
526 		_init_load_cleanup(vol, init_ctx);
527 		return;
528 	}
529 
530 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
531 		spdk_uuid_generate(&params->uuid);
532 	}
533 
534 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
535 	vol->pm_file.path[dir_len] = '/';
536 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
537 			    &params->uuid);
538 	vol->pm_file.size = _get_pm_file_size(params);
539 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
540 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
541 					    &mapped_len, &vol->pm_file.pm_is_pmem);
542 	if (vol->pm_file.pm_buf == NULL) {
543 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
544 			    vol->pm_file.path, strerror(errno));
545 		cb_fn(cb_arg, NULL, -errno);
546 		_init_load_cleanup(vol, init_ctx);
547 		return;
548 	}
549 
550 	if (vol->pm_file.size != mapped_len) {
551 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
552 			    vol->pm_file.size, mapped_len);
553 		cb_fn(cb_arg, NULL, -ENOMEM);
554 		_init_load_cleanup(vol, init_ctx);
555 		return;
556 	}
557 
558 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
559 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
560 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
561 	memcpy(&vol->params, params, sizeof(*params));
562 
563 	vol->backing_dev = backing_dev;
564 
565 	rc = _allocate_bit_arrays(vol);
566 	if (rc != 0) {
567 		cb_fn(cb_arg, NULL, rc);
568 		_init_load_cleanup(vol, init_ctx);
569 		return;
570 	}
571 
572 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
573 	       sizeof(vol->backing_super->signature));
574 	memcpy(&vol->backing_super->params, params, sizeof(*params));
575 
576 	_initialize_vol_pm_pointers(vol);
577 
578 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
579 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
580 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
581 	 */
582 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
583 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
584 
585 	init_ctx->vol = vol;
586 	init_ctx->cb_fn = cb_fn;
587 	init_ctx->cb_arg = cb_arg;
588 
589 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
590 	init_ctx->iov[0].iov_base = init_ctx->path;
591 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
592 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
593 	init_ctx->backing_cb_args.cb_arg = init_ctx;
594 	/* Write path to offset 4K on backing device - just after where the super
595 	 *  block will be written.  We wait until this is committed before writing the
596 	 *  super block to guarantee we don't get the super block written without the
597 	 *  the path if the system crashed in the middle of a write operation.
598 	 */
599 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
600 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
601 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
602 				 &init_ctx->backing_cb_args);
603 }
604 
605 static void
606 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
607 {
608 	struct reduce_init_load_ctx *load_ctx = cb_arg;
609 	struct spdk_reduce_vol *vol = load_ctx->vol;
610 	uint64_t backing_dev_size;
611 	uint64_t i, num_chunks, logical_map_index;
612 	struct spdk_reduce_chunk_map *chunk;
613 	size_t mapped_len;
614 	uint32_t j;
615 	int rc;
616 
617 	if (memcmp(vol->backing_super->signature,
618 		   SPDK_REDUCE_SIGNATURE,
619 		   sizeof(vol->backing_super->signature)) != 0) {
620 		/* This backing device isn't a libreduce backing device. */
621 		rc = -EILSEQ;
622 		goto error;
623 	}
624 
625 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
626 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
627 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
628 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
629 
630 	rc = _allocate_bit_arrays(vol);
631 	if (rc != 0) {
632 		goto error;
633 	}
634 
635 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
636 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
637 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
638 			    backing_dev_size);
639 		rc = -EILSEQ;
640 		goto error;
641 	}
642 
643 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
644 	vol->pm_file.size = _get_pm_file_size(&vol->params);
645 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
646 					    &vol->pm_file.pm_is_pmem);
647 	if (vol->pm_file.pm_buf == NULL) {
648 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
649 		rc = -errno;
650 		goto error;
651 	}
652 
653 	if (vol->pm_file.size != mapped_len) {
654 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
655 			    vol->pm_file.size, mapped_len);
656 		rc = -ENOMEM;
657 		goto error;
658 	}
659 
660 	rc = _allocate_vol_requests(vol);
661 	if (rc != 0) {
662 		goto error;
663 	}
664 
665 	_initialize_vol_pm_pointers(vol);
666 
667 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
668 	for (i = 0; i < num_chunks; i++) {
669 		logical_map_index = vol->pm_logical_map[i];
670 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
671 			continue;
672 		}
673 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
674 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
675 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
676 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
677 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
678 			}
679 		}
680 	}
681 
682 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
683 	/* Only clean up the ctx - the vol has been passed to the application
684 	 *  for use now that volume load was successful.
685 	 */
686 	_init_load_cleanup(NULL, load_ctx);
687 	return;
688 
689 error:
690 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
691 	_init_load_cleanup(vol, load_ctx);
692 }
693 
694 void
695 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
696 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
697 {
698 	struct spdk_reduce_vol *vol;
699 	struct reduce_init_load_ctx *load_ctx;
700 
701 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
702 	    backing_dev->unmap == NULL) {
703 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
704 		cb_fn(cb_arg, NULL, -EINVAL);
705 		return;
706 	}
707 
708 	vol = calloc(1, sizeof(*vol));
709 	if (vol == NULL) {
710 		cb_fn(cb_arg, NULL, -ENOMEM);
711 		return;
712 	}
713 
714 	TAILQ_INIT(&vol->free_requests);
715 	TAILQ_INIT(&vol->executing_requests);
716 	TAILQ_INIT(&vol->queued_requests);
717 
718 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL);
719 	if (vol->backing_super == NULL) {
720 		_init_load_cleanup(vol, NULL);
721 		cb_fn(cb_arg, NULL, -ENOMEM);
722 		return;
723 	}
724 
725 	vol->backing_dev = backing_dev;
726 
727 	load_ctx = calloc(1, sizeof(*load_ctx));
728 	if (load_ctx == NULL) {
729 		_init_load_cleanup(vol, NULL);
730 		cb_fn(cb_arg, NULL, -ENOMEM);
731 		return;
732 	}
733 
734 	load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL);
735 	if (load_ctx->path == NULL) {
736 		_init_load_cleanup(vol, load_ctx);
737 		cb_fn(cb_arg, NULL, -ENOMEM);
738 		return;
739 	}
740 
741 	load_ctx->vol = vol;
742 	load_ctx->cb_fn = cb_fn;
743 	load_ctx->cb_arg = cb_arg;
744 
745 	load_ctx->iov[0].iov_base = vol->backing_super;
746 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
747 	load_ctx->iov[1].iov_base = load_ctx->path;
748 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
749 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
750 	load_ctx->backing_cb_args.cb_arg = load_ctx;
751 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
752 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
753 				vol->backing_dev->blocklen,
754 				&load_ctx->backing_cb_args);
755 }
756 
757 void
758 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
759 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
760 {
761 	if (vol == NULL) {
762 		/* This indicates a programming error. */
763 		assert(false);
764 		cb_fn(cb_arg, -EINVAL);
765 		return;
766 	}
767 
768 	_init_load_cleanup(vol, NULL);
769 	cb_fn(cb_arg, 0);
770 }
771 
772 struct reduce_destroy_ctx {
773 	spdk_reduce_vol_op_complete		cb_fn;
774 	void					*cb_arg;
775 	struct spdk_reduce_vol			*vol;
776 	struct spdk_reduce_vol_superblock	*super;
777 	struct iovec				iov;
778 	struct spdk_reduce_vol_cb_args		backing_cb_args;
779 	int					reduce_errno;
780 	char					pm_path[REDUCE_PATH_MAX];
781 };
782 
783 static void
784 destroy_unload_cpl(void *cb_arg, int reduce_errno)
785 {
786 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
787 
788 	if (destroy_ctx->reduce_errno == 0) {
789 		if (unlink(destroy_ctx->pm_path)) {
790 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
791 				    destroy_ctx->pm_path, strerror(errno));
792 		}
793 	}
794 
795 	/* Even if the unload somehow failed, we still pass the destroy_ctx
796 	 * reduce_errno since that indicates whether or not the volume was
797 	 * actually destroyed.
798 	 */
799 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
800 	spdk_dma_free(destroy_ctx->super);
801 	free(destroy_ctx);
802 }
803 
804 static void
805 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
806 {
807 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
808 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
809 
810 	destroy_ctx->reduce_errno = reduce_errno;
811 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
812 }
813 
814 static void
815 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
816 {
817 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
818 
819 	if (reduce_errno != 0) {
820 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
821 		spdk_dma_free(destroy_ctx->super);
822 		free(destroy_ctx);
823 		return;
824 	}
825 
826 	destroy_ctx->vol = vol;
827 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
828 	destroy_ctx->iov.iov_base = destroy_ctx->super;
829 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
830 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
831 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
832 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
833 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
834 				 &destroy_ctx->backing_cb_args);
835 }
836 
837 void
838 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
839 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
840 {
841 	struct reduce_destroy_ctx *destroy_ctx;
842 
843 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
844 	if (destroy_ctx == NULL) {
845 		cb_fn(cb_arg, -ENOMEM);
846 		return;
847 	}
848 
849 	destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL);
850 	if (destroy_ctx->super == NULL) {
851 		free(destroy_ctx);
852 		cb_fn(cb_arg, -ENOMEM);
853 		return;
854 	}
855 	destroy_ctx->cb_fn = cb_fn;
856 	destroy_ctx->cb_arg = cb_arg;
857 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
858 }
859 
860 static bool
861 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
862 {
863 	uint64_t start_chunk, end_chunk;
864 
865 	start_chunk = offset / vol->logical_blocks_per_chunk;
866 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
867 
868 	return (start_chunk != end_chunk);
869 }
870 
871 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
872 
873 static void
874 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
875 {
876 	struct spdk_reduce_vol_request *next_req;
877 	struct spdk_reduce_vol *vol = req->vol;
878 
879 	req->cb_fn(req->cb_arg, reduce_errno);
880 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
881 
882 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
883 		if (next_req->logical_map_index == req->logical_map_index) {
884 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
885 			if (next_req->type == REDUCE_IO_READV) {
886 				_start_readv_request(next_req);
887 			} else {
888 				assert(next_req->type == REDUCE_IO_WRITEV);
889 				_start_writev_request(next_req);
890 			}
891 			break;
892 		}
893 	}
894 
895 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
896 }
897 
898 static void
899 _write_write_done(void *_req, int reduce_errno)
900 {
901 	struct spdk_reduce_vol_request *req = _req;
902 	struct spdk_reduce_vol *vol = req->vol;
903 	uint64_t old_chunk_map_index;
904 	struct spdk_reduce_chunk_map *old_chunk;
905 	uint32_t i;
906 
907 	if (reduce_errno != 0) {
908 		req->reduce_errno = reduce_errno;
909 	}
910 
911 	assert(req->num_backing_ops > 0);
912 	if (--req->num_backing_ops > 0) {
913 		return;
914 	}
915 
916 	if (req->reduce_errno != 0) {
917 		_reduce_vol_complete_req(req, req->reduce_errno);
918 		return;
919 	}
920 
921 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
922 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
923 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
924 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
925 			if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
926 				break;
927 			}
928 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
929 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
930 			old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
931 		}
932 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
933 	}
934 
935 	/*
936 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
937 	 * becomes invalid after we update the logical map, since the old chunk map will no
938 	 * longer have a reference to it in the logical map.
939 	 */
940 
941 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
942 	_reduce_persist(vol, req->chunk, _reduce_vol_get_chunk_struct_size(vol));
943 
944 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
945 
946 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
947 
948 	_reduce_vol_complete_req(req, 0);
949 }
950 
951 static void
952 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
953 		   reduce_request_fn next_fn, bool is_write)
954 {
955 	struct iovec *iov;
956 	uint8_t *buf;
957 	uint32_t i;
958 
959 	if (req->chunk_is_compressed) {
960 		iov = req->comp_buf_iov;
961 		buf = req->comp_buf;
962 	} else {
963 		iov = req->decomp_buf_iov;
964 		buf = req->decomp_buf;
965 	}
966 
967 	req->num_backing_ops = req->num_io_units;
968 	req->backing_cb_args.cb_fn = next_fn;
969 	req->backing_cb_args.cb_arg = req;
970 	for (i = 0; i < req->num_io_units; i++) {
971 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
972 		iov[i].iov_len = vol->params.backing_io_unit_size;
973 		if (is_write) {
974 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
975 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
976 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
977 		} else {
978 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
979 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
980 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
981 		}
982 	}
983 }
984 
985 static void
986 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
987 			uint32_t compressed_size)
988 {
989 	struct spdk_reduce_vol *vol = req->vol;
990 	uint32_t i;
991 
992 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
993 
994 	/* TODO: fail if no chunk map found - but really this should not happen if we
995 	 * size the number of requests similarly to number of extra chunk maps
996 	 */
997 	assert(req->chunk_map_index != UINT32_MAX);
998 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
999 
1000 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1001 	req->num_io_units = spdk_divide_round_up(compressed_size,
1002 			    vol->params.backing_io_unit_size);
1003 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1004 	req->chunk->compressed_size =
1005 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1006 
1007 	for (i = 0; i < req->num_io_units; i++) {
1008 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1009 		/* TODO: fail if no backing block found - but really this should also not
1010 		 * happen (see comment above).
1011 		 */
1012 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1013 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1014 	}
1015 	while (i < vol->backing_io_units_per_chunk) {
1016 		req->chunk->io_unit_index[i++] = REDUCE_EMPTY_MAP_ENTRY;
1017 	}
1018 
1019 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1020 }
1021 
1022 static void
1023 _write_compress_done(void *_req, int reduce_errno)
1024 {
1025 	struct spdk_reduce_vol_request *req = _req;
1026 
1027 	/* Negative reduce_errno indicates failure for compression operations.
1028 	 * Just write the uncompressed data instead.  Force this to happen
1029 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1030 	 * When it sees the data couldn't be compressed, it will just write
1031 	 * the uncompressed buffer to disk.
1032 	 */
1033 	if (reduce_errno < 0) {
1034 		reduce_errno = req->vol->params.chunk_size;
1035 	}
1036 
1037 	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
1038 	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1039 }
1040 
1041 static void
1042 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1043 {
1044 	struct spdk_reduce_vol *vol = req->vol;
1045 
1046 	req->backing_cb_args.cb_fn = next_fn;
1047 	req->backing_cb_args.cb_arg = req;
1048 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1049 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1050 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1051 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1052 	vol->backing_dev->compress(vol->backing_dev,
1053 				   req->decomp_buf_iov, 1, req->comp_buf_iov, 1,
1054 				   &req->backing_cb_args);
1055 }
1056 
1057 static void
1058 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1059 {
1060 	struct spdk_reduce_vol *vol = req->vol;
1061 
1062 	req->backing_cb_args.cb_fn = next_fn;
1063 	req->backing_cb_args.cb_arg = req;
1064 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1065 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1066 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1067 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1068 	vol->backing_dev->decompress(vol->backing_dev,
1069 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1070 				     &req->backing_cb_args);
1071 }
1072 
1073 static void
1074 _write_decompress_done(void *_req, int reduce_errno)
1075 {
1076 	struct spdk_reduce_vol_request *req = _req;
1077 	struct spdk_reduce_vol *vol = req->vol;
1078 	uint64_t chunk_offset;
1079 	uint8_t *buf;
1080 	int i;
1081 
1082 	/* Negative reduce_errno indicates failure for compression operations. */
1083 	if (reduce_errno < 0) {
1084 		_reduce_vol_complete_req(req, reduce_errno);
1085 		return;
1086 	}
1087 
1088 	/* Positive reduce_errno indicates number of bytes in decompressed
1089 	 *  buffer.  This should equal the chunk size - otherwise that's another
1090 	 *  type of failure.
1091 	 */
1092 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1093 		_reduce_vol_complete_req(req, -EIO);
1094 		return;
1095 	}
1096 
1097 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1098 	buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1099 	for (i = 0; i < req->iovcnt; i++) {
1100 		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
1101 		buf += req->iov[i].iov_len;
1102 	}
1103 
1104 	_reduce_vol_compress_chunk(req, _write_compress_done);
1105 }
1106 
1107 static void
1108 _write_read_done(void *_req, int reduce_errno)
1109 {
1110 	struct spdk_reduce_vol_request *req = _req;
1111 
1112 	if (reduce_errno != 0) {
1113 		req->reduce_errno = reduce_errno;
1114 	}
1115 
1116 	assert(req->num_backing_ops > 0);
1117 	if (--req->num_backing_ops > 0) {
1118 		return;
1119 	}
1120 
1121 	if (req->reduce_errno != 0) {
1122 		_reduce_vol_complete_req(req, req->reduce_errno);
1123 		return;
1124 	}
1125 
1126 	if (req->chunk_is_compressed) {
1127 		_reduce_vol_decompress_chunk(req, _write_decompress_done);
1128 	} else {
1129 		_write_decompress_done(req, req->chunk->compressed_size);
1130 	}
1131 }
1132 
1133 static void
1134 _read_decompress_done(void *_req, int reduce_errno)
1135 {
1136 	struct spdk_reduce_vol_request *req = _req;
1137 	struct spdk_reduce_vol *vol = req->vol;
1138 	uint64_t chunk_offset;
1139 	uint8_t *buf;
1140 	int i;
1141 
1142 	/* Negative reduce_errno indicates failure for compression operations. */
1143 	if (reduce_errno < 0) {
1144 		_reduce_vol_complete_req(req, reduce_errno);
1145 		return;
1146 	}
1147 
1148 	/* Positive reduce_errno indicates number of bytes in decompressed
1149 	 *  buffer.  This should equal the chunk size - otherwise that's another
1150 	 *  type of failure.
1151 	 */
1152 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1153 		_reduce_vol_complete_req(req, -EIO);
1154 		return;
1155 	}
1156 
1157 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1158 	buf = req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1159 	for (i = 0; i < req->iovcnt; i++) {
1160 		memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1161 		buf += req->iov[i].iov_len;
1162 	}
1163 	_reduce_vol_complete_req(req, 0);
1164 }
1165 
1166 static void
1167 _read_read_done(void *_req, int reduce_errno)
1168 {
1169 	struct spdk_reduce_vol_request *req = _req;
1170 
1171 	if (reduce_errno != 0) {
1172 		req->reduce_errno = reduce_errno;
1173 	}
1174 
1175 	assert(req->num_backing_ops > 0);
1176 	if (--req->num_backing_ops > 0) {
1177 		return;
1178 	}
1179 
1180 	if (req->reduce_errno != 0) {
1181 		_reduce_vol_complete_req(req, req->reduce_errno);
1182 		return;
1183 	}
1184 
1185 	if (req->chunk_is_compressed) {
1186 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1187 	} else {
1188 		_read_decompress_done(req, req->chunk->compressed_size);
1189 	}
1190 }
1191 
1192 static void
1193 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1194 {
1195 	struct spdk_reduce_vol *vol = req->vol;
1196 
1197 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1198 	assert(req->chunk_map_index != UINT32_MAX);
1199 
1200 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1201 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1202 			    vol->params.backing_io_unit_size);
1203 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1204 
1205 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1206 }
1207 
1208 static bool
1209 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1210 		    uint64_t length)
1211 {
1212 	uint64_t size = 0;
1213 	int i;
1214 
1215 	for (i = 0; i < iovcnt; i++) {
1216 		size += iov[i].iov_len;
1217 	}
1218 
1219 	return size == (length * vol->params.logical_block_size);
1220 }
1221 
1222 static bool
1223 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1224 {
1225 	struct spdk_reduce_vol_request *req;
1226 
1227 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1228 		if (logical_map_index == req->logical_map_index) {
1229 			return true;
1230 		}
1231 	}
1232 
1233 	return false;
1234 }
1235 
1236 static void
1237 _start_readv_request(struct spdk_reduce_vol_request *req)
1238 {
1239 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1240 	_reduce_vol_read_chunk(req, _read_read_done);
1241 }
1242 
1243 void
1244 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1245 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1246 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1247 {
1248 	struct spdk_reduce_vol_request *req;
1249 	uint64_t logical_map_index;
1250 	bool overlapped;
1251 	int i;
1252 
1253 	if (length == 0) {
1254 		cb_fn(cb_arg, 0);
1255 		return;
1256 	}
1257 
1258 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1259 		cb_fn(cb_arg, -EINVAL);
1260 		return;
1261 	}
1262 
1263 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1264 		cb_fn(cb_arg, -EINVAL);
1265 		return;
1266 	}
1267 
1268 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1269 	overlapped = _check_overlap(vol, logical_map_index);
1270 
1271 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1272 		/*
1273 		 * This chunk hasn't been allocated.  So treat the data as all
1274 		 * zeroes for this chunk - do the memset and immediately complete
1275 		 * the operation.
1276 		 */
1277 		for (i = 0; i < iovcnt; i++) {
1278 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1279 		}
1280 		cb_fn(cb_arg, 0);
1281 		return;
1282 	}
1283 
1284 	req = TAILQ_FIRST(&vol->free_requests);
1285 	if (req == NULL) {
1286 		cb_fn(cb_arg, -ENOMEM);
1287 		return;
1288 	}
1289 
1290 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1291 	req->type = REDUCE_IO_READV;
1292 	req->vol = vol;
1293 	req->iov = iov;
1294 	req->iovcnt = iovcnt;
1295 	req->offset = offset;
1296 	req->logical_map_index = logical_map_index;
1297 	req->length = length;
1298 	req->cb_fn = cb_fn;
1299 	req->cb_arg = cb_arg;
1300 
1301 	if (!overlapped) {
1302 		_start_readv_request(req);
1303 	} else {
1304 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1305 	}
1306 }
1307 
1308 static void
1309 _start_writev_request(struct spdk_reduce_vol_request *req)
1310 {
1311 	struct spdk_reduce_vol *vol = req->vol;
1312 	uint64_t chunk_offset;
1313 	uint32_t lbsize, lb_per_chunk;
1314 	int i;
1315 	uint8_t *buf;
1316 
1317 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1318 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1319 		/* Read old chunk, then overwrite with data from this write operation.
1320 		 * TODO: bypass reading old chunk if this write operation overwrites
1321 		 * the entire chunk.
1322 		 */
1323 		_reduce_vol_read_chunk(req, _write_read_done);
1324 		return;
1325 	}
1326 
1327 	buf = req->decomp_buf;
1328 	lbsize = vol->params.logical_block_size;
1329 	lb_per_chunk = vol->logical_blocks_per_chunk;
1330 	/* Note: we must zero out parts of req->buf not specified by this write operation. */
1331 	chunk_offset = req->offset % lb_per_chunk;
1332 	if (chunk_offset != 0) {
1333 		memset(buf, 0, chunk_offset * lbsize);
1334 		buf += chunk_offset * lbsize;
1335 	}
1336 	for (i = 0; i < req->iovcnt; i++) {
1337 		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
1338 		buf += req->iov[i].iov_len;
1339 	}
1340 	chunk_offset += req->length;
1341 	if (chunk_offset != lb_per_chunk) {
1342 		memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize);
1343 	}
1344 	_reduce_vol_compress_chunk(req, _write_compress_done);
1345 }
1346 
1347 void
1348 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1349 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1350 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1351 {
1352 	struct spdk_reduce_vol_request *req;
1353 	uint64_t logical_map_index;
1354 	bool overlapped;
1355 
1356 	if (length == 0) {
1357 		cb_fn(cb_arg, 0);
1358 		return;
1359 	}
1360 
1361 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1362 		cb_fn(cb_arg, -EINVAL);
1363 		return;
1364 	}
1365 
1366 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1367 		cb_fn(cb_arg, -EINVAL);
1368 		return;
1369 	}
1370 
1371 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1372 	overlapped = _check_overlap(vol, logical_map_index);
1373 
1374 	req = TAILQ_FIRST(&vol->free_requests);
1375 	if (req == NULL) {
1376 		cb_fn(cb_arg, -ENOMEM);
1377 		return;
1378 	}
1379 
1380 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1381 	req->type = REDUCE_IO_WRITEV;
1382 	req->vol = vol;
1383 	req->iov = iov;
1384 	req->iovcnt = iovcnt;
1385 	req->offset = offset;
1386 	req->logical_map_index = logical_map_index;
1387 	req->length = length;
1388 	req->cb_fn = cb_fn;
1389 	req->cb_arg = cb_arg;
1390 
1391 	if (!overlapped) {
1392 		_start_writev_request(req);
1393 	} else {
1394 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1395 	}
1396 }
1397 
1398 const struct spdk_reduce_vol_params *
1399 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1400 {
1401 	return &vol->params;
1402 }
1403 
1404 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1405 {
1406 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1407 	uint32_t struct_size;
1408 	uint64_t chunk_map_size;
1409 
1410 	SPDK_NOTICELOG("vol info:\n");
1411 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1412 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1413 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1414 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1415 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1416 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1417 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1418 		       vol->params.vol_size / vol->params.chunk_size);
1419 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1420 			vol->params.backing_io_unit_size);
1421 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1422 	struct_size = _reduce_vol_get_chunk_struct_size(vol);
1423 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1424 
1425 	SPDK_NOTICELOG("pmem info:\n");
1426 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1427 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1428 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1429 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1430 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1431 			   vol->params.chunk_size);
1432 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1433 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1434 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1435 			 vol->params.backing_io_unit_size);
1436 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1437 }
1438 
1439 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1440