xref: /spdk/lib/reduce/reduce.c (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/reduce.h"
38 #include "spdk/env.h"
39 #include "spdk/string.h"
40 #include "spdk/bit_array.h"
41 #include "spdk/util.h"
42 #include "spdk/log.h"
43 #include "spdk/memory.h"
44 
45 #include "libpmem.h"
46 
47 /* Always round up the size of the PM region to the nearest cacheline. */
48 #define REDUCE_PM_SIZE_ALIGNMENT	64
49 
50 /* Offset into the backing device where the persistent memory file's path is stored. */
51 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
52 
53 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
54 
55 #define REDUCE_NUM_VOL_REQUESTS	256
56 
57 /* Structure written to offset 0 of both the pm file and the backing device. */
58 struct spdk_reduce_vol_superblock {
59 	uint8_t				signature[8];
60 	struct spdk_reduce_vol_params	params;
61 	uint8_t				reserved[4048];
62 };
63 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
64 
65 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
66 /* null terminator counts one */
67 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
68 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
69 
70 #define REDUCE_PATH_MAX 4096
71 
72 #define REDUCE_ZERO_BUF_SIZE 0x100000
73 
74 /**
75  * Describes a persistent memory file used to hold metadata associated with a
76  *  compressed volume.
77  */
78 struct spdk_reduce_pm_file {
79 	char			path[REDUCE_PATH_MAX];
80 	void			*pm_buf;
81 	int			pm_is_pmem;
82 	uint64_t		size;
83 };
84 
85 #define REDUCE_IO_READV		1
86 #define REDUCE_IO_WRITEV	2
87 
88 struct spdk_reduce_chunk_map {
89 	uint32_t		compressed_size;
90 	uint32_t		reserved;
91 	uint64_t		io_unit_index[0];
92 };
93 
94 struct spdk_reduce_vol_request {
95 	/**
96 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
97 	 *   1) source buffer for compression operations
98 	 *   2) destination buffer for decompression operations
99 	 *   3) data buffer when writing uncompressed chunk to disk
100 	 *   4) data buffer when reading uncompressed chunk from disk
101 	 */
102 	uint8_t					*decomp_buf;
103 	struct iovec				*decomp_buf_iov;
104 
105 	/**
106 	 * These are used to construct the iovecs that are sent to
107 	 *  the decomp engine, they point to a mix of the scratch buffer
108 	 *  and user buffer
109 	 */
110 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
111 	int					decomp_iovcnt;
112 
113 	/**
114 	 *  Scratch buffer used for compressed chunk.  This is used for:
115 	 *   1) destination buffer for compression operations
116 	 *   2) source buffer for decompression operations
117 	 *   3) data buffer when writing compressed chunk to disk
118 	 *   4) data buffer when reading compressed chunk from disk
119 	 */
120 	uint8_t					*comp_buf;
121 	struct iovec				*comp_buf_iov;
122 	struct iovec				*iov;
123 	bool					rmw;
124 	struct spdk_reduce_vol			*vol;
125 	int					type;
126 	int					reduce_errno;
127 	int					iovcnt;
128 	int					num_backing_ops;
129 	uint32_t				num_io_units;
130 	bool					chunk_is_compressed;
131 	bool					copy_after_decompress;
132 	uint64_t				offset;
133 	uint64_t				logical_map_index;
134 	uint64_t				length;
135 	uint64_t				chunk_map_index;
136 	struct spdk_reduce_chunk_map		*chunk;
137 	spdk_reduce_vol_op_complete		cb_fn;
138 	void					*cb_arg;
139 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
140 	struct spdk_reduce_vol_cb_args		backing_cb_args;
141 };
142 
143 struct spdk_reduce_vol {
144 	struct spdk_reduce_vol_params		params;
145 	uint32_t				backing_io_units_per_chunk;
146 	uint32_t				backing_lba_per_io_unit;
147 	uint32_t				logical_blocks_per_chunk;
148 	struct spdk_reduce_pm_file		pm_file;
149 	struct spdk_reduce_backing_dev		*backing_dev;
150 	struct spdk_reduce_vol_superblock	*backing_super;
151 	struct spdk_reduce_vol_superblock	*pm_super;
152 	uint64_t				*pm_logical_map;
153 	uint64_t				*pm_chunk_maps;
154 
155 	struct spdk_bit_array			*allocated_chunk_maps;
156 	struct spdk_bit_array			*allocated_backing_io_units;
157 
158 	struct spdk_reduce_vol_request		*request_mem;
159 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
160 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
161 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
162 
163 	/* Single contiguous buffer used for all request buffers for this volume. */
164 	uint8_t					*buf_mem;
165 	struct iovec				*buf_iov_mem;
166 };
167 
168 static void _start_readv_request(struct spdk_reduce_vol_request *req);
169 static void _start_writev_request(struct spdk_reduce_vol_request *req);
170 static uint8_t *g_zero_buf;
171 static int g_vol_count = 0;
172 
173 /*
174  * Allocate extra metadata chunks and corresponding backing io units to account for
175  *  outstanding IO in worst case scenario where logical map is completely allocated
176  *  and no data can be compressed.  We need extra chunks in this case to handle
177  *  in-flight writes since reduce never writes data in place.
178  */
179 #define REDUCE_NUM_EXTRA_CHUNKS 128
180 
181 static void
182 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
183 {
184 	if (vol->pm_file.pm_is_pmem) {
185 		pmem_persist(addr, len);
186 	} else {
187 		pmem_msync(addr, len);
188 	}
189 }
190 
191 static uint64_t
192 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
193 {
194 	uint64_t chunks_in_logical_map, logical_map_size;
195 
196 	chunks_in_logical_map = vol_size / chunk_size;
197 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
198 
199 	/* Round up to next cacheline. */
200 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
201 	       REDUCE_PM_SIZE_ALIGNMENT;
202 }
203 
204 static uint64_t
205 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
206 {
207 	uint64_t num_chunks;
208 
209 	num_chunks = vol_size / chunk_size;
210 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
211 
212 	return num_chunks;
213 }
214 
215 static inline uint32_t
216 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
217 {
218 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
219 }
220 
221 static uint64_t
222 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
223 {
224 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
225 
226 	num_chunks = _get_total_chunks(vol_size, chunk_size);
227 	io_units_per_chunk = chunk_size / backing_io_unit_size;
228 
229 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
230 
231 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
232 	       REDUCE_PM_SIZE_ALIGNMENT;
233 }
234 
235 static struct spdk_reduce_chunk_map *
236 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
237 {
238 	uintptr_t chunk_map_addr;
239 
240 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
241 
242 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
243 	chunk_map_addr += chunk_map_index *
244 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
245 
246 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
247 }
248 
249 static int
250 _validate_vol_params(struct spdk_reduce_vol_params *params)
251 {
252 	if (params->vol_size > 0) {
253 		/**
254 		 * User does not pass in the vol size - it gets calculated by libreduce from
255 		 *  values in this structure plus the size of the backing device.
256 		 */
257 		return -EINVAL;
258 	}
259 
260 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
261 	    params->logical_block_size == 0) {
262 		return -EINVAL;
263 	}
264 
265 	/* Chunk size must be an even multiple of the backing io unit size. */
266 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
267 		return -EINVAL;
268 	}
269 
270 	/* Chunk size must be an even multiple of the logical block size. */
271 	if ((params->chunk_size % params->logical_block_size) != 0) {
272 		return -1;
273 	}
274 
275 	return 0;
276 }
277 
278 static uint64_t
279 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
280 {
281 	uint64_t num_chunks;
282 
283 	num_chunks = backing_dev_size / chunk_size;
284 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
285 		return 0;
286 	}
287 
288 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
289 	return num_chunks * chunk_size;
290 }
291 
292 static uint64_t
293 _get_pm_file_size(struct spdk_reduce_vol_params *params)
294 {
295 	uint64_t total_pm_size;
296 
297 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
298 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
299 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
300 			 params->backing_io_unit_size);
301 	return total_pm_size;
302 }
303 
304 const struct spdk_uuid *
305 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
306 {
307 	return &vol->params.uuid;
308 }
309 
310 static void
311 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
312 {
313 	uint64_t logical_map_size;
314 
315 	/* Superblock is at the beginning of the pm file. */
316 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
317 
318 	/* Logical map immediately follows the super block. */
319 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
320 
321 	/* Chunks maps follow the logical map. */
322 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
323 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
324 }
325 
326 /* We need 2 iovs during load - one for the superblock, another for the path */
327 #define LOAD_IOV_COUNT	2
328 
329 struct reduce_init_load_ctx {
330 	struct spdk_reduce_vol			*vol;
331 	struct spdk_reduce_vol_cb_args		backing_cb_args;
332 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
333 	void					*cb_arg;
334 	struct iovec				iov[LOAD_IOV_COUNT];
335 	void					*path;
336 };
337 
338 static inline bool
339 _addr_crosses_huge_page(const void *addr, size_t *size)
340 {
341 	size_t _size;
342 	uint64_t rc;
343 
344 	assert(size);
345 
346 	_size = *size;
347 	rc = spdk_vtophys(addr, size);
348 
349 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
350 }
351 
352 static inline int
353 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
354 {
355 	uint8_t *addr;
356 	size_t size_tmp = buffer_size;
357 
358 	addr = *_addr;
359 
360 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
361 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
362 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
363 		 * Skip remaining bytes and continue from the beginning of the next page */
364 		addr += size_tmp;
365 	}
366 
367 	if (addr + buffer_size > addr_range) {
368 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
369 		return -ERANGE;
370 	}
371 
372 	*vol_buffer = addr;
373 	*_addr = addr + buffer_size;
374 
375 	return 0;
376 }
377 
378 static int
379 _allocate_vol_requests(struct spdk_reduce_vol *vol)
380 {
381 	struct spdk_reduce_vol_request *req;
382 	uint32_t reqs_in_2mb_page, huge_pages_needed;
383 	uint8_t *buffer, *buffer_end;
384 	int i = 0;
385 	int rc = 0;
386 
387 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
388 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
389 	* necessarily power of 2
390 	* Allocate 2x since we need buffers for both read/write and compress/decompress
391 	* intermediate buffers. */
392 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
393 	if (!reqs_in_2mb_page) {
394 		return -EINVAL;
395 	}
396 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
397 
398 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
399 	if (vol->buf_mem == NULL) {
400 		return -ENOMEM;
401 	}
402 
403 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
404 	if (vol->request_mem == NULL) {
405 		spdk_free(vol->buf_mem);
406 		vol->buf_mem = NULL;
407 		return -ENOMEM;
408 	}
409 
410 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
411 	 *  buffers.
412 	 */
413 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
414 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
415 	if (vol->buf_iov_mem == NULL) {
416 		free(vol->request_mem);
417 		spdk_free(vol->buf_mem);
418 		vol->request_mem = NULL;
419 		vol->buf_mem = NULL;
420 		return -ENOMEM;
421 	}
422 
423 	buffer = vol->buf_mem;
424 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
425 
426 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
427 		req = &vol->request_mem[i];
428 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
429 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
430 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
431 
432 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
433 		if (rc) {
434 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
435 				    vol->buf_mem, buffer_end);
436 			break;
437 		}
438 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
439 		if (rc) {
440 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
441 				    vol->buf_mem, buffer_end);
442 			break;
443 		}
444 	}
445 
446 	if (rc) {
447 		free(vol->buf_iov_mem);
448 		free(vol->request_mem);
449 		spdk_free(vol->buf_mem);
450 		vol->buf_mem = NULL;
451 		vol->buf_iov_mem = NULL;
452 		vol->request_mem = NULL;
453 	}
454 
455 	return rc;
456 }
457 
458 static void
459 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
460 {
461 	if (ctx != NULL) {
462 		spdk_free(ctx->path);
463 		free(ctx);
464 	}
465 
466 	if (vol != NULL) {
467 		if (vol->pm_file.pm_buf != NULL) {
468 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
469 		}
470 
471 		spdk_free(vol->backing_super);
472 		spdk_bit_array_free(&vol->allocated_chunk_maps);
473 		spdk_bit_array_free(&vol->allocated_backing_io_units);
474 		free(vol->request_mem);
475 		free(vol->buf_iov_mem);
476 		spdk_free(vol->buf_mem);
477 		free(vol);
478 	}
479 }
480 
481 static int
482 _alloc_zero_buff(void)
483 {
484 	int rc = 0;
485 
486 	/* The zero buffer is shared between all volumes and just used
487 	 * for reads so allocate one global instance here if not already
488 	 * allocated when another vol init'd or loaded.
489 	 */
490 	if (g_vol_count++ == 0) {
491 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
492 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
493 					  SPDK_MALLOC_DMA);
494 		if (g_zero_buf == NULL) {
495 			rc = -ENOMEM;
496 		}
497 	}
498 	return rc;
499 }
500 
501 static void
502 _init_write_super_cpl(void *cb_arg, int reduce_errno)
503 {
504 	struct reduce_init_load_ctx *init_ctx = cb_arg;
505 	int rc;
506 
507 	rc = _allocate_vol_requests(init_ctx->vol);
508 	if (rc != 0) {
509 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
510 		_init_load_cleanup(init_ctx->vol, init_ctx);
511 		return;
512 	}
513 
514 	rc = _alloc_zero_buff();
515 	if (rc != 0) {
516 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
517 		_init_load_cleanup(init_ctx->vol, init_ctx);
518 		return;
519 	}
520 
521 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
522 	/* Only clean up the ctx - the vol has been passed to the application
523 	 *  for use now that initialization was successful.
524 	 */
525 	_init_load_cleanup(NULL, init_ctx);
526 }
527 
528 static void
529 _init_write_path_cpl(void *cb_arg, int reduce_errno)
530 {
531 	struct reduce_init_load_ctx *init_ctx = cb_arg;
532 	struct spdk_reduce_vol *vol = init_ctx->vol;
533 
534 	init_ctx->iov[0].iov_base = vol->backing_super;
535 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
536 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
537 	init_ctx->backing_cb_args.cb_arg = init_ctx;
538 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
539 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
540 				 &init_ctx->backing_cb_args);
541 }
542 
543 static int
544 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
545 {
546 	uint64_t total_chunks, total_backing_io_units;
547 	uint32_t i, num_metadata_io_units;
548 
549 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
550 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
551 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
552 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
553 
554 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
555 		return -ENOMEM;
556 	}
557 
558 	/* Set backing io unit bits associated with metadata. */
559 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
560 				vol->backing_dev->blocklen;
561 	for (i = 0; i < num_metadata_io_units; i++) {
562 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
563 	}
564 
565 	return 0;
566 }
567 
568 void
569 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
570 		     struct spdk_reduce_backing_dev *backing_dev,
571 		     const char *pm_file_dir,
572 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
573 {
574 	struct spdk_reduce_vol *vol;
575 	struct reduce_init_load_ctx *init_ctx;
576 	uint64_t backing_dev_size;
577 	size_t mapped_len;
578 	int dir_len, max_dir_len, rc;
579 
580 	/* We need to append a path separator and the UUID to the supplied
581 	 * path.
582 	 */
583 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
584 	dir_len = strnlen(pm_file_dir, max_dir_len);
585 	/* Strip trailing slash if the user provided one - we will add it back
586 	 * later when appending the filename.
587 	 */
588 	if (pm_file_dir[dir_len - 1] == '/') {
589 		dir_len--;
590 	}
591 	if (dir_len == max_dir_len) {
592 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
593 		cb_fn(cb_arg, NULL, -EINVAL);
594 		return;
595 	}
596 
597 	rc = _validate_vol_params(params);
598 	if (rc != 0) {
599 		SPDK_ERRLOG("invalid vol params\n");
600 		cb_fn(cb_arg, NULL, rc);
601 		return;
602 	}
603 
604 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
605 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
606 	if (params->vol_size == 0) {
607 		SPDK_ERRLOG("backing device is too small\n");
608 		cb_fn(cb_arg, NULL, -EINVAL);
609 		return;
610 	}
611 
612 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
613 	    backing_dev->unmap == NULL) {
614 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
615 		cb_fn(cb_arg, NULL, -EINVAL);
616 		return;
617 	}
618 
619 	vol = calloc(1, sizeof(*vol));
620 	if (vol == NULL) {
621 		cb_fn(cb_arg, NULL, -ENOMEM);
622 		return;
623 	}
624 
625 	TAILQ_INIT(&vol->free_requests);
626 	TAILQ_INIT(&vol->executing_requests);
627 	TAILQ_INIT(&vol->queued_requests);
628 
629 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
630 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
631 	if (vol->backing_super == NULL) {
632 		cb_fn(cb_arg, NULL, -ENOMEM);
633 		_init_load_cleanup(vol, NULL);
634 		return;
635 	}
636 
637 	init_ctx = calloc(1, sizeof(*init_ctx));
638 	if (init_ctx == NULL) {
639 		cb_fn(cb_arg, NULL, -ENOMEM);
640 		_init_load_cleanup(vol, NULL);
641 		return;
642 	}
643 
644 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
645 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
646 	if (init_ctx->path == NULL) {
647 		cb_fn(cb_arg, NULL, -ENOMEM);
648 		_init_load_cleanup(vol, init_ctx);
649 		return;
650 	}
651 
652 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
653 		spdk_uuid_generate(&params->uuid);
654 	}
655 
656 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
657 	vol->pm_file.path[dir_len] = '/';
658 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
659 			    &params->uuid);
660 	vol->pm_file.size = _get_pm_file_size(params);
661 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
662 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
663 					    &mapped_len, &vol->pm_file.pm_is_pmem);
664 	if (vol->pm_file.pm_buf == NULL) {
665 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
666 			    vol->pm_file.path, strerror(errno));
667 		cb_fn(cb_arg, NULL, -errno);
668 		_init_load_cleanup(vol, init_ctx);
669 		return;
670 	}
671 
672 	if (vol->pm_file.size != mapped_len) {
673 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
674 			    vol->pm_file.size, mapped_len);
675 		cb_fn(cb_arg, NULL, -ENOMEM);
676 		_init_load_cleanup(vol, init_ctx);
677 		return;
678 	}
679 
680 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
681 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
682 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
683 	memcpy(&vol->params, params, sizeof(*params));
684 
685 	vol->backing_dev = backing_dev;
686 
687 	rc = _allocate_bit_arrays(vol);
688 	if (rc != 0) {
689 		cb_fn(cb_arg, NULL, rc);
690 		_init_load_cleanup(vol, init_ctx);
691 		return;
692 	}
693 
694 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
695 	       sizeof(vol->backing_super->signature));
696 	memcpy(&vol->backing_super->params, params, sizeof(*params));
697 
698 	_initialize_vol_pm_pointers(vol);
699 
700 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
701 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
702 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
703 	 */
704 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
705 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
706 
707 	init_ctx->vol = vol;
708 	init_ctx->cb_fn = cb_fn;
709 	init_ctx->cb_arg = cb_arg;
710 
711 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
712 	init_ctx->iov[0].iov_base = init_ctx->path;
713 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
714 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
715 	init_ctx->backing_cb_args.cb_arg = init_ctx;
716 	/* Write path to offset 4K on backing device - just after where the super
717 	 *  block will be written.  We wait until this is committed before writing the
718 	 *  super block to guarantee we don't get the super block written without the
719 	 *  the path if the system crashed in the middle of a write operation.
720 	 */
721 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
722 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
723 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
724 				 &init_ctx->backing_cb_args);
725 }
726 
727 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
728 
729 static void
730 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
731 {
732 	struct reduce_init_load_ctx *load_ctx = cb_arg;
733 	struct spdk_reduce_vol *vol = load_ctx->vol;
734 	uint64_t backing_dev_size;
735 	uint64_t i, num_chunks, logical_map_index;
736 	struct spdk_reduce_chunk_map *chunk;
737 	size_t mapped_len;
738 	uint32_t j;
739 	int rc;
740 
741 	rc = _alloc_zero_buff();
742 	if (rc) {
743 		goto error;
744 	}
745 
746 	if (memcmp(vol->backing_super->signature,
747 		   SPDK_REDUCE_SIGNATURE,
748 		   sizeof(vol->backing_super->signature)) != 0) {
749 		/* This backing device isn't a libreduce backing device. */
750 		rc = -EILSEQ;
751 		goto error;
752 	}
753 
754 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
755 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
756 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
757 	 *  persistent memory file if it exists.
758 	 */
759 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
760 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
761 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
762 		_init_load_cleanup(NULL, load_ctx);
763 		return;
764 	}
765 
766 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
767 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
768 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
769 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
770 
771 	rc = _allocate_bit_arrays(vol);
772 	if (rc != 0) {
773 		goto error;
774 	}
775 
776 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
777 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
778 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
779 			    backing_dev_size);
780 		rc = -EILSEQ;
781 		goto error;
782 	}
783 
784 	vol->pm_file.size = _get_pm_file_size(&vol->params);
785 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
786 					    &vol->pm_file.pm_is_pmem);
787 	if (vol->pm_file.pm_buf == NULL) {
788 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
789 		rc = -errno;
790 		goto error;
791 	}
792 
793 	if (vol->pm_file.size != mapped_len) {
794 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
795 			    vol->pm_file.size, mapped_len);
796 		rc = -ENOMEM;
797 		goto error;
798 	}
799 
800 	rc = _allocate_vol_requests(vol);
801 	if (rc != 0) {
802 		goto error;
803 	}
804 
805 	_initialize_vol_pm_pointers(vol);
806 
807 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
808 	for (i = 0; i < num_chunks; i++) {
809 		logical_map_index = vol->pm_logical_map[i];
810 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
811 			continue;
812 		}
813 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
814 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
815 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
816 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
817 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
818 			}
819 		}
820 	}
821 
822 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
823 	/* Only clean up the ctx - the vol has been passed to the application
824 	 *  for use now that volume load was successful.
825 	 */
826 	_init_load_cleanup(NULL, load_ctx);
827 	return;
828 
829 error:
830 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
831 	_init_load_cleanup(vol, load_ctx);
832 }
833 
834 void
835 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
836 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
837 {
838 	struct spdk_reduce_vol *vol;
839 	struct reduce_init_load_ctx *load_ctx;
840 
841 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
842 	    backing_dev->unmap == NULL) {
843 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
844 		cb_fn(cb_arg, NULL, -EINVAL);
845 		return;
846 	}
847 
848 	vol = calloc(1, sizeof(*vol));
849 	if (vol == NULL) {
850 		cb_fn(cb_arg, NULL, -ENOMEM);
851 		return;
852 	}
853 
854 	TAILQ_INIT(&vol->free_requests);
855 	TAILQ_INIT(&vol->executing_requests);
856 	TAILQ_INIT(&vol->queued_requests);
857 
858 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
859 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
860 	if (vol->backing_super == NULL) {
861 		_init_load_cleanup(vol, NULL);
862 		cb_fn(cb_arg, NULL, -ENOMEM);
863 		return;
864 	}
865 
866 	vol->backing_dev = backing_dev;
867 
868 	load_ctx = calloc(1, sizeof(*load_ctx));
869 	if (load_ctx == NULL) {
870 		_init_load_cleanup(vol, NULL);
871 		cb_fn(cb_arg, NULL, -ENOMEM);
872 		return;
873 	}
874 
875 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
876 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
877 	if (load_ctx->path == NULL) {
878 		_init_load_cleanup(vol, load_ctx);
879 		cb_fn(cb_arg, NULL, -ENOMEM);
880 		return;
881 	}
882 
883 	load_ctx->vol = vol;
884 	load_ctx->cb_fn = cb_fn;
885 	load_ctx->cb_arg = cb_arg;
886 
887 	load_ctx->iov[0].iov_base = vol->backing_super;
888 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
889 	load_ctx->iov[1].iov_base = load_ctx->path;
890 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
891 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
892 	load_ctx->backing_cb_args.cb_arg = load_ctx;
893 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
894 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
895 				vol->backing_dev->blocklen,
896 				&load_ctx->backing_cb_args);
897 }
898 
899 void
900 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
901 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
902 {
903 	if (vol == NULL) {
904 		/* This indicates a programming error. */
905 		assert(false);
906 		cb_fn(cb_arg, -EINVAL);
907 		return;
908 	}
909 
910 	if (--g_vol_count == 0) {
911 		spdk_free(g_zero_buf);
912 	}
913 	assert(g_vol_count >= 0);
914 	_init_load_cleanup(vol, NULL);
915 	cb_fn(cb_arg, 0);
916 }
917 
918 struct reduce_destroy_ctx {
919 	spdk_reduce_vol_op_complete		cb_fn;
920 	void					*cb_arg;
921 	struct spdk_reduce_vol			*vol;
922 	struct spdk_reduce_vol_superblock	*super;
923 	struct iovec				iov;
924 	struct spdk_reduce_vol_cb_args		backing_cb_args;
925 	int					reduce_errno;
926 	char					pm_path[REDUCE_PATH_MAX];
927 };
928 
929 static void
930 destroy_unload_cpl(void *cb_arg, int reduce_errno)
931 {
932 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
933 
934 	if (destroy_ctx->reduce_errno == 0) {
935 		if (unlink(destroy_ctx->pm_path)) {
936 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
937 				    destroy_ctx->pm_path, strerror(errno));
938 		}
939 	}
940 
941 	/* Even if the unload somehow failed, we still pass the destroy_ctx
942 	 * reduce_errno since that indicates whether or not the volume was
943 	 * actually destroyed.
944 	 */
945 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
946 	spdk_free(destroy_ctx->super);
947 	free(destroy_ctx);
948 }
949 
950 static void
951 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
952 {
953 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
954 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
955 
956 	destroy_ctx->reduce_errno = reduce_errno;
957 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
958 }
959 
960 static void
961 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
962 {
963 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
964 
965 	if (reduce_errno != 0) {
966 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
967 		spdk_free(destroy_ctx->super);
968 		free(destroy_ctx);
969 		return;
970 	}
971 
972 	destroy_ctx->vol = vol;
973 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
974 	destroy_ctx->iov.iov_base = destroy_ctx->super;
975 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
976 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
977 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
978 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
979 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
980 				 &destroy_ctx->backing_cb_args);
981 }
982 
983 void
984 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
985 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
986 {
987 	struct reduce_destroy_ctx *destroy_ctx;
988 
989 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
990 	if (destroy_ctx == NULL) {
991 		cb_fn(cb_arg, -ENOMEM);
992 		return;
993 	}
994 
995 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
996 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
997 	if (destroy_ctx->super == NULL) {
998 		free(destroy_ctx);
999 		cb_fn(cb_arg, -ENOMEM);
1000 		return;
1001 	}
1002 	destroy_ctx->cb_fn = cb_fn;
1003 	destroy_ctx->cb_arg = cb_arg;
1004 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
1005 }
1006 
1007 static bool
1008 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
1009 {
1010 	uint64_t start_chunk, end_chunk;
1011 
1012 	start_chunk = offset / vol->logical_blocks_per_chunk;
1013 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
1014 
1015 	return (start_chunk != end_chunk);
1016 }
1017 
1018 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
1019 
1020 static void
1021 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
1022 {
1023 	struct spdk_reduce_vol_request *next_req;
1024 	struct spdk_reduce_vol *vol = req->vol;
1025 
1026 	req->cb_fn(req->cb_arg, reduce_errno);
1027 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
1028 
1029 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1030 		if (next_req->logical_map_index == req->logical_map_index) {
1031 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1032 			if (next_req->type == REDUCE_IO_READV) {
1033 				_start_readv_request(next_req);
1034 			} else {
1035 				assert(next_req->type == REDUCE_IO_WRITEV);
1036 				_start_writev_request(next_req);
1037 			}
1038 			break;
1039 		}
1040 	}
1041 
1042 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1043 }
1044 
1045 static void
1046 _write_write_done(void *_req, int reduce_errno)
1047 {
1048 	struct spdk_reduce_vol_request *req = _req;
1049 	struct spdk_reduce_vol *vol = req->vol;
1050 	uint64_t old_chunk_map_index;
1051 	struct spdk_reduce_chunk_map *old_chunk;
1052 	uint32_t i;
1053 
1054 	if (reduce_errno != 0) {
1055 		req->reduce_errno = reduce_errno;
1056 	}
1057 
1058 	assert(req->num_backing_ops > 0);
1059 	if (--req->num_backing_ops > 0) {
1060 		return;
1061 	}
1062 
1063 	if (req->reduce_errno != 0) {
1064 		_reduce_vol_complete_req(req, req->reduce_errno);
1065 		return;
1066 	}
1067 
1068 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1069 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1070 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
1071 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1072 			if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
1073 				break;
1074 			}
1075 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
1076 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
1077 			old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1078 		}
1079 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
1080 	}
1081 
1082 	/*
1083 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1084 	 * becomes invalid after we update the logical map, since the old chunk map will no
1085 	 * longer have a reference to it in the logical map.
1086 	 */
1087 
1088 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1089 	_reduce_persist(vol, req->chunk,
1090 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1091 
1092 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1093 
1094 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1095 
1096 	_reduce_vol_complete_req(req, 0);
1097 }
1098 
1099 static void
1100 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1101 		   reduce_request_fn next_fn, bool is_write)
1102 {
1103 	struct iovec *iov;
1104 	uint8_t *buf;
1105 	uint32_t i;
1106 
1107 	if (req->chunk_is_compressed) {
1108 		iov = req->comp_buf_iov;
1109 		buf = req->comp_buf;
1110 	} else {
1111 		iov = req->decomp_buf_iov;
1112 		buf = req->decomp_buf;
1113 	}
1114 
1115 	req->num_backing_ops = req->num_io_units;
1116 	req->backing_cb_args.cb_fn = next_fn;
1117 	req->backing_cb_args.cb_arg = req;
1118 	for (i = 0; i < req->num_io_units; i++) {
1119 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1120 		iov[i].iov_len = vol->params.backing_io_unit_size;
1121 		if (is_write) {
1122 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1123 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1124 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1125 		} else {
1126 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1127 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1128 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
1129 		}
1130 	}
1131 }
1132 
1133 static void
1134 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1135 			uint32_t compressed_size)
1136 {
1137 	struct spdk_reduce_vol *vol = req->vol;
1138 	uint32_t i;
1139 	uint64_t chunk_offset, remainder, total_len = 0;
1140 	uint8_t *buf;
1141 	int j;
1142 
1143 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1144 
1145 	/* TODO: fail if no chunk map found - but really this should not happen if we
1146 	 * size the number of requests similarly to number of extra chunk maps
1147 	 */
1148 	assert(req->chunk_map_index != UINT32_MAX);
1149 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1150 
1151 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1152 	req->num_io_units = spdk_divide_round_up(compressed_size,
1153 			    vol->params.backing_io_unit_size);
1154 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1155 	req->chunk->compressed_size =
1156 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1157 
1158 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1159 	if (req->chunk_is_compressed == false) {
1160 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1161 		buf = req->decomp_buf;
1162 		total_len = chunk_offset * vol->params.logical_block_size;
1163 
1164 		/* zero any offset into chunk */
1165 		if (req->rmw == false && chunk_offset) {
1166 			memset(buf, 0, total_len);
1167 		}
1168 		buf += total_len;
1169 
1170 		/* copy the data */
1171 		for (j = 0; j < req->iovcnt; j++) {
1172 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1173 			buf += req->iov[j].iov_len;
1174 			total_len += req->iov[j].iov_len;
1175 		}
1176 
1177 		/* zero any remainder */
1178 		remainder = vol->params.chunk_size - total_len;
1179 		total_len += remainder;
1180 		if (req->rmw == false && remainder) {
1181 			memset(buf, 0, remainder);
1182 		}
1183 		assert(total_len == vol->params.chunk_size);
1184 	}
1185 
1186 	for (i = 0; i < req->num_io_units; i++) {
1187 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1188 		/* TODO: fail if no backing block found - but really this should also not
1189 		 * happen (see comment above).
1190 		 */
1191 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1192 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1193 	}
1194 
1195 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1196 }
1197 
1198 static void
1199 _write_compress_done(void *_req, int reduce_errno)
1200 {
1201 	struct spdk_reduce_vol_request *req = _req;
1202 
1203 	/* Negative reduce_errno indicates failure for compression operations.
1204 	 * Just write the uncompressed data instead.  Force this to happen
1205 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1206 	 * When it sees the data couldn't be compressed, it will just write
1207 	 * the uncompressed buffer to disk.
1208 	 */
1209 	if (reduce_errno < 0) {
1210 		reduce_errno = req->vol->params.chunk_size;
1211 	}
1212 
1213 	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
1214 	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1215 }
1216 
1217 static void
1218 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1219 {
1220 	struct spdk_reduce_vol *vol = req->vol;
1221 
1222 	req->backing_cb_args.cb_fn = next_fn;
1223 	req->backing_cb_args.cb_arg = req;
1224 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1225 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1226 	vol->backing_dev->compress(vol->backing_dev,
1227 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1228 				   &req->backing_cb_args);
1229 }
1230 
1231 static void
1232 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1233 {
1234 	struct spdk_reduce_vol *vol = req->vol;
1235 
1236 	req->backing_cb_args.cb_fn = next_fn;
1237 	req->backing_cb_args.cb_arg = req;
1238 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1239 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1240 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1241 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1242 	vol->backing_dev->decompress(vol->backing_dev,
1243 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1244 				     &req->backing_cb_args);
1245 }
1246 
1247 static void
1248 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1249 {
1250 	struct spdk_reduce_vol *vol = req->vol;
1251 	uint64_t chunk_offset, remainder = 0;
1252 	uint64_t ttl_len = 0;
1253 	size_t iov_len;
1254 	int i;
1255 
1256 	req->decomp_iovcnt = 0;
1257 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1258 
1259 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1260 	 * if at least one of the conditions below is true:
1261 	 * 1. User's buffer is fragmented
1262 	 * 2. Length of the user's buffer is less than the chunk
1263 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1264 	iov_len = req->iov[0].iov_len;
1265 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1266 				     req->iov[0].iov_len < vol->params.chunk_size ||
1267 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1268 	if (req->copy_after_decompress) {
1269 		req->decomp_iov[0].iov_base = req->decomp_buf;
1270 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1271 		req->decomp_iovcnt = 1;
1272 		goto decompress;
1273 	}
1274 
1275 	if (chunk_offset) {
1276 		/* first iov point to our scratch buffer for any offset into the chunk */
1277 		req->decomp_iov[0].iov_base = req->decomp_buf;
1278 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1279 		ttl_len += req->decomp_iov[0].iov_len;
1280 		req->decomp_iovcnt = 1;
1281 	}
1282 
1283 	/* now the user data iov, direct to the user buffer */
1284 	for (i = 0; i < req->iovcnt; i++) {
1285 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1286 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1287 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1288 	}
1289 	req->decomp_iovcnt += req->iovcnt;
1290 
1291 	/* send the rest of the chunk to our scratch buffer */
1292 	remainder = vol->params.chunk_size - ttl_len;
1293 	if (remainder) {
1294 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1295 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1296 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1297 		req->decomp_iovcnt++;
1298 	}
1299 	assert(ttl_len == vol->params.chunk_size);
1300 
1301 decompress:
1302 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1303 	req->backing_cb_args.cb_fn = next_fn;
1304 	req->backing_cb_args.cb_arg = req;
1305 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1306 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1307 	vol->backing_dev->decompress(vol->backing_dev,
1308 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1309 				     &req->backing_cb_args);
1310 }
1311 
1312 static inline void
1313 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1314 {
1315 	struct spdk_reduce_vol *vol = req->vol;
1316 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1317 	uint64_t chunk_offset, ttl_len = 0;
1318 	uint64_t remainder = 0;
1319 	char *copy_offset = NULL;
1320 	uint32_t lbsize = vol->params.logical_block_size;
1321 	int i;
1322 
1323 	req->decomp_iov[0].iov_base = req->decomp_buf;
1324 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1325 	req->decomp_iovcnt = 1;
1326 	copy_offset = req->decomp_iov[0].iov_base;
1327 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1328 
1329 	if (chunk_offset) {
1330 		ttl_len += chunk_offset * lbsize;
1331 		/* copy_offset already points to padding buffer if zero_paddings=false */
1332 		if (zero_paddings) {
1333 			memcpy(copy_offset, padding_buffer, ttl_len);
1334 		}
1335 		copy_offset += ttl_len;
1336 	}
1337 
1338 	/* now the user data iov, direct from the user buffer */
1339 	for (i = 0; i < req->iovcnt; i++) {
1340 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1341 		copy_offset += req->iov[i].iov_len;
1342 		ttl_len += req->iov[i].iov_len;
1343 	}
1344 
1345 	remainder = vol->params.chunk_size - ttl_len;
1346 	if (remainder) {
1347 		/* copy_offset already points to padding buffer if zero_paddings=false */
1348 		if (zero_paddings) {
1349 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1350 		}
1351 		ttl_len += remainder;
1352 	}
1353 
1354 	assert(ttl_len == req->vol->params.chunk_size);
1355 }
1356 
1357 /* This function can be called when we are compressing a new data or in case of read-modify-write
1358  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1359  * should point to already read and decompressed buffer */
1360 static inline void
1361 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1362 {
1363 	struct spdk_reduce_vol *vol = req->vol;
1364 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1365 	uint64_t chunk_offset, ttl_len = 0;
1366 	uint64_t remainder = 0;
1367 	uint32_t lbsize = vol->params.logical_block_size;
1368 	size_t iov_len;
1369 	int i;
1370 
1371 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1372 	 * if at least one of the conditions below is true:
1373 	 * 1. User's buffer is fragmented
1374 	 * 2. Length of the user's buffer is less than the chunk
1375 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1376 	iov_len = req->iov[0].iov_len;
1377 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1378 					  req->iov[0].iov_len < vol->params.chunk_size ||
1379 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1380 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1381 		return;
1382 	}
1383 
1384 	req->decomp_iovcnt = 0;
1385 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1386 
1387 	if (chunk_offset != 0) {
1388 		ttl_len += chunk_offset * lbsize;
1389 		req->decomp_iov[0].iov_base = padding_buffer;
1390 		req->decomp_iov[0].iov_len = ttl_len;
1391 		req->decomp_iovcnt = 1;
1392 	}
1393 
1394 	/* now the user data iov, direct from the user buffer */
1395 	for (i = 0; i < req->iovcnt; i++) {
1396 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1397 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1398 		ttl_len += req->iov[i].iov_len;
1399 	}
1400 	req->decomp_iovcnt += req->iovcnt;
1401 
1402 	remainder = vol->params.chunk_size - ttl_len;
1403 	if (remainder) {
1404 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1405 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1406 		req->decomp_iovcnt++;
1407 		ttl_len += remainder;
1408 	}
1409 	assert(ttl_len == req->vol->params.chunk_size);
1410 }
1411 
1412 static void
1413 _write_decompress_done(void *_req, int reduce_errno)
1414 {
1415 	struct spdk_reduce_vol_request *req = _req;
1416 
1417 	/* Negative reduce_errno indicates failure for compression operations. */
1418 	if (reduce_errno < 0) {
1419 		_reduce_vol_complete_req(req, reduce_errno);
1420 		return;
1421 	}
1422 
1423 	/* Positive reduce_errno indicates number of bytes in decompressed
1424 	 *  buffer.  This should equal the chunk size - otherwise that's another
1425 	 *  type of failure.
1426 	 */
1427 	if ((uint32_t)reduce_errno != req->vol->params.chunk_size) {
1428 		_reduce_vol_complete_req(req, -EIO);
1429 		return;
1430 	}
1431 
1432 	_prepare_compress_chunk(req, false);
1433 	_reduce_vol_compress_chunk(req, _write_compress_done);
1434 }
1435 
1436 static void
1437 _write_read_done(void *_req, int reduce_errno)
1438 {
1439 	struct spdk_reduce_vol_request *req = _req;
1440 
1441 	if (reduce_errno != 0) {
1442 		req->reduce_errno = reduce_errno;
1443 	}
1444 
1445 	assert(req->num_backing_ops > 0);
1446 	if (--req->num_backing_ops > 0) {
1447 		return;
1448 	}
1449 
1450 	if (req->reduce_errno != 0) {
1451 		_reduce_vol_complete_req(req, req->reduce_errno);
1452 		return;
1453 	}
1454 
1455 	if (req->chunk_is_compressed) {
1456 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1457 	} else {
1458 		_write_decompress_done(req, req->chunk->compressed_size);
1459 	}
1460 }
1461 
1462 static void
1463 _read_decompress_done(void *_req, int reduce_errno)
1464 {
1465 	struct spdk_reduce_vol_request *req = _req;
1466 	struct spdk_reduce_vol *vol = req->vol;
1467 
1468 	/* Negative reduce_errno indicates failure for compression operations. */
1469 	if (reduce_errno < 0) {
1470 		_reduce_vol_complete_req(req, reduce_errno);
1471 		return;
1472 	}
1473 
1474 	/* Positive reduce_errno indicates number of bytes in decompressed
1475 	 *  buffer.  This should equal the chunk size - otherwise that's another
1476 	 *  type of failure.
1477 	 */
1478 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1479 		_reduce_vol_complete_req(req, -EIO);
1480 		return;
1481 	}
1482 
1483 	if (req->copy_after_decompress) {
1484 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1485 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1486 		int i;
1487 
1488 		for (i = 0; i < req->iovcnt; i++) {
1489 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1490 			decomp_buffer += req->iov[i].iov_len;
1491 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1492 		}
1493 	}
1494 
1495 	_reduce_vol_complete_req(req, 0);
1496 }
1497 
1498 static void
1499 _read_read_done(void *_req, int reduce_errno)
1500 {
1501 	struct spdk_reduce_vol_request *req = _req;
1502 	uint64_t chunk_offset;
1503 	uint8_t *buf;
1504 	int i;
1505 
1506 	if (reduce_errno != 0) {
1507 		req->reduce_errno = reduce_errno;
1508 	}
1509 
1510 	assert(req->num_backing_ops > 0);
1511 	if (--req->num_backing_ops > 0) {
1512 		return;
1513 	}
1514 
1515 	if (req->reduce_errno != 0) {
1516 		_reduce_vol_complete_req(req, req->reduce_errno);
1517 		return;
1518 	}
1519 
1520 	if (req->chunk_is_compressed) {
1521 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1522 	} else {
1523 
1524 		/* If the chunk was compressed, the data would have been sent to the
1525 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1526 		 */
1527 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1528 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1529 		for (i = 0; i < req->iovcnt; i++) {
1530 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1531 			buf += req->iov[i].iov_len;
1532 		}
1533 
1534 		_read_decompress_done(req, req->chunk->compressed_size);
1535 	}
1536 }
1537 
1538 static void
1539 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1540 {
1541 	struct spdk_reduce_vol *vol = req->vol;
1542 
1543 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1544 	assert(req->chunk_map_index != UINT32_MAX);
1545 
1546 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1547 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1548 			    vol->params.backing_io_unit_size);
1549 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1550 
1551 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1552 }
1553 
1554 static bool
1555 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1556 		    uint64_t length)
1557 {
1558 	uint64_t size = 0;
1559 	int i;
1560 
1561 	if (iovcnt > REDUCE_MAX_IOVECS) {
1562 		return false;
1563 	}
1564 
1565 	for (i = 0; i < iovcnt; i++) {
1566 		size += iov[i].iov_len;
1567 	}
1568 
1569 	return size == (length * vol->params.logical_block_size);
1570 }
1571 
1572 static bool
1573 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1574 {
1575 	struct spdk_reduce_vol_request *req;
1576 
1577 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1578 		if (logical_map_index == req->logical_map_index) {
1579 			return true;
1580 		}
1581 	}
1582 
1583 	return false;
1584 }
1585 
1586 static void
1587 _start_readv_request(struct spdk_reduce_vol_request *req)
1588 {
1589 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1590 	_reduce_vol_read_chunk(req, _read_read_done);
1591 }
1592 
1593 void
1594 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1595 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1596 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1597 {
1598 	struct spdk_reduce_vol_request *req;
1599 	uint64_t logical_map_index;
1600 	bool overlapped;
1601 	int i;
1602 
1603 	if (length == 0) {
1604 		cb_fn(cb_arg, 0);
1605 		return;
1606 	}
1607 
1608 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1609 		cb_fn(cb_arg, -EINVAL);
1610 		return;
1611 	}
1612 
1613 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1614 		cb_fn(cb_arg, -EINVAL);
1615 		return;
1616 	}
1617 
1618 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1619 	overlapped = _check_overlap(vol, logical_map_index);
1620 
1621 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1622 		/*
1623 		 * This chunk hasn't been allocated.  So treat the data as all
1624 		 * zeroes for this chunk - do the memset and immediately complete
1625 		 * the operation.
1626 		 */
1627 		for (i = 0; i < iovcnt; i++) {
1628 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1629 		}
1630 		cb_fn(cb_arg, 0);
1631 		return;
1632 	}
1633 
1634 	req = TAILQ_FIRST(&vol->free_requests);
1635 	if (req == NULL) {
1636 		cb_fn(cb_arg, -ENOMEM);
1637 		return;
1638 	}
1639 
1640 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1641 	req->type = REDUCE_IO_READV;
1642 	req->vol = vol;
1643 	req->iov = iov;
1644 	req->iovcnt = iovcnt;
1645 	req->offset = offset;
1646 	req->logical_map_index = logical_map_index;
1647 	req->length = length;
1648 	req->copy_after_decompress = false;
1649 	req->cb_fn = cb_fn;
1650 	req->cb_arg = cb_arg;
1651 
1652 	if (!overlapped) {
1653 		_start_readv_request(req);
1654 	} else {
1655 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1656 	}
1657 }
1658 
1659 static void
1660 _start_writev_request(struct spdk_reduce_vol_request *req)
1661 {
1662 	struct spdk_reduce_vol *vol = req->vol;
1663 
1664 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1665 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1666 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1667 			/* Read old chunk, then overwrite with data from this write
1668 			 *  operation.
1669 			 */
1670 			req->rmw = true;
1671 			_reduce_vol_read_chunk(req, _write_read_done);
1672 			return;
1673 		}
1674 	}
1675 
1676 	req->rmw = false;
1677 
1678 	_prepare_compress_chunk(req, true);
1679 	_reduce_vol_compress_chunk(req, _write_compress_done);
1680 }
1681 
1682 void
1683 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1684 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1685 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1686 {
1687 	struct spdk_reduce_vol_request *req;
1688 	uint64_t logical_map_index;
1689 	bool overlapped;
1690 
1691 	if (length == 0) {
1692 		cb_fn(cb_arg, 0);
1693 		return;
1694 	}
1695 
1696 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1697 		cb_fn(cb_arg, -EINVAL);
1698 		return;
1699 	}
1700 
1701 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1702 		cb_fn(cb_arg, -EINVAL);
1703 		return;
1704 	}
1705 
1706 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1707 	overlapped = _check_overlap(vol, logical_map_index);
1708 
1709 	req = TAILQ_FIRST(&vol->free_requests);
1710 	if (req == NULL) {
1711 		cb_fn(cb_arg, -ENOMEM);
1712 		return;
1713 	}
1714 
1715 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1716 	req->type = REDUCE_IO_WRITEV;
1717 	req->vol = vol;
1718 	req->iov = iov;
1719 	req->iovcnt = iovcnt;
1720 	req->offset = offset;
1721 	req->logical_map_index = logical_map_index;
1722 	req->length = length;
1723 	req->copy_after_decompress = false;
1724 	req->cb_fn = cb_fn;
1725 	req->cb_arg = cb_arg;
1726 
1727 	if (!overlapped) {
1728 		_start_writev_request(req);
1729 	} else {
1730 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1731 	}
1732 }
1733 
1734 const struct spdk_reduce_vol_params *
1735 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1736 {
1737 	return &vol->params;
1738 }
1739 
1740 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1741 {
1742 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1743 	uint32_t struct_size;
1744 	uint64_t chunk_map_size;
1745 
1746 	SPDK_NOTICELOG("vol info:\n");
1747 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1748 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1749 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1750 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1751 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1752 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1753 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1754 		       vol->params.vol_size / vol->params.chunk_size);
1755 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1756 			vol->params.backing_io_unit_size);
1757 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1758 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1759 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1760 
1761 	SPDK_NOTICELOG("pmem info:\n");
1762 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1763 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1764 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1765 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1766 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1767 			   vol->params.chunk_size);
1768 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1769 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1770 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1771 			 vol->params.backing_io_unit_size);
1772 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1773 }
1774 
1775 SPDK_LOG_REGISTER_COMPONENT(reduce)
1776