xref: /spdk/lib/reduce/reduce.c (revision dcdab59d332f49b70bf3ad5ac5fef2b91170df2f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "queue_internal.h"
10 
11 #include "spdk/reduce.h"
12 #include "spdk/env.h"
13 #include "spdk/string.h"
14 #include "spdk/bit_array.h"
15 #include "spdk/util.h"
16 #include "spdk/log.h"
17 #include "spdk/memory.h"
18 #include "spdk/tree.h"
19 
20 #include "libpmem.h"
21 
22 /* Always round up the size of the PM region to the nearest cacheline. */
23 #define REDUCE_PM_SIZE_ALIGNMENT	64
24 
25 /* Offset into the backing device where the persistent memory file's path is stored. */
26 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
27 
28 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
29 
30 #define REDUCE_NUM_VOL_REQUESTS	256
31 
32 /* Structure written to offset 0 of both the pm file and the backing device. */
33 struct spdk_reduce_vol_superblock {
34 	uint8_t				signature[8];
35 	struct spdk_reduce_vol_params	params;
36 	uint8_t				reserved[4040];
37 };
38 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
39 
40 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
41 /* null terminator counts one */
42 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
43 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
44 
45 #define REDUCE_PATH_MAX 4096
46 
47 #define REDUCE_ZERO_BUF_SIZE 0x100000
48 
49 /**
50  * Describes a persistent memory file used to hold metadata associated with a
51  *  compressed volume.
52  */
53 struct spdk_reduce_pm_file {
54 	char			path[REDUCE_PATH_MAX];
55 	void			*pm_buf;
56 	int			pm_is_pmem;
57 	uint64_t		size;
58 };
59 
60 #define REDUCE_IO_READV		1
61 #define REDUCE_IO_WRITEV	2
62 
63 struct spdk_reduce_chunk_map {
64 	uint32_t		compressed_size;
65 	uint32_t		reserved;
66 	uint64_t		io_unit_index[0];
67 };
68 
69 struct spdk_reduce_vol_request {
70 	/**
71 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
72 	 *   1) source buffer for compression operations
73 	 *   2) destination buffer for decompression operations
74 	 *   3) data buffer when writing uncompressed chunk to disk
75 	 *   4) data buffer when reading uncompressed chunk from disk
76 	 */
77 	uint8_t					*decomp_buf;
78 	struct iovec				*decomp_buf_iov;
79 
80 	/**
81 	 * These are used to construct the iovecs that are sent to
82 	 *  the decomp engine, they point to a mix of the scratch buffer
83 	 *  and user buffer
84 	 */
85 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
86 	int					decomp_iovcnt;
87 
88 	/**
89 	 *  Scratch buffer used for compressed chunk.  This is used for:
90 	 *   1) destination buffer for compression operations
91 	 *   2) source buffer for decompression operations
92 	 *   3) data buffer when writing compressed chunk to disk
93 	 *   4) data buffer when reading compressed chunk from disk
94 	 */
95 	uint8_t					*comp_buf;
96 	struct iovec				*comp_buf_iov;
97 	struct iovec				*iov;
98 	bool					rmw;
99 	struct spdk_reduce_vol			*vol;
100 	int					type;
101 	int					reduce_errno;
102 	int					iovcnt;
103 	int					num_backing_ops;
104 	uint32_t				num_io_units;
105 	struct spdk_reduce_backing_io           *backing_io;
106 	bool					chunk_is_compressed;
107 	bool					copy_after_decompress;
108 	uint64_t				offset;
109 	uint64_t				logical_map_index;
110 	uint64_t				length;
111 	uint64_t				chunk_map_index;
112 	struct spdk_reduce_chunk_map		*chunk;
113 	spdk_reduce_vol_op_complete		cb_fn;
114 	void					*cb_arg;
115 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
116 	RB_ENTRY(spdk_reduce_vol_request)	rbnode;
117 	struct spdk_reduce_vol_cb_args		backing_cb_args;
118 };
119 
120 struct spdk_reduce_vol {
121 	struct spdk_reduce_vol_params		params;
122 	uint32_t				backing_io_units_per_chunk;
123 	uint32_t				backing_lba_per_io_unit;
124 	uint32_t				logical_blocks_per_chunk;
125 	struct spdk_reduce_pm_file		pm_file;
126 	struct spdk_reduce_backing_dev		*backing_dev;
127 	struct spdk_reduce_vol_superblock	*backing_super;
128 	struct spdk_reduce_vol_superblock	*pm_super;
129 	uint64_t				*pm_logical_map;
130 	uint64_t				*pm_chunk_maps;
131 
132 	struct spdk_bit_array			*allocated_chunk_maps;
133 	/* The starting position when looking for a block from allocated_chunk_maps */
134 	uint64_t				find_chunk_offset;
135 	/* Cache free chunks to speed up lookup of free chunk. */
136 	struct reduce_queue			free_chunks_queue;
137 	struct spdk_bit_array			*allocated_backing_io_units;
138 	/* The starting position when looking for a block from allocated_backing_io_units */
139 	uint64_t				find_block_offset;
140 	/* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */
141 	struct reduce_queue			free_backing_blocks_queue;
142 
143 	struct spdk_reduce_vol_request		*request_mem;
144 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
145 	RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests;
146 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
147 
148 	/* Single contiguous buffer used for all request buffers for this volume. */
149 	uint8_t					*buf_mem;
150 	struct iovec				*buf_iov_mem;
151 	/* Single contiguous buffer used for backing io buffers for this volume. */
152 	uint8_t					*buf_backing_io_mem;
153 };
154 
155 static void _start_readv_request(struct spdk_reduce_vol_request *req);
156 static void _start_writev_request(struct spdk_reduce_vol_request *req);
157 static uint8_t *g_zero_buf;
158 static int g_vol_count = 0;
159 
160 /*
161  * Allocate extra metadata chunks and corresponding backing io units to account for
162  *  outstanding IO in worst case scenario where logical map is completely allocated
163  *  and no data can be compressed.  We need extra chunks in this case to handle
164  *  in-flight writes since reduce never writes data in place.
165  */
166 #define REDUCE_NUM_EXTRA_CHUNKS 128
167 
168 static void
169 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
170 {
171 	if (vol->pm_file.pm_is_pmem) {
172 		pmem_persist(addr, len);
173 	} else {
174 		pmem_msync(addr, len);
175 	}
176 }
177 
178 static uint64_t
179 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
180 {
181 	uint64_t chunks_in_logical_map, logical_map_size;
182 
183 	chunks_in_logical_map = vol_size / chunk_size;
184 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
185 
186 	/* Round up to next cacheline. */
187 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
188 	       REDUCE_PM_SIZE_ALIGNMENT;
189 }
190 
191 static uint64_t
192 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
193 {
194 	uint64_t num_chunks;
195 
196 	num_chunks = vol_size / chunk_size;
197 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
198 
199 	return num_chunks;
200 }
201 
202 static inline uint32_t
203 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
204 {
205 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
206 }
207 
208 static uint64_t
209 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
210 {
211 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
212 
213 	num_chunks = _get_total_chunks(vol_size, chunk_size);
214 	io_units_per_chunk = chunk_size / backing_io_unit_size;
215 
216 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
217 
218 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
219 	       REDUCE_PM_SIZE_ALIGNMENT;
220 }
221 
222 static struct spdk_reduce_chunk_map *
223 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
224 {
225 	uintptr_t chunk_map_addr;
226 
227 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
228 
229 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
230 	chunk_map_addr += chunk_map_index *
231 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
232 
233 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
234 }
235 
236 static int
237 _validate_vol_params(struct spdk_reduce_vol_params *params)
238 {
239 	if (params->vol_size > 0) {
240 		/**
241 		 * User does not pass in the vol size - it gets calculated by libreduce from
242 		 *  values in this structure plus the size of the backing device.
243 		 */
244 		return -EINVAL;
245 	}
246 
247 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
248 	    params->logical_block_size == 0) {
249 		return -EINVAL;
250 	}
251 
252 	/* Chunk size must be an even multiple of the backing io unit size. */
253 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
254 		return -EINVAL;
255 	}
256 
257 	/* Chunk size must be an even multiple of the logical block size. */
258 	if ((params->chunk_size % params->logical_block_size) != 0) {
259 		return -1;
260 	}
261 
262 	return 0;
263 }
264 
265 static uint64_t
266 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
267 {
268 	uint64_t num_chunks;
269 
270 	num_chunks = backing_dev_size / chunk_size;
271 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
272 		return 0;
273 	}
274 
275 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
276 	return num_chunks * chunk_size;
277 }
278 
279 static uint64_t
280 _get_pm_file_size(struct spdk_reduce_vol_params *params)
281 {
282 	uint64_t total_pm_size;
283 
284 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
285 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
286 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
287 			 params->backing_io_unit_size);
288 	return total_pm_size;
289 }
290 
291 const struct spdk_uuid *
292 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
293 {
294 	return &vol->params.uuid;
295 }
296 
297 static void
298 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
299 {
300 	uint64_t logical_map_size;
301 
302 	/* Superblock is at the beginning of the pm file. */
303 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
304 
305 	/* Logical map immediately follows the super block. */
306 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
307 
308 	/* Chunks maps follow the logical map. */
309 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
310 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
311 }
312 
313 /* We need 2 iovs during load - one for the superblock, another for the path */
314 #define LOAD_IOV_COUNT	2
315 
316 struct reduce_init_load_ctx {
317 	struct spdk_reduce_vol			*vol;
318 	struct spdk_reduce_vol_cb_args		backing_cb_args;
319 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
320 	void					*cb_arg;
321 	struct iovec				iov[LOAD_IOV_COUNT];
322 	void					*path;
323 	struct spdk_reduce_backing_io           *backing_io;
324 };
325 
326 static inline bool
327 _addr_crosses_huge_page(const void *addr, size_t *size)
328 {
329 	size_t _size;
330 	uint64_t rc;
331 
332 	assert(size);
333 
334 	_size = *size;
335 	rc = spdk_vtophys(addr, size);
336 
337 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
338 }
339 
340 static inline int
341 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
342 {
343 	uint8_t *addr;
344 	size_t size_tmp = buffer_size;
345 
346 	addr = *_addr;
347 
348 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
349 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
350 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
351 		 * Skip remaining bytes and continue from the beginning of the next page */
352 		addr += size_tmp;
353 	}
354 
355 	if (addr + buffer_size > addr_range) {
356 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
357 		return -ERANGE;
358 	}
359 
360 	*vol_buffer = addr;
361 	*_addr = addr + buffer_size;
362 
363 	return 0;
364 }
365 
366 static int
367 _allocate_vol_requests(struct spdk_reduce_vol *vol)
368 {
369 	struct spdk_reduce_vol_request *req;
370 	struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev;
371 	uint32_t reqs_in_2mb_page, huge_pages_needed;
372 	uint8_t *buffer, *buffer_end;
373 	int i = 0;
374 	int rc = 0;
375 
376 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
377 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
378 	* necessarily power of 2
379 	* Allocate 2x since we need buffers for both read/write and compress/decompress
380 	* intermediate buffers. */
381 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
382 	if (!reqs_in_2mb_page) {
383 		return -EINVAL;
384 	}
385 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
386 
387 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
388 	if (vol->buf_mem == NULL) {
389 		return -ENOMEM;
390 	}
391 
392 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
393 	if (vol->request_mem == NULL) {
394 		spdk_free(vol->buf_mem);
395 		vol->buf_mem = NULL;
396 		return -ENOMEM;
397 	}
398 
399 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
400 	 *  buffers.
401 	 */
402 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
403 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
404 	if (vol->buf_iov_mem == NULL) {
405 		free(vol->request_mem);
406 		spdk_free(vol->buf_mem);
407 		vol->request_mem = NULL;
408 		vol->buf_mem = NULL;
409 		return -ENOMEM;
410 	}
411 
412 	vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) +
413 					 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk);
414 	if (vol->buf_backing_io_mem == NULL) {
415 		free(vol->request_mem);
416 		free(vol->buf_iov_mem);
417 		spdk_free(vol->buf_mem);
418 		vol->request_mem = NULL;
419 		vol->buf_iov_mem = NULL;
420 		vol->buf_mem = NULL;
421 		return -ENOMEM;
422 	}
423 
424 	buffer = vol->buf_mem;
425 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
426 
427 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
428 		req = &vol->request_mem[i];
429 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
430 		req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i *
431 				  (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) *
432 				  vol->backing_io_units_per_chunk);
433 
434 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
435 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
436 
437 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
438 		if (rc) {
439 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
440 				    vol->buf_mem, buffer_end);
441 			break;
442 		}
443 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
444 		if (rc) {
445 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
446 				    vol->buf_mem, buffer_end);
447 			break;
448 		}
449 	}
450 
451 	if (rc) {
452 		free(vol->buf_backing_io_mem);
453 		free(vol->buf_iov_mem);
454 		free(vol->request_mem);
455 		spdk_free(vol->buf_mem);
456 		vol->buf_mem = NULL;
457 		vol->buf_backing_io_mem = NULL;
458 		vol->buf_iov_mem = NULL;
459 		vol->request_mem = NULL;
460 	}
461 
462 	return rc;
463 }
464 
465 static void
466 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
467 {
468 	if (ctx != NULL) {
469 		spdk_free(ctx->path);
470 		free(ctx->backing_io);
471 		free(ctx);
472 	}
473 
474 	if (vol != NULL) {
475 		if (vol->pm_file.pm_buf != NULL) {
476 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
477 		}
478 
479 		spdk_free(vol->backing_super);
480 		spdk_bit_array_free(&vol->allocated_chunk_maps);
481 		spdk_bit_array_free(&vol->allocated_backing_io_units);
482 		free(vol->request_mem);
483 		free(vol->buf_backing_io_mem);
484 		free(vol->buf_iov_mem);
485 		spdk_free(vol->buf_mem);
486 		free(vol);
487 	}
488 }
489 
490 static int
491 _alloc_zero_buff(void)
492 {
493 	int rc = 0;
494 
495 	/* The zero buffer is shared between all volumes and just used
496 	 * for reads so allocate one global instance here if not already
497 	 * allocated when another vol init'd or loaded.
498 	 */
499 	if (g_vol_count++ == 0) {
500 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
501 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
502 					  SPDK_MALLOC_DMA);
503 		if (g_zero_buf == NULL) {
504 			g_vol_count--;
505 			rc = -ENOMEM;
506 		}
507 	}
508 	return rc;
509 }
510 
511 static void
512 _init_write_super_cpl(void *cb_arg, int reduce_errno)
513 {
514 	struct reduce_init_load_ctx *init_ctx = cb_arg;
515 	int rc;
516 
517 	rc = _allocate_vol_requests(init_ctx->vol);
518 	if (rc != 0) {
519 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
520 		_init_load_cleanup(init_ctx->vol, init_ctx);
521 		return;
522 	}
523 
524 	rc = _alloc_zero_buff();
525 	if (rc != 0) {
526 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
527 		_init_load_cleanup(init_ctx->vol, init_ctx);
528 		return;
529 	}
530 
531 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
532 	/* Only clean up the ctx - the vol has been passed to the application
533 	 *  for use now that initialization was successful.
534 	 */
535 	_init_load_cleanup(NULL, init_ctx);
536 }
537 
538 static void
539 _init_write_path_cpl(void *cb_arg, int reduce_errno)
540 {
541 	struct reduce_init_load_ctx *init_ctx = cb_arg;
542 	struct spdk_reduce_vol *vol = init_ctx->vol;
543 	struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io;
544 
545 	init_ctx->iov[0].iov_base = vol->backing_super;
546 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
547 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
548 	init_ctx->backing_cb_args.cb_arg = init_ctx;
549 
550 	backing_io->dev = vol->backing_dev;
551 	backing_io->iov = init_ctx->iov;
552 	backing_io->iovcnt = 1;
553 	backing_io->lba = 0;
554 	backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen;
555 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
556 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
557 
558 	vol->backing_dev->submit_backing_io(backing_io);
559 }
560 
561 static int
562 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
563 {
564 	uint64_t total_chunks, total_backing_io_units;
565 	uint32_t i, num_metadata_io_units;
566 
567 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
568 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
569 	vol->find_chunk_offset = 0;
570 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
571 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
572 	vol->find_block_offset = 0;
573 
574 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
575 		return -ENOMEM;
576 	}
577 
578 	/* Set backing io unit bits associated with metadata. */
579 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
580 				vol->params.backing_io_unit_size;
581 	for (i = 0; i < num_metadata_io_units; i++) {
582 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
583 	}
584 
585 	return 0;
586 }
587 
588 static int
589 overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2)
590 {
591 	return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index >
592 		req2->logical_map_index);
593 }
594 RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp);
595 
596 
597 void
598 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
599 		     struct spdk_reduce_backing_dev *backing_dev,
600 		     const char *pm_file_dir,
601 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
602 {
603 	struct spdk_reduce_vol *vol;
604 	struct reduce_init_load_ctx *init_ctx;
605 	struct spdk_reduce_backing_io *backing_io;
606 	uint64_t backing_dev_size;
607 	size_t mapped_len;
608 	int dir_len, max_dir_len, rc;
609 
610 	/* We need to append a path separator and the UUID to the supplied
611 	 * path.
612 	 */
613 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
614 	dir_len = strnlen(pm_file_dir, max_dir_len);
615 	/* Strip trailing slash if the user provided one - we will add it back
616 	 * later when appending the filename.
617 	 */
618 	if (pm_file_dir[dir_len - 1] == '/') {
619 		dir_len--;
620 	}
621 	if (dir_len == max_dir_len) {
622 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
623 		cb_fn(cb_arg, NULL, -EINVAL);
624 		return;
625 	}
626 
627 	rc = _validate_vol_params(params);
628 	if (rc != 0) {
629 		SPDK_ERRLOG("invalid vol params\n");
630 		cb_fn(cb_arg, NULL, rc);
631 		return;
632 	}
633 
634 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
635 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
636 	if (params->vol_size == 0) {
637 		SPDK_ERRLOG("backing device is too small\n");
638 		cb_fn(cb_arg, NULL, -EINVAL);
639 		return;
640 	}
641 
642 	if (backing_dev->submit_backing_io == NULL) {
643 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
644 		cb_fn(cb_arg, NULL, -EINVAL);
645 		return;
646 	}
647 
648 	vol = calloc(1, sizeof(*vol));
649 	if (vol == NULL) {
650 		cb_fn(cb_arg, NULL, -ENOMEM);
651 		return;
652 	}
653 
654 	TAILQ_INIT(&vol->free_requests);
655 	RB_INIT(&vol->executing_requests);
656 	TAILQ_INIT(&vol->queued_requests);
657 	queue_init(&vol->free_chunks_queue);
658 	queue_init(&vol->free_backing_blocks_queue);
659 
660 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
661 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
662 	if (vol->backing_super == NULL) {
663 		cb_fn(cb_arg, NULL, -ENOMEM);
664 		_init_load_cleanup(vol, NULL);
665 		return;
666 	}
667 
668 	init_ctx = calloc(1, sizeof(*init_ctx));
669 	if (init_ctx == NULL) {
670 		cb_fn(cb_arg, NULL, -ENOMEM);
671 		_init_load_cleanup(vol, NULL);
672 		return;
673 	}
674 
675 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
676 	if (backing_io == NULL) {
677 		cb_fn(cb_arg, NULL, -ENOMEM);
678 		_init_load_cleanup(vol, init_ctx);
679 		return;
680 	}
681 	init_ctx->backing_io = backing_io;
682 
683 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
684 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
685 	if (init_ctx->path == NULL) {
686 		cb_fn(cb_arg, NULL, -ENOMEM);
687 		_init_load_cleanup(vol, init_ctx);
688 		return;
689 	}
690 
691 	if (spdk_uuid_is_null(&params->uuid)) {
692 		spdk_uuid_generate(&params->uuid);
693 	}
694 
695 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
696 	vol->pm_file.path[dir_len] = '/';
697 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
698 			    &params->uuid);
699 	vol->pm_file.size = _get_pm_file_size(params);
700 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
701 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
702 					    &mapped_len, &vol->pm_file.pm_is_pmem);
703 	if (vol->pm_file.pm_buf == NULL) {
704 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
705 			    vol->pm_file.path, strerror(errno));
706 		cb_fn(cb_arg, NULL, -errno);
707 		_init_load_cleanup(vol, init_ctx);
708 		return;
709 	}
710 
711 	if (vol->pm_file.size != mapped_len) {
712 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
713 			    vol->pm_file.size, mapped_len);
714 		cb_fn(cb_arg, NULL, -ENOMEM);
715 		_init_load_cleanup(vol, init_ctx);
716 		return;
717 	}
718 
719 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
720 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
721 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
722 	memcpy(&vol->params, params, sizeof(*params));
723 
724 	vol->backing_dev = backing_dev;
725 
726 	rc = _allocate_bit_arrays(vol);
727 	if (rc != 0) {
728 		cb_fn(cb_arg, NULL, rc);
729 		_init_load_cleanup(vol, init_ctx);
730 		return;
731 	}
732 
733 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
734 	       sizeof(vol->backing_super->signature));
735 	memcpy(&vol->backing_super->params, params, sizeof(*params));
736 
737 	_initialize_vol_pm_pointers(vol);
738 
739 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
740 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
741 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
742 	 */
743 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
744 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
745 
746 	init_ctx->vol = vol;
747 	init_ctx->cb_fn = cb_fn;
748 	init_ctx->cb_arg = cb_arg;
749 
750 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
751 	init_ctx->iov[0].iov_base = init_ctx->path;
752 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
753 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
754 	init_ctx->backing_cb_args.cb_arg = init_ctx;
755 	/* Write path to offset 4K on backing device - just after where the super
756 	 *  block will be written.  We wait until this is committed before writing the
757 	 *  super block to guarantee we don't get the super block written without the
758 	 *  the path if the system crashed in the middle of a write operation.
759 	 */
760 	backing_io->dev = vol->backing_dev;
761 	backing_io->iov = init_ctx->iov;
762 	backing_io->iovcnt = 1;
763 	backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen;
764 	backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen;
765 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
766 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
767 
768 	vol->backing_dev->submit_backing_io(backing_io);
769 }
770 
771 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
772 
773 static void
774 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
775 {
776 	struct reduce_init_load_ctx *load_ctx = cb_arg;
777 	struct spdk_reduce_vol *vol = load_ctx->vol;
778 	uint64_t backing_dev_size;
779 	uint64_t i, num_chunks, logical_map_index;
780 	struct spdk_reduce_chunk_map *chunk;
781 	size_t mapped_len;
782 	uint32_t j;
783 	int rc;
784 
785 	if (reduce_errno != 0) {
786 		rc = reduce_errno;
787 		goto error;
788 	}
789 
790 	rc = _alloc_zero_buff();
791 	if (rc) {
792 		goto error;
793 	}
794 
795 	if (memcmp(vol->backing_super->signature,
796 		   SPDK_REDUCE_SIGNATURE,
797 		   sizeof(vol->backing_super->signature)) != 0) {
798 		/* This backing device isn't a libreduce backing device. */
799 		rc = -EILSEQ;
800 		goto error;
801 	}
802 
803 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
804 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
805 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
806 	 *  persistent memory file if it exists.
807 	 */
808 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
809 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
810 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
811 		_init_load_cleanup(NULL, load_ctx);
812 		return;
813 	}
814 
815 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
816 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
817 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
818 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
819 
820 	rc = _allocate_bit_arrays(vol);
821 	if (rc != 0) {
822 		goto error;
823 	}
824 
825 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
826 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
827 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
828 			    backing_dev_size);
829 		rc = -EILSEQ;
830 		goto error;
831 	}
832 
833 	vol->pm_file.size = _get_pm_file_size(&vol->params);
834 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
835 					    &vol->pm_file.pm_is_pmem);
836 	if (vol->pm_file.pm_buf == NULL) {
837 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
838 		rc = -errno;
839 		goto error;
840 	}
841 
842 	if (vol->pm_file.size != mapped_len) {
843 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
844 			    vol->pm_file.size, mapped_len);
845 		rc = -ENOMEM;
846 		goto error;
847 	}
848 
849 	rc = _allocate_vol_requests(vol);
850 	if (rc != 0) {
851 		goto error;
852 	}
853 
854 	_initialize_vol_pm_pointers(vol);
855 
856 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
857 	for (i = 0; i < num_chunks; i++) {
858 		logical_map_index = vol->pm_logical_map[i];
859 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
860 			continue;
861 		}
862 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
863 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
864 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
865 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
866 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
867 			}
868 		}
869 	}
870 
871 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
872 	/* Only clean up the ctx - the vol has been passed to the application
873 	 *  for use now that volume load was successful.
874 	 */
875 	_init_load_cleanup(NULL, load_ctx);
876 	return;
877 
878 error:
879 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
880 	_init_load_cleanup(vol, load_ctx);
881 }
882 
883 void
884 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
885 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
886 {
887 	struct spdk_reduce_vol *vol;
888 	struct reduce_init_load_ctx *load_ctx;
889 	struct spdk_reduce_backing_io *backing_io;
890 
891 	if (backing_dev->submit_backing_io == NULL) {
892 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
893 		cb_fn(cb_arg, NULL, -EINVAL);
894 		return;
895 	}
896 
897 	vol = calloc(1, sizeof(*vol));
898 	if (vol == NULL) {
899 		cb_fn(cb_arg, NULL, -ENOMEM);
900 		return;
901 	}
902 
903 	TAILQ_INIT(&vol->free_requests);
904 	RB_INIT(&vol->executing_requests);
905 	TAILQ_INIT(&vol->queued_requests);
906 	queue_init(&vol->free_chunks_queue);
907 	queue_init(&vol->free_backing_blocks_queue);
908 
909 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
910 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
911 	if (vol->backing_super == NULL) {
912 		_init_load_cleanup(vol, NULL);
913 		cb_fn(cb_arg, NULL, -ENOMEM);
914 		return;
915 	}
916 
917 	vol->backing_dev = backing_dev;
918 
919 	load_ctx = calloc(1, sizeof(*load_ctx));
920 	if (load_ctx == NULL) {
921 		_init_load_cleanup(vol, NULL);
922 		cb_fn(cb_arg, NULL, -ENOMEM);
923 		return;
924 	}
925 
926 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
927 	if (backing_io == NULL) {
928 		_init_load_cleanup(vol, load_ctx);
929 		cb_fn(cb_arg, NULL, -ENOMEM);
930 		return;
931 	}
932 
933 	load_ctx->backing_io = backing_io;
934 
935 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
936 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
937 	if (load_ctx->path == NULL) {
938 		_init_load_cleanup(vol, load_ctx);
939 		cb_fn(cb_arg, NULL, -ENOMEM);
940 		return;
941 	}
942 
943 	load_ctx->vol = vol;
944 	load_ctx->cb_fn = cb_fn;
945 	load_ctx->cb_arg = cb_arg;
946 
947 	load_ctx->iov[0].iov_base = vol->backing_super;
948 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
949 	load_ctx->iov[1].iov_base = load_ctx->path;
950 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
951 	backing_io->dev = vol->backing_dev;
952 	backing_io->iov = load_ctx->iov;
953 	backing_io->iovcnt = LOAD_IOV_COUNT;
954 	backing_io->lba = 0;
955 	backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
956 				vol->backing_dev->blocklen;
957 	backing_io->backing_cb_args = &load_ctx->backing_cb_args;
958 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
959 
960 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
961 	load_ctx->backing_cb_args.cb_arg = load_ctx;
962 	vol->backing_dev->submit_backing_io(backing_io);
963 }
964 
965 void
966 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
967 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
968 {
969 	if (vol == NULL) {
970 		/* This indicates a programming error. */
971 		assert(false);
972 		cb_fn(cb_arg, -EINVAL);
973 		return;
974 	}
975 
976 	if (--g_vol_count == 0) {
977 		spdk_free(g_zero_buf);
978 	}
979 	assert(g_vol_count >= 0);
980 	_init_load_cleanup(vol, NULL);
981 	cb_fn(cb_arg, 0);
982 }
983 
984 struct reduce_destroy_ctx {
985 	spdk_reduce_vol_op_complete		cb_fn;
986 	void					*cb_arg;
987 	struct spdk_reduce_vol			*vol;
988 	struct spdk_reduce_vol_superblock	*super;
989 	struct iovec				iov;
990 	struct spdk_reduce_vol_cb_args		backing_cb_args;
991 	int					reduce_errno;
992 	char					pm_path[REDUCE_PATH_MAX];
993 	struct spdk_reduce_backing_io           *backing_io;
994 };
995 
996 static void
997 destroy_unload_cpl(void *cb_arg, int reduce_errno)
998 {
999 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1000 
1001 	if (destroy_ctx->reduce_errno == 0) {
1002 		if (unlink(destroy_ctx->pm_path)) {
1003 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
1004 				    destroy_ctx->pm_path, strerror(errno));
1005 		}
1006 	}
1007 
1008 	/* Even if the unload somehow failed, we still pass the destroy_ctx
1009 	 * reduce_errno since that indicates whether or not the volume was
1010 	 * actually destroyed.
1011 	 */
1012 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
1013 	spdk_free(destroy_ctx->super);
1014 	free(destroy_ctx->backing_io);
1015 	free(destroy_ctx);
1016 }
1017 
1018 static void
1019 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
1020 {
1021 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1022 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
1023 
1024 	destroy_ctx->reduce_errno = reduce_errno;
1025 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
1026 }
1027 
1028 static void
1029 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1030 {
1031 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1032 	struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io;
1033 
1034 	if (reduce_errno != 0) {
1035 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
1036 		spdk_free(destroy_ctx->super);
1037 		free(destroy_ctx);
1038 		return;
1039 	}
1040 
1041 	destroy_ctx->vol = vol;
1042 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
1043 	destroy_ctx->iov.iov_base = destroy_ctx->super;
1044 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
1045 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
1046 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
1047 
1048 	backing_io->dev = vol->backing_dev;
1049 	backing_io->iov = &destroy_ctx->iov;
1050 	backing_io->iovcnt = 1;
1051 	backing_io->lba = 0;
1052 	backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen;
1053 	backing_io->backing_cb_args = &destroy_ctx->backing_cb_args;
1054 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1055 
1056 	vol->backing_dev->submit_backing_io(backing_io);
1057 }
1058 
1059 void
1060 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
1061 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1062 {
1063 	struct reduce_destroy_ctx *destroy_ctx;
1064 	struct spdk_reduce_backing_io *backing_io;
1065 
1066 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
1067 	if (destroy_ctx == NULL) {
1068 		cb_fn(cb_arg, -ENOMEM);
1069 		return;
1070 	}
1071 
1072 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
1073 	if (backing_io == NULL) {
1074 		free(destroy_ctx);
1075 		cb_fn(cb_arg, -ENOMEM);
1076 		return;
1077 	}
1078 
1079 	destroy_ctx->backing_io = backing_io;
1080 
1081 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
1082 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1083 	if (destroy_ctx->super == NULL) {
1084 		free(destroy_ctx);
1085 		free(backing_io);
1086 		cb_fn(cb_arg, -ENOMEM);
1087 		return;
1088 	}
1089 	destroy_ctx->cb_fn = cb_fn;
1090 	destroy_ctx->cb_arg = cb_arg;
1091 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
1092 }
1093 
1094 static bool
1095 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
1096 {
1097 	uint64_t start_chunk, end_chunk;
1098 
1099 	start_chunk = offset / vol->logical_blocks_per_chunk;
1100 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
1101 
1102 	return (start_chunk != end_chunk);
1103 }
1104 
1105 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
1106 
1107 static void
1108 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
1109 {
1110 	struct spdk_reduce_vol_request *next_req;
1111 	struct spdk_reduce_vol *vol = req->vol;
1112 
1113 	req->cb_fn(req->cb_arg, reduce_errno);
1114 	RB_REMOVE(executing_req_tree, &vol->executing_requests, req);
1115 
1116 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1117 		if (next_req->logical_map_index == req->logical_map_index) {
1118 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1119 			if (next_req->type == REDUCE_IO_READV) {
1120 				_start_readv_request(next_req);
1121 			} else {
1122 				assert(next_req->type == REDUCE_IO_WRITEV);
1123 				_start_writev_request(next_req);
1124 			}
1125 			break;
1126 		}
1127 	}
1128 
1129 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1130 }
1131 
1132 static void
1133 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
1134 {
1135 	struct spdk_reduce_chunk_map *chunk;
1136 	uint64_t index;
1137 	bool success;
1138 	uint32_t i;
1139 
1140 	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
1141 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1142 		index = chunk->io_unit_index[i];
1143 		if (index == REDUCE_EMPTY_MAP_ENTRY) {
1144 			break;
1145 		}
1146 		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
1147 					  index) == true);
1148 		spdk_bit_array_clear(vol->allocated_backing_io_units, index);
1149 		success = queue_enqueue(&vol->free_backing_blocks_queue, index);
1150 		if (!success && index < vol->find_block_offset) {
1151 			vol->find_block_offset = index;
1152 		}
1153 		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1154 	}
1155 	success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index);
1156 	if (!success && chunk_map_index < vol->find_chunk_offset) {
1157 		vol->find_chunk_offset = chunk_map_index;
1158 	}
1159 	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
1160 }
1161 
1162 static void
1163 _write_write_done(void *_req, int reduce_errno)
1164 {
1165 	struct spdk_reduce_vol_request *req = _req;
1166 	struct spdk_reduce_vol *vol = req->vol;
1167 	uint64_t old_chunk_map_index;
1168 
1169 	if (reduce_errno != 0) {
1170 		req->reduce_errno = reduce_errno;
1171 	}
1172 
1173 	assert(req->num_backing_ops > 0);
1174 	if (--req->num_backing_ops > 0) {
1175 		return;
1176 	}
1177 
1178 	if (req->reduce_errno != 0) {
1179 		_reduce_vol_reset_chunk(vol, req->chunk_map_index);
1180 		_reduce_vol_complete_req(req, req->reduce_errno);
1181 		return;
1182 	}
1183 
1184 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1185 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1186 		_reduce_vol_reset_chunk(vol, old_chunk_map_index);
1187 	}
1188 
1189 	/*
1190 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1191 	 * becomes invalid after we update the logical map, since the old chunk map will no
1192 	 * longer have a reference to it in the logical map.
1193 	 */
1194 
1195 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1196 	_reduce_persist(vol, req->chunk,
1197 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1198 
1199 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1200 
1201 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1202 
1203 	_reduce_vol_complete_req(req, 0);
1204 }
1205 
1206 static struct spdk_reduce_backing_io *
1207 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index)
1208 {
1209 	struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev;
1210 	struct spdk_reduce_backing_io *backing_io;
1211 
1212 	backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io +
1213 			(sizeof(*backing_io) + backing_dev->user_ctx_size) * index);
1214 
1215 	return backing_io;
1216 
1217 }
1218 
1219 struct reduce_merged_io_desc {
1220 	uint64_t io_unit_index;
1221 	uint32_t num_io_units;
1222 };
1223 
1224 static void
1225 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1226 				 reduce_request_fn next_fn, bool is_write)
1227 {
1228 	struct iovec *iov;
1229 	struct spdk_reduce_backing_io *backing_io;
1230 	uint8_t *buf;
1231 	uint32_t i;
1232 
1233 	if (req->chunk_is_compressed) {
1234 		iov = req->comp_buf_iov;
1235 		buf = req->comp_buf;
1236 	} else {
1237 		iov = req->decomp_buf_iov;
1238 		buf = req->decomp_buf;
1239 	}
1240 
1241 	req->num_backing_ops = req->num_io_units;
1242 	req->backing_cb_args.cb_fn = next_fn;
1243 	req->backing_cb_args.cb_arg = req;
1244 	for (i = 0; i < req->num_io_units; i++) {
1245 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1246 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1247 		iov[i].iov_len = vol->params.backing_io_unit_size;
1248 		backing_io->dev  = vol->backing_dev;
1249 		backing_io->iov = &iov[i];
1250 		backing_io->iovcnt = 1;
1251 		backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit;
1252 		backing_io->lba_count = vol->backing_lba_per_io_unit;
1253 		backing_io->backing_cb_args = &req->backing_cb_args;
1254 		if (is_write) {
1255 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1256 		} else {
1257 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1258 		}
1259 		vol->backing_dev->submit_backing_io(backing_io);
1260 	}
1261 }
1262 
1263 static void
1264 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1265 		   reduce_request_fn next_fn, bool is_write)
1266 {
1267 	struct iovec *iov;
1268 	struct spdk_reduce_backing_io *backing_io;
1269 	struct reduce_merged_io_desc merged_io_desc[4];
1270 	uint8_t *buf;
1271 	bool merge = false;
1272 	uint32_t num_io = 0;
1273 	uint32_t io_unit_counts = 0;
1274 	uint32_t merged_io_idx = 0;
1275 	uint32_t i;
1276 
1277 	/* The merged_io_desc value is defined here to contain four elements,
1278 	 * and the chunk size must be four times the maximum of the io unit.
1279 	 * if chunk size is too big, don't merge IO.
1280 	 */
1281 	if (vol->backing_io_units_per_chunk > 4) {
1282 		_issue_backing_ops_without_merge(req, vol, next_fn, is_write);
1283 		return;
1284 	}
1285 
1286 	if (req->chunk_is_compressed) {
1287 		iov = req->comp_buf_iov;
1288 		buf = req->comp_buf;
1289 	} else {
1290 		iov = req->decomp_buf_iov;
1291 		buf = req->decomp_buf;
1292 	}
1293 
1294 	for (i = 0; i < req->num_io_units; i++) {
1295 		if (!merge) {
1296 			merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
1297 			merged_io_desc[merged_io_idx].num_io_units = 1;
1298 			num_io++;
1299 		}
1300 
1301 		if (i + 1 == req->num_io_units) {
1302 			break;
1303 		}
1304 
1305 		if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
1306 			merged_io_desc[merged_io_idx].num_io_units += 1;
1307 			merge = true;
1308 			continue;
1309 		}
1310 		merge = false;
1311 		merged_io_idx++;
1312 	}
1313 
1314 	req->num_backing_ops = num_io;
1315 	req->backing_cb_args.cb_fn = next_fn;
1316 	req->backing_cb_args.cb_arg = req;
1317 	for (i = 0; i < num_io; i++) {
1318 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1319 		iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
1320 		iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
1321 		backing_io->dev  = vol->backing_dev;
1322 		backing_io->iov = &iov[i];
1323 		backing_io->iovcnt = 1;
1324 		backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit;
1325 		backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units;
1326 		backing_io->backing_cb_args = &req->backing_cb_args;
1327 		if (is_write) {
1328 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1329 		} else {
1330 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1331 		}
1332 		vol->backing_dev->submit_backing_io(backing_io);
1333 
1334 		/* Collects the number of processed I/O. */
1335 		io_unit_counts += merged_io_desc[i].num_io_units;
1336 	}
1337 }
1338 
1339 static void
1340 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1341 			uint32_t compressed_size)
1342 {
1343 	struct spdk_reduce_vol *vol = req->vol;
1344 	uint32_t i;
1345 	uint64_t chunk_offset, remainder, free_index, total_len = 0;
1346 	uint8_t *buf;
1347 	bool success;
1348 	int j;
1349 
1350 	success = queue_dequeue(&vol->free_chunks_queue, &free_index);
1351 	if (success) {
1352 		req->chunk_map_index = free_index;
1353 	} else {
1354 		req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps,
1355 				       vol->find_chunk_offset);
1356 		vol->find_chunk_offset = req->chunk_map_index + 1;
1357 	}
1358 
1359 	/* TODO: fail if no chunk map found - but really this should not happen if we
1360 	 * size the number of requests similarly to number of extra chunk maps
1361 	 */
1362 	assert(req->chunk_map_index != UINT32_MAX);
1363 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1364 
1365 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1366 	req->num_io_units = spdk_divide_round_up(compressed_size,
1367 			    vol->params.backing_io_unit_size);
1368 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1369 	req->chunk->compressed_size =
1370 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1371 
1372 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1373 	if (req->chunk_is_compressed == false) {
1374 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1375 		buf = req->decomp_buf;
1376 		total_len = chunk_offset * vol->params.logical_block_size;
1377 
1378 		/* zero any offset into chunk */
1379 		if (req->rmw == false && chunk_offset) {
1380 			memset(buf, 0, total_len);
1381 		}
1382 		buf += total_len;
1383 
1384 		/* copy the data */
1385 		for (j = 0; j < req->iovcnt; j++) {
1386 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1387 			buf += req->iov[j].iov_len;
1388 			total_len += req->iov[j].iov_len;
1389 		}
1390 
1391 		/* zero any remainder */
1392 		remainder = vol->params.chunk_size - total_len;
1393 		total_len += remainder;
1394 		if (req->rmw == false && remainder) {
1395 			memset(buf, 0, remainder);
1396 		}
1397 		assert(total_len == vol->params.chunk_size);
1398 	}
1399 
1400 	for (i = 0; i < req->num_io_units; i++) {
1401 		success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index);
1402 		if (success) {
1403 			req->chunk->io_unit_index[i] = free_index;
1404 		} else {
1405 			req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units,
1406 						       vol->find_block_offset);
1407 			vol->find_block_offset = req->chunk->io_unit_index[i] + 1;
1408 		}
1409 		/* TODO: fail if no backing block found - but really this should also not
1410 		 * happen (see comment above).
1411 		 */
1412 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1413 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1414 	}
1415 
1416 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1417 }
1418 
1419 static void
1420 _write_compress_done(void *_req, int reduce_errno)
1421 {
1422 	struct spdk_reduce_vol_request *req = _req;
1423 
1424 	/* Negative reduce_errno indicates failure for compression operations.
1425 	 * Just write the uncompressed data instead.  Force this to happen
1426 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1427 	 * When it sees the data couldn't be compressed, it will just write
1428 	 * the uncompressed buffer to disk.
1429 	 */
1430 	if (reduce_errno < 0) {
1431 		req->backing_cb_args.output_size = req->vol->params.chunk_size;
1432 	}
1433 
1434 	_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
1435 }
1436 
1437 static void
1438 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1439 {
1440 	struct spdk_reduce_vol *vol = req->vol;
1441 
1442 	req->backing_cb_args.cb_fn = next_fn;
1443 	req->backing_cb_args.cb_arg = req;
1444 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1445 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1446 	vol->backing_dev->compress(vol->backing_dev,
1447 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1448 				   &req->backing_cb_args);
1449 }
1450 
1451 static void
1452 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1453 {
1454 	struct spdk_reduce_vol *vol = req->vol;
1455 
1456 	req->backing_cb_args.cb_fn = next_fn;
1457 	req->backing_cb_args.cb_arg = req;
1458 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1459 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1460 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1461 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1462 	vol->backing_dev->decompress(vol->backing_dev,
1463 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1464 				     &req->backing_cb_args);
1465 }
1466 
1467 static void
1468 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1469 {
1470 	struct spdk_reduce_vol *vol = req->vol;
1471 	uint64_t chunk_offset, remainder = 0;
1472 	uint64_t ttl_len = 0;
1473 	size_t iov_len;
1474 	int i;
1475 
1476 	req->decomp_iovcnt = 0;
1477 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1478 
1479 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1480 	 * if at least one of the conditions below is true:
1481 	 * 1. User's buffer is fragmented
1482 	 * 2. Length of the user's buffer is less than the chunk
1483 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1484 	iov_len = req->iov[0].iov_len;
1485 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1486 				     req->iov[0].iov_len < vol->params.chunk_size ||
1487 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1488 	if (req->copy_after_decompress) {
1489 		req->decomp_iov[0].iov_base = req->decomp_buf;
1490 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1491 		req->decomp_iovcnt = 1;
1492 		goto decompress;
1493 	}
1494 
1495 	if (chunk_offset) {
1496 		/* first iov point to our scratch buffer for any offset into the chunk */
1497 		req->decomp_iov[0].iov_base = req->decomp_buf;
1498 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1499 		ttl_len += req->decomp_iov[0].iov_len;
1500 		req->decomp_iovcnt = 1;
1501 	}
1502 
1503 	/* now the user data iov, direct to the user buffer */
1504 	for (i = 0; i < req->iovcnt; i++) {
1505 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1506 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1507 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1508 	}
1509 	req->decomp_iovcnt += req->iovcnt;
1510 
1511 	/* send the rest of the chunk to our scratch buffer */
1512 	remainder = vol->params.chunk_size - ttl_len;
1513 	if (remainder) {
1514 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1515 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1516 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1517 		req->decomp_iovcnt++;
1518 	}
1519 	assert(ttl_len == vol->params.chunk_size);
1520 
1521 decompress:
1522 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1523 	req->backing_cb_args.cb_fn = next_fn;
1524 	req->backing_cb_args.cb_arg = req;
1525 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1526 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1527 	vol->backing_dev->decompress(vol->backing_dev,
1528 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1529 				     &req->backing_cb_args);
1530 }
1531 
1532 static inline void
1533 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1534 {
1535 	struct spdk_reduce_vol *vol = req->vol;
1536 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1537 	uint64_t chunk_offset, ttl_len = 0;
1538 	uint64_t remainder = 0;
1539 	char *copy_offset = NULL;
1540 	uint32_t lbsize = vol->params.logical_block_size;
1541 	int i;
1542 
1543 	req->decomp_iov[0].iov_base = req->decomp_buf;
1544 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1545 	req->decomp_iovcnt = 1;
1546 	copy_offset = req->decomp_iov[0].iov_base;
1547 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1548 
1549 	if (chunk_offset) {
1550 		ttl_len += chunk_offset * lbsize;
1551 		/* copy_offset already points to padding buffer if zero_paddings=false */
1552 		if (zero_paddings) {
1553 			memcpy(copy_offset, padding_buffer, ttl_len);
1554 		}
1555 		copy_offset += ttl_len;
1556 	}
1557 
1558 	/* now the user data iov, direct from the user buffer */
1559 	for (i = 0; i < req->iovcnt; i++) {
1560 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1561 		copy_offset += req->iov[i].iov_len;
1562 		ttl_len += req->iov[i].iov_len;
1563 	}
1564 
1565 	remainder = vol->params.chunk_size - ttl_len;
1566 	if (remainder) {
1567 		/* copy_offset already points to padding buffer if zero_paddings=false */
1568 		if (zero_paddings) {
1569 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1570 		}
1571 		ttl_len += remainder;
1572 	}
1573 
1574 	assert(ttl_len == req->vol->params.chunk_size);
1575 }
1576 
1577 /* This function can be called when we are compressing a new data or in case of read-modify-write
1578  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1579  * should point to already read and decompressed buffer */
1580 static inline void
1581 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1582 {
1583 	struct spdk_reduce_vol *vol = req->vol;
1584 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1585 	uint64_t chunk_offset, ttl_len = 0;
1586 	uint64_t remainder = 0;
1587 	uint32_t lbsize = vol->params.logical_block_size;
1588 	size_t iov_len;
1589 	int i;
1590 
1591 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1592 	 * if at least one of the conditions below is true:
1593 	 * 1. User's buffer is fragmented
1594 	 * 2. Length of the user's buffer is less than the chunk
1595 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1596 	iov_len = req->iov[0].iov_len;
1597 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1598 					  req->iov[0].iov_len < vol->params.chunk_size ||
1599 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1600 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1601 		return;
1602 	}
1603 
1604 	req->decomp_iovcnt = 0;
1605 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1606 
1607 	if (chunk_offset != 0) {
1608 		ttl_len += chunk_offset * lbsize;
1609 		req->decomp_iov[0].iov_base = padding_buffer;
1610 		req->decomp_iov[0].iov_len = ttl_len;
1611 		req->decomp_iovcnt = 1;
1612 	}
1613 
1614 	/* now the user data iov, direct from the user buffer */
1615 	for (i = 0; i < req->iovcnt; i++) {
1616 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1617 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1618 		ttl_len += req->iov[i].iov_len;
1619 	}
1620 	req->decomp_iovcnt += req->iovcnt;
1621 
1622 	remainder = vol->params.chunk_size - ttl_len;
1623 	if (remainder) {
1624 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1625 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1626 		req->decomp_iovcnt++;
1627 		ttl_len += remainder;
1628 	}
1629 	assert(ttl_len == req->vol->params.chunk_size);
1630 }
1631 
1632 static void
1633 _write_decompress_done(void *_req, int reduce_errno)
1634 {
1635 	struct spdk_reduce_vol_request *req = _req;
1636 
1637 	/* Negative reduce_errno indicates failure for compression operations. */
1638 	if (reduce_errno < 0) {
1639 		_reduce_vol_complete_req(req, reduce_errno);
1640 		return;
1641 	}
1642 
1643 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1644 	 * represents the output_size.
1645 	 */
1646 	if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
1647 		_reduce_vol_complete_req(req, -EIO);
1648 		return;
1649 	}
1650 
1651 	_prepare_compress_chunk(req, false);
1652 	_reduce_vol_compress_chunk(req, _write_compress_done);
1653 }
1654 
1655 static void
1656 _write_read_done(void *_req, int reduce_errno)
1657 {
1658 	struct spdk_reduce_vol_request *req = _req;
1659 
1660 	if (reduce_errno != 0) {
1661 		req->reduce_errno = reduce_errno;
1662 	}
1663 
1664 	assert(req->num_backing_ops > 0);
1665 	if (--req->num_backing_ops > 0) {
1666 		return;
1667 	}
1668 
1669 	if (req->reduce_errno != 0) {
1670 		_reduce_vol_complete_req(req, req->reduce_errno);
1671 		return;
1672 	}
1673 
1674 	if (req->chunk_is_compressed) {
1675 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1676 	} else {
1677 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1678 
1679 		_write_decompress_done(req, 0);
1680 	}
1681 }
1682 
1683 static void
1684 _read_decompress_done(void *_req, int reduce_errno)
1685 {
1686 	struct spdk_reduce_vol_request *req = _req;
1687 	struct spdk_reduce_vol *vol = req->vol;
1688 
1689 	/* Negative reduce_errno indicates failure for compression operations. */
1690 	if (reduce_errno < 0) {
1691 		_reduce_vol_complete_req(req, reduce_errno);
1692 		return;
1693 	}
1694 
1695 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1696 	 * represents the output_size.
1697 	 */
1698 	if (req->backing_cb_args.output_size != vol->params.chunk_size) {
1699 		_reduce_vol_complete_req(req, -EIO);
1700 		return;
1701 	}
1702 
1703 	if (req->copy_after_decompress) {
1704 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1705 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1706 		int i;
1707 
1708 		for (i = 0; i < req->iovcnt; i++) {
1709 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1710 			decomp_buffer += req->iov[i].iov_len;
1711 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1712 		}
1713 	}
1714 
1715 	_reduce_vol_complete_req(req, 0);
1716 }
1717 
1718 static void
1719 _read_read_done(void *_req, int reduce_errno)
1720 {
1721 	struct spdk_reduce_vol_request *req = _req;
1722 	uint64_t chunk_offset;
1723 	uint8_t *buf;
1724 	int i;
1725 
1726 	if (reduce_errno != 0) {
1727 		req->reduce_errno = reduce_errno;
1728 	}
1729 
1730 	assert(req->num_backing_ops > 0);
1731 	if (--req->num_backing_ops > 0) {
1732 		return;
1733 	}
1734 
1735 	if (req->reduce_errno != 0) {
1736 		_reduce_vol_complete_req(req, req->reduce_errno);
1737 		return;
1738 	}
1739 
1740 	if (req->chunk_is_compressed) {
1741 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1742 	} else {
1743 
1744 		/* If the chunk was compressed, the data would have been sent to the
1745 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1746 		 */
1747 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1748 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1749 		for (i = 0; i < req->iovcnt; i++) {
1750 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1751 			buf += req->iov[i].iov_len;
1752 		}
1753 
1754 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1755 
1756 		_read_decompress_done(req, 0);
1757 	}
1758 }
1759 
1760 static void
1761 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1762 {
1763 	struct spdk_reduce_vol *vol = req->vol;
1764 
1765 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1766 	assert(req->chunk_map_index != UINT32_MAX);
1767 
1768 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1769 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1770 			    vol->params.backing_io_unit_size);
1771 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1772 
1773 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1774 }
1775 
1776 static bool
1777 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1778 		    uint64_t length)
1779 {
1780 	uint64_t size = 0;
1781 	int i;
1782 
1783 	if (iovcnt > REDUCE_MAX_IOVECS) {
1784 		return false;
1785 	}
1786 
1787 	for (i = 0; i < iovcnt; i++) {
1788 		size += iov[i].iov_len;
1789 	}
1790 
1791 	return size == (length * vol->params.logical_block_size);
1792 }
1793 
1794 static bool
1795 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1796 {
1797 	struct spdk_reduce_vol_request req;
1798 
1799 	req.logical_map_index = logical_map_index;
1800 
1801 	return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req));
1802 }
1803 
1804 static void
1805 _start_readv_request(struct spdk_reduce_vol_request *req)
1806 {
1807 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1808 	_reduce_vol_read_chunk(req, _read_read_done);
1809 }
1810 
1811 void
1812 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1813 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1814 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1815 {
1816 	struct spdk_reduce_vol_request *req;
1817 	uint64_t logical_map_index;
1818 	bool overlapped;
1819 	int i;
1820 
1821 	if (length == 0) {
1822 		cb_fn(cb_arg, 0);
1823 		return;
1824 	}
1825 
1826 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1827 		cb_fn(cb_arg, -EINVAL);
1828 		return;
1829 	}
1830 
1831 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1832 		cb_fn(cb_arg, -EINVAL);
1833 		return;
1834 	}
1835 
1836 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1837 	overlapped = _check_overlap(vol, logical_map_index);
1838 
1839 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1840 		/*
1841 		 * This chunk hasn't been allocated.  So treat the data as all
1842 		 * zeroes for this chunk - do the memset and immediately complete
1843 		 * the operation.
1844 		 */
1845 		for (i = 0; i < iovcnt; i++) {
1846 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1847 		}
1848 		cb_fn(cb_arg, 0);
1849 		return;
1850 	}
1851 
1852 	req = TAILQ_FIRST(&vol->free_requests);
1853 	if (req == NULL) {
1854 		cb_fn(cb_arg, -ENOMEM);
1855 		return;
1856 	}
1857 
1858 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1859 	req->type = REDUCE_IO_READV;
1860 	req->vol = vol;
1861 	req->iov = iov;
1862 	req->iovcnt = iovcnt;
1863 	req->offset = offset;
1864 	req->logical_map_index = logical_map_index;
1865 	req->length = length;
1866 	req->copy_after_decompress = false;
1867 	req->cb_fn = cb_fn;
1868 	req->cb_arg = cb_arg;
1869 
1870 	if (!overlapped) {
1871 		_start_readv_request(req);
1872 	} else {
1873 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1874 	}
1875 }
1876 
1877 static void
1878 _start_writev_request(struct spdk_reduce_vol_request *req)
1879 {
1880 	struct spdk_reduce_vol *vol = req->vol;
1881 
1882 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1883 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1884 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1885 			/* Read old chunk, then overwrite with data from this write
1886 			 *  operation.
1887 			 */
1888 			req->rmw = true;
1889 			_reduce_vol_read_chunk(req, _write_read_done);
1890 			return;
1891 		}
1892 	}
1893 
1894 	req->rmw = false;
1895 
1896 	_prepare_compress_chunk(req, true);
1897 	_reduce_vol_compress_chunk(req, _write_compress_done);
1898 }
1899 
1900 void
1901 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1902 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1903 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1904 {
1905 	struct spdk_reduce_vol_request *req;
1906 	uint64_t logical_map_index;
1907 	bool overlapped;
1908 
1909 	if (length == 0) {
1910 		cb_fn(cb_arg, 0);
1911 		return;
1912 	}
1913 
1914 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1915 		cb_fn(cb_arg, -EINVAL);
1916 		return;
1917 	}
1918 
1919 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1920 		cb_fn(cb_arg, -EINVAL);
1921 		return;
1922 	}
1923 
1924 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1925 	overlapped = _check_overlap(vol, logical_map_index);
1926 
1927 	req = TAILQ_FIRST(&vol->free_requests);
1928 	if (req == NULL) {
1929 		cb_fn(cb_arg, -ENOMEM);
1930 		return;
1931 	}
1932 
1933 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1934 	req->type = REDUCE_IO_WRITEV;
1935 	req->vol = vol;
1936 	req->iov = iov;
1937 	req->iovcnt = iovcnt;
1938 	req->offset = offset;
1939 	req->logical_map_index = logical_map_index;
1940 	req->length = length;
1941 	req->copy_after_decompress = false;
1942 	req->cb_fn = cb_fn;
1943 	req->cb_arg = cb_arg;
1944 
1945 	if (!overlapped) {
1946 		_start_writev_request(req);
1947 	} else {
1948 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1949 	}
1950 }
1951 
1952 const struct spdk_reduce_vol_params *
1953 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1954 {
1955 	return &vol->params;
1956 }
1957 
1958 const char *
1959 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
1960 {
1961 	return vol->pm_file.path;
1962 }
1963 
1964 void
1965 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1966 {
1967 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1968 	uint32_t struct_size;
1969 	uint64_t chunk_map_size;
1970 
1971 	SPDK_NOTICELOG("vol info:\n");
1972 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1973 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1974 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1975 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1976 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1977 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1978 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1979 		       vol->params.vol_size / vol->params.chunk_size);
1980 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1981 			vol->params.backing_io_unit_size);
1982 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1983 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1984 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1985 
1986 	SPDK_NOTICELOG("pmem info:\n");
1987 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1988 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1989 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1990 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1991 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1992 			   vol->params.chunk_size);
1993 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1994 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1995 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1996 			 vol->params.backing_io_unit_size);
1997 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1998 }
1999 
2000 SPDK_LOG_REGISTER_COMPONENT(reduce)
2001