xref: /spdk/lib/reduce/reduce.c (revision a15dcb0bf07debe26957eaf30ec392942910ea99)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 /**
71  * Describes a persistent memory file used to hold metadata associated with a
72  *  compressed volume.
73  */
74 struct spdk_reduce_pm_file {
75 	char			path[REDUCE_PATH_MAX];
76 	void			*pm_buf;
77 	int			pm_is_pmem;
78 	uint64_t		size;
79 };
80 
81 #define REDUCE_IO_READV		1
82 #define REDUCE_IO_WRITEV	2
83 
84 struct spdk_reduce_chunk_map {
85 	uint32_t		compressed_size;
86 	uint32_t		reserved;
87 	uint64_t		io_unit_index[0];
88 };
89 
90 #define REDUCE_MAX_IOVECS	32
91 
92 struct spdk_reduce_vol_request {
93 	/**
94 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
95 	 *   1) source buffer for compression operations
96 	 *   2) destination buffer for decompression operations
97 	 *   3) data buffer when writing uncompressed chunk to disk
98 	 *   4) data buffer when reading uncompressed chunk from disk
99 	 */
100 	uint8_t					*decomp_buf;
101 	struct iovec				*decomp_buf_iov;
102 
103 	/**
104 	 * These are used to construct the iovecs that are sent to
105 	 *  the decomp engine, they point to a mix of the scratch buffer
106 	 *  and user buffer
107 	 */
108 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS];
109 	int					decomp_iovcnt;
110 
111 	/**
112 	 *  Scratch buffer used for compressed chunk.  This is used for:
113 	 *   1) destination buffer for compression operations
114 	 *   2) source buffer for decompression operations
115 	 *   3) data buffer when writing compressed chunk to disk
116 	 *   4) data buffer when reading compressed chunk from disk
117 	 */
118 	uint8_t					*comp_buf;
119 	struct iovec				*comp_buf_iov;
120 	struct iovec				*iov;
121 	struct spdk_reduce_vol			*vol;
122 	int					type;
123 	int					reduce_errno;
124 	int					iovcnt;
125 	int					num_backing_ops;
126 	uint32_t				num_io_units;
127 	bool					chunk_is_compressed;
128 	uint64_t				offset;
129 	uint64_t				logical_map_index;
130 	uint64_t				length;
131 	uint64_t				chunk_map_index;
132 	struct spdk_reduce_chunk_map		*chunk;
133 	spdk_reduce_vol_op_complete		cb_fn;
134 	void					*cb_arg;
135 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
136 	struct spdk_reduce_vol_cb_args		backing_cb_args;
137 };
138 
139 struct spdk_reduce_vol {
140 	struct spdk_reduce_vol_params		params;
141 	uint32_t				backing_io_units_per_chunk;
142 	uint32_t				backing_lba_per_io_unit;
143 	uint32_t				logical_blocks_per_chunk;
144 	struct spdk_reduce_pm_file		pm_file;
145 	struct spdk_reduce_backing_dev		*backing_dev;
146 	struct spdk_reduce_vol_superblock	*backing_super;
147 	struct spdk_reduce_vol_superblock	*pm_super;
148 	uint64_t				*pm_logical_map;
149 	uint64_t				*pm_chunk_maps;
150 
151 	struct spdk_bit_array			*allocated_chunk_maps;
152 	struct spdk_bit_array			*allocated_backing_io_units;
153 
154 	struct spdk_reduce_vol_request		*request_mem;
155 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
156 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
157 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
158 
159 	/* Single contiguous buffer used for all request buffers for this volume. */
160 	uint8_t					*buf_mem;
161 	struct iovec				*buf_iov_mem;
162 };
163 
164 static void _start_readv_request(struct spdk_reduce_vol_request *req);
165 static void _start_writev_request(struct spdk_reduce_vol_request *req);
166 static uint8_t *g_zero_buf;
167 static int g_vol_count = 0;
168 
169 /*
170  * Allocate extra metadata chunks and corresponding backing io units to account for
171  *  outstanding IO in worst case scenario where logical map is completely allocated
172  *  and no data can be compressed.  We need extra chunks in this case to handle
173  *  in-flight writes since reduce never writes data in place.
174  */
175 #define REDUCE_NUM_EXTRA_CHUNKS 128
176 
177 static void
178 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
179 {
180 	if (vol->pm_file.pm_is_pmem) {
181 		pmem_persist(addr, len);
182 	} else {
183 		pmem_msync(addr, len);
184 	}
185 }
186 
187 static uint64_t
188 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
189 {
190 	uint64_t chunks_in_logical_map, logical_map_size;
191 
192 	chunks_in_logical_map = vol_size / chunk_size;
193 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
194 
195 	/* Round up to next cacheline. */
196 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
197 	       REDUCE_PM_SIZE_ALIGNMENT;
198 }
199 
200 static uint64_t
201 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
202 {
203 	uint64_t num_chunks;
204 
205 	num_chunks = vol_size / chunk_size;
206 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
207 
208 	return num_chunks;
209 }
210 
211 static uint64_t
212 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
213 {
214 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
215 
216 	num_chunks = _get_total_chunks(vol_size, chunk_size);
217 	io_units_per_chunk = chunk_size / backing_io_unit_size;
218 	total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t);
219 
220 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
221 	       REDUCE_PM_SIZE_ALIGNMENT;
222 }
223 
224 static inline uint32_t
225 _reduce_vol_get_chunk_struct_size(struct spdk_reduce_vol *vol)
226 {
227 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * vol->backing_io_units_per_chunk;
228 }
229 
230 static struct spdk_reduce_chunk_map *
231 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
232 {
233 	uintptr_t chunk_map_addr;
234 
235 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
236 
237 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
238 	chunk_map_addr += chunk_map_index * _reduce_vol_get_chunk_struct_size(vol);
239 
240 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
241 }
242 
243 static int
244 _validate_vol_params(struct spdk_reduce_vol_params *params)
245 {
246 	if (params->vol_size > 0) {
247 		/**
248 		 * User does not pass in the vol size - it gets calculated by libreduce from
249 		 *  values in this structure plus the size of the backing device.
250 		 */
251 		return -EINVAL;
252 	}
253 
254 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
255 	    params->logical_block_size == 0) {
256 		return -EINVAL;
257 	}
258 
259 	/* Chunk size must be an even multiple of the backing io unit size. */
260 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
261 		return -EINVAL;
262 	}
263 
264 	/* Chunk size must be an even multiple of the logical block size. */
265 	if ((params->chunk_size % params->logical_block_size) != 0) {
266 		return -1;
267 	}
268 
269 	return 0;
270 }
271 
272 static uint64_t
273 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
274 {
275 	uint64_t num_chunks;
276 
277 	num_chunks = backing_dev_size / chunk_size;
278 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
279 		return 0;
280 	}
281 
282 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
283 	return num_chunks * chunk_size;
284 }
285 
286 static uint64_t
287 _get_pm_file_size(struct spdk_reduce_vol_params *params)
288 {
289 	uint64_t total_pm_size;
290 
291 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
292 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
293 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
294 			 params->backing_io_unit_size);
295 	return total_pm_size;
296 }
297 
298 const struct spdk_uuid *
299 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
300 {
301 	return &vol->params.uuid;
302 }
303 
304 static void
305 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
306 {
307 	uint64_t logical_map_size;
308 
309 	/* Superblock is at the beginning of the pm file. */
310 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
311 
312 	/* Logical map immediately follows the super block. */
313 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
314 
315 	/* Chunks maps follow the logical map. */
316 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
317 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
318 }
319 
320 /* We need 2 iovs during load - one for the superblock, another for the path */
321 #define LOAD_IOV_COUNT	2
322 
323 struct reduce_init_load_ctx {
324 	struct spdk_reduce_vol			*vol;
325 	struct spdk_reduce_vol_cb_args		backing_cb_args;
326 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
327 	void					*cb_arg;
328 	struct iovec				iov[LOAD_IOV_COUNT];
329 	void					*path;
330 };
331 
332 static int
333 _allocate_vol_requests(struct spdk_reduce_vol *vol)
334 {
335 	struct spdk_reduce_vol_request *req;
336 	int i;
337 
338 	/* Allocate 2x since we need buffers for both read/write and compress/decompress
339 	 *  intermediate buffers.
340 	 */
341 	vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size,
342 				   64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
343 	if (vol->buf_mem == NULL) {
344 		return -ENOMEM;
345 	}
346 
347 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
348 	if (vol->request_mem == NULL) {
349 		spdk_free(vol->buf_mem);
350 		vol->buf_mem = NULL;
351 		return -ENOMEM;
352 	}
353 
354 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
355 	 *  buffers.
356 	 */
357 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
358 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
359 	if (vol->buf_iov_mem == NULL) {
360 		free(vol->request_mem);
361 		spdk_free(vol->buf_mem);
362 		vol->request_mem = NULL;
363 		vol->buf_mem = NULL;
364 		return -ENOMEM;
365 	}
366 
367 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
368 		req = &vol->request_mem[i];
369 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
370 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
371 		req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size;
372 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
373 		req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size;
374 	}
375 
376 	return 0;
377 }
378 
379 static void
380 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
381 {
382 	if (ctx != NULL) {
383 		spdk_free(ctx->path);
384 		free(ctx);
385 	}
386 
387 	if (vol != NULL) {
388 		if (vol->pm_file.pm_buf != NULL) {
389 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
390 		}
391 
392 		spdk_free(vol->backing_super);
393 		spdk_bit_array_free(&vol->allocated_chunk_maps);
394 		spdk_bit_array_free(&vol->allocated_backing_io_units);
395 		free(vol->request_mem);
396 		free(vol->buf_iov_mem);
397 		spdk_free(vol->buf_mem);
398 		free(vol);
399 	}
400 }
401 
402 static int
403 _alloc_zero_buff(struct spdk_reduce_vol *vol)
404 {
405 	int rc = 0;
406 
407 	/* The zero buffer is shared between all volumnes and just used
408 	 * for reads so allocate one global instance here if not already
409 	 * allocated when another vol init'd or loaded.
410 	 */
411 	if (g_vol_count++ == 0) {
412 		g_zero_buf = spdk_zmalloc(vol->params.chunk_size,
413 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
414 					  SPDK_MALLOC_DMA);
415 		if (g_zero_buf == NULL) {
416 			rc = -ENOMEM;
417 		}
418 	}
419 	return rc;
420 }
421 
422 static void
423 _init_write_super_cpl(void *cb_arg, int reduce_errno)
424 {
425 	struct reduce_init_load_ctx *init_ctx = cb_arg;
426 	int rc;
427 
428 	rc = _allocate_vol_requests(init_ctx->vol);
429 	if (rc != 0) {
430 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
431 		_init_load_cleanup(init_ctx->vol, init_ctx);
432 		return;
433 	}
434 
435 	rc = _alloc_zero_buff(init_ctx->vol);
436 	if (rc != 0) {
437 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
438 		_init_load_cleanup(init_ctx->vol, init_ctx);
439 		return;
440 	}
441 
442 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
443 	/* Only clean up the ctx - the vol has been passed to the application
444 	 *  for use now that initialization was successful.
445 	 */
446 	_init_load_cleanup(NULL, init_ctx);
447 }
448 
449 static void
450 _init_write_path_cpl(void *cb_arg, int reduce_errno)
451 {
452 	struct reduce_init_load_ctx *init_ctx = cb_arg;
453 	struct spdk_reduce_vol *vol = init_ctx->vol;
454 
455 	init_ctx->iov[0].iov_base = vol->backing_super;
456 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
457 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
458 	init_ctx->backing_cb_args.cb_arg = init_ctx;
459 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
460 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
461 				 &init_ctx->backing_cb_args);
462 }
463 
464 static int
465 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
466 {
467 	uint64_t total_chunks, total_backing_io_units;
468 	uint32_t i, num_metadata_io_units;
469 
470 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
471 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
472 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
473 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
474 
475 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
476 		return -ENOMEM;
477 	}
478 
479 	/* Set backing io unit bits associated with metadata. */
480 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
481 				vol->backing_dev->blocklen;
482 	for (i = 0; i < num_metadata_io_units; i++) {
483 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
484 	}
485 
486 	return 0;
487 }
488 
489 void
490 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
491 		     struct spdk_reduce_backing_dev *backing_dev,
492 		     const char *pm_file_dir,
493 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
494 {
495 	struct spdk_reduce_vol *vol;
496 	struct reduce_init_load_ctx *init_ctx;
497 	uint64_t backing_dev_size;
498 	size_t mapped_len;
499 	int dir_len, max_dir_len, rc;
500 
501 	/* We need to append a path separator and the UUID to the supplied
502 	 * path.
503 	 */
504 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
505 	dir_len = strnlen(pm_file_dir, max_dir_len);
506 	/* Strip trailing slash if the user provided one - we will add it back
507 	 * later when appending the filename.
508 	 */
509 	if (pm_file_dir[dir_len - 1] == '/') {
510 		dir_len--;
511 	}
512 	if (dir_len == max_dir_len) {
513 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
514 		cb_fn(cb_arg, NULL, -EINVAL);
515 		return;
516 	}
517 
518 	rc = _validate_vol_params(params);
519 	if (rc != 0) {
520 		SPDK_ERRLOG("invalid vol params\n");
521 		cb_fn(cb_arg, NULL, rc);
522 		return;
523 	}
524 
525 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
526 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
527 	if (params->vol_size == 0) {
528 		SPDK_ERRLOG("backing device is too small\n");
529 		cb_fn(cb_arg, NULL, -EINVAL);
530 		return;
531 	}
532 
533 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
534 	    backing_dev->unmap == NULL) {
535 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
536 		cb_fn(cb_arg, NULL, -EINVAL);
537 		return;
538 	}
539 
540 	vol = calloc(1, sizeof(*vol));
541 	if (vol == NULL) {
542 		cb_fn(cb_arg, NULL, -ENOMEM);
543 		return;
544 	}
545 
546 	TAILQ_INIT(&vol->free_requests);
547 	TAILQ_INIT(&vol->executing_requests);
548 	TAILQ_INIT(&vol->queued_requests);
549 
550 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
551 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
552 	if (vol->backing_super == NULL) {
553 		cb_fn(cb_arg, NULL, -ENOMEM);
554 		_init_load_cleanup(vol, NULL);
555 		return;
556 	}
557 
558 	init_ctx = calloc(1, sizeof(*init_ctx));
559 	if (init_ctx == NULL) {
560 		cb_fn(cb_arg, NULL, -ENOMEM);
561 		_init_load_cleanup(vol, NULL);
562 		return;
563 	}
564 
565 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
566 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
567 	if (init_ctx->path == NULL) {
568 		cb_fn(cb_arg, NULL, -ENOMEM);
569 		_init_load_cleanup(vol, init_ctx);
570 		return;
571 	}
572 
573 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
574 		spdk_uuid_generate(&params->uuid);
575 	}
576 
577 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
578 	vol->pm_file.path[dir_len] = '/';
579 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
580 			    &params->uuid);
581 	vol->pm_file.size = _get_pm_file_size(params);
582 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
583 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
584 					    &mapped_len, &vol->pm_file.pm_is_pmem);
585 	if (vol->pm_file.pm_buf == NULL) {
586 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
587 			    vol->pm_file.path, strerror(errno));
588 		cb_fn(cb_arg, NULL, -errno);
589 		_init_load_cleanup(vol, init_ctx);
590 		return;
591 	}
592 
593 	if (vol->pm_file.size != mapped_len) {
594 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
595 			    vol->pm_file.size, mapped_len);
596 		cb_fn(cb_arg, NULL, -ENOMEM);
597 		_init_load_cleanup(vol, init_ctx);
598 		return;
599 	}
600 
601 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
602 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
603 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
604 	memcpy(&vol->params, params, sizeof(*params));
605 
606 	vol->backing_dev = backing_dev;
607 
608 	rc = _allocate_bit_arrays(vol);
609 	if (rc != 0) {
610 		cb_fn(cb_arg, NULL, rc);
611 		_init_load_cleanup(vol, init_ctx);
612 		return;
613 	}
614 
615 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
616 	       sizeof(vol->backing_super->signature));
617 	memcpy(&vol->backing_super->params, params, sizeof(*params));
618 
619 	_initialize_vol_pm_pointers(vol);
620 
621 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
622 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
623 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
624 	 */
625 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
626 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
627 
628 	init_ctx->vol = vol;
629 	init_ctx->cb_fn = cb_fn;
630 	init_ctx->cb_arg = cb_arg;
631 
632 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
633 	init_ctx->iov[0].iov_base = init_ctx->path;
634 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
635 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
636 	init_ctx->backing_cb_args.cb_arg = init_ctx;
637 	/* Write path to offset 4K on backing device - just after where the super
638 	 *  block will be written.  We wait until this is committed before writing the
639 	 *  super block to guarantee we don't get the super block written without the
640 	 *  the path if the system crashed in the middle of a write operation.
641 	 */
642 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
643 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
644 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
645 				 &init_ctx->backing_cb_args);
646 }
647 
648 static void
649 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
650 {
651 	struct reduce_init_load_ctx *load_ctx = cb_arg;
652 	struct spdk_reduce_vol *vol = load_ctx->vol;
653 	uint64_t backing_dev_size;
654 	uint64_t i, num_chunks, logical_map_index;
655 	struct spdk_reduce_chunk_map *chunk;
656 	size_t mapped_len;
657 	uint32_t j;
658 	int rc;
659 
660 	if (memcmp(vol->backing_super->signature,
661 		   SPDK_REDUCE_SIGNATURE,
662 		   sizeof(vol->backing_super->signature)) != 0) {
663 		/* This backing device isn't a libreduce backing device. */
664 		rc = -EILSEQ;
665 		goto error;
666 	}
667 
668 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
669 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
670 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
671 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
672 
673 	rc = _allocate_bit_arrays(vol);
674 	if (rc != 0) {
675 		goto error;
676 	}
677 
678 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
679 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
680 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
681 			    backing_dev_size);
682 		rc = -EILSEQ;
683 		goto error;
684 	}
685 
686 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
687 	vol->pm_file.size = _get_pm_file_size(&vol->params);
688 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
689 					    &vol->pm_file.pm_is_pmem);
690 	if (vol->pm_file.pm_buf == NULL) {
691 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
692 		rc = -errno;
693 		goto error;
694 	}
695 
696 	if (vol->pm_file.size != mapped_len) {
697 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
698 			    vol->pm_file.size, mapped_len);
699 		rc = -ENOMEM;
700 		goto error;
701 	}
702 
703 	rc = _allocate_vol_requests(vol);
704 	if (rc != 0) {
705 		goto error;
706 	}
707 
708 	_initialize_vol_pm_pointers(vol);
709 
710 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
711 	for (i = 0; i < num_chunks; i++) {
712 		logical_map_index = vol->pm_logical_map[i];
713 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
714 			continue;
715 		}
716 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
717 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
718 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
719 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
720 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
721 			}
722 		}
723 	}
724 
725 	rc = _alloc_zero_buff(vol);
726 	if (rc) {
727 		goto error;
728 	}
729 
730 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
731 	/* Only clean up the ctx - the vol has been passed to the application
732 	 *  for use now that volume load was successful.
733 	 */
734 	_init_load_cleanup(NULL, load_ctx);
735 	return;
736 
737 error:
738 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
739 	_init_load_cleanup(vol, load_ctx);
740 }
741 
742 void
743 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
744 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
745 {
746 	struct spdk_reduce_vol *vol;
747 	struct reduce_init_load_ctx *load_ctx;
748 
749 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
750 	    backing_dev->unmap == NULL) {
751 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
752 		cb_fn(cb_arg, NULL, -EINVAL);
753 		return;
754 	}
755 
756 	vol = calloc(1, sizeof(*vol));
757 	if (vol == NULL) {
758 		cb_fn(cb_arg, NULL, -ENOMEM);
759 		return;
760 	}
761 
762 	TAILQ_INIT(&vol->free_requests);
763 	TAILQ_INIT(&vol->executing_requests);
764 	TAILQ_INIT(&vol->queued_requests);
765 
766 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
767 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
768 	if (vol->backing_super == NULL) {
769 		_init_load_cleanup(vol, NULL);
770 		cb_fn(cb_arg, NULL, -ENOMEM);
771 		return;
772 	}
773 
774 	vol->backing_dev = backing_dev;
775 
776 	load_ctx = calloc(1, sizeof(*load_ctx));
777 	if (load_ctx == NULL) {
778 		_init_load_cleanup(vol, NULL);
779 		cb_fn(cb_arg, NULL, -ENOMEM);
780 		return;
781 	}
782 
783 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
784 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
785 	if (load_ctx->path == NULL) {
786 		_init_load_cleanup(vol, load_ctx);
787 		cb_fn(cb_arg, NULL, -ENOMEM);
788 		return;
789 	}
790 
791 	load_ctx->vol = vol;
792 	load_ctx->cb_fn = cb_fn;
793 	load_ctx->cb_arg = cb_arg;
794 
795 	load_ctx->iov[0].iov_base = vol->backing_super;
796 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
797 	load_ctx->iov[1].iov_base = load_ctx->path;
798 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
799 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
800 	load_ctx->backing_cb_args.cb_arg = load_ctx;
801 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
802 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
803 				vol->backing_dev->blocklen,
804 				&load_ctx->backing_cb_args);
805 }
806 
807 void
808 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
809 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
810 {
811 	if (vol == NULL) {
812 		/* This indicates a programming error. */
813 		assert(false);
814 		cb_fn(cb_arg, -EINVAL);
815 		return;
816 	}
817 
818 	if (--g_vol_count == 0) {
819 		spdk_free(g_zero_buf);
820 	}
821 	_init_load_cleanup(vol, NULL);
822 	cb_fn(cb_arg, 0);
823 }
824 
825 struct reduce_destroy_ctx {
826 	spdk_reduce_vol_op_complete		cb_fn;
827 	void					*cb_arg;
828 	struct spdk_reduce_vol			*vol;
829 	struct spdk_reduce_vol_superblock	*super;
830 	struct iovec				iov;
831 	struct spdk_reduce_vol_cb_args		backing_cb_args;
832 	int					reduce_errno;
833 	char					pm_path[REDUCE_PATH_MAX];
834 };
835 
836 static void
837 destroy_unload_cpl(void *cb_arg, int reduce_errno)
838 {
839 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
840 
841 	if (destroy_ctx->reduce_errno == 0) {
842 		if (unlink(destroy_ctx->pm_path)) {
843 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
844 				    destroy_ctx->pm_path, strerror(errno));
845 		}
846 	}
847 
848 	/* Even if the unload somehow failed, we still pass the destroy_ctx
849 	 * reduce_errno since that indicates whether or not the volume was
850 	 * actually destroyed.
851 	 */
852 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
853 	spdk_free(destroy_ctx->super);
854 	free(destroy_ctx);
855 }
856 
857 static void
858 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
859 {
860 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
861 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
862 
863 	destroy_ctx->reduce_errno = reduce_errno;
864 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
865 }
866 
867 static void
868 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
869 {
870 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
871 
872 	if (reduce_errno != 0) {
873 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
874 		spdk_free(destroy_ctx->super);
875 		free(destroy_ctx);
876 		return;
877 	}
878 
879 	destroy_ctx->vol = vol;
880 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
881 	destroy_ctx->iov.iov_base = destroy_ctx->super;
882 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
883 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
884 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
885 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
886 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
887 				 &destroy_ctx->backing_cb_args);
888 }
889 
890 void
891 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
892 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
893 {
894 	struct reduce_destroy_ctx *destroy_ctx;
895 
896 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
897 	if (destroy_ctx == NULL) {
898 		cb_fn(cb_arg, -ENOMEM);
899 		return;
900 	}
901 
902 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
903 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
904 	if (destroy_ctx->super == NULL) {
905 		free(destroy_ctx);
906 		cb_fn(cb_arg, -ENOMEM);
907 		return;
908 	}
909 	destroy_ctx->cb_fn = cb_fn;
910 	destroy_ctx->cb_arg = cb_arg;
911 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
912 }
913 
914 static bool
915 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
916 {
917 	uint64_t start_chunk, end_chunk;
918 
919 	start_chunk = offset / vol->logical_blocks_per_chunk;
920 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
921 
922 	return (start_chunk != end_chunk);
923 }
924 
925 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
926 
927 static void
928 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
929 {
930 	struct spdk_reduce_vol_request *next_req;
931 	struct spdk_reduce_vol *vol = req->vol;
932 
933 	req->cb_fn(req->cb_arg, reduce_errno);
934 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
935 
936 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
937 		if (next_req->logical_map_index == req->logical_map_index) {
938 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
939 			if (next_req->type == REDUCE_IO_READV) {
940 				_start_readv_request(next_req);
941 			} else {
942 				assert(next_req->type == REDUCE_IO_WRITEV);
943 				_start_writev_request(next_req);
944 			}
945 			break;
946 		}
947 	}
948 
949 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
950 }
951 
952 static void
953 _write_write_done(void *_req, int reduce_errno)
954 {
955 	struct spdk_reduce_vol_request *req = _req;
956 	struct spdk_reduce_vol *vol = req->vol;
957 	uint64_t old_chunk_map_index;
958 	struct spdk_reduce_chunk_map *old_chunk;
959 	uint32_t i;
960 
961 	if (reduce_errno != 0) {
962 		req->reduce_errno = reduce_errno;
963 	}
964 
965 	assert(req->num_backing_ops > 0);
966 	if (--req->num_backing_ops > 0) {
967 		return;
968 	}
969 
970 	if (req->reduce_errno != 0) {
971 		_reduce_vol_complete_req(req, req->reduce_errno);
972 		return;
973 	}
974 
975 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
976 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
977 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
978 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
979 			if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
980 				break;
981 			}
982 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
983 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
984 			old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
985 		}
986 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
987 	}
988 
989 	/*
990 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
991 	 * becomes invalid after we update the logical map, since the old chunk map will no
992 	 * longer have a reference to it in the logical map.
993 	 */
994 
995 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
996 	_reduce_persist(vol, req->chunk, _reduce_vol_get_chunk_struct_size(vol));
997 
998 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
999 
1000 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1001 
1002 	_reduce_vol_complete_req(req, 0);
1003 }
1004 
1005 static void
1006 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1007 		   reduce_request_fn next_fn, bool is_write)
1008 {
1009 	struct iovec *iov;
1010 	uint8_t *buf;
1011 	uint32_t i;
1012 
1013 	if (req->chunk_is_compressed) {
1014 		iov = req->comp_buf_iov;
1015 		buf = req->comp_buf;
1016 	} else {
1017 		iov = req->decomp_buf_iov;
1018 		buf = req->decomp_buf;
1019 	}
1020 
1021 	req->num_backing_ops = req->num_io_units;
1022 	req->backing_cb_args.cb_fn = next_fn;
1023 	req->backing_cb_args.cb_arg = req;
1024 	for (i = 0; i < req->num_io_units; i++) {
1025 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1026 		iov[i].iov_len = vol->params.backing_io_unit_size;
1027 		if (is_write) {
1028 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1029 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1030 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1031 		} else {
1032 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1033 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1034 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
1035 		}
1036 	}
1037 }
1038 
1039 static void
1040 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1041 			uint32_t compressed_size)
1042 {
1043 	struct spdk_reduce_vol *vol = req->vol;
1044 	uint32_t i;
1045 
1046 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1047 
1048 	/* TODO: fail if no chunk map found - but really this should not happen if we
1049 	 * size the number of requests similarly to number of extra chunk maps
1050 	 */
1051 	assert(req->chunk_map_index != UINT32_MAX);
1052 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1053 
1054 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1055 	req->num_io_units = spdk_divide_round_up(compressed_size,
1056 			    vol->params.backing_io_unit_size);
1057 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1058 	req->chunk->compressed_size =
1059 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1060 
1061 	for (i = 0; i < req->num_io_units; i++) {
1062 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1063 		/* TODO: fail if no backing block found - but really this should also not
1064 		 * happen (see comment above).
1065 		 */
1066 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1067 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1068 	}
1069 	while (i < vol->backing_io_units_per_chunk) {
1070 		req->chunk->io_unit_index[i++] = REDUCE_EMPTY_MAP_ENTRY;
1071 	}
1072 
1073 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1074 }
1075 
1076 static void
1077 _write_compress_done(void *_req, int reduce_errno)
1078 {
1079 	struct spdk_reduce_vol_request *req = _req;
1080 
1081 	/* Negative reduce_errno indicates failure for compression operations.
1082 	 * Just write the uncompressed data instead.  Force this to happen
1083 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1084 	 * When it sees the data couldn't be compressed, it will just write
1085 	 * the uncompressed buffer to disk.
1086 	 */
1087 	if (reduce_errno < 0) {
1088 		reduce_errno = req->vol->params.chunk_size;
1089 	}
1090 
1091 	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
1092 	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1093 }
1094 
1095 static void
1096 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1097 {
1098 	struct spdk_reduce_vol *vol = req->vol;
1099 
1100 	req->backing_cb_args.cb_fn = next_fn;
1101 	req->backing_cb_args.cb_arg = req;
1102 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1103 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1104 	vol->backing_dev->compress(vol->backing_dev,
1105 				   &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1,
1106 				   &req->backing_cb_args);
1107 }
1108 
1109 static void
1110 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1111 {
1112 	struct spdk_reduce_vol *vol = req->vol;
1113 
1114 	req->backing_cb_args.cb_fn = next_fn;
1115 	req->backing_cb_args.cb_arg = req;
1116 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1117 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1118 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1119 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1120 	vol->backing_dev->decompress(vol->backing_dev,
1121 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1122 				     &req->backing_cb_args);
1123 }
1124 
1125 static void
1126 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1127 {
1128 	struct spdk_reduce_vol *vol = req->vol;
1129 	uint64_t chunk_offset, remainder = 0;
1130 	uint64_t ttl_len = 0;
1131 	int i;
1132 
1133 	req->decomp_iovcnt = 0;
1134 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1135 
1136 	if (chunk_offset) {
1137 		/* first iov point to our scratch buffer for any offset into the chunk */
1138 		req->decomp_iov[0].iov_base = req->decomp_buf;
1139 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1140 		ttl_len += req->decomp_iov[0].iov_len;
1141 		req->decomp_iovcnt = 1;
1142 	}
1143 
1144 	/* now the user data iov, direct to the user buffer */
1145 	for (i = 0; i < req->iovcnt; i++) {
1146 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1147 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1148 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1149 	}
1150 	req->decomp_iovcnt += req->iovcnt;
1151 
1152 	/* send the rest of the chunk to our scratch buffer */
1153 	remainder = vol->params.chunk_size - ttl_len;
1154 	if (remainder) {
1155 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1156 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1157 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1158 		req->decomp_iovcnt++;
1159 	}
1160 	assert(ttl_len == vol->params.chunk_size);
1161 
1162 	req->backing_cb_args.cb_fn = next_fn;
1163 	req->backing_cb_args.cb_arg = req;
1164 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1165 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1166 	vol->backing_dev->decompress(vol->backing_dev,
1167 				     req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt,
1168 				     &req->backing_cb_args);
1169 }
1170 
1171 static void
1172 _write_decompress_done(void *_req, int reduce_errno)
1173 {
1174 	struct spdk_reduce_vol_request *req = _req;
1175 	struct spdk_reduce_vol *vol = req->vol;
1176 	uint64_t chunk_offset, ttl_len = 0;
1177 	int i;
1178 
1179 	/* Negative reduce_errno indicates failure for compression operations. */
1180 	if (reduce_errno < 0) {
1181 		_reduce_vol_complete_req(req, reduce_errno);
1182 		return;
1183 	}
1184 
1185 	/* Positive reduce_errno indicates number of bytes in decompressed
1186 	 *  buffer.  This should equal the chunk size - otherwise that's another
1187 	 *  type of failure.
1188 	 */
1189 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1190 		_reduce_vol_complete_req(req, -EIO);
1191 		return;
1192 	}
1193 
1194 	req->decomp_iovcnt = 0;
1195 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1196 
1197 	if (chunk_offset) {
1198 		req->decomp_iov[0].iov_base = req->decomp_buf;
1199 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1200 		ttl_len += req->decomp_iov[0].iov_len;
1201 		req->decomp_iovcnt = 1;
1202 	}
1203 
1204 	for (i = 0; i < req->iovcnt; i++) {
1205 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1206 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1207 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1208 	}
1209 	req->decomp_iovcnt += req->iovcnt;
1210 
1211 	if (ttl_len < req->vol->params.chunk_size) {
1212 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1213 		req->decomp_iov[req->decomp_iovcnt].iov_len = req->vol->params.chunk_size - ttl_len;
1214 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1215 		req->decomp_iovcnt++;
1216 	}
1217 	assert(ttl_len == vol->params.chunk_size);
1218 
1219 	_reduce_vol_compress_chunk(req, _write_compress_done);
1220 }
1221 
1222 static void
1223 _write_read_done(void *_req, int reduce_errno)
1224 {
1225 	struct spdk_reduce_vol_request *req = _req;
1226 
1227 	if (reduce_errno != 0) {
1228 		req->reduce_errno = reduce_errno;
1229 	}
1230 
1231 	assert(req->num_backing_ops > 0);
1232 	if (--req->num_backing_ops > 0) {
1233 		return;
1234 	}
1235 
1236 	if (req->reduce_errno != 0) {
1237 		_reduce_vol_complete_req(req, req->reduce_errno);
1238 		return;
1239 	}
1240 
1241 	if (req->chunk_is_compressed) {
1242 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1243 	} else {
1244 		_write_decompress_done(req, req->chunk->compressed_size);
1245 	}
1246 }
1247 
1248 static void
1249 _read_decompress_done(void *_req, int reduce_errno)
1250 {
1251 	struct spdk_reduce_vol_request *req = _req;
1252 	struct spdk_reduce_vol *vol = req->vol;
1253 
1254 	/* Negative reduce_errno indicates failure for compression operations. */
1255 	if (reduce_errno < 0) {
1256 		_reduce_vol_complete_req(req, reduce_errno);
1257 		return;
1258 	}
1259 
1260 	/* Positive reduce_errno indicates number of bytes in decompressed
1261 	 *  buffer.  This should equal the chunk size - otherwise that's another
1262 	 *  type of failure.
1263 	 */
1264 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1265 		_reduce_vol_complete_req(req, -EIO);
1266 		return;
1267 	}
1268 
1269 	_reduce_vol_complete_req(req, 0);
1270 }
1271 
1272 static void
1273 _read_read_done(void *_req, int reduce_errno)
1274 {
1275 	struct spdk_reduce_vol_request *req = _req;
1276 
1277 	if (reduce_errno != 0) {
1278 		req->reduce_errno = reduce_errno;
1279 	}
1280 
1281 	assert(req->num_backing_ops > 0);
1282 	if (--req->num_backing_ops > 0) {
1283 		return;
1284 	}
1285 
1286 	if (req->reduce_errno != 0) {
1287 		_reduce_vol_complete_req(req, req->reduce_errno);
1288 		return;
1289 	}
1290 
1291 	if (req->chunk_is_compressed) {
1292 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1293 	} else {
1294 		_read_decompress_done(req, req->chunk->compressed_size);
1295 	}
1296 }
1297 
1298 static void
1299 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1300 {
1301 	struct spdk_reduce_vol *vol = req->vol;
1302 
1303 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1304 	assert(req->chunk_map_index != UINT32_MAX);
1305 
1306 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1307 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1308 			    vol->params.backing_io_unit_size);
1309 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1310 
1311 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1312 }
1313 
1314 static bool
1315 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1316 		    uint64_t length)
1317 {
1318 	uint64_t size = 0;
1319 	int i;
1320 
1321 	for (i = 0; i < iovcnt; i++) {
1322 		size += iov[i].iov_len;
1323 	}
1324 
1325 	return size == (length * vol->params.logical_block_size);
1326 }
1327 
1328 static bool
1329 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1330 {
1331 	struct spdk_reduce_vol_request *req;
1332 
1333 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1334 		if (logical_map_index == req->logical_map_index) {
1335 			return true;
1336 		}
1337 	}
1338 
1339 	return false;
1340 }
1341 
1342 static void
1343 _start_readv_request(struct spdk_reduce_vol_request *req)
1344 {
1345 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1346 	_reduce_vol_read_chunk(req, _read_read_done);
1347 }
1348 
1349 void
1350 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1351 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1352 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1353 {
1354 	struct spdk_reduce_vol_request *req;
1355 	uint64_t logical_map_index;
1356 	bool overlapped;
1357 	int i;
1358 
1359 	if (length == 0) {
1360 		cb_fn(cb_arg, 0);
1361 		return;
1362 	}
1363 
1364 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1365 		cb_fn(cb_arg, -EINVAL);
1366 		return;
1367 	}
1368 
1369 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1370 		cb_fn(cb_arg, -EINVAL);
1371 		return;
1372 	}
1373 
1374 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1375 	overlapped = _check_overlap(vol, logical_map_index);
1376 
1377 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1378 		/*
1379 		 * This chunk hasn't been allocated.  So treat the data as all
1380 		 * zeroes for this chunk - do the memset and immediately complete
1381 		 * the operation.
1382 		 */
1383 		for (i = 0; i < iovcnt; i++) {
1384 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1385 		}
1386 		cb_fn(cb_arg, 0);
1387 		return;
1388 	}
1389 
1390 	req = TAILQ_FIRST(&vol->free_requests);
1391 	if (req == NULL) {
1392 		cb_fn(cb_arg, -ENOMEM);
1393 		return;
1394 	}
1395 
1396 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1397 	req->type = REDUCE_IO_READV;
1398 	req->vol = vol;
1399 	req->iov = iov;
1400 	req->iovcnt = iovcnt;
1401 	req->offset = offset;
1402 	req->logical_map_index = logical_map_index;
1403 	req->length = length;
1404 	req->cb_fn = cb_fn;
1405 	req->cb_arg = cb_arg;
1406 
1407 	if (!overlapped) {
1408 		_start_readv_request(req);
1409 	} else {
1410 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1411 	}
1412 }
1413 
1414 static void
1415 _start_writev_request(struct spdk_reduce_vol_request *req)
1416 {
1417 	struct spdk_reduce_vol *vol = req->vol;
1418 	uint64_t chunk_offset, ttl_len = 0;
1419 	uint64_t remainder = 0;
1420 	uint32_t lbsize, lb_per_chunk;
1421 	int i;
1422 
1423 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1424 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1425 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1426 			/* Read old chunk, then overwrite with data from this write
1427 			 *  operation.
1428 			 */
1429 			_reduce_vol_read_chunk(req, _write_read_done);
1430 			return;
1431 		}
1432 	}
1433 
1434 	lbsize = vol->params.logical_block_size;
1435 	lb_per_chunk = vol->logical_blocks_per_chunk;
1436 	req->decomp_iovcnt = 0;
1437 
1438 	/* Note: point to our zero buf for offset into the chunk. */
1439 	chunk_offset = req->offset % lb_per_chunk;
1440 	if (chunk_offset != 0) {
1441 		ttl_len += chunk_offset * lbsize;
1442 		req->decomp_iov[0].iov_base = g_zero_buf;
1443 		req->decomp_iov[0].iov_len = ttl_len;
1444 		req->decomp_iovcnt = 1;
1445 	}
1446 
1447 	/* now the user data iov, direct to the user buffer */
1448 	for (i = 0; i < req->iovcnt; i++) {
1449 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1450 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1451 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1452 	}
1453 	req->decomp_iovcnt += req->iovcnt;
1454 
1455 	chunk_offset += req->length;
1456 	if (chunk_offset != lb_per_chunk) {
1457 		remainder = (lb_per_chunk - chunk_offset) * lbsize;
1458 		req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf;
1459 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1460 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1461 		req->decomp_iovcnt++;
1462 	}
1463 	assert(ttl_len == req->vol->params.chunk_size);
1464 
1465 	_reduce_vol_compress_chunk(req, _write_compress_done);
1466 }
1467 
1468 void
1469 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1470 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1471 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1472 {
1473 	struct spdk_reduce_vol_request *req;
1474 	uint64_t logical_map_index;
1475 	bool overlapped;
1476 
1477 	if (length == 0) {
1478 		cb_fn(cb_arg, 0);
1479 		return;
1480 	}
1481 
1482 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1483 		cb_fn(cb_arg, -EINVAL);
1484 		return;
1485 	}
1486 
1487 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1488 		cb_fn(cb_arg, -EINVAL);
1489 		return;
1490 	}
1491 
1492 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1493 	overlapped = _check_overlap(vol, logical_map_index);
1494 
1495 	req = TAILQ_FIRST(&vol->free_requests);
1496 	if (req == NULL) {
1497 		cb_fn(cb_arg, -ENOMEM);
1498 		return;
1499 	}
1500 
1501 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1502 	req->type = REDUCE_IO_WRITEV;
1503 	req->vol = vol;
1504 	req->iov = iov;
1505 	req->iovcnt = iovcnt;
1506 	req->offset = offset;
1507 	req->logical_map_index = logical_map_index;
1508 	req->length = length;
1509 	req->cb_fn = cb_fn;
1510 	req->cb_arg = cb_arg;
1511 
1512 	if (!overlapped) {
1513 		_start_writev_request(req);
1514 	} else {
1515 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1516 	}
1517 }
1518 
1519 const struct spdk_reduce_vol_params *
1520 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1521 {
1522 	return &vol->params;
1523 }
1524 
1525 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1526 {
1527 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1528 	uint32_t struct_size;
1529 	uint64_t chunk_map_size;
1530 
1531 	SPDK_NOTICELOG("vol info:\n");
1532 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1533 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1534 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1535 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1536 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1537 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1538 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1539 		       vol->params.vol_size / vol->params.chunk_size);
1540 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1541 			vol->params.backing_io_unit_size);
1542 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1543 	struct_size = _reduce_vol_get_chunk_struct_size(vol);
1544 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1545 
1546 	SPDK_NOTICELOG("pmem info:\n");
1547 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1548 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1549 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1550 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1551 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1552 			   vol->params.chunk_size);
1553 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1554 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1555 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1556 			 vol->params.backing_io_unit_size);
1557 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1558 }
1559 
1560 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1561