xref: /spdk/lib/reduce/reduce.c (revision 48b83bb7b3b1ea42dfb6709a8389c6f2ff52f405)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/env.h"
11 #include "spdk/string.h"
12 #include "spdk/bit_array.h"
13 #include "spdk/util.h"
14 #include "spdk/log.h"
15 #include "spdk/memory.h"
16 
17 #include "libpmem.h"
18 
19 /* Always round up the size of the PM region to the nearest cacheline. */
20 #define REDUCE_PM_SIZE_ALIGNMENT	64
21 
22 /* Offset into the backing device where the persistent memory file's path is stored. */
23 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
24 
25 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
26 
27 #define REDUCE_NUM_VOL_REQUESTS	256
28 
29 /* Structure written to offset 0 of both the pm file and the backing device. */
30 struct spdk_reduce_vol_superblock {
31 	uint8_t				signature[8];
32 	struct spdk_reduce_vol_params	params;
33 	uint8_t				reserved[4040];
34 };
35 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
36 
37 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
38 /* null terminator counts one */
39 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
40 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
41 
42 #define REDUCE_PATH_MAX 4096
43 
44 #define REDUCE_ZERO_BUF_SIZE 0x100000
45 
46 /**
47  * Describes a persistent memory file used to hold metadata associated with a
48  *  compressed volume.
49  */
50 struct spdk_reduce_pm_file {
51 	char			path[REDUCE_PATH_MAX];
52 	void			*pm_buf;
53 	int			pm_is_pmem;
54 	uint64_t		size;
55 };
56 
57 #define REDUCE_IO_READV		1
58 #define REDUCE_IO_WRITEV	2
59 
60 struct spdk_reduce_chunk_map {
61 	uint32_t		compressed_size;
62 	uint32_t		reserved;
63 	uint64_t		io_unit_index[0];
64 };
65 
66 struct spdk_reduce_vol_request {
67 	/**
68 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
69 	 *   1) source buffer for compression operations
70 	 *   2) destination buffer for decompression operations
71 	 *   3) data buffer when writing uncompressed chunk to disk
72 	 *   4) data buffer when reading uncompressed chunk from disk
73 	 */
74 	uint8_t					*decomp_buf;
75 	struct iovec				*decomp_buf_iov;
76 
77 	/**
78 	 * These are used to construct the iovecs that are sent to
79 	 *  the decomp engine, they point to a mix of the scratch buffer
80 	 *  and user buffer
81 	 */
82 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
83 	int					decomp_iovcnt;
84 
85 	/**
86 	 *  Scratch buffer used for compressed chunk.  This is used for:
87 	 *   1) destination buffer for compression operations
88 	 *   2) source buffer for decompression operations
89 	 *   3) data buffer when writing compressed chunk to disk
90 	 *   4) data buffer when reading compressed chunk from disk
91 	 */
92 	uint8_t					*comp_buf;
93 	struct iovec				*comp_buf_iov;
94 	struct iovec				*iov;
95 	bool					rmw;
96 	struct spdk_reduce_vol			*vol;
97 	int					type;
98 	int					reduce_errno;
99 	int					iovcnt;
100 	int					num_backing_ops;
101 	uint32_t				num_io_units;
102 	struct spdk_reduce_backing_io           *backing_io;
103 	bool					chunk_is_compressed;
104 	bool					copy_after_decompress;
105 	uint64_t				offset;
106 	uint64_t				logical_map_index;
107 	uint64_t				length;
108 	uint64_t				chunk_map_index;
109 	struct spdk_reduce_chunk_map		*chunk;
110 	spdk_reduce_vol_op_complete		cb_fn;
111 	void					*cb_arg;
112 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
113 	struct spdk_reduce_vol_cb_args		backing_cb_args;
114 };
115 
116 struct spdk_reduce_vol {
117 	struct spdk_reduce_vol_params		params;
118 	uint32_t				backing_io_units_per_chunk;
119 	uint32_t				backing_lba_per_io_unit;
120 	uint32_t				logical_blocks_per_chunk;
121 	struct spdk_reduce_pm_file		pm_file;
122 	struct spdk_reduce_backing_dev		*backing_dev;
123 	struct spdk_reduce_vol_superblock	*backing_super;
124 	struct spdk_reduce_vol_superblock	*pm_super;
125 	uint64_t				*pm_logical_map;
126 	uint64_t				*pm_chunk_maps;
127 
128 	struct spdk_bit_array			*allocated_chunk_maps;
129 	struct spdk_bit_array			*allocated_backing_io_units;
130 
131 	struct spdk_reduce_vol_request		*request_mem;
132 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
133 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
134 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
135 
136 	/* Single contiguous buffer used for all request buffers for this volume. */
137 	uint8_t					*buf_mem;
138 	struct iovec                            *buf_iov_mem;
139 	/* Single contiguous buffer used for backing io buffers for this volume. */
140 	uint8_t                                 *buf_backing_io_mem;
141 };
142 
143 static void _start_readv_request(struct spdk_reduce_vol_request *req);
144 static void _start_writev_request(struct spdk_reduce_vol_request *req);
145 static uint8_t *g_zero_buf;
146 static int g_vol_count = 0;
147 
148 /*
149  * Allocate extra metadata chunks and corresponding backing io units to account for
150  *  outstanding IO in worst case scenario where logical map is completely allocated
151  *  and no data can be compressed.  We need extra chunks in this case to handle
152  *  in-flight writes since reduce never writes data in place.
153  */
154 #define REDUCE_NUM_EXTRA_CHUNKS 128
155 
156 static void
157 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
158 {
159 	if (vol->pm_file.pm_is_pmem) {
160 		pmem_persist(addr, len);
161 	} else {
162 		pmem_msync(addr, len);
163 	}
164 }
165 
166 static uint64_t
167 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
168 {
169 	uint64_t chunks_in_logical_map, logical_map_size;
170 
171 	chunks_in_logical_map = vol_size / chunk_size;
172 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
173 
174 	/* Round up to next cacheline. */
175 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
176 	       REDUCE_PM_SIZE_ALIGNMENT;
177 }
178 
179 static uint64_t
180 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
181 {
182 	uint64_t num_chunks;
183 
184 	num_chunks = vol_size / chunk_size;
185 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
186 
187 	return num_chunks;
188 }
189 
190 static inline uint32_t
191 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
192 {
193 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
194 }
195 
196 static uint64_t
197 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
198 {
199 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
200 
201 	num_chunks = _get_total_chunks(vol_size, chunk_size);
202 	io_units_per_chunk = chunk_size / backing_io_unit_size;
203 
204 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
205 
206 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
207 	       REDUCE_PM_SIZE_ALIGNMENT;
208 }
209 
210 static struct spdk_reduce_chunk_map *
211 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
212 {
213 	uintptr_t chunk_map_addr;
214 
215 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
216 
217 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
218 	chunk_map_addr += chunk_map_index *
219 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
220 
221 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
222 }
223 
224 static int
225 _validate_vol_params(struct spdk_reduce_vol_params *params)
226 {
227 	if (params->vol_size > 0) {
228 		/**
229 		 * User does not pass in the vol size - it gets calculated by libreduce from
230 		 *  values in this structure plus the size of the backing device.
231 		 */
232 		return -EINVAL;
233 	}
234 
235 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
236 	    params->logical_block_size == 0) {
237 		return -EINVAL;
238 	}
239 
240 	/* Chunk size must be an even multiple of the backing io unit size. */
241 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
242 		return -EINVAL;
243 	}
244 
245 	/* Chunk size must be an even multiple of the logical block size. */
246 	if ((params->chunk_size % params->logical_block_size) != 0) {
247 		return -1;
248 	}
249 
250 	return 0;
251 }
252 
253 static uint64_t
254 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
255 {
256 	uint64_t num_chunks;
257 
258 	num_chunks = backing_dev_size / chunk_size;
259 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
260 		return 0;
261 	}
262 
263 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
264 	return num_chunks * chunk_size;
265 }
266 
267 static uint64_t
268 _get_pm_file_size(struct spdk_reduce_vol_params *params)
269 {
270 	uint64_t total_pm_size;
271 
272 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
273 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
274 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
275 			 params->backing_io_unit_size);
276 	return total_pm_size;
277 }
278 
279 const struct spdk_uuid *
280 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
281 {
282 	return &vol->params.uuid;
283 }
284 
285 static void
286 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
287 {
288 	uint64_t logical_map_size;
289 
290 	/* Superblock is at the beginning of the pm file. */
291 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
292 
293 	/* Logical map immediately follows the super block. */
294 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
295 
296 	/* Chunks maps follow the logical map. */
297 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
298 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
299 }
300 
301 /* We need 2 iovs during load - one for the superblock, another for the path */
302 #define LOAD_IOV_COUNT	2
303 
304 struct reduce_init_load_ctx {
305 	struct spdk_reduce_vol			*vol;
306 	struct spdk_reduce_vol_cb_args		backing_cb_args;
307 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
308 	void					*cb_arg;
309 	struct iovec				iov[LOAD_IOV_COUNT];
310 	void					*path;
311 	struct spdk_reduce_backing_io           *backing_io;
312 };
313 
314 static inline bool
315 _addr_crosses_huge_page(const void *addr, size_t *size)
316 {
317 	size_t _size;
318 	uint64_t rc;
319 
320 	assert(size);
321 
322 	_size = *size;
323 	rc = spdk_vtophys(addr, size);
324 
325 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
326 }
327 
328 static inline int
329 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
330 {
331 	uint8_t *addr;
332 	size_t size_tmp = buffer_size;
333 
334 	addr = *_addr;
335 
336 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
337 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
338 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
339 		 * Skip remaining bytes and continue from the beginning of the next page */
340 		addr += size_tmp;
341 	}
342 
343 	if (addr + buffer_size > addr_range) {
344 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
345 		return -ERANGE;
346 	}
347 
348 	*vol_buffer = addr;
349 	*_addr = addr + buffer_size;
350 
351 	return 0;
352 }
353 
354 static int
355 _allocate_vol_requests(struct spdk_reduce_vol *vol)
356 {
357 	struct spdk_reduce_vol_request *req;
358 	struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev;
359 	uint32_t reqs_in_2mb_page, huge_pages_needed;
360 	uint8_t *buffer, *buffer_end;
361 	int i = 0;
362 	int rc = 0;
363 
364 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
365 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
366 	* necessarily power of 2
367 	* Allocate 2x since we need buffers for both read/write and compress/decompress
368 	* intermediate buffers. */
369 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
370 	if (!reqs_in_2mb_page) {
371 		return -EINVAL;
372 	}
373 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
374 
375 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
376 	if (vol->buf_mem == NULL) {
377 		return -ENOMEM;
378 	}
379 
380 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
381 	if (vol->request_mem == NULL) {
382 		spdk_free(vol->buf_mem);
383 		vol->buf_mem = NULL;
384 		return -ENOMEM;
385 	}
386 
387 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
388 	 *  buffers.
389 	 */
390 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
391 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
392 	if (vol->buf_iov_mem == NULL) {
393 		free(vol->request_mem);
394 		spdk_free(vol->buf_mem);
395 		vol->request_mem = NULL;
396 		vol->buf_mem = NULL;
397 		return -ENOMEM;
398 	}
399 
400 	vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) +
401 					 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk);
402 	if (vol->buf_backing_io_mem == NULL) {
403 		free(vol->request_mem);
404 		free(vol->buf_iov_mem);
405 		spdk_free(vol->buf_mem);
406 		vol->request_mem = NULL;
407 		vol->buf_iov_mem = NULL;
408 		vol->buf_mem = NULL;
409 		return -ENOMEM;
410 	}
411 
412 	buffer = vol->buf_mem;
413 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
414 
415 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
416 		req = &vol->request_mem[i];
417 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
418 		req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i *
419 				  (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) *
420 				  vol->backing_io_units_per_chunk);
421 
422 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
423 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
424 
425 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
426 		if (rc) {
427 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
428 				    vol->buf_mem, buffer_end);
429 			break;
430 		}
431 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
432 		if (rc) {
433 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
434 				    vol->buf_mem, buffer_end);
435 			break;
436 		}
437 	}
438 
439 	if (rc) {
440 		free(vol->buf_backing_io_mem);
441 		free(vol->buf_iov_mem);
442 		free(vol->request_mem);
443 		spdk_free(vol->buf_mem);
444 		vol->buf_mem = NULL;
445 		vol->buf_backing_io_mem = NULL;
446 		vol->buf_iov_mem = NULL;
447 		vol->request_mem = NULL;
448 	}
449 
450 	return rc;
451 }
452 
453 static void
454 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
455 {
456 	if (ctx != NULL) {
457 		spdk_free(ctx->path);
458 		free(ctx->backing_io);
459 		free(ctx);
460 	}
461 
462 	if (vol != NULL) {
463 		if (vol->pm_file.pm_buf != NULL) {
464 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
465 		}
466 
467 		spdk_free(vol->backing_super);
468 		spdk_bit_array_free(&vol->allocated_chunk_maps);
469 		spdk_bit_array_free(&vol->allocated_backing_io_units);
470 		free(vol->request_mem);
471 		free(vol->buf_backing_io_mem);
472 		free(vol->buf_iov_mem);
473 		spdk_free(vol->buf_mem);
474 		free(vol);
475 	}
476 }
477 
478 static int
479 _alloc_zero_buff(void)
480 {
481 	int rc = 0;
482 
483 	/* The zero buffer is shared between all volumes and just used
484 	 * for reads so allocate one global instance here if not already
485 	 * allocated when another vol init'd or loaded.
486 	 */
487 	if (g_vol_count++ == 0) {
488 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
489 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
490 					  SPDK_MALLOC_DMA);
491 		if (g_zero_buf == NULL) {
492 			g_vol_count--;
493 			rc = -ENOMEM;
494 		}
495 	}
496 	return rc;
497 }
498 
499 static void
500 _init_write_super_cpl(void *cb_arg, int reduce_errno)
501 {
502 	struct reduce_init_load_ctx *init_ctx = cb_arg;
503 	int rc;
504 
505 	rc = _allocate_vol_requests(init_ctx->vol);
506 	if (rc != 0) {
507 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
508 		_init_load_cleanup(init_ctx->vol, init_ctx);
509 		return;
510 	}
511 
512 	rc = _alloc_zero_buff();
513 	if (rc != 0) {
514 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
515 		_init_load_cleanup(init_ctx->vol, init_ctx);
516 		return;
517 	}
518 
519 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
520 	/* Only clean up the ctx - the vol has been passed to the application
521 	 *  for use now that initialization was successful.
522 	 */
523 	_init_load_cleanup(NULL, init_ctx);
524 }
525 
526 static void
527 _init_write_path_cpl(void *cb_arg, int reduce_errno)
528 {
529 	struct reduce_init_load_ctx *init_ctx = cb_arg;
530 	struct spdk_reduce_vol *vol = init_ctx->vol;
531 	struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io;
532 
533 	init_ctx->iov[0].iov_base = vol->backing_super;
534 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
535 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
536 	init_ctx->backing_cb_args.cb_arg = init_ctx;
537 
538 	backing_io->dev = vol->backing_dev;
539 	backing_io->iov = init_ctx->iov;
540 	backing_io->iovcnt = 1;
541 	backing_io->lba = 0;
542 	backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen;
543 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
544 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
545 
546 	vol->backing_dev->submit_backing_io(backing_io);
547 }
548 
549 static int
550 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
551 {
552 	uint64_t total_chunks, total_backing_io_units;
553 	uint32_t i, num_metadata_io_units;
554 
555 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
556 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
557 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
558 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
559 
560 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
561 		return -ENOMEM;
562 	}
563 
564 	/* Set backing io unit bits associated with metadata. */
565 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
566 				vol->params.backing_io_unit_size;
567 	for (i = 0; i < num_metadata_io_units; i++) {
568 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
569 	}
570 
571 	return 0;
572 }
573 
574 void
575 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
576 		     struct spdk_reduce_backing_dev *backing_dev,
577 		     const char *pm_file_dir,
578 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
579 {
580 	struct spdk_reduce_vol *vol;
581 	struct reduce_init_load_ctx *init_ctx;
582 	struct spdk_reduce_backing_io *backing_io;
583 	uint64_t backing_dev_size;
584 	size_t mapped_len;
585 	int dir_len, max_dir_len, rc;
586 
587 	/* We need to append a path separator and the UUID to the supplied
588 	 * path.
589 	 */
590 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
591 	dir_len = strnlen(pm_file_dir, max_dir_len);
592 	/* Strip trailing slash if the user provided one - we will add it back
593 	 * later when appending the filename.
594 	 */
595 	if (pm_file_dir[dir_len - 1] == '/') {
596 		dir_len--;
597 	}
598 	if (dir_len == max_dir_len) {
599 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
600 		cb_fn(cb_arg, NULL, -EINVAL);
601 		return;
602 	}
603 
604 	rc = _validate_vol_params(params);
605 	if (rc != 0) {
606 		SPDK_ERRLOG("invalid vol params\n");
607 		cb_fn(cb_arg, NULL, rc);
608 		return;
609 	}
610 
611 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
612 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
613 	if (params->vol_size == 0) {
614 		SPDK_ERRLOG("backing device is too small\n");
615 		cb_fn(cb_arg, NULL, -EINVAL);
616 		return;
617 	}
618 
619 	if (backing_dev->submit_backing_io == NULL) {
620 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
621 		cb_fn(cb_arg, NULL, -EINVAL);
622 		return;
623 	}
624 
625 	vol = calloc(1, sizeof(*vol));
626 	if (vol == NULL) {
627 		cb_fn(cb_arg, NULL, -ENOMEM);
628 		return;
629 	}
630 
631 	TAILQ_INIT(&vol->free_requests);
632 	TAILQ_INIT(&vol->executing_requests);
633 	TAILQ_INIT(&vol->queued_requests);
634 
635 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
636 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
637 	if (vol->backing_super == NULL) {
638 		cb_fn(cb_arg, NULL, -ENOMEM);
639 		_init_load_cleanup(vol, NULL);
640 		return;
641 	}
642 
643 	init_ctx = calloc(1, sizeof(*init_ctx));
644 	if (init_ctx == NULL) {
645 		cb_fn(cb_arg, NULL, -ENOMEM);
646 		_init_load_cleanup(vol, NULL);
647 		return;
648 	}
649 
650 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
651 	if (backing_io == NULL) {
652 		cb_fn(cb_arg, NULL, -ENOMEM);
653 		_init_load_cleanup(vol, init_ctx);
654 		return;
655 	}
656 	init_ctx->backing_io = backing_io;
657 
658 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
659 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
660 	if (init_ctx->path == NULL) {
661 		cb_fn(cb_arg, NULL, -ENOMEM);
662 		_init_load_cleanup(vol, init_ctx);
663 		return;
664 	}
665 
666 	if (spdk_uuid_is_null(&params->uuid)) {
667 		spdk_uuid_generate(&params->uuid);
668 	}
669 
670 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
671 	vol->pm_file.path[dir_len] = '/';
672 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
673 			    &params->uuid);
674 	vol->pm_file.size = _get_pm_file_size(params);
675 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
676 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
677 					    &mapped_len, &vol->pm_file.pm_is_pmem);
678 	if (vol->pm_file.pm_buf == NULL) {
679 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
680 			    vol->pm_file.path, strerror(errno));
681 		cb_fn(cb_arg, NULL, -errno);
682 		_init_load_cleanup(vol, init_ctx);
683 		return;
684 	}
685 
686 	if (vol->pm_file.size != mapped_len) {
687 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
688 			    vol->pm_file.size, mapped_len);
689 		cb_fn(cb_arg, NULL, -ENOMEM);
690 		_init_load_cleanup(vol, init_ctx);
691 		return;
692 	}
693 
694 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
695 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
696 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
697 	memcpy(&vol->params, params, sizeof(*params));
698 
699 	vol->backing_dev = backing_dev;
700 
701 	rc = _allocate_bit_arrays(vol);
702 	if (rc != 0) {
703 		cb_fn(cb_arg, NULL, rc);
704 		_init_load_cleanup(vol, init_ctx);
705 		return;
706 	}
707 
708 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
709 	       sizeof(vol->backing_super->signature));
710 	memcpy(&vol->backing_super->params, params, sizeof(*params));
711 
712 	_initialize_vol_pm_pointers(vol);
713 
714 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
715 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
716 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
717 	 */
718 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
719 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
720 
721 	init_ctx->vol = vol;
722 	init_ctx->cb_fn = cb_fn;
723 	init_ctx->cb_arg = cb_arg;
724 
725 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
726 	init_ctx->iov[0].iov_base = init_ctx->path;
727 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
728 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
729 	init_ctx->backing_cb_args.cb_arg = init_ctx;
730 	/* Write path to offset 4K on backing device - just after where the super
731 	 *  block will be written.  We wait until this is committed before writing the
732 	 *  super block to guarantee we don't get the super block written without the
733 	 *  the path if the system crashed in the middle of a write operation.
734 	 */
735 	backing_io->dev = vol->backing_dev;
736 	backing_io->iov = init_ctx->iov;
737 	backing_io->iovcnt = 1;
738 	backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen;
739 	backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen;
740 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
741 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
742 
743 	vol->backing_dev->submit_backing_io(backing_io);
744 }
745 
746 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
747 
748 static void
749 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
750 {
751 	struct reduce_init_load_ctx *load_ctx = cb_arg;
752 	struct spdk_reduce_vol *vol = load_ctx->vol;
753 	uint64_t backing_dev_size;
754 	uint64_t i, num_chunks, logical_map_index;
755 	struct spdk_reduce_chunk_map *chunk;
756 	size_t mapped_len;
757 	uint32_t j;
758 	int rc;
759 
760 	rc = _alloc_zero_buff();
761 	if (rc) {
762 		goto error;
763 	}
764 
765 	if (memcmp(vol->backing_super->signature,
766 		   SPDK_REDUCE_SIGNATURE,
767 		   sizeof(vol->backing_super->signature)) != 0) {
768 		/* This backing device isn't a libreduce backing device. */
769 		rc = -EILSEQ;
770 		goto error;
771 	}
772 
773 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
774 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
775 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
776 	 *  persistent memory file if it exists.
777 	 */
778 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
779 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
780 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
781 		_init_load_cleanup(NULL, load_ctx);
782 		return;
783 	}
784 
785 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
786 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
787 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
788 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
789 
790 	rc = _allocate_bit_arrays(vol);
791 	if (rc != 0) {
792 		goto error;
793 	}
794 
795 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
796 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
797 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
798 			    backing_dev_size);
799 		rc = -EILSEQ;
800 		goto error;
801 	}
802 
803 	vol->pm_file.size = _get_pm_file_size(&vol->params);
804 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
805 					    &vol->pm_file.pm_is_pmem);
806 	if (vol->pm_file.pm_buf == NULL) {
807 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
808 		rc = -errno;
809 		goto error;
810 	}
811 
812 	if (vol->pm_file.size != mapped_len) {
813 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
814 			    vol->pm_file.size, mapped_len);
815 		rc = -ENOMEM;
816 		goto error;
817 	}
818 
819 	rc = _allocate_vol_requests(vol);
820 	if (rc != 0) {
821 		goto error;
822 	}
823 
824 	_initialize_vol_pm_pointers(vol);
825 
826 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
827 	for (i = 0; i < num_chunks; i++) {
828 		logical_map_index = vol->pm_logical_map[i];
829 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
830 			continue;
831 		}
832 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
833 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
834 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
835 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
836 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
837 			}
838 		}
839 	}
840 
841 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
842 	/* Only clean up the ctx - the vol has been passed to the application
843 	 *  for use now that volume load was successful.
844 	 */
845 	_init_load_cleanup(NULL, load_ctx);
846 	return;
847 
848 error:
849 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
850 	_init_load_cleanup(vol, load_ctx);
851 }
852 
853 void
854 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
855 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
856 {
857 	struct spdk_reduce_vol *vol;
858 	struct reduce_init_load_ctx *load_ctx;
859 	struct spdk_reduce_backing_io *backing_io;
860 
861 	if (backing_dev->submit_backing_io == NULL) {
862 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
863 		cb_fn(cb_arg, NULL, -EINVAL);
864 		return;
865 	}
866 
867 	vol = calloc(1, sizeof(*vol));
868 	if (vol == NULL) {
869 		cb_fn(cb_arg, NULL, -ENOMEM);
870 		return;
871 	}
872 
873 	TAILQ_INIT(&vol->free_requests);
874 	TAILQ_INIT(&vol->executing_requests);
875 	TAILQ_INIT(&vol->queued_requests);
876 
877 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
878 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
879 	if (vol->backing_super == NULL) {
880 		_init_load_cleanup(vol, NULL);
881 		cb_fn(cb_arg, NULL, -ENOMEM);
882 		return;
883 	}
884 
885 	vol->backing_dev = backing_dev;
886 
887 	load_ctx = calloc(1, sizeof(*load_ctx));
888 	if (load_ctx == NULL) {
889 		_init_load_cleanup(vol, NULL);
890 		cb_fn(cb_arg, NULL, -ENOMEM);
891 		return;
892 	}
893 
894 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
895 	if (backing_io == NULL) {
896 		_init_load_cleanup(vol, load_ctx);
897 		cb_fn(cb_arg, NULL, -ENOMEM);
898 		return;
899 	}
900 
901 	load_ctx->backing_io = backing_io;
902 
903 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
904 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
905 	if (load_ctx->path == NULL) {
906 		_init_load_cleanup(vol, load_ctx);
907 		cb_fn(cb_arg, NULL, -ENOMEM);
908 		return;
909 	}
910 
911 	load_ctx->vol = vol;
912 	load_ctx->cb_fn = cb_fn;
913 	load_ctx->cb_arg = cb_arg;
914 
915 	load_ctx->iov[0].iov_base = vol->backing_super;
916 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
917 	load_ctx->iov[1].iov_base = load_ctx->path;
918 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
919 	backing_io->dev = vol->backing_dev;
920 	backing_io->iov = load_ctx->iov;
921 	backing_io->iovcnt = LOAD_IOV_COUNT;
922 	backing_io->lba = 0;
923 	backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
924 				vol->backing_dev->blocklen;
925 	backing_io->backing_cb_args = &load_ctx->backing_cb_args;
926 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
927 
928 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
929 	load_ctx->backing_cb_args.cb_arg = load_ctx;
930 	vol->backing_dev->submit_backing_io(backing_io);
931 }
932 
933 void
934 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
935 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
936 {
937 	if (vol == NULL) {
938 		/* This indicates a programming error. */
939 		assert(false);
940 		cb_fn(cb_arg, -EINVAL);
941 		return;
942 	}
943 
944 	if (--g_vol_count == 0) {
945 		spdk_free(g_zero_buf);
946 	}
947 	assert(g_vol_count >= 0);
948 	_init_load_cleanup(vol, NULL);
949 	cb_fn(cb_arg, 0);
950 }
951 
952 struct reduce_destroy_ctx {
953 	spdk_reduce_vol_op_complete		cb_fn;
954 	void					*cb_arg;
955 	struct spdk_reduce_vol			*vol;
956 	struct spdk_reduce_vol_superblock	*super;
957 	struct iovec				iov;
958 	struct spdk_reduce_vol_cb_args		backing_cb_args;
959 	int					reduce_errno;
960 	char					pm_path[REDUCE_PATH_MAX];
961 	struct spdk_reduce_backing_io           *backing_io;
962 };
963 
964 static void
965 destroy_unload_cpl(void *cb_arg, int reduce_errno)
966 {
967 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
968 
969 	if (destroy_ctx->reduce_errno == 0) {
970 		if (unlink(destroy_ctx->pm_path)) {
971 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
972 				    destroy_ctx->pm_path, strerror(errno));
973 		}
974 	}
975 
976 	/* Even if the unload somehow failed, we still pass the destroy_ctx
977 	 * reduce_errno since that indicates whether or not the volume was
978 	 * actually destroyed.
979 	 */
980 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
981 	spdk_free(destroy_ctx->super);
982 	free(destroy_ctx->backing_io);
983 	free(destroy_ctx);
984 }
985 
986 static void
987 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
988 {
989 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
990 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
991 
992 	destroy_ctx->reduce_errno = reduce_errno;
993 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
994 }
995 
996 static void
997 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
998 {
999 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1000 	struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io;
1001 
1002 	if (reduce_errno != 0) {
1003 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
1004 		spdk_free(destroy_ctx->super);
1005 		free(destroy_ctx);
1006 		return;
1007 	}
1008 
1009 	destroy_ctx->vol = vol;
1010 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
1011 	destroy_ctx->iov.iov_base = destroy_ctx->super;
1012 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
1013 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
1014 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
1015 
1016 	backing_io->dev = vol->backing_dev;
1017 	backing_io->iov = &destroy_ctx->iov;
1018 	backing_io->iovcnt = 1;
1019 	backing_io->lba = 0;
1020 	backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen;
1021 	backing_io->backing_cb_args = &destroy_ctx->backing_cb_args;
1022 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1023 
1024 	vol->backing_dev->submit_backing_io(backing_io);
1025 }
1026 
1027 void
1028 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
1029 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1030 {
1031 	struct reduce_destroy_ctx *destroy_ctx;
1032 	struct spdk_reduce_backing_io *backing_io;
1033 
1034 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
1035 	if (destroy_ctx == NULL) {
1036 		cb_fn(cb_arg, -ENOMEM);
1037 		return;
1038 	}
1039 
1040 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
1041 	if (backing_io == NULL) {
1042 		free(destroy_ctx);
1043 		cb_fn(cb_arg, -ENOMEM);
1044 		return;
1045 	}
1046 
1047 	destroy_ctx->backing_io = backing_io;
1048 
1049 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
1050 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1051 	if (destroy_ctx->super == NULL) {
1052 		free(destroy_ctx);
1053 		free(backing_io);
1054 		cb_fn(cb_arg, -ENOMEM);
1055 		return;
1056 	}
1057 	destroy_ctx->cb_fn = cb_fn;
1058 	destroy_ctx->cb_arg = cb_arg;
1059 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
1060 }
1061 
1062 static bool
1063 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
1064 {
1065 	uint64_t start_chunk, end_chunk;
1066 
1067 	start_chunk = offset / vol->logical_blocks_per_chunk;
1068 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
1069 
1070 	return (start_chunk != end_chunk);
1071 }
1072 
1073 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
1074 
1075 static void
1076 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
1077 {
1078 	struct spdk_reduce_vol_request *next_req;
1079 	struct spdk_reduce_vol *vol = req->vol;
1080 
1081 	req->cb_fn(req->cb_arg, reduce_errno);
1082 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
1083 
1084 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1085 		if (next_req->logical_map_index == req->logical_map_index) {
1086 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1087 			if (next_req->type == REDUCE_IO_READV) {
1088 				_start_readv_request(next_req);
1089 			} else {
1090 				assert(next_req->type == REDUCE_IO_WRITEV);
1091 				_start_writev_request(next_req);
1092 			}
1093 			break;
1094 		}
1095 	}
1096 
1097 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1098 }
1099 
1100 static void
1101 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
1102 {
1103 	struct spdk_reduce_chunk_map *chunk;
1104 	uint32_t i;
1105 
1106 	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
1107 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1108 		if (chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
1109 			break;
1110 		}
1111 		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
1112 					  chunk->io_unit_index[i]) == true);
1113 		spdk_bit_array_clear(vol->allocated_backing_io_units, chunk->io_unit_index[i]);
1114 		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1115 	}
1116 	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
1117 }
1118 
1119 static void
1120 _write_write_done(void *_req, int reduce_errno)
1121 {
1122 	struct spdk_reduce_vol_request *req = _req;
1123 	struct spdk_reduce_vol *vol = req->vol;
1124 	uint64_t old_chunk_map_index;
1125 
1126 	if (reduce_errno != 0) {
1127 		req->reduce_errno = reduce_errno;
1128 	}
1129 
1130 	assert(req->num_backing_ops > 0);
1131 	if (--req->num_backing_ops > 0) {
1132 		return;
1133 	}
1134 
1135 	if (req->reduce_errno != 0) {
1136 		_reduce_vol_reset_chunk(vol, req->chunk_map_index);
1137 		_reduce_vol_complete_req(req, req->reduce_errno);
1138 		return;
1139 	}
1140 
1141 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1142 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1143 		_reduce_vol_reset_chunk(vol, old_chunk_map_index);
1144 	}
1145 
1146 	/*
1147 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1148 	 * becomes invalid after we update the logical map, since the old chunk map will no
1149 	 * longer have a reference to it in the logical map.
1150 	 */
1151 
1152 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1153 	_reduce_persist(vol, req->chunk,
1154 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1155 
1156 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1157 
1158 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1159 
1160 	_reduce_vol_complete_req(req, 0);
1161 }
1162 
1163 static struct spdk_reduce_backing_io *
1164 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index)
1165 {
1166 	struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev;
1167 	struct spdk_reduce_backing_io *backing_io;
1168 
1169 	backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io +
1170 			(sizeof(*backing_io) + backing_dev->user_ctx_size) * index);
1171 
1172 	return backing_io;
1173 
1174 }
1175 
1176 struct reduce_merged_io_desc {
1177 	uint64_t io_unit_index;
1178 	uint32_t num_io_units;
1179 };
1180 
1181 static void
1182 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1183 				 reduce_request_fn next_fn, bool is_write)
1184 {
1185 	struct iovec *iov;
1186 	struct spdk_reduce_backing_io *backing_io;
1187 	uint8_t *buf;
1188 	uint32_t i;
1189 
1190 	if (req->chunk_is_compressed) {
1191 		iov = req->comp_buf_iov;
1192 		buf = req->comp_buf;
1193 	} else {
1194 		iov = req->decomp_buf_iov;
1195 		buf = req->decomp_buf;
1196 	}
1197 
1198 	req->num_backing_ops = req->num_io_units;
1199 	req->backing_cb_args.cb_fn = next_fn;
1200 	req->backing_cb_args.cb_arg = req;
1201 	for (i = 0; i < req->num_io_units; i++) {
1202 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1203 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1204 		iov[i].iov_len = vol->params.backing_io_unit_size;
1205 		backing_io->dev  = vol->backing_dev;
1206 		backing_io->iov = &iov[i];
1207 		backing_io->iovcnt = 1;
1208 		backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit;
1209 		backing_io->lba_count = vol->backing_lba_per_io_unit;
1210 		backing_io->backing_cb_args = &req->backing_cb_args;
1211 		if (is_write) {
1212 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1213 		} else {
1214 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1215 		}
1216 		vol->backing_dev->submit_backing_io(backing_io);
1217 	}
1218 }
1219 
1220 static void
1221 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1222 		   reduce_request_fn next_fn, bool is_write)
1223 {
1224 	struct iovec *iov;
1225 	struct spdk_reduce_backing_io *backing_io;
1226 	struct reduce_merged_io_desc merged_io_desc[4];
1227 	uint8_t *buf;
1228 	bool merge = false;
1229 	uint32_t num_io = 0;
1230 	uint32_t io_unit_counts = 0;
1231 	uint32_t merged_io_idx = 0;
1232 	uint32_t i;
1233 
1234 	/* The merged_io_desc value is defined here to contain four elements,
1235 	 * and the chunk size must be four times the maximum of the io unit.
1236 	 * if chunk size is too big, don't merge IO.
1237 	 */
1238 	if (vol->backing_io_units_per_chunk > 4) {
1239 		_issue_backing_ops_without_merge(req, vol, next_fn, is_write);
1240 		return;
1241 	}
1242 
1243 	if (req->chunk_is_compressed) {
1244 		iov = req->comp_buf_iov;
1245 		buf = req->comp_buf;
1246 	} else {
1247 		iov = req->decomp_buf_iov;
1248 		buf = req->decomp_buf;
1249 	}
1250 
1251 	for (i = 0; i < req->num_io_units; i++) {
1252 		if (!merge) {
1253 			merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
1254 			merged_io_desc[merged_io_idx].num_io_units = 1;
1255 			num_io++;
1256 		}
1257 
1258 		if (i + 1 == req->num_io_units) {
1259 			break;
1260 		}
1261 
1262 		if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
1263 			merged_io_desc[merged_io_idx].num_io_units += 1;
1264 			merge = true;
1265 			continue;
1266 		}
1267 		merge = false;
1268 		merged_io_idx++;
1269 	}
1270 
1271 	req->num_backing_ops = num_io;
1272 	req->backing_cb_args.cb_fn = next_fn;
1273 	req->backing_cb_args.cb_arg = req;
1274 	for (i = 0; i < num_io; i++) {
1275 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1276 		iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
1277 		iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
1278 		backing_io->dev  = vol->backing_dev;
1279 		backing_io->iov = &iov[i];
1280 		backing_io->iovcnt = 1;
1281 		backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit;
1282 		backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units;
1283 		backing_io->backing_cb_args = &req->backing_cb_args;
1284 		if (is_write) {
1285 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1286 		} else {
1287 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1288 		}
1289 		vol->backing_dev->submit_backing_io(backing_io);
1290 
1291 		/* Collects the number of processed I/O. */
1292 		io_unit_counts += merged_io_desc[i].num_io_units;
1293 	}
1294 }
1295 
1296 static void
1297 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1298 			uint32_t compressed_size)
1299 {
1300 	struct spdk_reduce_vol *vol = req->vol;
1301 	uint32_t i;
1302 	uint64_t chunk_offset, remainder, total_len = 0;
1303 	uint8_t *buf;
1304 	int j;
1305 
1306 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1307 
1308 	/* TODO: fail if no chunk map found - but really this should not happen if we
1309 	 * size the number of requests similarly to number of extra chunk maps
1310 	 */
1311 	assert(req->chunk_map_index != UINT32_MAX);
1312 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1313 
1314 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1315 	req->num_io_units = spdk_divide_round_up(compressed_size,
1316 			    vol->params.backing_io_unit_size);
1317 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1318 	req->chunk->compressed_size =
1319 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1320 
1321 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1322 	if (req->chunk_is_compressed == false) {
1323 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1324 		buf = req->decomp_buf;
1325 		total_len = chunk_offset * vol->params.logical_block_size;
1326 
1327 		/* zero any offset into chunk */
1328 		if (req->rmw == false && chunk_offset) {
1329 			memset(buf, 0, total_len);
1330 		}
1331 		buf += total_len;
1332 
1333 		/* copy the data */
1334 		for (j = 0; j < req->iovcnt; j++) {
1335 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1336 			buf += req->iov[j].iov_len;
1337 			total_len += req->iov[j].iov_len;
1338 		}
1339 
1340 		/* zero any remainder */
1341 		remainder = vol->params.chunk_size - total_len;
1342 		total_len += remainder;
1343 		if (req->rmw == false && remainder) {
1344 			memset(buf, 0, remainder);
1345 		}
1346 		assert(total_len == vol->params.chunk_size);
1347 	}
1348 
1349 	for (i = 0; i < req->num_io_units; i++) {
1350 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1351 		/* TODO: fail if no backing block found - but really this should also not
1352 		 * happen (see comment above).
1353 		 */
1354 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1355 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1356 	}
1357 
1358 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1359 }
1360 
1361 static void
1362 _write_compress_done(void *_req, int reduce_errno)
1363 {
1364 	struct spdk_reduce_vol_request *req = _req;
1365 
1366 	/* Negative reduce_errno indicates failure for compression operations.
1367 	 * Just write the uncompressed data instead.  Force this to happen
1368 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1369 	 * When it sees the data couldn't be compressed, it will just write
1370 	 * the uncompressed buffer to disk.
1371 	 */
1372 	if (reduce_errno < 0) {
1373 		req->backing_cb_args.output_size = req->vol->params.chunk_size;
1374 	}
1375 
1376 	_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
1377 }
1378 
1379 static void
1380 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1381 {
1382 	struct spdk_reduce_vol *vol = req->vol;
1383 
1384 	req->backing_cb_args.cb_fn = next_fn;
1385 	req->backing_cb_args.cb_arg = req;
1386 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1387 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1388 	vol->backing_dev->compress(vol->backing_dev,
1389 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1390 				   &req->backing_cb_args);
1391 }
1392 
1393 static void
1394 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1395 {
1396 	struct spdk_reduce_vol *vol = req->vol;
1397 
1398 	req->backing_cb_args.cb_fn = next_fn;
1399 	req->backing_cb_args.cb_arg = req;
1400 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1401 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1402 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1403 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1404 	vol->backing_dev->decompress(vol->backing_dev,
1405 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1406 				     &req->backing_cb_args);
1407 }
1408 
1409 static void
1410 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1411 {
1412 	struct spdk_reduce_vol *vol = req->vol;
1413 	uint64_t chunk_offset, remainder = 0;
1414 	uint64_t ttl_len = 0;
1415 	size_t iov_len;
1416 	int i;
1417 
1418 	req->decomp_iovcnt = 0;
1419 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1420 
1421 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1422 	 * if at least one of the conditions below is true:
1423 	 * 1. User's buffer is fragmented
1424 	 * 2. Length of the user's buffer is less than the chunk
1425 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1426 	iov_len = req->iov[0].iov_len;
1427 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1428 				     req->iov[0].iov_len < vol->params.chunk_size ||
1429 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1430 	if (req->copy_after_decompress) {
1431 		req->decomp_iov[0].iov_base = req->decomp_buf;
1432 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1433 		req->decomp_iovcnt = 1;
1434 		goto decompress;
1435 	}
1436 
1437 	if (chunk_offset) {
1438 		/* first iov point to our scratch buffer for any offset into the chunk */
1439 		req->decomp_iov[0].iov_base = req->decomp_buf;
1440 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1441 		ttl_len += req->decomp_iov[0].iov_len;
1442 		req->decomp_iovcnt = 1;
1443 	}
1444 
1445 	/* now the user data iov, direct to the user buffer */
1446 	for (i = 0; i < req->iovcnt; i++) {
1447 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1448 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1449 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1450 	}
1451 	req->decomp_iovcnt += req->iovcnt;
1452 
1453 	/* send the rest of the chunk to our scratch buffer */
1454 	remainder = vol->params.chunk_size - ttl_len;
1455 	if (remainder) {
1456 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1457 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1458 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1459 		req->decomp_iovcnt++;
1460 	}
1461 	assert(ttl_len == vol->params.chunk_size);
1462 
1463 decompress:
1464 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1465 	req->backing_cb_args.cb_fn = next_fn;
1466 	req->backing_cb_args.cb_arg = req;
1467 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1468 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1469 	vol->backing_dev->decompress(vol->backing_dev,
1470 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1471 				     &req->backing_cb_args);
1472 }
1473 
1474 static inline void
1475 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1476 {
1477 	struct spdk_reduce_vol *vol = req->vol;
1478 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1479 	uint64_t chunk_offset, ttl_len = 0;
1480 	uint64_t remainder = 0;
1481 	char *copy_offset = NULL;
1482 	uint32_t lbsize = vol->params.logical_block_size;
1483 	int i;
1484 
1485 	req->decomp_iov[0].iov_base = req->decomp_buf;
1486 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1487 	req->decomp_iovcnt = 1;
1488 	copy_offset = req->decomp_iov[0].iov_base;
1489 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1490 
1491 	if (chunk_offset) {
1492 		ttl_len += chunk_offset * lbsize;
1493 		/* copy_offset already points to padding buffer if zero_paddings=false */
1494 		if (zero_paddings) {
1495 			memcpy(copy_offset, padding_buffer, ttl_len);
1496 		}
1497 		copy_offset += ttl_len;
1498 	}
1499 
1500 	/* now the user data iov, direct from the user buffer */
1501 	for (i = 0; i < req->iovcnt; i++) {
1502 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1503 		copy_offset += req->iov[i].iov_len;
1504 		ttl_len += req->iov[i].iov_len;
1505 	}
1506 
1507 	remainder = vol->params.chunk_size - ttl_len;
1508 	if (remainder) {
1509 		/* copy_offset already points to padding buffer if zero_paddings=false */
1510 		if (zero_paddings) {
1511 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1512 		}
1513 		ttl_len += remainder;
1514 	}
1515 
1516 	assert(ttl_len == req->vol->params.chunk_size);
1517 }
1518 
1519 /* This function can be called when we are compressing a new data or in case of read-modify-write
1520  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1521  * should point to already read and decompressed buffer */
1522 static inline void
1523 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1524 {
1525 	struct spdk_reduce_vol *vol = req->vol;
1526 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1527 	uint64_t chunk_offset, ttl_len = 0;
1528 	uint64_t remainder = 0;
1529 	uint32_t lbsize = vol->params.logical_block_size;
1530 	size_t iov_len;
1531 	int i;
1532 
1533 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1534 	 * if at least one of the conditions below is true:
1535 	 * 1. User's buffer is fragmented
1536 	 * 2. Length of the user's buffer is less than the chunk
1537 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1538 	iov_len = req->iov[0].iov_len;
1539 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1540 					  req->iov[0].iov_len < vol->params.chunk_size ||
1541 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1542 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1543 		return;
1544 	}
1545 
1546 	req->decomp_iovcnt = 0;
1547 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1548 
1549 	if (chunk_offset != 0) {
1550 		ttl_len += chunk_offset * lbsize;
1551 		req->decomp_iov[0].iov_base = padding_buffer;
1552 		req->decomp_iov[0].iov_len = ttl_len;
1553 		req->decomp_iovcnt = 1;
1554 	}
1555 
1556 	/* now the user data iov, direct from the user buffer */
1557 	for (i = 0; i < req->iovcnt; i++) {
1558 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1559 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1560 		ttl_len += req->iov[i].iov_len;
1561 	}
1562 	req->decomp_iovcnt += req->iovcnt;
1563 
1564 	remainder = vol->params.chunk_size - ttl_len;
1565 	if (remainder) {
1566 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1567 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1568 		req->decomp_iovcnt++;
1569 		ttl_len += remainder;
1570 	}
1571 	assert(ttl_len == req->vol->params.chunk_size);
1572 }
1573 
1574 static void
1575 _write_decompress_done(void *_req, int reduce_errno)
1576 {
1577 	struct spdk_reduce_vol_request *req = _req;
1578 
1579 	/* Negative reduce_errno indicates failure for compression operations. */
1580 	if (reduce_errno < 0) {
1581 		_reduce_vol_complete_req(req, reduce_errno);
1582 		return;
1583 	}
1584 
1585 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1586 	 * represents the output_size.
1587 	 */
1588 	if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
1589 		_reduce_vol_complete_req(req, -EIO);
1590 		return;
1591 	}
1592 
1593 	_prepare_compress_chunk(req, false);
1594 	_reduce_vol_compress_chunk(req, _write_compress_done);
1595 }
1596 
1597 static void
1598 _write_read_done(void *_req, int reduce_errno)
1599 {
1600 	struct spdk_reduce_vol_request *req = _req;
1601 
1602 	if (reduce_errno != 0) {
1603 		req->reduce_errno = reduce_errno;
1604 	}
1605 
1606 	assert(req->num_backing_ops > 0);
1607 	if (--req->num_backing_ops > 0) {
1608 		return;
1609 	}
1610 
1611 	if (req->reduce_errno != 0) {
1612 		_reduce_vol_complete_req(req, req->reduce_errno);
1613 		return;
1614 	}
1615 
1616 	if (req->chunk_is_compressed) {
1617 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1618 	} else {
1619 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1620 
1621 		_write_decompress_done(req, 0);
1622 	}
1623 }
1624 
1625 static void
1626 _read_decompress_done(void *_req, int reduce_errno)
1627 {
1628 	struct spdk_reduce_vol_request *req = _req;
1629 	struct spdk_reduce_vol *vol = req->vol;
1630 
1631 	/* Negative reduce_errno indicates failure for compression operations. */
1632 	if (reduce_errno < 0) {
1633 		_reduce_vol_complete_req(req, reduce_errno);
1634 		return;
1635 	}
1636 
1637 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1638 	 * represents the output_size.
1639 	 */
1640 	if (req->backing_cb_args.output_size != vol->params.chunk_size) {
1641 		_reduce_vol_complete_req(req, -EIO);
1642 		return;
1643 	}
1644 
1645 	if (req->copy_after_decompress) {
1646 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1647 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1648 		int i;
1649 
1650 		for (i = 0; i < req->iovcnt; i++) {
1651 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1652 			decomp_buffer += req->iov[i].iov_len;
1653 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1654 		}
1655 	}
1656 
1657 	_reduce_vol_complete_req(req, 0);
1658 }
1659 
1660 static void
1661 _read_read_done(void *_req, int reduce_errno)
1662 {
1663 	struct spdk_reduce_vol_request *req = _req;
1664 	uint64_t chunk_offset;
1665 	uint8_t *buf;
1666 	int i;
1667 
1668 	if (reduce_errno != 0) {
1669 		req->reduce_errno = reduce_errno;
1670 	}
1671 
1672 	assert(req->num_backing_ops > 0);
1673 	if (--req->num_backing_ops > 0) {
1674 		return;
1675 	}
1676 
1677 	if (req->reduce_errno != 0) {
1678 		_reduce_vol_complete_req(req, req->reduce_errno);
1679 		return;
1680 	}
1681 
1682 	if (req->chunk_is_compressed) {
1683 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1684 	} else {
1685 
1686 		/* If the chunk was compressed, the data would have been sent to the
1687 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1688 		 */
1689 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1690 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1691 		for (i = 0; i < req->iovcnt; i++) {
1692 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1693 			buf += req->iov[i].iov_len;
1694 		}
1695 
1696 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1697 
1698 		_read_decompress_done(req, 0);
1699 	}
1700 }
1701 
1702 static void
1703 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1704 {
1705 	struct spdk_reduce_vol *vol = req->vol;
1706 
1707 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1708 	assert(req->chunk_map_index != UINT32_MAX);
1709 
1710 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1711 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1712 			    vol->params.backing_io_unit_size);
1713 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1714 
1715 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1716 }
1717 
1718 static bool
1719 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1720 		    uint64_t length)
1721 {
1722 	uint64_t size = 0;
1723 	int i;
1724 
1725 	if (iovcnt > REDUCE_MAX_IOVECS) {
1726 		return false;
1727 	}
1728 
1729 	for (i = 0; i < iovcnt; i++) {
1730 		size += iov[i].iov_len;
1731 	}
1732 
1733 	return size == (length * vol->params.logical_block_size);
1734 }
1735 
1736 static bool
1737 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1738 {
1739 	struct spdk_reduce_vol_request *req;
1740 
1741 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1742 		if (logical_map_index == req->logical_map_index) {
1743 			return true;
1744 		}
1745 	}
1746 
1747 	return false;
1748 }
1749 
1750 static void
1751 _start_readv_request(struct spdk_reduce_vol_request *req)
1752 {
1753 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1754 	_reduce_vol_read_chunk(req, _read_read_done);
1755 }
1756 
1757 void
1758 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1759 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1760 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1761 {
1762 	struct spdk_reduce_vol_request *req;
1763 	uint64_t logical_map_index;
1764 	bool overlapped;
1765 	int i;
1766 
1767 	if (length == 0) {
1768 		cb_fn(cb_arg, 0);
1769 		return;
1770 	}
1771 
1772 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1773 		cb_fn(cb_arg, -EINVAL);
1774 		return;
1775 	}
1776 
1777 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1778 		cb_fn(cb_arg, -EINVAL);
1779 		return;
1780 	}
1781 
1782 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1783 	overlapped = _check_overlap(vol, logical_map_index);
1784 
1785 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1786 		/*
1787 		 * This chunk hasn't been allocated.  So treat the data as all
1788 		 * zeroes for this chunk - do the memset and immediately complete
1789 		 * the operation.
1790 		 */
1791 		for (i = 0; i < iovcnt; i++) {
1792 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1793 		}
1794 		cb_fn(cb_arg, 0);
1795 		return;
1796 	}
1797 
1798 	req = TAILQ_FIRST(&vol->free_requests);
1799 	if (req == NULL) {
1800 		cb_fn(cb_arg, -ENOMEM);
1801 		return;
1802 	}
1803 
1804 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1805 	req->type = REDUCE_IO_READV;
1806 	req->vol = vol;
1807 	req->iov = iov;
1808 	req->iovcnt = iovcnt;
1809 	req->offset = offset;
1810 	req->logical_map_index = logical_map_index;
1811 	req->length = length;
1812 	req->copy_after_decompress = false;
1813 	req->cb_fn = cb_fn;
1814 	req->cb_arg = cb_arg;
1815 
1816 	if (!overlapped) {
1817 		_start_readv_request(req);
1818 	} else {
1819 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1820 	}
1821 }
1822 
1823 static void
1824 _start_writev_request(struct spdk_reduce_vol_request *req)
1825 {
1826 	struct spdk_reduce_vol *vol = req->vol;
1827 
1828 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1829 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1830 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1831 			/* Read old chunk, then overwrite with data from this write
1832 			 *  operation.
1833 			 */
1834 			req->rmw = true;
1835 			_reduce_vol_read_chunk(req, _write_read_done);
1836 			return;
1837 		}
1838 	}
1839 
1840 	req->rmw = false;
1841 
1842 	_prepare_compress_chunk(req, true);
1843 	_reduce_vol_compress_chunk(req, _write_compress_done);
1844 }
1845 
1846 void
1847 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1848 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1849 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1850 {
1851 	struct spdk_reduce_vol_request *req;
1852 	uint64_t logical_map_index;
1853 	bool overlapped;
1854 
1855 	if (length == 0) {
1856 		cb_fn(cb_arg, 0);
1857 		return;
1858 	}
1859 
1860 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1861 		cb_fn(cb_arg, -EINVAL);
1862 		return;
1863 	}
1864 
1865 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1866 		cb_fn(cb_arg, -EINVAL);
1867 		return;
1868 	}
1869 
1870 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1871 	overlapped = _check_overlap(vol, logical_map_index);
1872 
1873 	req = TAILQ_FIRST(&vol->free_requests);
1874 	if (req == NULL) {
1875 		cb_fn(cb_arg, -ENOMEM);
1876 		return;
1877 	}
1878 
1879 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1880 	req->type = REDUCE_IO_WRITEV;
1881 	req->vol = vol;
1882 	req->iov = iov;
1883 	req->iovcnt = iovcnt;
1884 	req->offset = offset;
1885 	req->logical_map_index = logical_map_index;
1886 	req->length = length;
1887 	req->copy_after_decompress = false;
1888 	req->cb_fn = cb_fn;
1889 	req->cb_arg = cb_arg;
1890 
1891 	if (!overlapped) {
1892 		_start_writev_request(req);
1893 	} else {
1894 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1895 	}
1896 }
1897 
1898 const struct spdk_reduce_vol_params *
1899 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1900 {
1901 	return &vol->params;
1902 }
1903 
1904 const char *
1905 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
1906 {
1907 	return vol->pm_file.path;
1908 }
1909 
1910 void
1911 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1912 {
1913 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1914 	uint32_t struct_size;
1915 	uint64_t chunk_map_size;
1916 
1917 	SPDK_NOTICELOG("vol info:\n");
1918 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1919 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1920 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1921 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1922 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1923 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1924 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1925 		       vol->params.vol_size / vol->params.chunk_size);
1926 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1927 			vol->params.backing_io_unit_size);
1928 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1929 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1930 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1931 
1932 	SPDK_NOTICELOG("pmem info:\n");
1933 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1934 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1935 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1936 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1937 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1938 			   vol->params.chunk_size);
1939 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1940 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1941 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1942 			 vol->params.backing_io_unit_size);
1943 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1944 }
1945 
1946 SPDK_LOG_REGISTER_COMPONENT(reduce)
1947