xref: /spdk/lib/reduce/reduce.c (revision 5fd9561f54daa8eff7f3bcb56c789655bca846b1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/reduce.h"
38 #include "spdk/env.h"
39 #include "spdk/string.h"
40 #include "spdk/bit_array.h"
41 #include "spdk/util.h"
42 #include "spdk/log.h"
43 
44 #include "libpmem.h"
45 
46 /* Always round up the size of the PM region to the nearest cacheline. */
47 #define REDUCE_PM_SIZE_ALIGNMENT	64
48 
49 /* Offset into the backing device where the persistent memory file's path is stored. */
50 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
51 
52 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
53 
54 #define REDUCE_NUM_VOL_REQUESTS	256
55 
56 /* Structure written to offset 0 of both the pm file and the backing device. */
57 struct spdk_reduce_vol_superblock {
58 	uint8_t				signature[8];
59 	struct spdk_reduce_vol_params	params;
60 	uint8_t				reserved[4048];
61 };
62 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
63 
64 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
65 /* null terminator counts one */
66 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
67 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
68 
69 #define REDUCE_PATH_MAX 4096
70 
71 #define REDUCE_ZERO_BUF_SIZE 0x100000
72 
73 /**
74  * Describes a persistent memory file used to hold metadata associated with a
75  *  compressed volume.
76  */
77 struct spdk_reduce_pm_file {
78 	char			path[REDUCE_PATH_MAX];
79 	void			*pm_buf;
80 	int			pm_is_pmem;
81 	uint64_t		size;
82 };
83 
84 #define REDUCE_IO_READV		1
85 #define REDUCE_IO_WRITEV	2
86 
87 struct spdk_reduce_chunk_map {
88 	uint32_t		compressed_size;
89 	uint32_t		reserved;
90 	uint64_t		io_unit_index[0];
91 };
92 
93 struct spdk_reduce_vol_request {
94 	/**
95 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
96 	 *   1) source buffer for compression operations
97 	 *   2) destination buffer for decompression operations
98 	 *   3) data buffer when writing uncompressed chunk to disk
99 	 *   4) data buffer when reading uncompressed chunk from disk
100 	 */
101 	uint8_t					*decomp_buf;
102 	struct iovec				*decomp_buf_iov;
103 
104 	/**
105 	 * These are used to construct the iovecs that are sent to
106 	 *  the decomp engine, they point to a mix of the scratch buffer
107 	 *  and user buffer
108 	 */
109 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
110 	int					decomp_iovcnt;
111 
112 	/**
113 	 *  Scratch buffer used for compressed chunk.  This is used for:
114 	 *   1) destination buffer for compression operations
115 	 *   2) source buffer for decompression operations
116 	 *   3) data buffer when writing compressed chunk to disk
117 	 *   4) data buffer when reading compressed chunk from disk
118 	 */
119 	uint8_t					*comp_buf;
120 	struct iovec				*comp_buf_iov;
121 	struct iovec				*iov;
122 	bool					rmw;
123 	struct spdk_reduce_vol			*vol;
124 	int					type;
125 	int					reduce_errno;
126 	int					iovcnt;
127 	int					num_backing_ops;
128 	uint32_t				num_io_units;
129 	bool					chunk_is_compressed;
130 	bool					copy_after_decompress;
131 	uint64_t				offset;
132 	uint64_t				logical_map_index;
133 	uint64_t				length;
134 	uint64_t				chunk_map_index;
135 	struct spdk_reduce_chunk_map		*chunk;
136 	spdk_reduce_vol_op_complete		cb_fn;
137 	void					*cb_arg;
138 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
139 	struct spdk_reduce_vol_cb_args		backing_cb_args;
140 };
141 
142 struct spdk_reduce_vol {
143 	struct spdk_reduce_vol_params		params;
144 	uint32_t				backing_io_units_per_chunk;
145 	uint32_t				backing_lba_per_io_unit;
146 	uint32_t				logical_blocks_per_chunk;
147 	struct spdk_reduce_pm_file		pm_file;
148 	struct spdk_reduce_backing_dev		*backing_dev;
149 	struct spdk_reduce_vol_superblock	*backing_super;
150 	struct spdk_reduce_vol_superblock	*pm_super;
151 	uint64_t				*pm_logical_map;
152 	uint64_t				*pm_chunk_maps;
153 
154 	struct spdk_bit_array			*allocated_chunk_maps;
155 	struct spdk_bit_array			*allocated_backing_io_units;
156 
157 	struct spdk_reduce_vol_request		*request_mem;
158 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
159 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
160 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
161 
162 	/* Single contiguous buffer used for all request buffers for this volume. */
163 	uint8_t					*buf_mem;
164 	struct iovec				*buf_iov_mem;
165 };
166 
167 static void _start_readv_request(struct spdk_reduce_vol_request *req);
168 static void _start_writev_request(struct spdk_reduce_vol_request *req);
169 static uint8_t *g_zero_buf;
170 static int g_vol_count = 0;
171 
172 /*
173  * Allocate extra metadata chunks and corresponding backing io units to account for
174  *  outstanding IO in worst case scenario where logical map is completely allocated
175  *  and no data can be compressed.  We need extra chunks in this case to handle
176  *  in-flight writes since reduce never writes data in place.
177  */
178 #define REDUCE_NUM_EXTRA_CHUNKS 128
179 
180 static void
181 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
182 {
183 	if (vol->pm_file.pm_is_pmem) {
184 		pmem_persist(addr, len);
185 	} else {
186 		pmem_msync(addr, len);
187 	}
188 }
189 
190 static uint64_t
191 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
192 {
193 	uint64_t chunks_in_logical_map, logical_map_size;
194 
195 	chunks_in_logical_map = vol_size / chunk_size;
196 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
197 
198 	/* Round up to next cacheline. */
199 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
200 	       REDUCE_PM_SIZE_ALIGNMENT;
201 }
202 
203 static uint64_t
204 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
205 {
206 	uint64_t num_chunks;
207 
208 	num_chunks = vol_size / chunk_size;
209 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
210 
211 	return num_chunks;
212 }
213 
214 static inline uint32_t
215 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
216 {
217 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
218 }
219 
220 static uint64_t
221 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
222 {
223 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
224 
225 	num_chunks = _get_total_chunks(vol_size, chunk_size);
226 	io_units_per_chunk = chunk_size / backing_io_unit_size;
227 
228 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
229 
230 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
231 	       REDUCE_PM_SIZE_ALIGNMENT;
232 }
233 
234 static struct spdk_reduce_chunk_map *
235 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
236 {
237 	uintptr_t chunk_map_addr;
238 
239 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
240 
241 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
242 	chunk_map_addr += chunk_map_index *
243 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
244 
245 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
246 }
247 
248 static int
249 _validate_vol_params(struct spdk_reduce_vol_params *params)
250 {
251 	if (params->vol_size > 0) {
252 		/**
253 		 * User does not pass in the vol size - it gets calculated by libreduce from
254 		 *  values in this structure plus the size of the backing device.
255 		 */
256 		return -EINVAL;
257 	}
258 
259 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
260 	    params->logical_block_size == 0) {
261 		return -EINVAL;
262 	}
263 
264 	/* Chunk size must be an even multiple of the backing io unit size. */
265 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
266 		return -EINVAL;
267 	}
268 
269 	/* Chunk size must be an even multiple of the logical block size. */
270 	if ((params->chunk_size % params->logical_block_size) != 0) {
271 		return -1;
272 	}
273 
274 	return 0;
275 }
276 
277 static uint64_t
278 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
279 {
280 	uint64_t num_chunks;
281 
282 	num_chunks = backing_dev_size / chunk_size;
283 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
284 		return 0;
285 	}
286 
287 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
288 	return num_chunks * chunk_size;
289 }
290 
291 static uint64_t
292 _get_pm_file_size(struct spdk_reduce_vol_params *params)
293 {
294 	uint64_t total_pm_size;
295 
296 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
297 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
298 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
299 			 params->backing_io_unit_size);
300 	return total_pm_size;
301 }
302 
303 const struct spdk_uuid *
304 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
305 {
306 	return &vol->params.uuid;
307 }
308 
309 static void
310 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
311 {
312 	uint64_t logical_map_size;
313 
314 	/* Superblock is at the beginning of the pm file. */
315 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
316 
317 	/* Logical map immediately follows the super block. */
318 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
319 
320 	/* Chunks maps follow the logical map. */
321 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
322 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
323 }
324 
325 /* We need 2 iovs during load - one for the superblock, another for the path */
326 #define LOAD_IOV_COUNT	2
327 
328 struct reduce_init_load_ctx {
329 	struct spdk_reduce_vol			*vol;
330 	struct spdk_reduce_vol_cb_args		backing_cb_args;
331 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
332 	void					*cb_arg;
333 	struct iovec				iov[LOAD_IOV_COUNT];
334 	void					*path;
335 };
336 
337 static int
338 _allocate_vol_requests(struct spdk_reduce_vol *vol)
339 {
340 	struct spdk_reduce_vol_request *req;
341 	int i;
342 
343 	/* Allocate 2x since we need buffers for both read/write and compress/decompress
344 	 *  intermediate buffers.
345 	 */
346 	vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size,
347 				   64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
348 	if (vol->buf_mem == NULL) {
349 		return -ENOMEM;
350 	}
351 
352 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
353 	if (vol->request_mem == NULL) {
354 		spdk_free(vol->buf_mem);
355 		vol->buf_mem = NULL;
356 		return -ENOMEM;
357 	}
358 
359 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
360 	 *  buffers.
361 	 */
362 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
363 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
364 	if (vol->buf_iov_mem == NULL) {
365 		free(vol->request_mem);
366 		spdk_free(vol->buf_mem);
367 		vol->request_mem = NULL;
368 		vol->buf_mem = NULL;
369 		return -ENOMEM;
370 	}
371 
372 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
373 		req = &vol->request_mem[i];
374 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
375 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
376 		req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size;
377 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
378 		req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size;
379 	}
380 
381 	return 0;
382 }
383 
384 static void
385 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
386 {
387 	if (ctx != NULL) {
388 		spdk_free(ctx->path);
389 		free(ctx);
390 	}
391 
392 	if (vol != NULL) {
393 		if (vol->pm_file.pm_buf != NULL) {
394 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
395 		}
396 
397 		spdk_free(vol->backing_super);
398 		spdk_bit_array_free(&vol->allocated_chunk_maps);
399 		spdk_bit_array_free(&vol->allocated_backing_io_units);
400 		free(vol->request_mem);
401 		free(vol->buf_iov_mem);
402 		spdk_free(vol->buf_mem);
403 		free(vol);
404 	}
405 }
406 
407 static int
408 _alloc_zero_buff(void)
409 {
410 	int rc = 0;
411 
412 	/* The zero buffer is shared between all volumes and just used
413 	 * for reads so allocate one global instance here if not already
414 	 * allocated when another vol init'd or loaded.
415 	 */
416 	if (g_vol_count++ == 0) {
417 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
418 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
419 					  SPDK_MALLOC_DMA);
420 		if (g_zero_buf == NULL) {
421 			rc = -ENOMEM;
422 		}
423 	}
424 	return rc;
425 }
426 
427 static void
428 _init_write_super_cpl(void *cb_arg, int reduce_errno)
429 {
430 	struct reduce_init_load_ctx *init_ctx = cb_arg;
431 	int rc;
432 
433 	rc = _allocate_vol_requests(init_ctx->vol);
434 	if (rc != 0) {
435 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
436 		_init_load_cleanup(init_ctx->vol, init_ctx);
437 		return;
438 	}
439 
440 	rc = _alloc_zero_buff();
441 	if (rc != 0) {
442 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
443 		_init_load_cleanup(init_ctx->vol, init_ctx);
444 		return;
445 	}
446 
447 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
448 	/* Only clean up the ctx - the vol has been passed to the application
449 	 *  for use now that initialization was successful.
450 	 */
451 	_init_load_cleanup(NULL, init_ctx);
452 }
453 
454 static void
455 _init_write_path_cpl(void *cb_arg, int reduce_errno)
456 {
457 	struct reduce_init_load_ctx *init_ctx = cb_arg;
458 	struct spdk_reduce_vol *vol = init_ctx->vol;
459 
460 	init_ctx->iov[0].iov_base = vol->backing_super;
461 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
462 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
463 	init_ctx->backing_cb_args.cb_arg = init_ctx;
464 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
465 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
466 				 &init_ctx->backing_cb_args);
467 }
468 
469 static int
470 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
471 {
472 	uint64_t total_chunks, total_backing_io_units;
473 	uint32_t i, num_metadata_io_units;
474 
475 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
476 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
477 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
478 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
479 
480 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
481 		return -ENOMEM;
482 	}
483 
484 	/* Set backing io unit bits associated with metadata. */
485 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
486 				vol->backing_dev->blocklen;
487 	for (i = 0; i < num_metadata_io_units; i++) {
488 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
489 	}
490 
491 	return 0;
492 }
493 
494 void
495 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
496 		     struct spdk_reduce_backing_dev *backing_dev,
497 		     const char *pm_file_dir,
498 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
499 {
500 	struct spdk_reduce_vol *vol;
501 	struct reduce_init_load_ctx *init_ctx;
502 	uint64_t backing_dev_size;
503 	size_t mapped_len;
504 	int dir_len, max_dir_len, rc;
505 
506 	/* We need to append a path separator and the UUID to the supplied
507 	 * path.
508 	 */
509 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
510 	dir_len = strnlen(pm_file_dir, max_dir_len);
511 	/* Strip trailing slash if the user provided one - we will add it back
512 	 * later when appending the filename.
513 	 */
514 	if (pm_file_dir[dir_len - 1] == '/') {
515 		dir_len--;
516 	}
517 	if (dir_len == max_dir_len) {
518 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
519 		cb_fn(cb_arg, NULL, -EINVAL);
520 		return;
521 	}
522 
523 	rc = _validate_vol_params(params);
524 	if (rc != 0) {
525 		SPDK_ERRLOG("invalid vol params\n");
526 		cb_fn(cb_arg, NULL, rc);
527 		return;
528 	}
529 
530 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
531 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
532 	if (params->vol_size == 0) {
533 		SPDK_ERRLOG("backing device is too small\n");
534 		cb_fn(cb_arg, NULL, -EINVAL);
535 		return;
536 	}
537 
538 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
539 	    backing_dev->unmap == NULL) {
540 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
541 		cb_fn(cb_arg, NULL, -EINVAL);
542 		return;
543 	}
544 
545 	vol = calloc(1, sizeof(*vol));
546 	if (vol == NULL) {
547 		cb_fn(cb_arg, NULL, -ENOMEM);
548 		return;
549 	}
550 
551 	TAILQ_INIT(&vol->free_requests);
552 	TAILQ_INIT(&vol->executing_requests);
553 	TAILQ_INIT(&vol->queued_requests);
554 
555 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
556 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
557 	if (vol->backing_super == NULL) {
558 		cb_fn(cb_arg, NULL, -ENOMEM);
559 		_init_load_cleanup(vol, NULL);
560 		return;
561 	}
562 
563 	init_ctx = calloc(1, sizeof(*init_ctx));
564 	if (init_ctx == NULL) {
565 		cb_fn(cb_arg, NULL, -ENOMEM);
566 		_init_load_cleanup(vol, NULL);
567 		return;
568 	}
569 
570 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
571 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
572 	if (init_ctx->path == NULL) {
573 		cb_fn(cb_arg, NULL, -ENOMEM);
574 		_init_load_cleanup(vol, init_ctx);
575 		return;
576 	}
577 
578 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
579 		spdk_uuid_generate(&params->uuid);
580 	}
581 
582 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
583 	vol->pm_file.path[dir_len] = '/';
584 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
585 			    &params->uuid);
586 	vol->pm_file.size = _get_pm_file_size(params);
587 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
588 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
589 					    &mapped_len, &vol->pm_file.pm_is_pmem);
590 	if (vol->pm_file.pm_buf == NULL) {
591 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
592 			    vol->pm_file.path, strerror(errno));
593 		cb_fn(cb_arg, NULL, -errno);
594 		_init_load_cleanup(vol, init_ctx);
595 		return;
596 	}
597 
598 	if (vol->pm_file.size != mapped_len) {
599 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
600 			    vol->pm_file.size, mapped_len);
601 		cb_fn(cb_arg, NULL, -ENOMEM);
602 		_init_load_cleanup(vol, init_ctx);
603 		return;
604 	}
605 
606 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
607 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
608 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
609 	memcpy(&vol->params, params, sizeof(*params));
610 
611 	vol->backing_dev = backing_dev;
612 
613 	rc = _allocate_bit_arrays(vol);
614 	if (rc != 0) {
615 		cb_fn(cb_arg, NULL, rc);
616 		_init_load_cleanup(vol, init_ctx);
617 		return;
618 	}
619 
620 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
621 	       sizeof(vol->backing_super->signature));
622 	memcpy(&vol->backing_super->params, params, sizeof(*params));
623 
624 	_initialize_vol_pm_pointers(vol);
625 
626 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
627 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
628 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
629 	 */
630 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
631 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
632 
633 	init_ctx->vol = vol;
634 	init_ctx->cb_fn = cb_fn;
635 	init_ctx->cb_arg = cb_arg;
636 
637 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
638 	init_ctx->iov[0].iov_base = init_ctx->path;
639 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
640 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
641 	init_ctx->backing_cb_args.cb_arg = init_ctx;
642 	/* Write path to offset 4K on backing device - just after where the super
643 	 *  block will be written.  We wait until this is committed before writing the
644 	 *  super block to guarantee we don't get the super block written without the
645 	 *  the path if the system crashed in the middle of a write operation.
646 	 */
647 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
648 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
649 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
650 				 &init_ctx->backing_cb_args);
651 }
652 
653 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
654 
655 static void
656 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
657 {
658 	struct reduce_init_load_ctx *load_ctx = cb_arg;
659 	struct spdk_reduce_vol *vol = load_ctx->vol;
660 	uint64_t backing_dev_size;
661 	uint64_t i, num_chunks, logical_map_index;
662 	struct spdk_reduce_chunk_map *chunk;
663 	size_t mapped_len;
664 	uint32_t j;
665 	int rc;
666 
667 	rc = _alloc_zero_buff();
668 	if (rc) {
669 		goto error;
670 	}
671 
672 	if (memcmp(vol->backing_super->signature,
673 		   SPDK_REDUCE_SIGNATURE,
674 		   sizeof(vol->backing_super->signature)) != 0) {
675 		/* This backing device isn't a libreduce backing device. */
676 		rc = -EILSEQ;
677 		goto error;
678 	}
679 
680 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
681 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
682 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
683 	 *  persistent memory file if it exists.
684 	 */
685 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
686 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
687 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
688 		_init_load_cleanup(NULL, load_ctx);
689 		return;
690 	}
691 
692 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
693 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
694 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
695 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
696 
697 	rc = _allocate_bit_arrays(vol);
698 	if (rc != 0) {
699 		goto error;
700 	}
701 
702 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
703 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
704 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
705 			    backing_dev_size);
706 		rc = -EILSEQ;
707 		goto error;
708 	}
709 
710 	vol->pm_file.size = _get_pm_file_size(&vol->params);
711 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
712 					    &vol->pm_file.pm_is_pmem);
713 	if (vol->pm_file.pm_buf == NULL) {
714 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
715 		rc = -errno;
716 		goto error;
717 	}
718 
719 	if (vol->pm_file.size != mapped_len) {
720 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
721 			    vol->pm_file.size, mapped_len);
722 		rc = -ENOMEM;
723 		goto error;
724 	}
725 
726 	rc = _allocate_vol_requests(vol);
727 	if (rc != 0) {
728 		goto error;
729 	}
730 
731 	_initialize_vol_pm_pointers(vol);
732 
733 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
734 	for (i = 0; i < num_chunks; i++) {
735 		logical_map_index = vol->pm_logical_map[i];
736 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
737 			continue;
738 		}
739 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
740 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
741 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
742 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
743 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
744 			}
745 		}
746 	}
747 
748 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
749 	/* Only clean up the ctx - the vol has been passed to the application
750 	 *  for use now that volume load was successful.
751 	 */
752 	_init_load_cleanup(NULL, load_ctx);
753 	return;
754 
755 error:
756 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
757 	_init_load_cleanup(vol, load_ctx);
758 }
759 
760 void
761 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
762 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
763 {
764 	struct spdk_reduce_vol *vol;
765 	struct reduce_init_load_ctx *load_ctx;
766 
767 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
768 	    backing_dev->unmap == NULL) {
769 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
770 		cb_fn(cb_arg, NULL, -EINVAL);
771 		return;
772 	}
773 
774 	vol = calloc(1, sizeof(*vol));
775 	if (vol == NULL) {
776 		cb_fn(cb_arg, NULL, -ENOMEM);
777 		return;
778 	}
779 
780 	TAILQ_INIT(&vol->free_requests);
781 	TAILQ_INIT(&vol->executing_requests);
782 	TAILQ_INIT(&vol->queued_requests);
783 
784 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
785 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
786 	if (vol->backing_super == NULL) {
787 		_init_load_cleanup(vol, NULL);
788 		cb_fn(cb_arg, NULL, -ENOMEM);
789 		return;
790 	}
791 
792 	vol->backing_dev = backing_dev;
793 
794 	load_ctx = calloc(1, sizeof(*load_ctx));
795 	if (load_ctx == NULL) {
796 		_init_load_cleanup(vol, NULL);
797 		cb_fn(cb_arg, NULL, -ENOMEM);
798 		return;
799 	}
800 
801 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
802 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
803 	if (load_ctx->path == NULL) {
804 		_init_load_cleanup(vol, load_ctx);
805 		cb_fn(cb_arg, NULL, -ENOMEM);
806 		return;
807 	}
808 
809 	load_ctx->vol = vol;
810 	load_ctx->cb_fn = cb_fn;
811 	load_ctx->cb_arg = cb_arg;
812 
813 	load_ctx->iov[0].iov_base = vol->backing_super;
814 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
815 	load_ctx->iov[1].iov_base = load_ctx->path;
816 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
817 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
818 	load_ctx->backing_cb_args.cb_arg = load_ctx;
819 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
820 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
821 				vol->backing_dev->blocklen,
822 				&load_ctx->backing_cb_args);
823 }
824 
825 void
826 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
827 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
828 {
829 	if (vol == NULL) {
830 		/* This indicates a programming error. */
831 		assert(false);
832 		cb_fn(cb_arg, -EINVAL);
833 		return;
834 	}
835 
836 	if (--g_vol_count == 0) {
837 		spdk_free(g_zero_buf);
838 	}
839 	assert(g_vol_count >= 0);
840 	_init_load_cleanup(vol, NULL);
841 	cb_fn(cb_arg, 0);
842 }
843 
844 struct reduce_destroy_ctx {
845 	spdk_reduce_vol_op_complete		cb_fn;
846 	void					*cb_arg;
847 	struct spdk_reduce_vol			*vol;
848 	struct spdk_reduce_vol_superblock	*super;
849 	struct iovec				iov;
850 	struct spdk_reduce_vol_cb_args		backing_cb_args;
851 	int					reduce_errno;
852 	char					pm_path[REDUCE_PATH_MAX];
853 };
854 
855 static void
856 destroy_unload_cpl(void *cb_arg, int reduce_errno)
857 {
858 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
859 
860 	if (destroy_ctx->reduce_errno == 0) {
861 		if (unlink(destroy_ctx->pm_path)) {
862 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
863 				    destroy_ctx->pm_path, strerror(errno));
864 		}
865 	}
866 
867 	/* Even if the unload somehow failed, we still pass the destroy_ctx
868 	 * reduce_errno since that indicates whether or not the volume was
869 	 * actually destroyed.
870 	 */
871 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
872 	spdk_free(destroy_ctx->super);
873 	free(destroy_ctx);
874 }
875 
876 static void
877 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
878 {
879 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
880 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
881 
882 	destroy_ctx->reduce_errno = reduce_errno;
883 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
884 }
885 
886 static void
887 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
888 {
889 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
890 
891 	if (reduce_errno != 0) {
892 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
893 		spdk_free(destroy_ctx->super);
894 		free(destroy_ctx);
895 		return;
896 	}
897 
898 	destroy_ctx->vol = vol;
899 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
900 	destroy_ctx->iov.iov_base = destroy_ctx->super;
901 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
902 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
903 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
904 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
905 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
906 				 &destroy_ctx->backing_cb_args);
907 }
908 
909 void
910 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
911 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
912 {
913 	struct reduce_destroy_ctx *destroy_ctx;
914 
915 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
916 	if (destroy_ctx == NULL) {
917 		cb_fn(cb_arg, -ENOMEM);
918 		return;
919 	}
920 
921 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
922 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
923 	if (destroy_ctx->super == NULL) {
924 		free(destroy_ctx);
925 		cb_fn(cb_arg, -ENOMEM);
926 		return;
927 	}
928 	destroy_ctx->cb_fn = cb_fn;
929 	destroy_ctx->cb_arg = cb_arg;
930 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
931 }
932 
933 static bool
934 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
935 {
936 	uint64_t start_chunk, end_chunk;
937 
938 	start_chunk = offset / vol->logical_blocks_per_chunk;
939 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
940 
941 	return (start_chunk != end_chunk);
942 }
943 
944 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
945 
946 static void
947 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
948 {
949 	struct spdk_reduce_vol_request *next_req;
950 	struct spdk_reduce_vol *vol = req->vol;
951 
952 	req->cb_fn(req->cb_arg, reduce_errno);
953 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
954 
955 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
956 		if (next_req->logical_map_index == req->logical_map_index) {
957 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
958 			if (next_req->type == REDUCE_IO_READV) {
959 				_start_readv_request(next_req);
960 			} else {
961 				assert(next_req->type == REDUCE_IO_WRITEV);
962 				_start_writev_request(next_req);
963 			}
964 			break;
965 		}
966 	}
967 
968 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
969 }
970 
971 static void
972 _write_write_done(void *_req, int reduce_errno)
973 {
974 	struct spdk_reduce_vol_request *req = _req;
975 	struct spdk_reduce_vol *vol = req->vol;
976 	uint64_t old_chunk_map_index;
977 	struct spdk_reduce_chunk_map *old_chunk;
978 	uint32_t i;
979 
980 	if (reduce_errno != 0) {
981 		req->reduce_errno = reduce_errno;
982 	}
983 
984 	assert(req->num_backing_ops > 0);
985 	if (--req->num_backing_ops > 0) {
986 		return;
987 	}
988 
989 	if (req->reduce_errno != 0) {
990 		_reduce_vol_complete_req(req, req->reduce_errno);
991 		return;
992 	}
993 
994 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
995 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
996 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
997 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
998 			if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
999 				break;
1000 			}
1001 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
1002 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
1003 			old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1004 		}
1005 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
1006 	}
1007 
1008 	/*
1009 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1010 	 * becomes invalid after we update the logical map, since the old chunk map will no
1011 	 * longer have a reference to it in the logical map.
1012 	 */
1013 
1014 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1015 	_reduce_persist(vol, req->chunk,
1016 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1017 
1018 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1019 
1020 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1021 
1022 	_reduce_vol_complete_req(req, 0);
1023 }
1024 
1025 static void
1026 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1027 		   reduce_request_fn next_fn, bool is_write)
1028 {
1029 	struct iovec *iov;
1030 	uint8_t *buf;
1031 	uint32_t i;
1032 
1033 	if (req->chunk_is_compressed) {
1034 		iov = req->comp_buf_iov;
1035 		buf = req->comp_buf;
1036 	} else {
1037 		iov = req->decomp_buf_iov;
1038 		buf = req->decomp_buf;
1039 	}
1040 
1041 	req->num_backing_ops = req->num_io_units;
1042 	req->backing_cb_args.cb_fn = next_fn;
1043 	req->backing_cb_args.cb_arg = req;
1044 	for (i = 0; i < req->num_io_units; i++) {
1045 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1046 		iov[i].iov_len = vol->params.backing_io_unit_size;
1047 		if (is_write) {
1048 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1049 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1050 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1051 		} else {
1052 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1053 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1054 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
1055 		}
1056 	}
1057 }
1058 
1059 static void
1060 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1061 			uint32_t compressed_size)
1062 {
1063 	struct spdk_reduce_vol *vol = req->vol;
1064 	uint32_t i;
1065 	uint64_t chunk_offset, remainder, total_len = 0;
1066 	uint8_t *buf;
1067 	int j;
1068 
1069 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1070 
1071 	/* TODO: fail if no chunk map found - but really this should not happen if we
1072 	 * size the number of requests similarly to number of extra chunk maps
1073 	 */
1074 	assert(req->chunk_map_index != UINT32_MAX);
1075 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1076 
1077 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1078 	req->num_io_units = spdk_divide_round_up(compressed_size,
1079 			    vol->params.backing_io_unit_size);
1080 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1081 	req->chunk->compressed_size =
1082 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1083 
1084 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1085 	if (req->chunk_is_compressed == false) {
1086 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1087 		buf = req->decomp_buf;
1088 		total_len = chunk_offset * vol->params.logical_block_size;
1089 
1090 		/* zero any offset into chunk */
1091 		if (req->rmw == false && chunk_offset) {
1092 			memset(buf, 0, total_len);
1093 		}
1094 		buf += total_len;
1095 
1096 		/* copy the data */
1097 		for (j = 0; j < req->iovcnt; j++) {
1098 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1099 			buf += req->iov[j].iov_len;
1100 			total_len += req->iov[j].iov_len;
1101 		}
1102 
1103 		/* zero any remainder */
1104 		remainder = vol->params.chunk_size - total_len;
1105 		total_len += remainder;
1106 		if (req->rmw == false && remainder) {
1107 			memset(buf, 0, remainder);
1108 		}
1109 		assert(total_len == vol->params.chunk_size);
1110 	}
1111 
1112 	for (i = 0; i < req->num_io_units; i++) {
1113 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1114 		/* TODO: fail if no backing block found - but really this should also not
1115 		 * happen (see comment above).
1116 		 */
1117 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1118 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1119 	}
1120 
1121 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1122 }
1123 
1124 static void
1125 _write_compress_done(void *_req, int reduce_errno)
1126 {
1127 	struct spdk_reduce_vol_request *req = _req;
1128 
1129 	/* Negative reduce_errno indicates failure for compression operations.
1130 	 * Just write the uncompressed data instead.  Force this to happen
1131 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1132 	 * When it sees the data couldn't be compressed, it will just write
1133 	 * the uncompressed buffer to disk.
1134 	 */
1135 	if (reduce_errno < 0) {
1136 		reduce_errno = req->vol->params.chunk_size;
1137 	}
1138 
1139 	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
1140 	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1141 }
1142 
1143 static void
1144 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1145 {
1146 	struct spdk_reduce_vol *vol = req->vol;
1147 
1148 	req->backing_cb_args.cb_fn = next_fn;
1149 	req->backing_cb_args.cb_arg = req;
1150 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1151 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1152 	vol->backing_dev->compress(vol->backing_dev,
1153 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1154 				   &req->backing_cb_args);
1155 }
1156 
1157 static void
1158 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1159 {
1160 	struct spdk_reduce_vol *vol = req->vol;
1161 
1162 	req->backing_cb_args.cb_fn = next_fn;
1163 	req->backing_cb_args.cb_arg = req;
1164 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1165 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1166 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1167 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1168 	vol->backing_dev->decompress(vol->backing_dev,
1169 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1170 				     &req->backing_cb_args);
1171 }
1172 
1173 static void
1174 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1175 {
1176 	struct spdk_reduce_vol *vol = req->vol;
1177 	uint64_t chunk_offset, remainder = 0;
1178 	uint64_t ttl_len = 0;
1179 	int i;
1180 
1181 	req->decomp_iovcnt = 0;
1182 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1183 
1184 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1185 	 * if at least one of the conditions below is true:
1186 	 * 1. User's buffer is fragmented
1187 	 * 2. Length of the user's buffer is less than the chunk */
1188 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1189 				     req->iov[0].iov_len < vol->params.chunk_size);
1190 	if (req->copy_after_decompress) {
1191 		req->decomp_iov[0].iov_base = req->decomp_buf;
1192 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1193 		req->decomp_iovcnt = 1;
1194 		goto decompress;
1195 	}
1196 
1197 	if (chunk_offset) {
1198 		/* first iov point to our scratch buffer for any offset into the chunk */
1199 		req->decomp_iov[0].iov_base = req->decomp_buf;
1200 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1201 		ttl_len += req->decomp_iov[0].iov_len;
1202 		req->decomp_iovcnt = 1;
1203 	}
1204 
1205 	/* now the user data iov, direct to the user buffer */
1206 	for (i = 0; i < req->iovcnt; i++) {
1207 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1208 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1209 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1210 	}
1211 	req->decomp_iovcnt += req->iovcnt;
1212 
1213 	/* send the rest of the chunk to our scratch buffer */
1214 	remainder = vol->params.chunk_size - ttl_len;
1215 	if (remainder) {
1216 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1217 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1218 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1219 		req->decomp_iovcnt++;
1220 	}
1221 	assert(ttl_len == vol->params.chunk_size);
1222 
1223 decompress:
1224 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1225 	req->backing_cb_args.cb_fn = next_fn;
1226 	req->backing_cb_args.cb_arg = req;
1227 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1228 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1229 	vol->backing_dev->decompress(vol->backing_dev,
1230 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1231 				     &req->backing_cb_args);
1232 }
1233 
1234 static inline void
1235 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1236 {
1237 	struct spdk_reduce_vol *vol = req->vol;
1238 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1239 	uint64_t chunk_offset, ttl_len = 0;
1240 	uint64_t remainder = 0;
1241 	char *copy_offset = NULL;
1242 	uint32_t lbsize = vol->params.logical_block_size;
1243 	int i;
1244 
1245 	req->decomp_iov[0].iov_base = req->decomp_buf;
1246 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1247 	req->decomp_iovcnt = 1;
1248 	copy_offset = req->decomp_iov[0].iov_base;
1249 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1250 
1251 	if (chunk_offset) {
1252 		ttl_len += chunk_offset * lbsize;
1253 		/* copy_offset already points to padding buffer if zero_paddings=false */
1254 		if (zero_paddings) {
1255 			memcpy(copy_offset, padding_buffer, ttl_len);
1256 		}
1257 		copy_offset += ttl_len;
1258 	}
1259 
1260 	/* now the user data iov, direct from the user buffer */
1261 	for (i = 0; i < req->iovcnt; i++) {
1262 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1263 		copy_offset += req->iov[i].iov_len;
1264 		ttl_len += req->iov[i].iov_len;
1265 	}
1266 
1267 	remainder = vol->params.chunk_size - ttl_len;
1268 	if (remainder) {
1269 		/* copy_offset already points to padding buffer if zero_paddings=false */
1270 		if (zero_paddings) {
1271 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1272 		}
1273 		ttl_len += remainder;
1274 	}
1275 
1276 	assert(ttl_len == req->vol->params.chunk_size);
1277 }
1278 
1279 /* This function can be called when we are compressing a new data or in case of read-modify-write
1280  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1281  * should point to already read and decompressed buffer */
1282 static inline void
1283 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1284 {
1285 	struct spdk_reduce_vol *vol = req->vol;
1286 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1287 	uint64_t chunk_offset, ttl_len = 0;
1288 	uint64_t remainder = 0;
1289 	uint32_t lbsize = vol->params.logical_block_size;
1290 	int i;
1291 
1292 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1293 	 * if at least one of the conditions below is true:
1294 	 * 1. User's buffer is fragmented
1295 	 * 2. Length of the user's buffer is less than the chunk */
1296 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1297 					  req->iov[0].iov_len < vol->params.chunk_size)) {
1298 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1299 		return;
1300 	}
1301 
1302 	req->decomp_iovcnt = 0;
1303 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1304 
1305 	if (chunk_offset != 0) {
1306 		ttl_len += chunk_offset * lbsize;
1307 		req->decomp_iov[0].iov_base = padding_buffer;
1308 		req->decomp_iov[0].iov_len = ttl_len;
1309 		req->decomp_iovcnt = 1;
1310 	}
1311 
1312 	/* now the user data iov, direct from the user buffer */
1313 	for (i = 0; i < req->iovcnt; i++) {
1314 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1315 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1316 		ttl_len += req->iov[i].iov_len;
1317 	}
1318 	req->decomp_iovcnt += req->iovcnt;
1319 
1320 	remainder = vol->params.chunk_size - ttl_len;
1321 	if (remainder) {
1322 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1323 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1324 		req->decomp_iovcnt++;
1325 		ttl_len += remainder;
1326 	}
1327 	assert(ttl_len == req->vol->params.chunk_size);
1328 }
1329 
1330 static void
1331 _write_decompress_done(void *_req, int reduce_errno)
1332 {
1333 	struct spdk_reduce_vol_request *req = _req;
1334 
1335 	/* Negative reduce_errno indicates failure for compression operations. */
1336 	if (reduce_errno < 0) {
1337 		_reduce_vol_complete_req(req, reduce_errno);
1338 		return;
1339 	}
1340 
1341 	/* Positive reduce_errno indicates number of bytes in decompressed
1342 	 *  buffer.  This should equal the chunk size - otherwise that's another
1343 	 *  type of failure.
1344 	 */
1345 	if ((uint32_t)reduce_errno != req->vol->params.chunk_size) {
1346 		_reduce_vol_complete_req(req, -EIO);
1347 		return;
1348 	}
1349 
1350 	_prepare_compress_chunk(req, false);
1351 	_reduce_vol_compress_chunk(req, _write_compress_done);
1352 }
1353 
1354 static void
1355 _write_read_done(void *_req, int reduce_errno)
1356 {
1357 	struct spdk_reduce_vol_request *req = _req;
1358 
1359 	if (reduce_errno != 0) {
1360 		req->reduce_errno = reduce_errno;
1361 	}
1362 
1363 	assert(req->num_backing_ops > 0);
1364 	if (--req->num_backing_ops > 0) {
1365 		return;
1366 	}
1367 
1368 	if (req->reduce_errno != 0) {
1369 		_reduce_vol_complete_req(req, req->reduce_errno);
1370 		return;
1371 	}
1372 
1373 	if (req->chunk_is_compressed) {
1374 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1375 	} else {
1376 		_write_decompress_done(req, req->chunk->compressed_size);
1377 	}
1378 }
1379 
1380 static void
1381 _read_decompress_done(void *_req, int reduce_errno)
1382 {
1383 	struct spdk_reduce_vol_request *req = _req;
1384 	struct spdk_reduce_vol *vol = req->vol;
1385 
1386 	/* Negative reduce_errno indicates failure for compression operations. */
1387 	if (reduce_errno < 0) {
1388 		_reduce_vol_complete_req(req, reduce_errno);
1389 		return;
1390 	}
1391 
1392 	/* Positive reduce_errno indicates number of bytes in decompressed
1393 	 *  buffer.  This should equal the chunk size - otherwise that's another
1394 	 *  type of failure.
1395 	 */
1396 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1397 		_reduce_vol_complete_req(req, -EIO);
1398 		return;
1399 	}
1400 
1401 	if (req->copy_after_decompress) {
1402 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1403 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1404 		int i;
1405 
1406 		for (i = 0; i < req->iovcnt; i++) {
1407 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1408 			decomp_buffer += req->iov[i].iov_len;
1409 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1410 		}
1411 	}
1412 
1413 	_reduce_vol_complete_req(req, 0);
1414 }
1415 
1416 static void
1417 _read_read_done(void *_req, int reduce_errno)
1418 {
1419 	struct spdk_reduce_vol_request *req = _req;
1420 	uint64_t chunk_offset;
1421 	uint8_t *buf;
1422 	int i;
1423 
1424 	if (reduce_errno != 0) {
1425 		req->reduce_errno = reduce_errno;
1426 	}
1427 
1428 	assert(req->num_backing_ops > 0);
1429 	if (--req->num_backing_ops > 0) {
1430 		return;
1431 	}
1432 
1433 	if (req->reduce_errno != 0) {
1434 		_reduce_vol_complete_req(req, req->reduce_errno);
1435 		return;
1436 	}
1437 
1438 	if (req->chunk_is_compressed) {
1439 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1440 	} else {
1441 
1442 		/* If the chunk was compressed, the data would have been sent to the
1443 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1444 		 */
1445 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1446 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1447 		for (i = 0; i < req->iovcnt; i++) {
1448 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1449 			buf += req->iov[i].iov_len;
1450 		}
1451 
1452 		_read_decompress_done(req, req->chunk->compressed_size);
1453 	}
1454 }
1455 
1456 static void
1457 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1458 {
1459 	struct spdk_reduce_vol *vol = req->vol;
1460 
1461 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1462 	assert(req->chunk_map_index != UINT32_MAX);
1463 
1464 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1465 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1466 			    vol->params.backing_io_unit_size);
1467 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1468 
1469 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1470 }
1471 
1472 static bool
1473 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1474 		    uint64_t length)
1475 {
1476 	uint64_t size = 0;
1477 	int i;
1478 
1479 	if (iovcnt > REDUCE_MAX_IOVECS) {
1480 		return false;
1481 	}
1482 
1483 	for (i = 0; i < iovcnt; i++) {
1484 		size += iov[i].iov_len;
1485 	}
1486 
1487 	return size == (length * vol->params.logical_block_size);
1488 }
1489 
1490 static bool
1491 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1492 {
1493 	struct spdk_reduce_vol_request *req;
1494 
1495 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1496 		if (logical_map_index == req->logical_map_index) {
1497 			return true;
1498 		}
1499 	}
1500 
1501 	return false;
1502 }
1503 
1504 static void
1505 _start_readv_request(struct spdk_reduce_vol_request *req)
1506 {
1507 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1508 	_reduce_vol_read_chunk(req, _read_read_done);
1509 }
1510 
1511 void
1512 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1513 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1514 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1515 {
1516 	struct spdk_reduce_vol_request *req;
1517 	uint64_t logical_map_index;
1518 	bool overlapped;
1519 	int i;
1520 
1521 	if (length == 0) {
1522 		cb_fn(cb_arg, 0);
1523 		return;
1524 	}
1525 
1526 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1527 		cb_fn(cb_arg, -EINVAL);
1528 		return;
1529 	}
1530 
1531 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1532 		cb_fn(cb_arg, -EINVAL);
1533 		return;
1534 	}
1535 
1536 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1537 	overlapped = _check_overlap(vol, logical_map_index);
1538 
1539 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1540 		/*
1541 		 * This chunk hasn't been allocated.  So treat the data as all
1542 		 * zeroes for this chunk - do the memset and immediately complete
1543 		 * the operation.
1544 		 */
1545 		for (i = 0; i < iovcnt; i++) {
1546 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1547 		}
1548 		cb_fn(cb_arg, 0);
1549 		return;
1550 	}
1551 
1552 	req = TAILQ_FIRST(&vol->free_requests);
1553 	if (req == NULL) {
1554 		cb_fn(cb_arg, -ENOMEM);
1555 		return;
1556 	}
1557 
1558 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1559 	req->type = REDUCE_IO_READV;
1560 	req->vol = vol;
1561 	req->iov = iov;
1562 	req->iovcnt = iovcnt;
1563 	req->offset = offset;
1564 	req->logical_map_index = logical_map_index;
1565 	req->length = length;
1566 	req->copy_after_decompress = false;
1567 	req->cb_fn = cb_fn;
1568 	req->cb_arg = cb_arg;
1569 
1570 	if (!overlapped) {
1571 		_start_readv_request(req);
1572 	} else {
1573 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1574 	}
1575 }
1576 
1577 static void
1578 _start_writev_request(struct spdk_reduce_vol_request *req)
1579 {
1580 	struct spdk_reduce_vol *vol = req->vol;
1581 
1582 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1583 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1584 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1585 			/* Read old chunk, then overwrite with data from this write
1586 			 *  operation.
1587 			 */
1588 			req->rmw = true;
1589 			_reduce_vol_read_chunk(req, _write_read_done);
1590 			return;
1591 		}
1592 	}
1593 
1594 	req->rmw = false;
1595 
1596 	_prepare_compress_chunk(req, true);
1597 	_reduce_vol_compress_chunk(req, _write_compress_done);
1598 }
1599 
1600 void
1601 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1602 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1603 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1604 {
1605 	struct spdk_reduce_vol_request *req;
1606 	uint64_t logical_map_index;
1607 	bool overlapped;
1608 
1609 	if (length == 0) {
1610 		cb_fn(cb_arg, 0);
1611 		return;
1612 	}
1613 
1614 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1615 		cb_fn(cb_arg, -EINVAL);
1616 		return;
1617 	}
1618 
1619 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1620 		cb_fn(cb_arg, -EINVAL);
1621 		return;
1622 	}
1623 
1624 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1625 	overlapped = _check_overlap(vol, logical_map_index);
1626 
1627 	req = TAILQ_FIRST(&vol->free_requests);
1628 	if (req == NULL) {
1629 		cb_fn(cb_arg, -ENOMEM);
1630 		return;
1631 	}
1632 
1633 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1634 	req->type = REDUCE_IO_WRITEV;
1635 	req->vol = vol;
1636 	req->iov = iov;
1637 	req->iovcnt = iovcnt;
1638 	req->offset = offset;
1639 	req->logical_map_index = logical_map_index;
1640 	req->length = length;
1641 	req->copy_after_decompress = false;
1642 	req->cb_fn = cb_fn;
1643 	req->cb_arg = cb_arg;
1644 
1645 	if (!overlapped) {
1646 		_start_writev_request(req);
1647 	} else {
1648 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1649 	}
1650 }
1651 
1652 const struct spdk_reduce_vol_params *
1653 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1654 {
1655 	return &vol->params;
1656 }
1657 
1658 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1659 {
1660 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1661 	uint32_t struct_size;
1662 	uint64_t chunk_map_size;
1663 
1664 	SPDK_NOTICELOG("vol info:\n");
1665 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1666 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1667 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1668 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1669 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1670 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1671 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1672 		       vol->params.vol_size / vol->params.chunk_size);
1673 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1674 			vol->params.backing_io_unit_size);
1675 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1676 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1677 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1678 
1679 	SPDK_NOTICELOG("pmem info:\n");
1680 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1681 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1682 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1683 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1684 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1685 			   vol->params.chunk_size);
1686 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1687 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1688 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1689 			 vol->params.backing_io_unit_size);
1690 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1691 }
1692 
1693 SPDK_LOG_REGISTER_COMPONENT(reduce)
1694