xref: /spdk/lib/reduce/reduce.c (revision b69e3ff279affbf4cbc4a09fa1f9c6c2b72397ff)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/env.h"
11 #include "spdk/string.h"
12 #include "spdk/bit_array.h"
13 #include "spdk/util.h"
14 #include "spdk/log.h"
15 #include "spdk/memory.h"
16 
17 #include "libpmem.h"
18 
19 /* Always round up the size of the PM region to the nearest cacheline. */
20 #define REDUCE_PM_SIZE_ALIGNMENT	64
21 
22 /* Offset into the backing device where the persistent memory file's path is stored. */
23 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
24 
25 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
26 
27 #define REDUCE_NUM_VOL_REQUESTS	256
28 
29 /* Structure written to offset 0 of both the pm file and the backing device. */
30 struct spdk_reduce_vol_superblock {
31 	uint8_t				signature[8];
32 	struct spdk_reduce_vol_params	params;
33 	uint8_t				reserved[4048];
34 };
35 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
36 
37 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
38 /* null terminator counts one */
39 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
40 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
41 
42 #define REDUCE_PATH_MAX 4096
43 
44 #define REDUCE_ZERO_BUF_SIZE 0x100000
45 
46 /**
47  * Describes a persistent memory file used to hold metadata associated with a
48  *  compressed volume.
49  */
50 struct spdk_reduce_pm_file {
51 	char			path[REDUCE_PATH_MAX];
52 	void			*pm_buf;
53 	int			pm_is_pmem;
54 	uint64_t		size;
55 };
56 
57 #define REDUCE_IO_READV		1
58 #define REDUCE_IO_WRITEV	2
59 
60 struct spdk_reduce_chunk_map {
61 	uint32_t		compressed_size;
62 	uint32_t		reserved;
63 	uint64_t		io_unit_index[0];
64 };
65 
66 struct spdk_reduce_vol_request {
67 	/**
68 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
69 	 *   1) source buffer for compression operations
70 	 *   2) destination buffer for decompression operations
71 	 *   3) data buffer when writing uncompressed chunk to disk
72 	 *   4) data buffer when reading uncompressed chunk from disk
73 	 */
74 	uint8_t					*decomp_buf;
75 	struct iovec				*decomp_buf_iov;
76 
77 	/**
78 	 * These are used to construct the iovecs that are sent to
79 	 *  the decomp engine, they point to a mix of the scratch buffer
80 	 *  and user buffer
81 	 */
82 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
83 	int					decomp_iovcnt;
84 
85 	/**
86 	 *  Scratch buffer used for compressed chunk.  This is used for:
87 	 *   1) destination buffer for compression operations
88 	 *   2) source buffer for decompression operations
89 	 *   3) data buffer when writing compressed chunk to disk
90 	 *   4) data buffer when reading compressed chunk from disk
91 	 */
92 	uint8_t					*comp_buf;
93 	struct iovec				*comp_buf_iov;
94 	struct iovec				*iov;
95 	bool					rmw;
96 	struct spdk_reduce_vol			*vol;
97 	int					type;
98 	int					reduce_errno;
99 	int					iovcnt;
100 	int					num_backing_ops;
101 	uint32_t				num_io_units;
102 	bool					chunk_is_compressed;
103 	bool					copy_after_decompress;
104 	uint64_t				offset;
105 	uint64_t				logical_map_index;
106 	uint64_t				length;
107 	uint64_t				chunk_map_index;
108 	struct spdk_reduce_chunk_map		*chunk;
109 	spdk_reduce_vol_op_complete		cb_fn;
110 	void					*cb_arg;
111 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
112 	struct spdk_reduce_vol_cb_args		backing_cb_args;
113 };
114 
115 struct spdk_reduce_vol {
116 	struct spdk_reduce_vol_params		params;
117 	uint32_t				backing_io_units_per_chunk;
118 	uint32_t				backing_lba_per_io_unit;
119 	uint32_t				logical_blocks_per_chunk;
120 	struct spdk_reduce_pm_file		pm_file;
121 	struct spdk_reduce_backing_dev		*backing_dev;
122 	struct spdk_reduce_vol_superblock	*backing_super;
123 	struct spdk_reduce_vol_superblock	*pm_super;
124 	uint64_t				*pm_logical_map;
125 	uint64_t				*pm_chunk_maps;
126 
127 	struct spdk_bit_array			*allocated_chunk_maps;
128 	struct spdk_bit_array			*allocated_backing_io_units;
129 
130 	struct spdk_reduce_vol_request		*request_mem;
131 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
132 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
133 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
134 
135 	/* Single contiguous buffer used for all request buffers for this volume. */
136 	uint8_t					*buf_mem;
137 	struct iovec				*buf_iov_mem;
138 };
139 
140 static void _start_readv_request(struct spdk_reduce_vol_request *req);
141 static void _start_writev_request(struct spdk_reduce_vol_request *req);
142 static uint8_t *g_zero_buf;
143 static int g_vol_count = 0;
144 
145 /*
146  * Allocate extra metadata chunks and corresponding backing io units to account for
147  *  outstanding IO in worst case scenario where logical map is completely allocated
148  *  and no data can be compressed.  We need extra chunks in this case to handle
149  *  in-flight writes since reduce never writes data in place.
150  */
151 #define REDUCE_NUM_EXTRA_CHUNKS 128
152 
153 static void
154 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
155 {
156 	if (vol->pm_file.pm_is_pmem) {
157 		pmem_persist(addr, len);
158 	} else {
159 		pmem_msync(addr, len);
160 	}
161 }
162 
163 static uint64_t
164 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
165 {
166 	uint64_t chunks_in_logical_map, logical_map_size;
167 
168 	chunks_in_logical_map = vol_size / chunk_size;
169 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
170 
171 	/* Round up to next cacheline. */
172 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
173 	       REDUCE_PM_SIZE_ALIGNMENT;
174 }
175 
176 static uint64_t
177 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
178 {
179 	uint64_t num_chunks;
180 
181 	num_chunks = vol_size / chunk_size;
182 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
183 
184 	return num_chunks;
185 }
186 
187 static inline uint32_t
188 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
189 {
190 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
191 }
192 
193 static uint64_t
194 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
195 {
196 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
197 
198 	num_chunks = _get_total_chunks(vol_size, chunk_size);
199 	io_units_per_chunk = chunk_size / backing_io_unit_size;
200 
201 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
202 
203 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
204 	       REDUCE_PM_SIZE_ALIGNMENT;
205 }
206 
207 static struct spdk_reduce_chunk_map *
208 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
209 {
210 	uintptr_t chunk_map_addr;
211 
212 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
213 
214 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
215 	chunk_map_addr += chunk_map_index *
216 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
217 
218 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
219 }
220 
221 static int
222 _validate_vol_params(struct spdk_reduce_vol_params *params)
223 {
224 	if (params->vol_size > 0) {
225 		/**
226 		 * User does not pass in the vol size - it gets calculated by libreduce from
227 		 *  values in this structure plus the size of the backing device.
228 		 */
229 		return -EINVAL;
230 	}
231 
232 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
233 	    params->logical_block_size == 0) {
234 		return -EINVAL;
235 	}
236 
237 	/* Chunk size must be an even multiple of the backing io unit size. */
238 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
239 		return -EINVAL;
240 	}
241 
242 	/* Chunk size must be an even multiple of the logical block size. */
243 	if ((params->chunk_size % params->logical_block_size) != 0) {
244 		return -1;
245 	}
246 
247 	return 0;
248 }
249 
250 static uint64_t
251 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
252 {
253 	uint64_t num_chunks;
254 
255 	num_chunks = backing_dev_size / chunk_size;
256 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
257 		return 0;
258 	}
259 
260 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
261 	return num_chunks * chunk_size;
262 }
263 
264 static uint64_t
265 _get_pm_file_size(struct spdk_reduce_vol_params *params)
266 {
267 	uint64_t total_pm_size;
268 
269 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
270 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
271 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
272 			 params->backing_io_unit_size);
273 	return total_pm_size;
274 }
275 
276 const struct spdk_uuid *
277 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
278 {
279 	return &vol->params.uuid;
280 }
281 
282 static void
283 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
284 {
285 	uint64_t logical_map_size;
286 
287 	/* Superblock is at the beginning of the pm file. */
288 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
289 
290 	/* Logical map immediately follows the super block. */
291 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
292 
293 	/* Chunks maps follow the logical map. */
294 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
295 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
296 }
297 
298 /* We need 2 iovs during load - one for the superblock, another for the path */
299 #define LOAD_IOV_COUNT	2
300 
301 struct reduce_init_load_ctx {
302 	struct spdk_reduce_vol			*vol;
303 	struct spdk_reduce_vol_cb_args		backing_cb_args;
304 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
305 	void					*cb_arg;
306 	struct iovec				iov[LOAD_IOV_COUNT];
307 	void					*path;
308 };
309 
310 static inline bool
311 _addr_crosses_huge_page(const void *addr, size_t *size)
312 {
313 	size_t _size;
314 	uint64_t rc;
315 
316 	assert(size);
317 
318 	_size = *size;
319 	rc = spdk_vtophys(addr, size);
320 
321 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
322 }
323 
324 static inline int
325 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
326 {
327 	uint8_t *addr;
328 	size_t size_tmp = buffer_size;
329 
330 	addr = *_addr;
331 
332 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
333 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
334 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
335 		 * Skip remaining bytes and continue from the beginning of the next page */
336 		addr += size_tmp;
337 	}
338 
339 	if (addr + buffer_size > addr_range) {
340 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
341 		return -ERANGE;
342 	}
343 
344 	*vol_buffer = addr;
345 	*_addr = addr + buffer_size;
346 
347 	return 0;
348 }
349 
350 static int
351 _allocate_vol_requests(struct spdk_reduce_vol *vol)
352 {
353 	struct spdk_reduce_vol_request *req;
354 	uint32_t reqs_in_2mb_page, huge_pages_needed;
355 	uint8_t *buffer, *buffer_end;
356 	int i = 0;
357 	int rc = 0;
358 
359 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
360 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
361 	* necessarily power of 2
362 	* Allocate 2x since we need buffers for both read/write and compress/decompress
363 	* intermediate buffers. */
364 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
365 	if (!reqs_in_2mb_page) {
366 		return -EINVAL;
367 	}
368 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
369 
370 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
371 	if (vol->buf_mem == NULL) {
372 		return -ENOMEM;
373 	}
374 
375 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
376 	if (vol->request_mem == NULL) {
377 		spdk_free(vol->buf_mem);
378 		vol->buf_mem = NULL;
379 		return -ENOMEM;
380 	}
381 
382 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
383 	 *  buffers.
384 	 */
385 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
386 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
387 	if (vol->buf_iov_mem == NULL) {
388 		free(vol->request_mem);
389 		spdk_free(vol->buf_mem);
390 		vol->request_mem = NULL;
391 		vol->buf_mem = NULL;
392 		return -ENOMEM;
393 	}
394 
395 	buffer = vol->buf_mem;
396 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
397 
398 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
399 		req = &vol->request_mem[i];
400 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
401 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
402 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
403 
404 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
405 		if (rc) {
406 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
407 				    vol->buf_mem, buffer_end);
408 			break;
409 		}
410 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
411 		if (rc) {
412 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
413 				    vol->buf_mem, buffer_end);
414 			break;
415 		}
416 	}
417 
418 	if (rc) {
419 		free(vol->buf_iov_mem);
420 		free(vol->request_mem);
421 		spdk_free(vol->buf_mem);
422 		vol->buf_mem = NULL;
423 		vol->buf_iov_mem = NULL;
424 		vol->request_mem = NULL;
425 	}
426 
427 	return rc;
428 }
429 
430 static void
431 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
432 {
433 	if (ctx != NULL) {
434 		spdk_free(ctx->path);
435 		free(ctx);
436 	}
437 
438 	if (vol != NULL) {
439 		if (vol->pm_file.pm_buf != NULL) {
440 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
441 		}
442 
443 		spdk_free(vol->backing_super);
444 		spdk_bit_array_free(&vol->allocated_chunk_maps);
445 		spdk_bit_array_free(&vol->allocated_backing_io_units);
446 		free(vol->request_mem);
447 		free(vol->buf_iov_mem);
448 		spdk_free(vol->buf_mem);
449 		free(vol);
450 	}
451 }
452 
453 static int
454 _alloc_zero_buff(void)
455 {
456 	int rc = 0;
457 
458 	/* The zero buffer is shared between all volumes and just used
459 	 * for reads so allocate one global instance here if not already
460 	 * allocated when another vol init'd or loaded.
461 	 */
462 	if (g_vol_count++ == 0) {
463 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
464 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
465 					  SPDK_MALLOC_DMA);
466 		if (g_zero_buf == NULL) {
467 			g_vol_count--;
468 			rc = -ENOMEM;
469 		}
470 	}
471 	return rc;
472 }
473 
474 static void
475 _init_write_super_cpl(void *cb_arg, int reduce_errno)
476 {
477 	struct reduce_init_load_ctx *init_ctx = cb_arg;
478 	int rc;
479 
480 	rc = _allocate_vol_requests(init_ctx->vol);
481 	if (rc != 0) {
482 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
483 		_init_load_cleanup(init_ctx->vol, init_ctx);
484 		return;
485 	}
486 
487 	rc = _alloc_zero_buff();
488 	if (rc != 0) {
489 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
490 		_init_load_cleanup(init_ctx->vol, init_ctx);
491 		return;
492 	}
493 
494 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
495 	/* Only clean up the ctx - the vol has been passed to the application
496 	 *  for use now that initialization was successful.
497 	 */
498 	_init_load_cleanup(NULL, init_ctx);
499 }
500 
501 static void
502 _init_write_path_cpl(void *cb_arg, int reduce_errno)
503 {
504 	struct reduce_init_load_ctx *init_ctx = cb_arg;
505 	struct spdk_reduce_vol *vol = init_ctx->vol;
506 
507 	init_ctx->iov[0].iov_base = vol->backing_super;
508 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
509 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
510 	init_ctx->backing_cb_args.cb_arg = init_ctx;
511 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
512 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
513 				 &init_ctx->backing_cb_args);
514 }
515 
516 static int
517 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
518 {
519 	uint64_t total_chunks, total_backing_io_units;
520 	uint32_t i, num_metadata_io_units;
521 
522 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
523 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
524 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
525 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
526 
527 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
528 		return -ENOMEM;
529 	}
530 
531 	/* Set backing io unit bits associated with metadata. */
532 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
533 				vol->params.backing_io_unit_size;
534 	for (i = 0; i < num_metadata_io_units; i++) {
535 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
536 	}
537 
538 	return 0;
539 }
540 
541 void
542 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
543 		     struct spdk_reduce_backing_dev *backing_dev,
544 		     const char *pm_file_dir,
545 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
546 {
547 	struct spdk_reduce_vol *vol;
548 	struct reduce_init_load_ctx *init_ctx;
549 	uint64_t backing_dev_size;
550 	size_t mapped_len;
551 	int dir_len, max_dir_len, rc;
552 
553 	/* We need to append a path separator and the UUID to the supplied
554 	 * path.
555 	 */
556 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
557 	dir_len = strnlen(pm_file_dir, max_dir_len);
558 	/* Strip trailing slash if the user provided one - we will add it back
559 	 * later when appending the filename.
560 	 */
561 	if (pm_file_dir[dir_len - 1] == '/') {
562 		dir_len--;
563 	}
564 	if (dir_len == max_dir_len) {
565 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
566 		cb_fn(cb_arg, NULL, -EINVAL);
567 		return;
568 	}
569 
570 	rc = _validate_vol_params(params);
571 	if (rc != 0) {
572 		SPDK_ERRLOG("invalid vol params\n");
573 		cb_fn(cb_arg, NULL, rc);
574 		return;
575 	}
576 
577 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
578 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
579 	if (params->vol_size == 0) {
580 		SPDK_ERRLOG("backing device is too small\n");
581 		cb_fn(cb_arg, NULL, -EINVAL);
582 		return;
583 	}
584 
585 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
586 	    backing_dev->unmap == NULL) {
587 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
588 		cb_fn(cb_arg, NULL, -EINVAL);
589 		return;
590 	}
591 
592 	vol = calloc(1, sizeof(*vol));
593 	if (vol == NULL) {
594 		cb_fn(cb_arg, NULL, -ENOMEM);
595 		return;
596 	}
597 
598 	TAILQ_INIT(&vol->free_requests);
599 	TAILQ_INIT(&vol->executing_requests);
600 	TAILQ_INIT(&vol->queued_requests);
601 
602 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
603 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
604 	if (vol->backing_super == NULL) {
605 		cb_fn(cb_arg, NULL, -ENOMEM);
606 		_init_load_cleanup(vol, NULL);
607 		return;
608 	}
609 
610 	init_ctx = calloc(1, sizeof(*init_ctx));
611 	if (init_ctx == NULL) {
612 		cb_fn(cb_arg, NULL, -ENOMEM);
613 		_init_load_cleanup(vol, NULL);
614 		return;
615 	}
616 
617 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
618 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
619 	if (init_ctx->path == NULL) {
620 		cb_fn(cb_arg, NULL, -ENOMEM);
621 		_init_load_cleanup(vol, init_ctx);
622 		return;
623 	}
624 
625 	if (spdk_uuid_is_null(&params->uuid)) {
626 		spdk_uuid_generate(&params->uuid);
627 	}
628 
629 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
630 	vol->pm_file.path[dir_len] = '/';
631 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
632 			    &params->uuid);
633 	vol->pm_file.size = _get_pm_file_size(params);
634 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
635 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
636 					    &mapped_len, &vol->pm_file.pm_is_pmem);
637 	if (vol->pm_file.pm_buf == NULL) {
638 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
639 			    vol->pm_file.path, strerror(errno));
640 		cb_fn(cb_arg, NULL, -errno);
641 		_init_load_cleanup(vol, init_ctx);
642 		return;
643 	}
644 
645 	if (vol->pm_file.size != mapped_len) {
646 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
647 			    vol->pm_file.size, mapped_len);
648 		cb_fn(cb_arg, NULL, -ENOMEM);
649 		_init_load_cleanup(vol, init_ctx);
650 		return;
651 	}
652 
653 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
654 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
655 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
656 	memcpy(&vol->params, params, sizeof(*params));
657 
658 	vol->backing_dev = backing_dev;
659 
660 	rc = _allocate_bit_arrays(vol);
661 	if (rc != 0) {
662 		cb_fn(cb_arg, NULL, rc);
663 		_init_load_cleanup(vol, init_ctx);
664 		return;
665 	}
666 
667 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
668 	       sizeof(vol->backing_super->signature));
669 	memcpy(&vol->backing_super->params, params, sizeof(*params));
670 
671 	_initialize_vol_pm_pointers(vol);
672 
673 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
674 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
675 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
676 	 */
677 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
678 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
679 
680 	init_ctx->vol = vol;
681 	init_ctx->cb_fn = cb_fn;
682 	init_ctx->cb_arg = cb_arg;
683 
684 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
685 	init_ctx->iov[0].iov_base = init_ctx->path;
686 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
687 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
688 	init_ctx->backing_cb_args.cb_arg = init_ctx;
689 	/* Write path to offset 4K on backing device - just after where the super
690 	 *  block will be written.  We wait until this is committed before writing the
691 	 *  super block to guarantee we don't get the super block written without the
692 	 *  the path if the system crashed in the middle of a write operation.
693 	 */
694 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
695 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
696 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
697 				 &init_ctx->backing_cb_args);
698 }
699 
700 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
701 
702 static void
703 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
704 {
705 	struct reduce_init_load_ctx *load_ctx = cb_arg;
706 	struct spdk_reduce_vol *vol = load_ctx->vol;
707 	uint64_t backing_dev_size;
708 	uint64_t i, num_chunks, logical_map_index;
709 	struct spdk_reduce_chunk_map *chunk;
710 	size_t mapped_len;
711 	uint32_t j;
712 	int rc;
713 
714 	rc = _alloc_zero_buff();
715 	if (rc) {
716 		goto error;
717 	}
718 
719 	if (memcmp(vol->backing_super->signature,
720 		   SPDK_REDUCE_SIGNATURE,
721 		   sizeof(vol->backing_super->signature)) != 0) {
722 		/* This backing device isn't a libreduce backing device. */
723 		rc = -EILSEQ;
724 		goto error;
725 	}
726 
727 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
728 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
729 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
730 	 *  persistent memory file if it exists.
731 	 */
732 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
733 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
734 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
735 		_init_load_cleanup(NULL, load_ctx);
736 		return;
737 	}
738 
739 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
740 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
741 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
742 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
743 
744 	rc = _allocate_bit_arrays(vol);
745 	if (rc != 0) {
746 		goto error;
747 	}
748 
749 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
750 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
751 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
752 			    backing_dev_size);
753 		rc = -EILSEQ;
754 		goto error;
755 	}
756 
757 	vol->pm_file.size = _get_pm_file_size(&vol->params);
758 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
759 					    &vol->pm_file.pm_is_pmem);
760 	if (vol->pm_file.pm_buf == NULL) {
761 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
762 		rc = -errno;
763 		goto error;
764 	}
765 
766 	if (vol->pm_file.size != mapped_len) {
767 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
768 			    vol->pm_file.size, mapped_len);
769 		rc = -ENOMEM;
770 		goto error;
771 	}
772 
773 	rc = _allocate_vol_requests(vol);
774 	if (rc != 0) {
775 		goto error;
776 	}
777 
778 	_initialize_vol_pm_pointers(vol);
779 
780 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
781 	for (i = 0; i < num_chunks; i++) {
782 		logical_map_index = vol->pm_logical_map[i];
783 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
784 			continue;
785 		}
786 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
787 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
788 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
789 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
790 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
791 			}
792 		}
793 	}
794 
795 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
796 	/* Only clean up the ctx - the vol has been passed to the application
797 	 *  for use now that volume load was successful.
798 	 */
799 	_init_load_cleanup(NULL, load_ctx);
800 	return;
801 
802 error:
803 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
804 	_init_load_cleanup(vol, load_ctx);
805 }
806 
807 void
808 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
809 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
810 {
811 	struct spdk_reduce_vol *vol;
812 	struct reduce_init_load_ctx *load_ctx;
813 
814 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
815 	    backing_dev->unmap == NULL) {
816 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
817 		cb_fn(cb_arg, NULL, -EINVAL);
818 		return;
819 	}
820 
821 	vol = calloc(1, sizeof(*vol));
822 	if (vol == NULL) {
823 		cb_fn(cb_arg, NULL, -ENOMEM);
824 		return;
825 	}
826 
827 	TAILQ_INIT(&vol->free_requests);
828 	TAILQ_INIT(&vol->executing_requests);
829 	TAILQ_INIT(&vol->queued_requests);
830 
831 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
832 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
833 	if (vol->backing_super == NULL) {
834 		_init_load_cleanup(vol, NULL);
835 		cb_fn(cb_arg, NULL, -ENOMEM);
836 		return;
837 	}
838 
839 	vol->backing_dev = backing_dev;
840 
841 	load_ctx = calloc(1, sizeof(*load_ctx));
842 	if (load_ctx == NULL) {
843 		_init_load_cleanup(vol, NULL);
844 		cb_fn(cb_arg, NULL, -ENOMEM);
845 		return;
846 	}
847 
848 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
849 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
850 	if (load_ctx->path == NULL) {
851 		_init_load_cleanup(vol, load_ctx);
852 		cb_fn(cb_arg, NULL, -ENOMEM);
853 		return;
854 	}
855 
856 	load_ctx->vol = vol;
857 	load_ctx->cb_fn = cb_fn;
858 	load_ctx->cb_arg = cb_arg;
859 
860 	load_ctx->iov[0].iov_base = vol->backing_super;
861 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
862 	load_ctx->iov[1].iov_base = load_ctx->path;
863 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
864 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
865 	load_ctx->backing_cb_args.cb_arg = load_ctx;
866 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
867 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
868 				vol->backing_dev->blocklen,
869 				&load_ctx->backing_cb_args);
870 }
871 
872 void
873 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
874 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
875 {
876 	if (vol == NULL) {
877 		/* This indicates a programming error. */
878 		assert(false);
879 		cb_fn(cb_arg, -EINVAL);
880 		return;
881 	}
882 
883 	if (--g_vol_count == 0) {
884 		spdk_free(g_zero_buf);
885 	}
886 	assert(g_vol_count >= 0);
887 	_init_load_cleanup(vol, NULL);
888 	cb_fn(cb_arg, 0);
889 }
890 
891 struct reduce_destroy_ctx {
892 	spdk_reduce_vol_op_complete		cb_fn;
893 	void					*cb_arg;
894 	struct spdk_reduce_vol			*vol;
895 	struct spdk_reduce_vol_superblock	*super;
896 	struct iovec				iov;
897 	struct spdk_reduce_vol_cb_args		backing_cb_args;
898 	int					reduce_errno;
899 	char					pm_path[REDUCE_PATH_MAX];
900 };
901 
902 static void
903 destroy_unload_cpl(void *cb_arg, int reduce_errno)
904 {
905 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
906 
907 	if (destroy_ctx->reduce_errno == 0) {
908 		if (unlink(destroy_ctx->pm_path)) {
909 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
910 				    destroy_ctx->pm_path, strerror(errno));
911 		}
912 	}
913 
914 	/* Even if the unload somehow failed, we still pass the destroy_ctx
915 	 * reduce_errno since that indicates whether or not the volume was
916 	 * actually destroyed.
917 	 */
918 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
919 	spdk_free(destroy_ctx->super);
920 	free(destroy_ctx);
921 }
922 
923 static void
924 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
925 {
926 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
927 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
928 
929 	destroy_ctx->reduce_errno = reduce_errno;
930 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
931 }
932 
933 static void
934 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
935 {
936 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
937 
938 	if (reduce_errno != 0) {
939 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
940 		spdk_free(destroy_ctx->super);
941 		free(destroy_ctx);
942 		return;
943 	}
944 
945 	destroy_ctx->vol = vol;
946 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
947 	destroy_ctx->iov.iov_base = destroy_ctx->super;
948 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
949 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
950 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
951 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
952 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
953 				 &destroy_ctx->backing_cb_args);
954 }
955 
956 void
957 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
958 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
959 {
960 	struct reduce_destroy_ctx *destroy_ctx;
961 
962 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
963 	if (destroy_ctx == NULL) {
964 		cb_fn(cb_arg, -ENOMEM);
965 		return;
966 	}
967 
968 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
969 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
970 	if (destroy_ctx->super == NULL) {
971 		free(destroy_ctx);
972 		cb_fn(cb_arg, -ENOMEM);
973 		return;
974 	}
975 	destroy_ctx->cb_fn = cb_fn;
976 	destroy_ctx->cb_arg = cb_arg;
977 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
978 }
979 
980 static bool
981 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
982 {
983 	uint64_t start_chunk, end_chunk;
984 
985 	start_chunk = offset / vol->logical_blocks_per_chunk;
986 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
987 
988 	return (start_chunk != end_chunk);
989 }
990 
991 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
992 
993 static void
994 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
995 {
996 	struct spdk_reduce_vol_request *next_req;
997 	struct spdk_reduce_vol *vol = req->vol;
998 
999 	req->cb_fn(req->cb_arg, reduce_errno);
1000 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
1001 
1002 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1003 		if (next_req->logical_map_index == req->logical_map_index) {
1004 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1005 			if (next_req->type == REDUCE_IO_READV) {
1006 				_start_readv_request(next_req);
1007 			} else {
1008 				assert(next_req->type == REDUCE_IO_WRITEV);
1009 				_start_writev_request(next_req);
1010 			}
1011 			break;
1012 		}
1013 	}
1014 
1015 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1016 }
1017 
1018 static void
1019 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
1020 {
1021 	struct spdk_reduce_chunk_map *chunk;
1022 	uint32_t i;
1023 
1024 	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
1025 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1026 		if (chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
1027 			break;
1028 		}
1029 		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
1030 					  chunk->io_unit_index[i]) == true);
1031 		spdk_bit_array_clear(vol->allocated_backing_io_units, chunk->io_unit_index[i]);
1032 		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1033 	}
1034 	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
1035 }
1036 
1037 static void
1038 _write_write_done(void *_req, int reduce_errno)
1039 {
1040 	struct spdk_reduce_vol_request *req = _req;
1041 	struct spdk_reduce_vol *vol = req->vol;
1042 	uint64_t old_chunk_map_index;
1043 
1044 	if (reduce_errno != 0) {
1045 		req->reduce_errno = reduce_errno;
1046 	}
1047 
1048 	assert(req->num_backing_ops > 0);
1049 	if (--req->num_backing_ops > 0) {
1050 		return;
1051 	}
1052 
1053 	if (req->reduce_errno != 0) {
1054 		_reduce_vol_reset_chunk(vol, req->chunk_map_index);
1055 		_reduce_vol_complete_req(req, req->reduce_errno);
1056 		return;
1057 	}
1058 
1059 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1060 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1061 		_reduce_vol_reset_chunk(vol, old_chunk_map_index);
1062 	}
1063 
1064 	/*
1065 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1066 	 * becomes invalid after we update the logical map, since the old chunk map will no
1067 	 * longer have a reference to it in the logical map.
1068 	 */
1069 
1070 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1071 	_reduce_persist(vol, req->chunk,
1072 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1073 
1074 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1075 
1076 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1077 
1078 	_reduce_vol_complete_req(req, 0);
1079 }
1080 
1081 struct reduce_merged_io_desc {
1082 	uint64_t io_unit_index;
1083 	uint32_t num_io_units;
1084 };
1085 
1086 static void
1087 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1088 				 reduce_request_fn next_fn, bool is_write)
1089 {
1090 	struct iovec *iov;
1091 	uint8_t *buf;
1092 	uint32_t i;
1093 
1094 	if (req->chunk_is_compressed) {
1095 		iov = req->comp_buf_iov;
1096 		buf = req->comp_buf;
1097 	} else {
1098 		iov = req->decomp_buf_iov;
1099 		buf = req->decomp_buf;
1100 	}
1101 
1102 	req->num_backing_ops = req->num_io_units;
1103 	req->backing_cb_args.cb_fn = next_fn;
1104 	req->backing_cb_args.cb_arg = req;
1105 	for (i = 0; i < req->num_io_units; i++) {
1106 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1107 		iov[i].iov_len = vol->params.backing_io_unit_size;
1108 		if (is_write) {
1109 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1110 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1111 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1112 		} else {
1113 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1114 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1115 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
1116 		}
1117 	}
1118 }
1119 
1120 static void
1121 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1122 		   reduce_request_fn next_fn, bool is_write)
1123 {
1124 	struct iovec *iov;
1125 	struct reduce_merged_io_desc merged_io_desc[4];
1126 	uint8_t *buf;
1127 	bool merge = false;
1128 	uint32_t num_io = 0;
1129 	uint32_t io_unit_counts = 0;
1130 	uint32_t merged_io_idx = 0;
1131 	uint32_t i;
1132 
1133 	/* The merged_io_desc value is defined here to contain four elements,
1134 	 * and the chunk size must be four times the maximum of the io unit.
1135 	 * if chunk size is too big, don't merge IO.
1136 	 */
1137 	if (vol->backing_io_units_per_chunk > 4) {
1138 		_issue_backing_ops_without_merge(req, vol, next_fn, is_write);
1139 		return;
1140 	}
1141 
1142 	if (req->chunk_is_compressed) {
1143 		iov = req->comp_buf_iov;
1144 		buf = req->comp_buf;
1145 	} else {
1146 		iov = req->decomp_buf_iov;
1147 		buf = req->decomp_buf;
1148 	}
1149 
1150 	for (i = 0; i < req->num_io_units; i++) {
1151 		if (!merge) {
1152 			merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
1153 			merged_io_desc[merged_io_idx].num_io_units = 1;
1154 			num_io++;
1155 		}
1156 
1157 		if (i + 1 == req->num_io_units) {
1158 			break;
1159 		}
1160 
1161 		if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
1162 			merged_io_desc[merged_io_idx].num_io_units += 1;
1163 			merge = true;
1164 			continue;
1165 		}
1166 		merge = false;
1167 		merged_io_idx++;
1168 	}
1169 
1170 	req->num_backing_ops = num_io;
1171 	req->backing_cb_args.cb_fn = next_fn;
1172 	req->backing_cb_args.cb_arg = req;
1173 	for (i = 0; i < num_io; i++) {
1174 		iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
1175 		iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
1176 		if (is_write) {
1177 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1178 						 merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit,
1179 						 vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units,
1180 						 &req->backing_cb_args);
1181 		} else {
1182 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1183 						merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit,
1184 						vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units,
1185 						&req->backing_cb_args);
1186 		}
1187 
1188 		/* Collects the number of processed I/O. */
1189 		io_unit_counts += merged_io_desc[i].num_io_units;
1190 	}
1191 }
1192 
1193 static void
1194 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1195 			uint32_t compressed_size)
1196 {
1197 	struct spdk_reduce_vol *vol = req->vol;
1198 	uint32_t i;
1199 	uint64_t chunk_offset, remainder, total_len = 0;
1200 	uint8_t *buf;
1201 	int j;
1202 
1203 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1204 
1205 	/* TODO: fail if no chunk map found - but really this should not happen if we
1206 	 * size the number of requests similarly to number of extra chunk maps
1207 	 */
1208 	assert(req->chunk_map_index != UINT32_MAX);
1209 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1210 
1211 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1212 	req->num_io_units = spdk_divide_round_up(compressed_size,
1213 			    vol->params.backing_io_unit_size);
1214 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1215 	req->chunk->compressed_size =
1216 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1217 
1218 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1219 	if (req->chunk_is_compressed == false) {
1220 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1221 		buf = req->decomp_buf;
1222 		total_len = chunk_offset * vol->params.logical_block_size;
1223 
1224 		/* zero any offset into chunk */
1225 		if (req->rmw == false && chunk_offset) {
1226 			memset(buf, 0, total_len);
1227 		}
1228 		buf += total_len;
1229 
1230 		/* copy the data */
1231 		for (j = 0; j < req->iovcnt; j++) {
1232 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1233 			buf += req->iov[j].iov_len;
1234 			total_len += req->iov[j].iov_len;
1235 		}
1236 
1237 		/* zero any remainder */
1238 		remainder = vol->params.chunk_size - total_len;
1239 		total_len += remainder;
1240 		if (req->rmw == false && remainder) {
1241 			memset(buf, 0, remainder);
1242 		}
1243 		assert(total_len == vol->params.chunk_size);
1244 	}
1245 
1246 	for (i = 0; i < req->num_io_units; i++) {
1247 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1248 		/* TODO: fail if no backing block found - but really this should also not
1249 		 * happen (see comment above).
1250 		 */
1251 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1252 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1253 	}
1254 
1255 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1256 }
1257 
1258 static void
1259 _write_compress_done(void *_req, int reduce_errno)
1260 {
1261 	struct spdk_reduce_vol_request *req = _req;
1262 
1263 	/* Negative reduce_errno indicates failure for compression operations.
1264 	 * Just write the uncompressed data instead.  Force this to happen
1265 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1266 	 * When it sees the data couldn't be compressed, it will just write
1267 	 * the uncompressed buffer to disk.
1268 	 */
1269 	if (reduce_errno < 0) {
1270 		req->backing_cb_args.output_size = req->vol->params.chunk_size;
1271 	}
1272 
1273 	_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
1274 }
1275 
1276 static void
1277 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1278 {
1279 	struct spdk_reduce_vol *vol = req->vol;
1280 
1281 	req->backing_cb_args.cb_fn = next_fn;
1282 	req->backing_cb_args.cb_arg = req;
1283 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1284 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1285 	vol->backing_dev->compress(vol->backing_dev,
1286 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1287 				   &req->backing_cb_args);
1288 }
1289 
1290 static void
1291 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1292 {
1293 	struct spdk_reduce_vol *vol = req->vol;
1294 
1295 	req->backing_cb_args.cb_fn = next_fn;
1296 	req->backing_cb_args.cb_arg = req;
1297 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1298 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1299 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1300 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1301 	vol->backing_dev->decompress(vol->backing_dev,
1302 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1303 				     &req->backing_cb_args);
1304 }
1305 
1306 static void
1307 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1308 {
1309 	struct spdk_reduce_vol *vol = req->vol;
1310 	uint64_t chunk_offset, remainder = 0;
1311 	uint64_t ttl_len = 0;
1312 	size_t iov_len;
1313 	int i;
1314 
1315 	req->decomp_iovcnt = 0;
1316 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1317 
1318 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1319 	 * if at least one of the conditions below is true:
1320 	 * 1. User's buffer is fragmented
1321 	 * 2. Length of the user's buffer is less than the chunk
1322 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1323 	iov_len = req->iov[0].iov_len;
1324 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1325 				     req->iov[0].iov_len < vol->params.chunk_size ||
1326 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1327 	if (req->copy_after_decompress) {
1328 		req->decomp_iov[0].iov_base = req->decomp_buf;
1329 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1330 		req->decomp_iovcnt = 1;
1331 		goto decompress;
1332 	}
1333 
1334 	if (chunk_offset) {
1335 		/* first iov point to our scratch buffer for any offset into the chunk */
1336 		req->decomp_iov[0].iov_base = req->decomp_buf;
1337 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1338 		ttl_len += req->decomp_iov[0].iov_len;
1339 		req->decomp_iovcnt = 1;
1340 	}
1341 
1342 	/* now the user data iov, direct to the user buffer */
1343 	for (i = 0; i < req->iovcnt; i++) {
1344 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1345 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1346 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1347 	}
1348 	req->decomp_iovcnt += req->iovcnt;
1349 
1350 	/* send the rest of the chunk to our scratch buffer */
1351 	remainder = vol->params.chunk_size - ttl_len;
1352 	if (remainder) {
1353 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1354 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1355 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1356 		req->decomp_iovcnt++;
1357 	}
1358 	assert(ttl_len == vol->params.chunk_size);
1359 
1360 decompress:
1361 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1362 	req->backing_cb_args.cb_fn = next_fn;
1363 	req->backing_cb_args.cb_arg = req;
1364 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1365 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1366 	vol->backing_dev->decompress(vol->backing_dev,
1367 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1368 				     &req->backing_cb_args);
1369 }
1370 
1371 static inline void
1372 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1373 {
1374 	struct spdk_reduce_vol *vol = req->vol;
1375 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1376 	uint64_t chunk_offset, ttl_len = 0;
1377 	uint64_t remainder = 0;
1378 	char *copy_offset = NULL;
1379 	uint32_t lbsize = vol->params.logical_block_size;
1380 	int i;
1381 
1382 	req->decomp_iov[0].iov_base = req->decomp_buf;
1383 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1384 	req->decomp_iovcnt = 1;
1385 	copy_offset = req->decomp_iov[0].iov_base;
1386 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1387 
1388 	if (chunk_offset) {
1389 		ttl_len += chunk_offset * lbsize;
1390 		/* copy_offset already points to padding buffer if zero_paddings=false */
1391 		if (zero_paddings) {
1392 			memcpy(copy_offset, padding_buffer, ttl_len);
1393 		}
1394 		copy_offset += ttl_len;
1395 	}
1396 
1397 	/* now the user data iov, direct from the user buffer */
1398 	for (i = 0; i < req->iovcnt; i++) {
1399 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1400 		copy_offset += req->iov[i].iov_len;
1401 		ttl_len += req->iov[i].iov_len;
1402 	}
1403 
1404 	remainder = vol->params.chunk_size - ttl_len;
1405 	if (remainder) {
1406 		/* copy_offset already points to padding buffer if zero_paddings=false */
1407 		if (zero_paddings) {
1408 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1409 		}
1410 		ttl_len += remainder;
1411 	}
1412 
1413 	assert(ttl_len == req->vol->params.chunk_size);
1414 }
1415 
1416 /* This function can be called when we are compressing a new data or in case of read-modify-write
1417  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1418  * should point to already read and decompressed buffer */
1419 static inline void
1420 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1421 {
1422 	struct spdk_reduce_vol *vol = req->vol;
1423 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1424 	uint64_t chunk_offset, ttl_len = 0;
1425 	uint64_t remainder = 0;
1426 	uint32_t lbsize = vol->params.logical_block_size;
1427 	size_t iov_len;
1428 	int i;
1429 
1430 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1431 	 * if at least one of the conditions below is true:
1432 	 * 1. User's buffer is fragmented
1433 	 * 2. Length of the user's buffer is less than the chunk
1434 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1435 	iov_len = req->iov[0].iov_len;
1436 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1437 					  req->iov[0].iov_len < vol->params.chunk_size ||
1438 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1439 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1440 		return;
1441 	}
1442 
1443 	req->decomp_iovcnt = 0;
1444 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1445 
1446 	if (chunk_offset != 0) {
1447 		ttl_len += chunk_offset * lbsize;
1448 		req->decomp_iov[0].iov_base = padding_buffer;
1449 		req->decomp_iov[0].iov_len = ttl_len;
1450 		req->decomp_iovcnt = 1;
1451 	}
1452 
1453 	/* now the user data iov, direct from the user buffer */
1454 	for (i = 0; i < req->iovcnt; i++) {
1455 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1456 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1457 		ttl_len += req->iov[i].iov_len;
1458 	}
1459 	req->decomp_iovcnt += req->iovcnt;
1460 
1461 	remainder = vol->params.chunk_size - ttl_len;
1462 	if (remainder) {
1463 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1464 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1465 		req->decomp_iovcnt++;
1466 		ttl_len += remainder;
1467 	}
1468 	assert(ttl_len == req->vol->params.chunk_size);
1469 }
1470 
1471 static void
1472 _write_decompress_done(void *_req, int reduce_errno)
1473 {
1474 	struct spdk_reduce_vol_request *req = _req;
1475 
1476 	/* Negative reduce_errno indicates failure for compression operations. */
1477 	if (reduce_errno < 0) {
1478 		_reduce_vol_complete_req(req, reduce_errno);
1479 		return;
1480 	}
1481 
1482 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1483 	 * represents the output_size.
1484 	 */
1485 	if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
1486 		_reduce_vol_complete_req(req, -EIO);
1487 		return;
1488 	}
1489 
1490 	_prepare_compress_chunk(req, false);
1491 	_reduce_vol_compress_chunk(req, _write_compress_done);
1492 }
1493 
1494 static void
1495 _write_read_done(void *_req, int reduce_errno)
1496 {
1497 	struct spdk_reduce_vol_request *req = _req;
1498 
1499 	if (reduce_errno != 0) {
1500 		req->reduce_errno = reduce_errno;
1501 	}
1502 
1503 	assert(req->num_backing_ops > 0);
1504 	if (--req->num_backing_ops > 0) {
1505 		return;
1506 	}
1507 
1508 	if (req->reduce_errno != 0) {
1509 		_reduce_vol_complete_req(req, req->reduce_errno);
1510 		return;
1511 	}
1512 
1513 	if (req->chunk_is_compressed) {
1514 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1515 	} else {
1516 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1517 
1518 		_write_decompress_done(req, 0);
1519 	}
1520 }
1521 
1522 static void
1523 _read_decompress_done(void *_req, int reduce_errno)
1524 {
1525 	struct spdk_reduce_vol_request *req = _req;
1526 	struct spdk_reduce_vol *vol = req->vol;
1527 
1528 	/* Negative reduce_errno indicates failure for compression operations. */
1529 	if (reduce_errno < 0) {
1530 		_reduce_vol_complete_req(req, reduce_errno);
1531 		return;
1532 	}
1533 
1534 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1535 	 * represents the output_size.
1536 	 */
1537 	if (req->backing_cb_args.output_size != vol->params.chunk_size) {
1538 		_reduce_vol_complete_req(req, -EIO);
1539 		return;
1540 	}
1541 
1542 	if (req->copy_after_decompress) {
1543 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1544 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1545 		int i;
1546 
1547 		for (i = 0; i < req->iovcnt; i++) {
1548 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1549 			decomp_buffer += req->iov[i].iov_len;
1550 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1551 		}
1552 	}
1553 
1554 	_reduce_vol_complete_req(req, 0);
1555 }
1556 
1557 static void
1558 _read_read_done(void *_req, int reduce_errno)
1559 {
1560 	struct spdk_reduce_vol_request *req = _req;
1561 	uint64_t chunk_offset;
1562 	uint8_t *buf;
1563 	int i;
1564 
1565 	if (reduce_errno != 0) {
1566 		req->reduce_errno = reduce_errno;
1567 	}
1568 
1569 	assert(req->num_backing_ops > 0);
1570 	if (--req->num_backing_ops > 0) {
1571 		return;
1572 	}
1573 
1574 	if (req->reduce_errno != 0) {
1575 		_reduce_vol_complete_req(req, req->reduce_errno);
1576 		return;
1577 	}
1578 
1579 	if (req->chunk_is_compressed) {
1580 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1581 	} else {
1582 
1583 		/* If the chunk was compressed, the data would have been sent to the
1584 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1585 		 */
1586 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1587 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1588 		for (i = 0; i < req->iovcnt; i++) {
1589 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1590 			buf += req->iov[i].iov_len;
1591 		}
1592 
1593 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1594 
1595 		_read_decompress_done(req, 0);
1596 	}
1597 }
1598 
1599 static void
1600 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1601 {
1602 	struct spdk_reduce_vol *vol = req->vol;
1603 
1604 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1605 	assert(req->chunk_map_index != UINT32_MAX);
1606 
1607 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1608 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1609 			    vol->params.backing_io_unit_size);
1610 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1611 
1612 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1613 }
1614 
1615 static bool
1616 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1617 		    uint64_t length)
1618 {
1619 	uint64_t size = 0;
1620 	int i;
1621 
1622 	if (iovcnt > REDUCE_MAX_IOVECS) {
1623 		return false;
1624 	}
1625 
1626 	for (i = 0; i < iovcnt; i++) {
1627 		size += iov[i].iov_len;
1628 	}
1629 
1630 	return size == (length * vol->params.logical_block_size);
1631 }
1632 
1633 static bool
1634 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1635 {
1636 	struct spdk_reduce_vol_request *req;
1637 
1638 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1639 		if (logical_map_index == req->logical_map_index) {
1640 			return true;
1641 		}
1642 	}
1643 
1644 	return false;
1645 }
1646 
1647 static void
1648 _start_readv_request(struct spdk_reduce_vol_request *req)
1649 {
1650 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1651 	_reduce_vol_read_chunk(req, _read_read_done);
1652 }
1653 
1654 void
1655 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1656 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1657 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1658 {
1659 	struct spdk_reduce_vol_request *req;
1660 	uint64_t logical_map_index;
1661 	bool overlapped;
1662 	int i;
1663 
1664 	if (length == 0) {
1665 		cb_fn(cb_arg, 0);
1666 		return;
1667 	}
1668 
1669 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1670 		cb_fn(cb_arg, -EINVAL);
1671 		return;
1672 	}
1673 
1674 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1675 		cb_fn(cb_arg, -EINVAL);
1676 		return;
1677 	}
1678 
1679 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1680 	overlapped = _check_overlap(vol, logical_map_index);
1681 
1682 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1683 		/*
1684 		 * This chunk hasn't been allocated.  So treat the data as all
1685 		 * zeroes for this chunk - do the memset and immediately complete
1686 		 * the operation.
1687 		 */
1688 		for (i = 0; i < iovcnt; i++) {
1689 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1690 		}
1691 		cb_fn(cb_arg, 0);
1692 		return;
1693 	}
1694 
1695 	req = TAILQ_FIRST(&vol->free_requests);
1696 	if (req == NULL) {
1697 		cb_fn(cb_arg, -ENOMEM);
1698 		return;
1699 	}
1700 
1701 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1702 	req->type = REDUCE_IO_READV;
1703 	req->vol = vol;
1704 	req->iov = iov;
1705 	req->iovcnt = iovcnt;
1706 	req->offset = offset;
1707 	req->logical_map_index = logical_map_index;
1708 	req->length = length;
1709 	req->copy_after_decompress = false;
1710 	req->cb_fn = cb_fn;
1711 	req->cb_arg = cb_arg;
1712 
1713 	if (!overlapped) {
1714 		_start_readv_request(req);
1715 	} else {
1716 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1717 	}
1718 }
1719 
1720 static void
1721 _start_writev_request(struct spdk_reduce_vol_request *req)
1722 {
1723 	struct spdk_reduce_vol *vol = req->vol;
1724 
1725 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1726 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1727 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1728 			/* Read old chunk, then overwrite with data from this write
1729 			 *  operation.
1730 			 */
1731 			req->rmw = true;
1732 			_reduce_vol_read_chunk(req, _write_read_done);
1733 			return;
1734 		}
1735 	}
1736 
1737 	req->rmw = false;
1738 
1739 	_prepare_compress_chunk(req, true);
1740 	_reduce_vol_compress_chunk(req, _write_compress_done);
1741 }
1742 
1743 void
1744 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1745 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1746 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1747 {
1748 	struct spdk_reduce_vol_request *req;
1749 	uint64_t logical_map_index;
1750 	bool overlapped;
1751 
1752 	if (length == 0) {
1753 		cb_fn(cb_arg, 0);
1754 		return;
1755 	}
1756 
1757 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1758 		cb_fn(cb_arg, -EINVAL);
1759 		return;
1760 	}
1761 
1762 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1763 		cb_fn(cb_arg, -EINVAL);
1764 		return;
1765 	}
1766 
1767 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1768 	overlapped = _check_overlap(vol, logical_map_index);
1769 
1770 	req = TAILQ_FIRST(&vol->free_requests);
1771 	if (req == NULL) {
1772 		cb_fn(cb_arg, -ENOMEM);
1773 		return;
1774 	}
1775 
1776 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1777 	req->type = REDUCE_IO_WRITEV;
1778 	req->vol = vol;
1779 	req->iov = iov;
1780 	req->iovcnt = iovcnt;
1781 	req->offset = offset;
1782 	req->logical_map_index = logical_map_index;
1783 	req->length = length;
1784 	req->copy_after_decompress = false;
1785 	req->cb_fn = cb_fn;
1786 	req->cb_arg = cb_arg;
1787 
1788 	if (!overlapped) {
1789 		_start_writev_request(req);
1790 	} else {
1791 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1792 	}
1793 }
1794 
1795 const struct spdk_reduce_vol_params *
1796 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1797 {
1798 	return &vol->params;
1799 }
1800 
1801 const char *
1802 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
1803 {
1804 	return vol->pm_file.path;
1805 }
1806 
1807 void
1808 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1809 {
1810 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1811 	uint32_t struct_size;
1812 	uint64_t chunk_map_size;
1813 
1814 	SPDK_NOTICELOG("vol info:\n");
1815 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1816 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1817 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1818 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1819 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1820 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1821 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1822 		       vol->params.vol_size / vol->params.chunk_size);
1823 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1824 			vol->params.backing_io_unit_size);
1825 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1826 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1827 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1828 
1829 	SPDK_NOTICELOG("pmem info:\n");
1830 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1831 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1832 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1833 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1834 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1835 			   vol->params.chunk_size);
1836 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1837 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1838 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1839 			 vol->params.backing_io_unit_size);
1840 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1841 }
1842 
1843 SPDK_LOG_REGISTER_COMPONENT(reduce)
1844