xref: /spdk/lib/reduce/reduce.c (revision ee513ce4a2ae8abfde3bc9aadfe5a15df857f639)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "queue_internal.h"
10 
11 #include "spdk/reduce.h"
12 #include "spdk/env.h"
13 #include "spdk/string.h"
14 #include "spdk/bit_array.h"
15 #include "spdk/util.h"
16 #include "spdk/log.h"
17 #include "spdk/memory.h"
18 #include "spdk/tree.h"
19 
20 #include "libpmem.h"
21 
22 /* Always round up the size of the PM region to the nearest cacheline. */
23 #define REDUCE_PM_SIZE_ALIGNMENT	64
24 
25 /* Offset into the backing device where the persistent memory file's path is stored. */
26 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
27 
28 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
29 
30 #define REDUCE_NUM_VOL_REQUESTS	256
31 
32 /* Structure written to offset 0 of both the pm file and the backing device. */
33 struct spdk_reduce_vol_superblock {
34 	uint8_t				signature[8];
35 	struct spdk_reduce_vol_params	params;
36 	uint8_t				reserved[4040];
37 };
38 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
39 
40 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
41 /* null terminator counts one */
42 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
43 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
44 
45 #define REDUCE_PATH_MAX 4096
46 
47 #define REDUCE_ZERO_BUF_SIZE 0x100000
48 
49 /**
50  * Describes a persistent memory file used to hold metadata associated with a
51  *  compressed volume.
52  */
53 struct spdk_reduce_pm_file {
54 	char			path[REDUCE_PATH_MAX];
55 	void			*pm_buf;
56 	int			pm_is_pmem;
57 	uint64_t		size;
58 };
59 
60 #define REDUCE_IO_READV		1
61 #define REDUCE_IO_WRITEV	2
62 #define	REDUCE_IO_UNMAP		3
63 
64 struct spdk_reduce_chunk_map {
65 	uint32_t		compressed_size;
66 	uint32_t		reserved;
67 	uint64_t		io_unit_index[0];
68 };
69 
70 struct spdk_reduce_vol_request {
71 	/**
72 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
73 	 *   1) source buffer for compression operations
74 	 *   2) destination buffer for decompression operations
75 	 *   3) data buffer when writing uncompressed chunk to disk
76 	 *   4) data buffer when reading uncompressed chunk from disk
77 	 */
78 	uint8_t					*decomp_buf;
79 	struct iovec				*decomp_buf_iov;
80 
81 	/**
82 	 * These are used to construct the iovecs that are sent to
83 	 *  the decomp engine, they point to a mix of the scratch buffer
84 	 *  and user buffer
85 	 */
86 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
87 	int					decomp_iovcnt;
88 
89 	/**
90 	 *  Scratch buffer used for compressed chunk.  This is used for:
91 	 *   1) destination buffer for compression operations
92 	 *   2) source buffer for decompression operations
93 	 *   3) data buffer when writing compressed chunk to disk
94 	 *   4) data buffer when reading compressed chunk from disk
95 	 */
96 	uint8_t					*comp_buf;
97 	struct iovec				*comp_buf_iov;
98 	struct iovec				*iov;
99 	bool					rmw;
100 	struct spdk_reduce_vol			*vol;
101 	int					type;
102 	int					reduce_errno;
103 	int					iovcnt;
104 	int					num_backing_ops;
105 	uint32_t				num_io_units;
106 	struct spdk_reduce_backing_io           *backing_io;
107 	bool					chunk_is_compressed;
108 	bool					copy_after_decompress;
109 	uint64_t				offset;
110 	uint64_t				logical_map_index;
111 	uint64_t				length;
112 	uint64_t				chunk_map_index;
113 	struct spdk_reduce_chunk_map		*chunk;
114 	spdk_reduce_vol_op_complete		cb_fn;
115 	void					*cb_arg;
116 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
117 	RB_ENTRY(spdk_reduce_vol_request)	rbnode;
118 	struct spdk_reduce_vol_cb_args		backing_cb_args;
119 };
120 
121 struct spdk_reduce_vol {
122 	struct spdk_reduce_vol_params		params;
123 	uint32_t				backing_io_units_per_chunk;
124 	uint32_t				backing_lba_per_io_unit;
125 	uint32_t				logical_blocks_per_chunk;
126 	struct spdk_reduce_pm_file		pm_file;
127 	struct spdk_reduce_backing_dev		*backing_dev;
128 	struct spdk_reduce_vol_superblock	*backing_super;
129 	struct spdk_reduce_vol_superblock	*pm_super;
130 	uint64_t				*pm_logical_map;
131 	uint64_t				*pm_chunk_maps;
132 
133 	struct spdk_bit_array			*allocated_chunk_maps;
134 	/* The starting position when looking for a block from allocated_chunk_maps */
135 	uint64_t				find_chunk_offset;
136 	/* Cache free chunks to speed up lookup of free chunk. */
137 	struct reduce_queue			free_chunks_queue;
138 	struct spdk_bit_array			*allocated_backing_io_units;
139 	/* The starting position when looking for a block from allocated_backing_io_units */
140 	uint64_t				find_block_offset;
141 	/* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */
142 	struct reduce_queue			free_backing_blocks_queue;
143 
144 	struct spdk_reduce_vol_request		*request_mem;
145 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
146 	RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests;
147 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
148 
149 	/* Single contiguous buffer used for all request buffers for this volume. */
150 	uint8_t					*buf_mem;
151 	struct iovec				*buf_iov_mem;
152 	/* Single contiguous buffer used for backing io buffers for this volume. */
153 	uint8_t					*buf_backing_io_mem;
154 };
155 
156 static void _start_readv_request(struct spdk_reduce_vol_request *req);
157 static void _start_writev_request(struct spdk_reduce_vol_request *req);
158 static uint8_t *g_zero_buf;
159 static int g_vol_count = 0;
160 
161 /*
162  * Allocate extra metadata chunks and corresponding backing io units to account for
163  *  outstanding IO in worst case scenario where logical map is completely allocated
164  *  and no data can be compressed.  We need extra chunks in this case to handle
165  *  in-flight writes since reduce never writes data in place.
166  */
167 #define REDUCE_NUM_EXTRA_CHUNKS 128
168 
169 static void
170 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
171 {
172 	if (vol->pm_file.pm_is_pmem) {
173 		pmem_persist(addr, len);
174 	} else {
175 		pmem_msync(addr, len);
176 	}
177 }
178 
179 static uint64_t
180 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
181 {
182 	uint64_t chunks_in_logical_map, logical_map_size;
183 
184 	chunks_in_logical_map = vol_size / chunk_size;
185 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
186 
187 	/* Round up to next cacheline. */
188 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
189 	       REDUCE_PM_SIZE_ALIGNMENT;
190 }
191 
192 static uint64_t
193 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
194 {
195 	uint64_t num_chunks;
196 
197 	num_chunks = vol_size / chunk_size;
198 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
199 
200 	return num_chunks;
201 }
202 
203 static inline uint32_t
204 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
205 {
206 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
207 }
208 
209 static uint64_t
210 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
211 {
212 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
213 
214 	num_chunks = _get_total_chunks(vol_size, chunk_size);
215 	io_units_per_chunk = chunk_size / backing_io_unit_size;
216 
217 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
218 
219 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
220 	       REDUCE_PM_SIZE_ALIGNMENT;
221 }
222 
223 static struct spdk_reduce_chunk_map *
224 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
225 {
226 	uintptr_t chunk_map_addr;
227 
228 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
229 
230 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
231 	chunk_map_addr += chunk_map_index *
232 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
233 
234 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
235 }
236 
237 static int
238 _validate_vol_params(struct spdk_reduce_vol_params *params)
239 {
240 	if (params->vol_size > 0) {
241 		/**
242 		 * User does not pass in the vol size - it gets calculated by libreduce from
243 		 *  values in this structure plus the size of the backing device.
244 		 */
245 		return -EINVAL;
246 	}
247 
248 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
249 	    params->logical_block_size == 0) {
250 		return -EINVAL;
251 	}
252 
253 	/* Chunk size must be an even multiple of the backing io unit size. */
254 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
255 		return -EINVAL;
256 	}
257 
258 	/* Chunk size must be an even multiple of the logical block size. */
259 	if ((params->chunk_size % params->logical_block_size) != 0) {
260 		return -1;
261 	}
262 
263 	return 0;
264 }
265 
266 static uint64_t
267 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
268 {
269 	uint64_t num_chunks;
270 
271 	num_chunks = backing_dev_size / chunk_size;
272 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
273 		return 0;
274 	}
275 
276 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
277 	return num_chunks * chunk_size;
278 }
279 
280 static uint64_t
281 _get_pm_file_size(struct spdk_reduce_vol_params *params)
282 {
283 	uint64_t total_pm_size;
284 
285 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
286 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
287 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
288 			 params->backing_io_unit_size);
289 	return total_pm_size;
290 }
291 
292 const struct spdk_uuid *
293 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
294 {
295 	return &vol->params.uuid;
296 }
297 
298 static void
299 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
300 {
301 	uint64_t logical_map_size;
302 
303 	/* Superblock is at the beginning of the pm file. */
304 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
305 
306 	/* Logical map immediately follows the super block. */
307 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
308 
309 	/* Chunks maps follow the logical map. */
310 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
311 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
312 }
313 
314 /* We need 2 iovs during load - one for the superblock, another for the path */
315 #define LOAD_IOV_COUNT	2
316 
317 struct reduce_init_load_ctx {
318 	struct spdk_reduce_vol			*vol;
319 	struct spdk_reduce_vol_cb_args		backing_cb_args;
320 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
321 	void					*cb_arg;
322 	struct iovec				iov[LOAD_IOV_COUNT];
323 	void					*path;
324 	struct spdk_reduce_backing_io           *backing_io;
325 };
326 
327 static inline bool
328 _addr_crosses_huge_page(const void *addr, size_t *size)
329 {
330 	size_t _size;
331 	uint64_t rc;
332 
333 	assert(size);
334 
335 	_size = *size;
336 	rc = spdk_vtophys(addr, size);
337 
338 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
339 }
340 
341 static inline int
342 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
343 {
344 	uint8_t *addr;
345 	size_t size_tmp = buffer_size;
346 
347 	addr = *_addr;
348 
349 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
350 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
351 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
352 		 * Skip remaining bytes and continue from the beginning of the next page */
353 		addr += size_tmp;
354 	}
355 
356 	if (addr + buffer_size > addr_range) {
357 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
358 		return -ERANGE;
359 	}
360 
361 	*vol_buffer = addr;
362 	*_addr = addr + buffer_size;
363 
364 	return 0;
365 }
366 
367 static int
368 _allocate_vol_requests(struct spdk_reduce_vol *vol)
369 {
370 	struct spdk_reduce_vol_request *req;
371 	struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev;
372 	uint32_t reqs_in_2mb_page, huge_pages_needed;
373 	uint8_t *buffer, *buffer_end;
374 	int i = 0;
375 	int rc = 0;
376 
377 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
378 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
379 	* necessarily power of 2
380 	* Allocate 2x since we need buffers for both read/write and compress/decompress
381 	* intermediate buffers. */
382 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
383 	if (!reqs_in_2mb_page) {
384 		return -EINVAL;
385 	}
386 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
387 
388 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
389 	if (vol->buf_mem == NULL) {
390 		return -ENOMEM;
391 	}
392 
393 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
394 	if (vol->request_mem == NULL) {
395 		spdk_free(vol->buf_mem);
396 		vol->buf_mem = NULL;
397 		return -ENOMEM;
398 	}
399 
400 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
401 	 *  buffers.
402 	 */
403 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
404 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
405 	if (vol->buf_iov_mem == NULL) {
406 		free(vol->request_mem);
407 		spdk_free(vol->buf_mem);
408 		vol->request_mem = NULL;
409 		vol->buf_mem = NULL;
410 		return -ENOMEM;
411 	}
412 
413 	vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) +
414 					 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk);
415 	if (vol->buf_backing_io_mem == NULL) {
416 		free(vol->request_mem);
417 		free(vol->buf_iov_mem);
418 		spdk_free(vol->buf_mem);
419 		vol->request_mem = NULL;
420 		vol->buf_iov_mem = NULL;
421 		vol->buf_mem = NULL;
422 		return -ENOMEM;
423 	}
424 
425 	buffer = vol->buf_mem;
426 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
427 
428 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
429 		req = &vol->request_mem[i];
430 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
431 		req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i *
432 				  (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) *
433 				  vol->backing_io_units_per_chunk);
434 
435 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
436 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
437 
438 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
439 		if (rc) {
440 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
441 				    vol->buf_mem, buffer_end);
442 			break;
443 		}
444 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
445 		if (rc) {
446 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
447 				    vol->buf_mem, buffer_end);
448 			break;
449 		}
450 	}
451 
452 	if (rc) {
453 		free(vol->buf_backing_io_mem);
454 		free(vol->buf_iov_mem);
455 		free(vol->request_mem);
456 		spdk_free(vol->buf_mem);
457 		vol->buf_mem = NULL;
458 		vol->buf_backing_io_mem = NULL;
459 		vol->buf_iov_mem = NULL;
460 		vol->request_mem = NULL;
461 	}
462 
463 	return rc;
464 }
465 
466 static void
467 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
468 {
469 	if (ctx != NULL) {
470 		spdk_free(ctx->path);
471 		free(ctx->backing_io);
472 		free(ctx);
473 	}
474 
475 	if (vol != NULL) {
476 		if (vol->pm_file.pm_buf != NULL) {
477 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
478 		}
479 
480 		spdk_free(vol->backing_super);
481 		spdk_bit_array_free(&vol->allocated_chunk_maps);
482 		spdk_bit_array_free(&vol->allocated_backing_io_units);
483 		free(vol->request_mem);
484 		free(vol->buf_backing_io_mem);
485 		free(vol->buf_iov_mem);
486 		spdk_free(vol->buf_mem);
487 		free(vol);
488 	}
489 }
490 
491 static int
492 _alloc_zero_buff(void)
493 {
494 	int rc = 0;
495 
496 	/* The zero buffer is shared between all volumes and just used
497 	 * for reads so allocate one global instance here if not already
498 	 * allocated when another vol init'd or loaded.
499 	 */
500 	if (g_vol_count++ == 0) {
501 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
502 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
503 					  SPDK_MALLOC_DMA);
504 		if (g_zero_buf == NULL) {
505 			g_vol_count--;
506 			rc = -ENOMEM;
507 		}
508 	}
509 	return rc;
510 }
511 
512 static void
513 _init_write_super_cpl(void *cb_arg, int reduce_errno)
514 {
515 	struct reduce_init_load_ctx *init_ctx = cb_arg;
516 	int rc = 0;
517 
518 	if (reduce_errno != 0) {
519 		rc = reduce_errno;
520 		goto err;
521 	}
522 
523 	rc = _allocate_vol_requests(init_ctx->vol);
524 	if (rc != 0) {
525 		goto err;
526 	}
527 
528 	rc = _alloc_zero_buff();
529 	if (rc != 0) {
530 		goto err;
531 	}
532 
533 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, rc);
534 	/* Only clean up the ctx - the vol has been passed to the application
535 	 *  for use now that initialization was successful.
536 	 */
537 	_init_load_cleanup(NULL, init_ctx);
538 
539 	return;
540 err:
541 	if (unlink(init_ctx->path)) {
542 		SPDK_ERRLOG("%s could not be unlinked: %s\n",
543 			    (char *)init_ctx->path, spdk_strerror(errno));
544 	}
545 
546 	init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
547 	_init_load_cleanup(init_ctx->vol, init_ctx);
548 }
549 
550 static void
551 _init_write_path_cpl(void *cb_arg, int reduce_errno)
552 {
553 	struct reduce_init_load_ctx *init_ctx = cb_arg;
554 	struct spdk_reduce_vol *vol = init_ctx->vol;
555 	struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io;
556 
557 	if (reduce_errno != 0) {
558 		_init_write_super_cpl(cb_arg, reduce_errno);
559 		return;
560 	}
561 
562 	init_ctx->iov[0].iov_base = vol->backing_super;
563 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
564 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
565 	init_ctx->backing_cb_args.cb_arg = init_ctx;
566 
567 	backing_io->dev = vol->backing_dev;
568 	backing_io->iov = init_ctx->iov;
569 	backing_io->iovcnt = 1;
570 	backing_io->lba = 0;
571 	backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen;
572 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
573 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
574 
575 	vol->backing_dev->submit_backing_io(backing_io);
576 }
577 
578 static int
579 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
580 {
581 	uint64_t total_chunks, total_backing_io_units;
582 	uint32_t i, num_metadata_io_units;
583 
584 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
585 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
586 	vol->find_chunk_offset = 0;
587 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
588 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
589 	vol->find_block_offset = 0;
590 
591 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
592 		return -ENOMEM;
593 	}
594 
595 	/* Set backing io unit bits associated with metadata. */
596 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
597 				vol->params.backing_io_unit_size;
598 	for (i = 0; i < num_metadata_io_units; i++) {
599 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
600 	}
601 
602 	return 0;
603 }
604 
605 static int
606 overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2)
607 {
608 	return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index >
609 		req2->logical_map_index);
610 }
611 RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp);
612 
613 
614 void
615 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
616 		     struct spdk_reduce_backing_dev *backing_dev,
617 		     const char *pm_file_dir,
618 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
619 {
620 	struct spdk_reduce_vol *vol;
621 	struct reduce_init_load_ctx *init_ctx;
622 	struct spdk_reduce_backing_io *backing_io;
623 	uint64_t backing_dev_size;
624 	size_t mapped_len;
625 	int dir_len, max_dir_len, rc;
626 
627 	/* We need to append a path separator and the UUID to the supplied
628 	 * path.
629 	 */
630 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
631 	dir_len = strnlen(pm_file_dir, max_dir_len);
632 	/* Strip trailing slash if the user provided one - we will add it back
633 	 * later when appending the filename.
634 	 */
635 	if (pm_file_dir[dir_len - 1] == '/') {
636 		dir_len--;
637 	}
638 	if (dir_len == max_dir_len) {
639 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
640 		cb_fn(cb_arg, NULL, -EINVAL);
641 		return;
642 	}
643 
644 	rc = _validate_vol_params(params);
645 	if (rc != 0) {
646 		SPDK_ERRLOG("invalid vol params\n");
647 		cb_fn(cb_arg, NULL, rc);
648 		return;
649 	}
650 
651 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
652 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
653 	if (params->vol_size == 0) {
654 		SPDK_ERRLOG("backing device is too small\n");
655 		cb_fn(cb_arg, NULL, -EINVAL);
656 		return;
657 	}
658 
659 	if (backing_dev->submit_backing_io == NULL) {
660 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
661 		cb_fn(cb_arg, NULL, -EINVAL);
662 		return;
663 	}
664 
665 	vol = calloc(1, sizeof(*vol));
666 	if (vol == NULL) {
667 		cb_fn(cb_arg, NULL, -ENOMEM);
668 		return;
669 	}
670 
671 	TAILQ_INIT(&vol->free_requests);
672 	RB_INIT(&vol->executing_requests);
673 	TAILQ_INIT(&vol->queued_requests);
674 	queue_init(&vol->free_chunks_queue);
675 	queue_init(&vol->free_backing_blocks_queue);
676 
677 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
678 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
679 	if (vol->backing_super == NULL) {
680 		cb_fn(cb_arg, NULL, -ENOMEM);
681 		_init_load_cleanup(vol, NULL);
682 		return;
683 	}
684 
685 	init_ctx = calloc(1, sizeof(*init_ctx));
686 	if (init_ctx == NULL) {
687 		cb_fn(cb_arg, NULL, -ENOMEM);
688 		_init_load_cleanup(vol, NULL);
689 		return;
690 	}
691 
692 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
693 	if (backing_io == NULL) {
694 		cb_fn(cb_arg, NULL, -ENOMEM);
695 		_init_load_cleanup(vol, init_ctx);
696 		return;
697 	}
698 	init_ctx->backing_io = backing_io;
699 
700 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
701 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
702 	if (init_ctx->path == NULL) {
703 		cb_fn(cb_arg, NULL, -ENOMEM);
704 		_init_load_cleanup(vol, init_ctx);
705 		return;
706 	}
707 
708 	if (spdk_uuid_is_null(&params->uuid)) {
709 		spdk_uuid_generate(&params->uuid);
710 	}
711 
712 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
713 	vol->pm_file.path[dir_len] = '/';
714 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
715 			    &params->uuid);
716 	vol->pm_file.size = _get_pm_file_size(params);
717 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
718 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
719 					    &mapped_len, &vol->pm_file.pm_is_pmem);
720 	if (vol->pm_file.pm_buf == NULL) {
721 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
722 			    vol->pm_file.path, strerror(errno));
723 		cb_fn(cb_arg, NULL, -errno);
724 		_init_load_cleanup(vol, init_ctx);
725 		return;
726 	}
727 
728 	if (vol->pm_file.size != mapped_len) {
729 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
730 			    vol->pm_file.size, mapped_len);
731 		cb_fn(cb_arg, NULL, -ENOMEM);
732 		_init_load_cleanup(vol, init_ctx);
733 		return;
734 	}
735 
736 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
737 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
738 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
739 	memcpy(&vol->params, params, sizeof(*params));
740 
741 	vol->backing_dev = backing_dev;
742 
743 	rc = _allocate_bit_arrays(vol);
744 	if (rc != 0) {
745 		cb_fn(cb_arg, NULL, rc);
746 		_init_load_cleanup(vol, init_ctx);
747 		return;
748 	}
749 
750 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
751 	       sizeof(vol->backing_super->signature));
752 	memcpy(&vol->backing_super->params, params, sizeof(*params));
753 
754 	_initialize_vol_pm_pointers(vol);
755 
756 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
757 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
758 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
759 	 */
760 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
761 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
762 
763 	init_ctx->vol = vol;
764 	init_ctx->cb_fn = cb_fn;
765 	init_ctx->cb_arg = cb_arg;
766 
767 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
768 	init_ctx->iov[0].iov_base = init_ctx->path;
769 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
770 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
771 	init_ctx->backing_cb_args.cb_arg = init_ctx;
772 	/* Write path to offset 4K on backing device - just after where the super
773 	 *  block will be written.  We wait until this is committed before writing the
774 	 *  super block to guarantee we don't get the super block written without the
775 	 *  the path if the system crashed in the middle of a write operation.
776 	 */
777 	backing_io->dev = vol->backing_dev;
778 	backing_io->iov = init_ctx->iov;
779 	backing_io->iovcnt = 1;
780 	backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen;
781 	backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen;
782 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
783 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
784 
785 	vol->backing_dev->submit_backing_io(backing_io);
786 }
787 
788 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
789 
790 static void
791 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
792 {
793 	struct reduce_init_load_ctx *load_ctx = cb_arg;
794 	struct spdk_reduce_vol *vol = load_ctx->vol;
795 	uint64_t backing_dev_size;
796 	uint64_t i, num_chunks, logical_map_index;
797 	struct spdk_reduce_chunk_map *chunk;
798 	size_t mapped_len;
799 	uint32_t j;
800 	int rc;
801 
802 	if (reduce_errno != 0) {
803 		rc = reduce_errno;
804 		goto error;
805 	}
806 
807 	rc = _alloc_zero_buff();
808 	if (rc) {
809 		goto error;
810 	}
811 
812 	if (memcmp(vol->backing_super->signature,
813 		   SPDK_REDUCE_SIGNATURE,
814 		   sizeof(vol->backing_super->signature)) != 0) {
815 		/* This backing device isn't a libreduce backing device. */
816 		rc = -EILSEQ;
817 		goto error;
818 	}
819 
820 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
821 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
822 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
823 	 *  persistent memory file if it exists.
824 	 */
825 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
826 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
827 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
828 		_init_load_cleanup(NULL, load_ctx);
829 		return;
830 	}
831 
832 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
833 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
834 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
835 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
836 
837 	rc = _allocate_bit_arrays(vol);
838 	if (rc != 0) {
839 		goto error;
840 	}
841 
842 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
843 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
844 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
845 			    backing_dev_size);
846 		rc = -EILSEQ;
847 		goto error;
848 	}
849 
850 	vol->pm_file.size = _get_pm_file_size(&vol->params);
851 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
852 					    &vol->pm_file.pm_is_pmem);
853 	if (vol->pm_file.pm_buf == NULL) {
854 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
855 		rc = -errno;
856 		goto error;
857 	}
858 
859 	if (vol->pm_file.size != mapped_len) {
860 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
861 			    vol->pm_file.size, mapped_len);
862 		rc = -ENOMEM;
863 		goto error;
864 	}
865 
866 	rc = _allocate_vol_requests(vol);
867 	if (rc != 0) {
868 		goto error;
869 	}
870 
871 	_initialize_vol_pm_pointers(vol);
872 
873 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
874 	for (i = 0; i < num_chunks; i++) {
875 		logical_map_index = vol->pm_logical_map[i];
876 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
877 			continue;
878 		}
879 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
880 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
881 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
882 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
883 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
884 			}
885 		}
886 	}
887 
888 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
889 	/* Only clean up the ctx - the vol has been passed to the application
890 	 *  for use now that volume load was successful.
891 	 */
892 	_init_load_cleanup(NULL, load_ctx);
893 	return;
894 
895 error:
896 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
897 	_init_load_cleanup(vol, load_ctx);
898 }
899 
900 void
901 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
902 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
903 {
904 	struct spdk_reduce_vol *vol;
905 	struct reduce_init_load_ctx *load_ctx;
906 	struct spdk_reduce_backing_io *backing_io;
907 
908 	if (backing_dev->submit_backing_io == NULL) {
909 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
910 		cb_fn(cb_arg, NULL, -EINVAL);
911 		return;
912 	}
913 
914 	vol = calloc(1, sizeof(*vol));
915 	if (vol == NULL) {
916 		cb_fn(cb_arg, NULL, -ENOMEM);
917 		return;
918 	}
919 
920 	TAILQ_INIT(&vol->free_requests);
921 	RB_INIT(&vol->executing_requests);
922 	TAILQ_INIT(&vol->queued_requests);
923 	queue_init(&vol->free_chunks_queue);
924 	queue_init(&vol->free_backing_blocks_queue);
925 
926 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
927 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
928 	if (vol->backing_super == NULL) {
929 		_init_load_cleanup(vol, NULL);
930 		cb_fn(cb_arg, NULL, -ENOMEM);
931 		return;
932 	}
933 
934 	vol->backing_dev = backing_dev;
935 
936 	load_ctx = calloc(1, sizeof(*load_ctx));
937 	if (load_ctx == NULL) {
938 		_init_load_cleanup(vol, NULL);
939 		cb_fn(cb_arg, NULL, -ENOMEM);
940 		return;
941 	}
942 
943 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
944 	if (backing_io == NULL) {
945 		_init_load_cleanup(vol, load_ctx);
946 		cb_fn(cb_arg, NULL, -ENOMEM);
947 		return;
948 	}
949 
950 	load_ctx->backing_io = backing_io;
951 
952 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
953 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
954 	if (load_ctx->path == NULL) {
955 		_init_load_cleanup(vol, load_ctx);
956 		cb_fn(cb_arg, NULL, -ENOMEM);
957 		return;
958 	}
959 
960 	load_ctx->vol = vol;
961 	load_ctx->cb_fn = cb_fn;
962 	load_ctx->cb_arg = cb_arg;
963 
964 	load_ctx->iov[0].iov_base = vol->backing_super;
965 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
966 	load_ctx->iov[1].iov_base = load_ctx->path;
967 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
968 	backing_io->dev = vol->backing_dev;
969 	backing_io->iov = load_ctx->iov;
970 	backing_io->iovcnt = LOAD_IOV_COUNT;
971 	backing_io->lba = 0;
972 	backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
973 				vol->backing_dev->blocklen;
974 	backing_io->backing_cb_args = &load_ctx->backing_cb_args;
975 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
976 
977 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
978 	load_ctx->backing_cb_args.cb_arg = load_ctx;
979 	vol->backing_dev->submit_backing_io(backing_io);
980 }
981 
982 void
983 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
984 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
985 {
986 	if (vol == NULL) {
987 		/* This indicates a programming error. */
988 		assert(false);
989 		cb_fn(cb_arg, -EINVAL);
990 		return;
991 	}
992 
993 	if (--g_vol_count == 0) {
994 		spdk_free(g_zero_buf);
995 	}
996 	assert(g_vol_count >= 0);
997 	_init_load_cleanup(vol, NULL);
998 	cb_fn(cb_arg, 0);
999 }
1000 
1001 struct reduce_destroy_ctx {
1002 	spdk_reduce_vol_op_complete		cb_fn;
1003 	void					*cb_arg;
1004 	struct spdk_reduce_vol			*vol;
1005 	struct spdk_reduce_vol_superblock	*super;
1006 	struct iovec				iov;
1007 	struct spdk_reduce_vol_cb_args		backing_cb_args;
1008 	int					reduce_errno;
1009 	char					pm_path[REDUCE_PATH_MAX];
1010 	struct spdk_reduce_backing_io           *backing_io;
1011 };
1012 
1013 static void
1014 destroy_unload_cpl(void *cb_arg, int reduce_errno)
1015 {
1016 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1017 
1018 	if (destroy_ctx->reduce_errno == 0) {
1019 		if (unlink(destroy_ctx->pm_path)) {
1020 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
1021 				    destroy_ctx->pm_path, strerror(errno));
1022 		}
1023 	}
1024 
1025 	/* Even if the unload somehow failed, we still pass the destroy_ctx
1026 	 * reduce_errno since that indicates whether or not the volume was
1027 	 * actually destroyed.
1028 	 */
1029 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
1030 	spdk_free(destroy_ctx->super);
1031 	free(destroy_ctx->backing_io);
1032 	free(destroy_ctx);
1033 }
1034 
1035 static void
1036 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
1037 {
1038 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1039 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
1040 
1041 	destroy_ctx->reduce_errno = reduce_errno;
1042 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
1043 }
1044 
1045 static void
1046 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1047 {
1048 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1049 	struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io;
1050 
1051 	if (reduce_errno != 0) {
1052 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
1053 		spdk_free(destroy_ctx->super);
1054 		free(destroy_ctx);
1055 		return;
1056 	}
1057 
1058 	destroy_ctx->vol = vol;
1059 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
1060 	destroy_ctx->iov.iov_base = destroy_ctx->super;
1061 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
1062 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
1063 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
1064 
1065 	backing_io->dev = vol->backing_dev;
1066 	backing_io->iov = &destroy_ctx->iov;
1067 	backing_io->iovcnt = 1;
1068 	backing_io->lba = 0;
1069 	backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen;
1070 	backing_io->backing_cb_args = &destroy_ctx->backing_cb_args;
1071 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1072 
1073 	vol->backing_dev->submit_backing_io(backing_io);
1074 }
1075 
1076 void
1077 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
1078 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1079 {
1080 	struct reduce_destroy_ctx *destroy_ctx;
1081 	struct spdk_reduce_backing_io *backing_io;
1082 
1083 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
1084 	if (destroy_ctx == NULL) {
1085 		cb_fn(cb_arg, -ENOMEM);
1086 		return;
1087 	}
1088 
1089 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
1090 	if (backing_io == NULL) {
1091 		free(destroy_ctx);
1092 		cb_fn(cb_arg, -ENOMEM);
1093 		return;
1094 	}
1095 
1096 	destroy_ctx->backing_io = backing_io;
1097 
1098 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
1099 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1100 	if (destroy_ctx->super == NULL) {
1101 		free(destroy_ctx);
1102 		free(backing_io);
1103 		cb_fn(cb_arg, -ENOMEM);
1104 		return;
1105 	}
1106 	destroy_ctx->cb_fn = cb_fn;
1107 	destroy_ctx->cb_arg = cb_arg;
1108 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
1109 }
1110 
1111 static bool
1112 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
1113 {
1114 	uint64_t start_chunk, end_chunk;
1115 
1116 	start_chunk = offset / vol->logical_blocks_per_chunk;
1117 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
1118 
1119 	return (start_chunk != end_chunk);
1120 }
1121 
1122 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
1123 static void _start_unmap_request_full_chunk(void *ctx);
1124 
1125 static void
1126 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
1127 {
1128 	struct spdk_reduce_vol_request *next_req;
1129 	struct spdk_reduce_vol *vol = req->vol;
1130 
1131 	req->cb_fn(req->cb_arg, reduce_errno);
1132 	RB_REMOVE(executing_req_tree, &vol->executing_requests, req);
1133 
1134 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1135 		if (next_req->logical_map_index == req->logical_map_index) {
1136 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1137 			if (next_req->type == REDUCE_IO_READV) {
1138 				_start_readv_request(next_req);
1139 			} else if (next_req->type == REDUCE_IO_WRITEV) {
1140 				_start_writev_request(next_req);
1141 			} else {
1142 				assert(next_req->type == REDUCE_IO_UNMAP);
1143 				_start_unmap_request_full_chunk(next_req);
1144 			}
1145 			break;
1146 		}
1147 	}
1148 
1149 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1150 }
1151 
1152 static void
1153 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
1154 {
1155 	struct spdk_reduce_chunk_map *chunk;
1156 	uint64_t index;
1157 	bool success;
1158 	uint32_t i;
1159 
1160 	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
1161 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1162 		index = chunk->io_unit_index[i];
1163 		if (index == REDUCE_EMPTY_MAP_ENTRY) {
1164 			break;
1165 		}
1166 		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
1167 					  index) == true);
1168 		spdk_bit_array_clear(vol->allocated_backing_io_units, index);
1169 		success = queue_enqueue(&vol->free_backing_blocks_queue, index);
1170 		if (!success && index < vol->find_block_offset) {
1171 			vol->find_block_offset = index;
1172 		}
1173 		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1174 	}
1175 	success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index);
1176 	if (!success && chunk_map_index < vol->find_chunk_offset) {
1177 		vol->find_chunk_offset = chunk_map_index;
1178 	}
1179 	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
1180 }
1181 
1182 static void
1183 _write_write_done(void *_req, int reduce_errno)
1184 {
1185 	struct spdk_reduce_vol_request *req = _req;
1186 	struct spdk_reduce_vol *vol = req->vol;
1187 	uint64_t old_chunk_map_index;
1188 
1189 	if (reduce_errno != 0) {
1190 		req->reduce_errno = reduce_errno;
1191 	}
1192 
1193 	assert(req->num_backing_ops > 0);
1194 	if (--req->num_backing_ops > 0) {
1195 		return;
1196 	}
1197 
1198 	if (req->reduce_errno != 0) {
1199 		_reduce_vol_reset_chunk(vol, req->chunk_map_index);
1200 		_reduce_vol_complete_req(req, req->reduce_errno);
1201 		return;
1202 	}
1203 
1204 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1205 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1206 		_reduce_vol_reset_chunk(vol, old_chunk_map_index);
1207 	}
1208 
1209 	/*
1210 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1211 	 * becomes invalid after we update the logical map, since the old chunk map will no
1212 	 * longer have a reference to it in the logical map.
1213 	 */
1214 
1215 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1216 	_reduce_persist(vol, req->chunk,
1217 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1218 
1219 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1220 
1221 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1222 
1223 	_reduce_vol_complete_req(req, 0);
1224 }
1225 
1226 static struct spdk_reduce_backing_io *
1227 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index)
1228 {
1229 	struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev;
1230 	struct spdk_reduce_backing_io *backing_io;
1231 
1232 	backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io +
1233 			(sizeof(*backing_io) + backing_dev->user_ctx_size) * index);
1234 
1235 	return backing_io;
1236 
1237 }
1238 
1239 struct reduce_merged_io_desc {
1240 	uint64_t io_unit_index;
1241 	uint32_t num_io_units;
1242 };
1243 
1244 static void
1245 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1246 				 reduce_request_fn next_fn, bool is_write)
1247 {
1248 	struct iovec *iov;
1249 	struct spdk_reduce_backing_io *backing_io;
1250 	uint8_t *buf;
1251 	uint32_t i;
1252 
1253 	if (req->chunk_is_compressed) {
1254 		iov = req->comp_buf_iov;
1255 		buf = req->comp_buf;
1256 	} else {
1257 		iov = req->decomp_buf_iov;
1258 		buf = req->decomp_buf;
1259 	}
1260 
1261 	req->num_backing_ops = req->num_io_units;
1262 	req->backing_cb_args.cb_fn = next_fn;
1263 	req->backing_cb_args.cb_arg = req;
1264 	for (i = 0; i < req->num_io_units; i++) {
1265 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1266 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1267 		iov[i].iov_len = vol->params.backing_io_unit_size;
1268 		backing_io->dev  = vol->backing_dev;
1269 		backing_io->iov = &iov[i];
1270 		backing_io->iovcnt = 1;
1271 		backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit;
1272 		backing_io->lba_count = vol->backing_lba_per_io_unit;
1273 		backing_io->backing_cb_args = &req->backing_cb_args;
1274 		if (is_write) {
1275 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1276 		} else {
1277 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1278 		}
1279 		vol->backing_dev->submit_backing_io(backing_io);
1280 	}
1281 }
1282 
1283 static void
1284 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1285 		   reduce_request_fn next_fn, bool is_write)
1286 {
1287 	struct iovec *iov;
1288 	struct spdk_reduce_backing_io *backing_io;
1289 	struct reduce_merged_io_desc merged_io_desc[4];
1290 	uint8_t *buf;
1291 	bool merge = false;
1292 	uint32_t num_io = 0;
1293 	uint32_t io_unit_counts = 0;
1294 	uint32_t merged_io_idx = 0;
1295 	uint32_t i;
1296 
1297 	/* The merged_io_desc value is defined here to contain four elements,
1298 	 * and the chunk size must be four times the maximum of the io unit.
1299 	 * if chunk size is too big, don't merge IO.
1300 	 */
1301 	if (vol->backing_io_units_per_chunk > 4) {
1302 		_issue_backing_ops_without_merge(req, vol, next_fn, is_write);
1303 		return;
1304 	}
1305 
1306 	if (req->chunk_is_compressed) {
1307 		iov = req->comp_buf_iov;
1308 		buf = req->comp_buf;
1309 	} else {
1310 		iov = req->decomp_buf_iov;
1311 		buf = req->decomp_buf;
1312 	}
1313 
1314 	for (i = 0; i < req->num_io_units; i++) {
1315 		if (!merge) {
1316 			merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
1317 			merged_io_desc[merged_io_idx].num_io_units = 1;
1318 			num_io++;
1319 		}
1320 
1321 		if (i + 1 == req->num_io_units) {
1322 			break;
1323 		}
1324 
1325 		if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
1326 			merged_io_desc[merged_io_idx].num_io_units += 1;
1327 			merge = true;
1328 			continue;
1329 		}
1330 		merge = false;
1331 		merged_io_idx++;
1332 	}
1333 
1334 	req->num_backing_ops = num_io;
1335 	req->backing_cb_args.cb_fn = next_fn;
1336 	req->backing_cb_args.cb_arg = req;
1337 	for (i = 0; i < num_io; i++) {
1338 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1339 		iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
1340 		iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
1341 		backing_io->dev  = vol->backing_dev;
1342 		backing_io->iov = &iov[i];
1343 		backing_io->iovcnt = 1;
1344 		backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit;
1345 		backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units;
1346 		backing_io->backing_cb_args = &req->backing_cb_args;
1347 		if (is_write) {
1348 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1349 		} else {
1350 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1351 		}
1352 		vol->backing_dev->submit_backing_io(backing_io);
1353 
1354 		/* Collects the number of processed I/O. */
1355 		io_unit_counts += merged_io_desc[i].num_io_units;
1356 	}
1357 }
1358 
1359 static void
1360 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1361 			uint32_t compressed_size)
1362 {
1363 	struct spdk_reduce_vol *vol = req->vol;
1364 	uint32_t i;
1365 	uint64_t chunk_offset, remainder, free_index, total_len = 0;
1366 	uint8_t *buf;
1367 	bool success;
1368 	int j;
1369 
1370 	success = queue_dequeue(&vol->free_chunks_queue, &free_index);
1371 	if (success) {
1372 		req->chunk_map_index = free_index;
1373 	} else {
1374 		req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps,
1375 				       vol->find_chunk_offset);
1376 		vol->find_chunk_offset = req->chunk_map_index + 1;
1377 	}
1378 
1379 	/* TODO: fail if no chunk map found - but really this should not happen if we
1380 	 * size the number of requests similarly to number of extra chunk maps
1381 	 */
1382 	assert(req->chunk_map_index != UINT32_MAX);
1383 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1384 
1385 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1386 	req->num_io_units = spdk_divide_round_up(compressed_size,
1387 			    vol->params.backing_io_unit_size);
1388 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1389 	req->chunk->compressed_size =
1390 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1391 
1392 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1393 	if (req->chunk_is_compressed == false) {
1394 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1395 		buf = req->decomp_buf;
1396 		total_len = chunk_offset * vol->params.logical_block_size;
1397 
1398 		/* zero any offset into chunk */
1399 		if (req->rmw == false && chunk_offset) {
1400 			memset(buf, 0, total_len);
1401 		}
1402 		buf += total_len;
1403 
1404 		/* copy the data */
1405 		for (j = 0; j < req->iovcnt; j++) {
1406 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1407 			buf += req->iov[j].iov_len;
1408 			total_len += req->iov[j].iov_len;
1409 		}
1410 
1411 		/* zero any remainder */
1412 		remainder = vol->params.chunk_size - total_len;
1413 		total_len += remainder;
1414 		if (req->rmw == false && remainder) {
1415 			memset(buf, 0, remainder);
1416 		}
1417 		assert(total_len == vol->params.chunk_size);
1418 	}
1419 
1420 	for (i = 0; i < req->num_io_units; i++) {
1421 		success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index);
1422 		if (success) {
1423 			req->chunk->io_unit_index[i] = free_index;
1424 		} else {
1425 			req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units,
1426 						       vol->find_block_offset);
1427 			vol->find_block_offset = req->chunk->io_unit_index[i] + 1;
1428 		}
1429 		/* TODO: fail if no backing block found - but really this should also not
1430 		 * happen (see comment above).
1431 		 */
1432 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1433 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1434 	}
1435 
1436 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1437 }
1438 
1439 static void
1440 _write_compress_done(void *_req, int reduce_errno)
1441 {
1442 	struct spdk_reduce_vol_request *req = _req;
1443 
1444 	/* Negative reduce_errno indicates failure for compression operations.
1445 	 * Just write the uncompressed data instead.  Force this to happen
1446 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1447 	 * When it sees the data couldn't be compressed, it will just write
1448 	 * the uncompressed buffer to disk.
1449 	 */
1450 	if (reduce_errno < 0) {
1451 		req->backing_cb_args.output_size = req->vol->params.chunk_size;
1452 	}
1453 
1454 	_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
1455 }
1456 
1457 static void
1458 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1459 {
1460 	struct spdk_reduce_vol *vol = req->vol;
1461 
1462 	req->backing_cb_args.cb_fn = next_fn;
1463 	req->backing_cb_args.cb_arg = req;
1464 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1465 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1466 	vol->backing_dev->compress(vol->backing_dev,
1467 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1468 				   &req->backing_cb_args);
1469 }
1470 
1471 static void
1472 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1473 {
1474 	struct spdk_reduce_vol *vol = req->vol;
1475 
1476 	req->backing_cb_args.cb_fn = next_fn;
1477 	req->backing_cb_args.cb_arg = req;
1478 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1479 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1480 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1481 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1482 	vol->backing_dev->decompress(vol->backing_dev,
1483 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1484 				     &req->backing_cb_args);
1485 }
1486 
1487 static void
1488 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1489 {
1490 	struct spdk_reduce_vol *vol = req->vol;
1491 	uint64_t chunk_offset, remainder = 0;
1492 	uint64_t ttl_len = 0;
1493 	size_t iov_len;
1494 	int i;
1495 
1496 	req->decomp_iovcnt = 0;
1497 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1498 
1499 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1500 	 * if at least one of the conditions below is true:
1501 	 * 1. User's buffer is fragmented
1502 	 * 2. Length of the user's buffer is less than the chunk
1503 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1504 	iov_len = req->iov[0].iov_len;
1505 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1506 				     req->iov[0].iov_len < vol->params.chunk_size ||
1507 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1508 	if (req->copy_after_decompress) {
1509 		req->decomp_iov[0].iov_base = req->decomp_buf;
1510 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1511 		req->decomp_iovcnt = 1;
1512 		goto decompress;
1513 	}
1514 
1515 	if (chunk_offset) {
1516 		/* first iov point to our scratch buffer for any offset into the chunk */
1517 		req->decomp_iov[0].iov_base = req->decomp_buf;
1518 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1519 		ttl_len += req->decomp_iov[0].iov_len;
1520 		req->decomp_iovcnt = 1;
1521 	}
1522 
1523 	/* now the user data iov, direct to the user buffer */
1524 	for (i = 0; i < req->iovcnt; i++) {
1525 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1526 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1527 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1528 	}
1529 	req->decomp_iovcnt += req->iovcnt;
1530 
1531 	/* send the rest of the chunk to our scratch buffer */
1532 	remainder = vol->params.chunk_size - ttl_len;
1533 	if (remainder) {
1534 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1535 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1536 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1537 		req->decomp_iovcnt++;
1538 	}
1539 	assert(ttl_len == vol->params.chunk_size);
1540 
1541 decompress:
1542 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1543 	req->backing_cb_args.cb_fn = next_fn;
1544 	req->backing_cb_args.cb_arg = req;
1545 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1546 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1547 	vol->backing_dev->decompress(vol->backing_dev,
1548 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1549 				     &req->backing_cb_args);
1550 }
1551 
1552 static inline void
1553 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1554 {
1555 	struct spdk_reduce_vol *vol = req->vol;
1556 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1557 	uint64_t chunk_offset, ttl_len = 0;
1558 	uint64_t remainder = 0;
1559 	char *copy_offset = NULL;
1560 	uint32_t lbsize = vol->params.logical_block_size;
1561 	int i;
1562 
1563 	req->decomp_iov[0].iov_base = req->decomp_buf;
1564 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1565 	req->decomp_iovcnt = 1;
1566 	copy_offset = req->decomp_iov[0].iov_base;
1567 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1568 
1569 	if (chunk_offset) {
1570 		ttl_len += chunk_offset * lbsize;
1571 		/* copy_offset already points to padding buffer if zero_paddings=false */
1572 		if (zero_paddings) {
1573 			memcpy(copy_offset, padding_buffer, ttl_len);
1574 		}
1575 		copy_offset += ttl_len;
1576 	}
1577 
1578 	/* now the user data iov, direct from the user buffer */
1579 	for (i = 0; i < req->iovcnt; i++) {
1580 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1581 		copy_offset += req->iov[i].iov_len;
1582 		ttl_len += req->iov[i].iov_len;
1583 	}
1584 
1585 	remainder = vol->params.chunk_size - ttl_len;
1586 	if (remainder) {
1587 		/* copy_offset already points to padding buffer if zero_paddings=false */
1588 		if (zero_paddings) {
1589 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1590 		}
1591 		ttl_len += remainder;
1592 	}
1593 
1594 	assert(ttl_len == req->vol->params.chunk_size);
1595 }
1596 
1597 /* This function can be called when we are compressing a new data or in case of read-modify-write
1598  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1599  * should point to already read and decompressed buffer */
1600 static inline void
1601 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1602 {
1603 	struct spdk_reduce_vol *vol = req->vol;
1604 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1605 	uint64_t chunk_offset, ttl_len = 0;
1606 	uint64_t remainder = 0;
1607 	uint32_t lbsize = vol->params.logical_block_size;
1608 	size_t iov_len;
1609 	int i;
1610 
1611 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1612 	 * if at least one of the conditions below is true:
1613 	 * 1. User's buffer is fragmented
1614 	 * 2. Length of the user's buffer is less than the chunk
1615 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1616 	iov_len = req->iov[0].iov_len;
1617 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1618 					  req->iov[0].iov_len < vol->params.chunk_size ||
1619 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1620 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1621 		return;
1622 	}
1623 
1624 	req->decomp_iovcnt = 0;
1625 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1626 
1627 	if (chunk_offset != 0) {
1628 		ttl_len += chunk_offset * lbsize;
1629 		req->decomp_iov[0].iov_base = padding_buffer;
1630 		req->decomp_iov[0].iov_len = ttl_len;
1631 		req->decomp_iovcnt = 1;
1632 	}
1633 
1634 	/* now the user data iov, direct from the user buffer */
1635 	for (i = 0; i < req->iovcnt; i++) {
1636 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1637 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1638 		ttl_len += req->iov[i].iov_len;
1639 	}
1640 	req->decomp_iovcnt += req->iovcnt;
1641 
1642 	remainder = vol->params.chunk_size - ttl_len;
1643 	if (remainder) {
1644 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1645 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1646 		req->decomp_iovcnt++;
1647 		ttl_len += remainder;
1648 	}
1649 	assert(ttl_len == req->vol->params.chunk_size);
1650 }
1651 
1652 static void
1653 _write_decompress_done(void *_req, int reduce_errno)
1654 {
1655 	struct spdk_reduce_vol_request *req = _req;
1656 
1657 	/* Negative reduce_errno indicates failure for compression operations. */
1658 	if (reduce_errno < 0) {
1659 		_reduce_vol_complete_req(req, reduce_errno);
1660 		return;
1661 	}
1662 
1663 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1664 	 * represents the output_size.
1665 	 */
1666 	if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
1667 		_reduce_vol_complete_req(req, -EIO);
1668 		return;
1669 	}
1670 
1671 	_prepare_compress_chunk(req, false);
1672 	_reduce_vol_compress_chunk(req, _write_compress_done);
1673 }
1674 
1675 static void
1676 _write_read_done(void *_req, int reduce_errno)
1677 {
1678 	struct spdk_reduce_vol_request *req = _req;
1679 
1680 	if (reduce_errno != 0) {
1681 		req->reduce_errno = reduce_errno;
1682 	}
1683 
1684 	assert(req->num_backing_ops > 0);
1685 	if (--req->num_backing_ops > 0) {
1686 		return;
1687 	}
1688 
1689 	if (req->reduce_errno != 0) {
1690 		_reduce_vol_complete_req(req, req->reduce_errno);
1691 		return;
1692 	}
1693 
1694 	if (req->chunk_is_compressed) {
1695 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1696 	} else {
1697 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1698 
1699 		_write_decompress_done(req, 0);
1700 	}
1701 }
1702 
1703 static void
1704 _read_decompress_done(void *_req, int reduce_errno)
1705 {
1706 	struct spdk_reduce_vol_request *req = _req;
1707 	struct spdk_reduce_vol *vol = req->vol;
1708 
1709 	/* Negative reduce_errno indicates failure for compression operations. */
1710 	if (reduce_errno < 0) {
1711 		_reduce_vol_complete_req(req, reduce_errno);
1712 		return;
1713 	}
1714 
1715 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1716 	 * represents the output_size.
1717 	 */
1718 	if (req->backing_cb_args.output_size != vol->params.chunk_size) {
1719 		_reduce_vol_complete_req(req, -EIO);
1720 		return;
1721 	}
1722 
1723 	if (req->copy_after_decompress) {
1724 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1725 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1726 		int i;
1727 
1728 		for (i = 0; i < req->iovcnt; i++) {
1729 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1730 			decomp_buffer += req->iov[i].iov_len;
1731 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1732 		}
1733 	}
1734 
1735 	_reduce_vol_complete_req(req, 0);
1736 }
1737 
1738 static void
1739 _read_read_done(void *_req, int reduce_errno)
1740 {
1741 	struct spdk_reduce_vol_request *req = _req;
1742 	uint64_t chunk_offset;
1743 	uint8_t *buf;
1744 	int i;
1745 
1746 	if (reduce_errno != 0) {
1747 		req->reduce_errno = reduce_errno;
1748 	}
1749 
1750 	assert(req->num_backing_ops > 0);
1751 	if (--req->num_backing_ops > 0) {
1752 		return;
1753 	}
1754 
1755 	if (req->reduce_errno != 0) {
1756 		_reduce_vol_complete_req(req, req->reduce_errno);
1757 		return;
1758 	}
1759 
1760 	if (req->chunk_is_compressed) {
1761 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1762 	} else {
1763 
1764 		/* If the chunk was compressed, the data would have been sent to the
1765 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1766 		 */
1767 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1768 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1769 		for (i = 0; i < req->iovcnt; i++) {
1770 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1771 			buf += req->iov[i].iov_len;
1772 		}
1773 
1774 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1775 
1776 		_read_decompress_done(req, 0);
1777 	}
1778 }
1779 
1780 static void
1781 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1782 {
1783 	struct spdk_reduce_vol *vol = req->vol;
1784 
1785 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1786 	assert(req->chunk_map_index != UINT32_MAX);
1787 
1788 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1789 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1790 			    vol->params.backing_io_unit_size);
1791 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1792 
1793 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1794 }
1795 
1796 static bool
1797 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1798 		    uint64_t length)
1799 {
1800 	uint64_t size = 0;
1801 	int i;
1802 
1803 	if (iovcnt > REDUCE_MAX_IOVECS) {
1804 		return false;
1805 	}
1806 
1807 	for (i = 0; i < iovcnt; i++) {
1808 		size += iov[i].iov_len;
1809 	}
1810 
1811 	return size == (length * vol->params.logical_block_size);
1812 }
1813 
1814 static bool
1815 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1816 {
1817 	struct spdk_reduce_vol_request req;
1818 
1819 	req.logical_map_index = logical_map_index;
1820 
1821 	return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req));
1822 }
1823 
1824 static void
1825 _start_readv_request(struct spdk_reduce_vol_request *req)
1826 {
1827 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1828 	_reduce_vol_read_chunk(req, _read_read_done);
1829 }
1830 
1831 void
1832 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1833 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1834 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1835 {
1836 	struct spdk_reduce_vol_request *req;
1837 	uint64_t logical_map_index;
1838 	bool overlapped;
1839 	int i;
1840 
1841 	if (length == 0) {
1842 		cb_fn(cb_arg, 0);
1843 		return;
1844 	}
1845 
1846 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1847 		cb_fn(cb_arg, -EINVAL);
1848 		return;
1849 	}
1850 
1851 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1852 		cb_fn(cb_arg, -EINVAL);
1853 		return;
1854 	}
1855 
1856 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1857 	overlapped = _check_overlap(vol, logical_map_index);
1858 
1859 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1860 		/*
1861 		 * This chunk hasn't been allocated.  So treat the data as all
1862 		 * zeroes for this chunk - do the memset and immediately complete
1863 		 * the operation.
1864 		 */
1865 		for (i = 0; i < iovcnt; i++) {
1866 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1867 		}
1868 		cb_fn(cb_arg, 0);
1869 		return;
1870 	}
1871 
1872 	req = TAILQ_FIRST(&vol->free_requests);
1873 	if (req == NULL) {
1874 		cb_fn(cb_arg, -ENOMEM);
1875 		return;
1876 	}
1877 
1878 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1879 	req->type = REDUCE_IO_READV;
1880 	req->vol = vol;
1881 	req->iov = iov;
1882 	req->iovcnt = iovcnt;
1883 	req->offset = offset;
1884 	req->logical_map_index = logical_map_index;
1885 	req->length = length;
1886 	req->copy_after_decompress = false;
1887 	req->cb_fn = cb_fn;
1888 	req->cb_arg = cb_arg;
1889 
1890 	if (!overlapped) {
1891 		_start_readv_request(req);
1892 	} else {
1893 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1894 	}
1895 }
1896 
1897 static void
1898 _start_writev_request(struct spdk_reduce_vol_request *req)
1899 {
1900 	struct spdk_reduce_vol *vol = req->vol;
1901 
1902 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1903 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1904 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1905 			/* Read old chunk, then overwrite with data from this write
1906 			 *  operation.
1907 			 */
1908 			req->rmw = true;
1909 			_reduce_vol_read_chunk(req, _write_read_done);
1910 			return;
1911 		}
1912 	}
1913 
1914 	req->rmw = false;
1915 
1916 	_prepare_compress_chunk(req, true);
1917 	_reduce_vol_compress_chunk(req, _write_compress_done);
1918 }
1919 
1920 void
1921 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1922 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1923 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1924 {
1925 	struct spdk_reduce_vol_request *req;
1926 	uint64_t logical_map_index;
1927 	bool overlapped;
1928 
1929 	if (length == 0) {
1930 		cb_fn(cb_arg, 0);
1931 		return;
1932 	}
1933 
1934 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1935 		cb_fn(cb_arg, -EINVAL);
1936 		return;
1937 	}
1938 
1939 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1940 		cb_fn(cb_arg, -EINVAL);
1941 		return;
1942 	}
1943 
1944 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1945 	overlapped = _check_overlap(vol, logical_map_index);
1946 
1947 	req = TAILQ_FIRST(&vol->free_requests);
1948 	if (req == NULL) {
1949 		cb_fn(cb_arg, -ENOMEM);
1950 		return;
1951 	}
1952 
1953 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1954 	req->type = REDUCE_IO_WRITEV;
1955 	req->vol = vol;
1956 	req->iov = iov;
1957 	req->iovcnt = iovcnt;
1958 	req->offset = offset;
1959 	req->logical_map_index = logical_map_index;
1960 	req->length = length;
1961 	req->copy_after_decompress = false;
1962 	req->cb_fn = cb_fn;
1963 	req->cb_arg = cb_arg;
1964 
1965 	if (!overlapped) {
1966 		_start_writev_request(req);
1967 	} else {
1968 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1969 	}
1970 }
1971 
1972 static void
1973 _start_unmap_request_full_chunk(void *ctx)
1974 {
1975 	struct spdk_reduce_vol_request *req = ctx;
1976 	struct spdk_reduce_vol *vol = req->vol;
1977 	uint64_t chunk_map_index;
1978 
1979 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1980 
1981 	chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1982 	if (chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1983 		_reduce_vol_reset_chunk(vol, chunk_map_index);
1984 		req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1985 		_reduce_persist(vol, req->chunk,
1986 				_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1987 		vol->pm_logical_map[req->logical_map_index] = REDUCE_EMPTY_MAP_ENTRY;
1988 		_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1989 	}
1990 	_reduce_vol_complete_req(req, 0);
1991 }
1992 
1993 static void
1994 _reduce_vol_unmap_full_chunk(struct spdk_reduce_vol *vol,
1995 			     uint64_t offset, uint64_t length,
1996 			     spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1997 {
1998 	struct spdk_reduce_vol_request *req;
1999 	uint64_t logical_map_index;
2000 	bool overlapped;
2001 
2002 	if (_request_spans_chunk_boundary(vol, offset, length)) {
2003 		cb_fn(cb_arg, -EINVAL);
2004 		return;
2005 	}
2006 
2007 	logical_map_index = offset / vol->logical_blocks_per_chunk;
2008 	overlapped = _check_overlap(vol, logical_map_index);
2009 
2010 	req = TAILQ_FIRST(&vol->free_requests);
2011 	if (req == NULL) {
2012 		cb_fn(cb_arg, -ENOMEM);
2013 		return;
2014 	}
2015 
2016 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
2017 	req->type = REDUCE_IO_UNMAP;
2018 	req->vol = vol;
2019 	req->iov = NULL;
2020 	req->iovcnt = 0;
2021 	req->offset = offset;
2022 	req->logical_map_index = logical_map_index;
2023 	req->length = length;
2024 	req->copy_after_decompress = false;
2025 	req->cb_fn = cb_fn;
2026 	req->cb_arg = cb_arg;
2027 
2028 	if (!overlapped) {
2029 		_start_unmap_request_full_chunk(req);
2030 	} else {
2031 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
2032 	}
2033 }
2034 
2035 struct unmap_partial_chunk_ctx {
2036 	struct spdk_reduce_vol *vol;
2037 	struct iovec iov;
2038 	spdk_reduce_vol_op_complete cb_fn;
2039 	void *cb_arg;
2040 };
2041 
2042 static void
2043 _reduce_unmap_partial_chunk_complete(void *_ctx, int reduce_errno)
2044 {
2045 	struct unmap_partial_chunk_ctx *ctx = _ctx;
2046 
2047 	ctx->cb_fn(ctx->cb_arg, reduce_errno);
2048 	free(ctx);
2049 }
2050 
2051 static void
2052 _reduce_vol_unmap_partial_chunk(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length,
2053 				spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
2054 {
2055 	struct unmap_partial_chunk_ctx *ctx;
2056 
2057 	ctx = calloc(1, sizeof(struct unmap_partial_chunk_ctx));
2058 	if (ctx == NULL) {
2059 		cb_fn(cb_arg, -ENOMEM);
2060 		return;
2061 	}
2062 
2063 	ctx->vol = vol;
2064 	ctx->iov.iov_base = g_zero_buf;
2065 	ctx->iov.iov_len = length * vol->params.logical_block_size;
2066 	ctx->cb_fn = cb_fn;
2067 	ctx->cb_arg = cb_arg;
2068 
2069 	spdk_reduce_vol_writev(vol, &ctx->iov, 1, offset, length, _reduce_unmap_partial_chunk_complete,
2070 			       ctx);
2071 }
2072 
2073 void
2074 spdk_reduce_vol_unmap(struct spdk_reduce_vol *vol,
2075 		      uint64_t offset, uint64_t length,
2076 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
2077 {
2078 	if (length < vol->logical_blocks_per_chunk) {
2079 		_reduce_vol_unmap_partial_chunk(vol, offset, length, cb_fn, cb_arg);
2080 	} else if (length == vol->logical_blocks_per_chunk) {
2081 		_reduce_vol_unmap_full_chunk(vol, offset, length, cb_fn, cb_arg);
2082 	} else {
2083 		cb_fn(cb_arg, -EINVAL);
2084 	}
2085 }
2086 
2087 const struct spdk_reduce_vol_params *
2088 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
2089 {
2090 	return &vol->params;
2091 }
2092 
2093 const char *
2094 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
2095 {
2096 	return vol->pm_file.path;
2097 }
2098 
2099 void
2100 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
2101 {
2102 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
2103 	uint32_t struct_size;
2104 	uint64_t chunk_map_size;
2105 
2106 	SPDK_NOTICELOG("vol info:\n");
2107 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
2108 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
2109 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
2110 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
2111 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
2112 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
2113 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
2114 		       vol->params.vol_size / vol->params.chunk_size);
2115 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
2116 			vol->params.backing_io_unit_size);
2117 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
2118 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
2119 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
2120 
2121 	SPDK_NOTICELOG("pmem info:\n");
2122 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
2123 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
2124 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
2125 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
2126 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
2127 			   vol->params.chunk_size);
2128 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
2129 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
2130 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
2131 			 vol->params.backing_io_unit_size);
2132 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
2133 }
2134 
2135 SPDK_LOG_REGISTER_COMPONENT(reduce)
2136