xref: /spdk/lib/reduce/reduce.c (revision 8d3f8fb818735d717730489685debac3c814d0ac)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "queue_internal.h"
10 
11 #include "spdk/reduce.h"
12 #include "spdk/env.h"
13 #include "spdk/string.h"
14 #include "spdk/bit_array.h"
15 #include "spdk/util.h"
16 #include "spdk/log.h"
17 #include "spdk/memory.h"
18 
19 #include "libpmem.h"
20 
21 /* Always round up the size of the PM region to the nearest cacheline. */
22 #define REDUCE_PM_SIZE_ALIGNMENT	64
23 
24 /* Offset into the backing device where the persistent memory file's path is stored. */
25 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
26 
27 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
28 
29 #define REDUCE_NUM_VOL_REQUESTS	256
30 
31 /* Structure written to offset 0 of both the pm file and the backing device. */
32 struct spdk_reduce_vol_superblock {
33 	uint8_t				signature[8];
34 	struct spdk_reduce_vol_params	params;
35 	uint8_t				reserved[4040];
36 };
37 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
38 
39 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
40 /* null terminator counts one */
41 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
42 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
43 
44 #define REDUCE_PATH_MAX 4096
45 
46 #define REDUCE_ZERO_BUF_SIZE 0x100000
47 
48 /**
49  * Describes a persistent memory file used to hold metadata associated with a
50  *  compressed volume.
51  */
52 struct spdk_reduce_pm_file {
53 	char			path[REDUCE_PATH_MAX];
54 	void			*pm_buf;
55 	int			pm_is_pmem;
56 	uint64_t		size;
57 };
58 
59 #define REDUCE_IO_READV		1
60 #define REDUCE_IO_WRITEV	2
61 
62 struct spdk_reduce_chunk_map {
63 	uint32_t		compressed_size;
64 	uint32_t		reserved;
65 	uint64_t		io_unit_index[0];
66 };
67 
68 struct spdk_reduce_vol_request {
69 	/**
70 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
71 	 *   1) source buffer for compression operations
72 	 *   2) destination buffer for decompression operations
73 	 *   3) data buffer when writing uncompressed chunk to disk
74 	 *   4) data buffer when reading uncompressed chunk from disk
75 	 */
76 	uint8_t					*decomp_buf;
77 	struct iovec				*decomp_buf_iov;
78 
79 	/**
80 	 * These are used to construct the iovecs that are sent to
81 	 *  the decomp engine, they point to a mix of the scratch buffer
82 	 *  and user buffer
83 	 */
84 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
85 	int					decomp_iovcnt;
86 
87 	/**
88 	 *  Scratch buffer used for compressed chunk.  This is used for:
89 	 *   1) destination buffer for compression operations
90 	 *   2) source buffer for decompression operations
91 	 *   3) data buffer when writing compressed chunk to disk
92 	 *   4) data buffer when reading compressed chunk from disk
93 	 */
94 	uint8_t					*comp_buf;
95 	struct iovec				*comp_buf_iov;
96 	struct iovec				*iov;
97 	bool					rmw;
98 	struct spdk_reduce_vol			*vol;
99 	int					type;
100 	int					reduce_errno;
101 	int					iovcnt;
102 	int					num_backing_ops;
103 	uint32_t				num_io_units;
104 	struct spdk_reduce_backing_io           *backing_io;
105 	bool					chunk_is_compressed;
106 	bool					copy_after_decompress;
107 	uint64_t				offset;
108 	uint64_t				logical_map_index;
109 	uint64_t				length;
110 	uint64_t				chunk_map_index;
111 	struct spdk_reduce_chunk_map		*chunk;
112 	spdk_reduce_vol_op_complete		cb_fn;
113 	void					*cb_arg;
114 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
115 	struct spdk_reduce_vol_cb_args		backing_cb_args;
116 };
117 
118 struct spdk_reduce_vol {
119 	struct spdk_reduce_vol_params		params;
120 	uint32_t				backing_io_units_per_chunk;
121 	uint32_t				backing_lba_per_io_unit;
122 	uint32_t				logical_blocks_per_chunk;
123 	struct spdk_reduce_pm_file		pm_file;
124 	struct spdk_reduce_backing_dev		*backing_dev;
125 	struct spdk_reduce_vol_superblock	*backing_super;
126 	struct spdk_reduce_vol_superblock	*pm_super;
127 	uint64_t				*pm_logical_map;
128 	uint64_t				*pm_chunk_maps;
129 
130 	struct spdk_bit_array			*allocated_chunk_maps;
131 	/* The starting position when looking for a block from allocated_chunk_maps */
132 	uint64_t				find_chunk_offset;
133 	/* Cache free chunks to speed up lookup of free chunk. */
134 	struct reduce_queue			free_chunks_queue;
135 	struct spdk_bit_array			*allocated_backing_io_units;
136 	/* The starting position when looking for a block from allocated_backing_io_units */
137 	uint64_t				find_block_offset;
138 	/* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */
139 	struct reduce_queue			free_backing_blocks_queue;
140 
141 	struct spdk_reduce_vol_request		*request_mem;
142 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
143 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
144 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
145 
146 	/* Single contiguous buffer used for all request buffers for this volume. */
147 	uint8_t					*buf_mem;
148 	struct iovec				*buf_iov_mem;
149 	/* Single contiguous buffer used for backing io buffers for this volume. */
150 	uint8_t					*buf_backing_io_mem;
151 };
152 
153 static void _start_readv_request(struct spdk_reduce_vol_request *req);
154 static void _start_writev_request(struct spdk_reduce_vol_request *req);
155 static uint8_t *g_zero_buf;
156 static int g_vol_count = 0;
157 
158 /*
159  * Allocate extra metadata chunks and corresponding backing io units to account for
160  *  outstanding IO in worst case scenario where logical map is completely allocated
161  *  and no data can be compressed.  We need extra chunks in this case to handle
162  *  in-flight writes since reduce never writes data in place.
163  */
164 #define REDUCE_NUM_EXTRA_CHUNKS 128
165 
166 static void
167 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
168 {
169 	if (vol->pm_file.pm_is_pmem) {
170 		pmem_persist(addr, len);
171 	} else {
172 		pmem_msync(addr, len);
173 	}
174 }
175 
176 static uint64_t
177 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
178 {
179 	uint64_t chunks_in_logical_map, logical_map_size;
180 
181 	chunks_in_logical_map = vol_size / chunk_size;
182 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
183 
184 	/* Round up to next cacheline. */
185 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
186 	       REDUCE_PM_SIZE_ALIGNMENT;
187 }
188 
189 static uint64_t
190 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
191 {
192 	uint64_t num_chunks;
193 
194 	num_chunks = vol_size / chunk_size;
195 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
196 
197 	return num_chunks;
198 }
199 
200 static inline uint32_t
201 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
202 {
203 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
204 }
205 
206 static uint64_t
207 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
208 {
209 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
210 
211 	num_chunks = _get_total_chunks(vol_size, chunk_size);
212 	io_units_per_chunk = chunk_size / backing_io_unit_size;
213 
214 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
215 
216 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
217 	       REDUCE_PM_SIZE_ALIGNMENT;
218 }
219 
220 static struct spdk_reduce_chunk_map *
221 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
222 {
223 	uintptr_t chunk_map_addr;
224 
225 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
226 
227 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
228 	chunk_map_addr += chunk_map_index *
229 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
230 
231 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
232 }
233 
234 static int
235 _validate_vol_params(struct spdk_reduce_vol_params *params)
236 {
237 	if (params->vol_size > 0) {
238 		/**
239 		 * User does not pass in the vol size - it gets calculated by libreduce from
240 		 *  values in this structure plus the size of the backing device.
241 		 */
242 		return -EINVAL;
243 	}
244 
245 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
246 	    params->logical_block_size == 0) {
247 		return -EINVAL;
248 	}
249 
250 	/* Chunk size must be an even multiple of the backing io unit size. */
251 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
252 		return -EINVAL;
253 	}
254 
255 	/* Chunk size must be an even multiple of the logical block size. */
256 	if ((params->chunk_size % params->logical_block_size) != 0) {
257 		return -1;
258 	}
259 
260 	return 0;
261 }
262 
263 static uint64_t
264 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
265 {
266 	uint64_t num_chunks;
267 
268 	num_chunks = backing_dev_size / chunk_size;
269 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
270 		return 0;
271 	}
272 
273 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
274 	return num_chunks * chunk_size;
275 }
276 
277 static uint64_t
278 _get_pm_file_size(struct spdk_reduce_vol_params *params)
279 {
280 	uint64_t total_pm_size;
281 
282 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
283 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
284 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
285 			 params->backing_io_unit_size);
286 	return total_pm_size;
287 }
288 
289 const struct spdk_uuid *
290 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
291 {
292 	return &vol->params.uuid;
293 }
294 
295 static void
296 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
297 {
298 	uint64_t logical_map_size;
299 
300 	/* Superblock is at the beginning of the pm file. */
301 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
302 
303 	/* Logical map immediately follows the super block. */
304 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
305 
306 	/* Chunks maps follow the logical map. */
307 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
308 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
309 }
310 
311 /* We need 2 iovs during load - one for the superblock, another for the path */
312 #define LOAD_IOV_COUNT	2
313 
314 struct reduce_init_load_ctx {
315 	struct spdk_reduce_vol			*vol;
316 	struct spdk_reduce_vol_cb_args		backing_cb_args;
317 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
318 	void					*cb_arg;
319 	struct iovec				iov[LOAD_IOV_COUNT];
320 	void					*path;
321 	struct spdk_reduce_backing_io           *backing_io;
322 };
323 
324 static inline bool
325 _addr_crosses_huge_page(const void *addr, size_t *size)
326 {
327 	size_t _size;
328 	uint64_t rc;
329 
330 	assert(size);
331 
332 	_size = *size;
333 	rc = spdk_vtophys(addr, size);
334 
335 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
336 }
337 
338 static inline int
339 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
340 {
341 	uint8_t *addr;
342 	size_t size_tmp = buffer_size;
343 
344 	addr = *_addr;
345 
346 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
347 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
348 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
349 		 * Skip remaining bytes and continue from the beginning of the next page */
350 		addr += size_tmp;
351 	}
352 
353 	if (addr + buffer_size > addr_range) {
354 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
355 		return -ERANGE;
356 	}
357 
358 	*vol_buffer = addr;
359 	*_addr = addr + buffer_size;
360 
361 	return 0;
362 }
363 
364 static int
365 _allocate_vol_requests(struct spdk_reduce_vol *vol)
366 {
367 	struct spdk_reduce_vol_request *req;
368 	struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev;
369 	uint32_t reqs_in_2mb_page, huge_pages_needed;
370 	uint8_t *buffer, *buffer_end;
371 	int i = 0;
372 	int rc = 0;
373 
374 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
375 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
376 	* necessarily power of 2
377 	* Allocate 2x since we need buffers for both read/write and compress/decompress
378 	* intermediate buffers. */
379 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
380 	if (!reqs_in_2mb_page) {
381 		return -EINVAL;
382 	}
383 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
384 
385 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
386 	if (vol->buf_mem == NULL) {
387 		return -ENOMEM;
388 	}
389 
390 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
391 	if (vol->request_mem == NULL) {
392 		spdk_free(vol->buf_mem);
393 		vol->buf_mem = NULL;
394 		return -ENOMEM;
395 	}
396 
397 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
398 	 *  buffers.
399 	 */
400 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
401 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
402 	if (vol->buf_iov_mem == NULL) {
403 		free(vol->request_mem);
404 		spdk_free(vol->buf_mem);
405 		vol->request_mem = NULL;
406 		vol->buf_mem = NULL;
407 		return -ENOMEM;
408 	}
409 
410 	vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) +
411 					 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk);
412 	if (vol->buf_backing_io_mem == NULL) {
413 		free(vol->request_mem);
414 		free(vol->buf_iov_mem);
415 		spdk_free(vol->buf_mem);
416 		vol->request_mem = NULL;
417 		vol->buf_iov_mem = NULL;
418 		vol->buf_mem = NULL;
419 		return -ENOMEM;
420 	}
421 
422 	buffer = vol->buf_mem;
423 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
424 
425 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
426 		req = &vol->request_mem[i];
427 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
428 		req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i *
429 				  (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) *
430 				  vol->backing_io_units_per_chunk);
431 
432 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
433 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
434 
435 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
436 		if (rc) {
437 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
438 				    vol->buf_mem, buffer_end);
439 			break;
440 		}
441 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
442 		if (rc) {
443 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
444 				    vol->buf_mem, buffer_end);
445 			break;
446 		}
447 	}
448 
449 	if (rc) {
450 		free(vol->buf_backing_io_mem);
451 		free(vol->buf_iov_mem);
452 		free(vol->request_mem);
453 		spdk_free(vol->buf_mem);
454 		vol->buf_mem = NULL;
455 		vol->buf_backing_io_mem = NULL;
456 		vol->buf_iov_mem = NULL;
457 		vol->request_mem = NULL;
458 	}
459 
460 	return rc;
461 }
462 
463 static void
464 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
465 {
466 	if (ctx != NULL) {
467 		spdk_free(ctx->path);
468 		free(ctx->backing_io);
469 		free(ctx);
470 	}
471 
472 	if (vol != NULL) {
473 		if (vol->pm_file.pm_buf != NULL) {
474 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
475 		}
476 
477 		spdk_free(vol->backing_super);
478 		spdk_bit_array_free(&vol->allocated_chunk_maps);
479 		spdk_bit_array_free(&vol->allocated_backing_io_units);
480 		free(vol->request_mem);
481 		free(vol->buf_backing_io_mem);
482 		free(vol->buf_iov_mem);
483 		spdk_free(vol->buf_mem);
484 		free(vol);
485 	}
486 }
487 
488 static int
489 _alloc_zero_buff(void)
490 {
491 	int rc = 0;
492 
493 	/* The zero buffer is shared between all volumes and just used
494 	 * for reads so allocate one global instance here if not already
495 	 * allocated when another vol init'd or loaded.
496 	 */
497 	if (g_vol_count++ == 0) {
498 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
499 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
500 					  SPDK_MALLOC_DMA);
501 		if (g_zero_buf == NULL) {
502 			g_vol_count--;
503 			rc = -ENOMEM;
504 		}
505 	}
506 	return rc;
507 }
508 
509 static void
510 _init_write_super_cpl(void *cb_arg, int reduce_errno)
511 {
512 	struct reduce_init_load_ctx *init_ctx = cb_arg;
513 	int rc;
514 
515 	rc = _allocate_vol_requests(init_ctx->vol);
516 	if (rc != 0) {
517 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
518 		_init_load_cleanup(init_ctx->vol, init_ctx);
519 		return;
520 	}
521 
522 	rc = _alloc_zero_buff();
523 	if (rc != 0) {
524 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
525 		_init_load_cleanup(init_ctx->vol, init_ctx);
526 		return;
527 	}
528 
529 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
530 	/* Only clean up the ctx - the vol has been passed to the application
531 	 *  for use now that initialization was successful.
532 	 */
533 	_init_load_cleanup(NULL, init_ctx);
534 }
535 
536 static void
537 _init_write_path_cpl(void *cb_arg, int reduce_errno)
538 {
539 	struct reduce_init_load_ctx *init_ctx = cb_arg;
540 	struct spdk_reduce_vol *vol = init_ctx->vol;
541 	struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io;
542 
543 	init_ctx->iov[0].iov_base = vol->backing_super;
544 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
545 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
546 	init_ctx->backing_cb_args.cb_arg = init_ctx;
547 
548 	backing_io->dev = vol->backing_dev;
549 	backing_io->iov = init_ctx->iov;
550 	backing_io->iovcnt = 1;
551 	backing_io->lba = 0;
552 	backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen;
553 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
554 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
555 
556 	vol->backing_dev->submit_backing_io(backing_io);
557 }
558 
559 static int
560 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
561 {
562 	uint64_t total_chunks, total_backing_io_units;
563 	uint32_t i, num_metadata_io_units;
564 
565 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
566 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
567 	vol->find_chunk_offset = 0;
568 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
569 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
570 	vol->find_block_offset = 0;
571 
572 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
573 		return -ENOMEM;
574 	}
575 
576 	/* Set backing io unit bits associated with metadata. */
577 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
578 				vol->params.backing_io_unit_size;
579 	for (i = 0; i < num_metadata_io_units; i++) {
580 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
581 	}
582 
583 	return 0;
584 }
585 
586 void
587 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
588 		     struct spdk_reduce_backing_dev *backing_dev,
589 		     const char *pm_file_dir,
590 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
591 {
592 	struct spdk_reduce_vol *vol;
593 	struct reduce_init_load_ctx *init_ctx;
594 	struct spdk_reduce_backing_io *backing_io;
595 	uint64_t backing_dev_size;
596 	size_t mapped_len;
597 	int dir_len, max_dir_len, rc;
598 
599 	/* We need to append a path separator and the UUID to the supplied
600 	 * path.
601 	 */
602 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
603 	dir_len = strnlen(pm_file_dir, max_dir_len);
604 	/* Strip trailing slash if the user provided one - we will add it back
605 	 * later when appending the filename.
606 	 */
607 	if (pm_file_dir[dir_len - 1] == '/') {
608 		dir_len--;
609 	}
610 	if (dir_len == max_dir_len) {
611 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
612 		cb_fn(cb_arg, NULL, -EINVAL);
613 		return;
614 	}
615 
616 	rc = _validate_vol_params(params);
617 	if (rc != 0) {
618 		SPDK_ERRLOG("invalid vol params\n");
619 		cb_fn(cb_arg, NULL, rc);
620 		return;
621 	}
622 
623 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
624 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
625 	if (params->vol_size == 0) {
626 		SPDK_ERRLOG("backing device is too small\n");
627 		cb_fn(cb_arg, NULL, -EINVAL);
628 		return;
629 	}
630 
631 	if (backing_dev->submit_backing_io == NULL) {
632 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
633 		cb_fn(cb_arg, NULL, -EINVAL);
634 		return;
635 	}
636 
637 	vol = calloc(1, sizeof(*vol));
638 	if (vol == NULL) {
639 		cb_fn(cb_arg, NULL, -ENOMEM);
640 		return;
641 	}
642 
643 	TAILQ_INIT(&vol->free_requests);
644 	TAILQ_INIT(&vol->executing_requests);
645 	TAILQ_INIT(&vol->queued_requests);
646 	queue_init(&vol->free_chunks_queue);
647 	queue_init(&vol->free_backing_blocks_queue);
648 
649 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
650 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
651 	if (vol->backing_super == NULL) {
652 		cb_fn(cb_arg, NULL, -ENOMEM);
653 		_init_load_cleanup(vol, NULL);
654 		return;
655 	}
656 
657 	init_ctx = calloc(1, sizeof(*init_ctx));
658 	if (init_ctx == NULL) {
659 		cb_fn(cb_arg, NULL, -ENOMEM);
660 		_init_load_cleanup(vol, NULL);
661 		return;
662 	}
663 
664 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
665 	if (backing_io == NULL) {
666 		cb_fn(cb_arg, NULL, -ENOMEM);
667 		_init_load_cleanup(vol, init_ctx);
668 		return;
669 	}
670 	init_ctx->backing_io = backing_io;
671 
672 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
673 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
674 	if (init_ctx->path == NULL) {
675 		cb_fn(cb_arg, NULL, -ENOMEM);
676 		_init_load_cleanup(vol, init_ctx);
677 		return;
678 	}
679 
680 	if (spdk_uuid_is_null(&params->uuid)) {
681 		spdk_uuid_generate(&params->uuid);
682 	}
683 
684 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
685 	vol->pm_file.path[dir_len] = '/';
686 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
687 			    &params->uuid);
688 	vol->pm_file.size = _get_pm_file_size(params);
689 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
690 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
691 					    &mapped_len, &vol->pm_file.pm_is_pmem);
692 	if (vol->pm_file.pm_buf == NULL) {
693 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
694 			    vol->pm_file.path, strerror(errno));
695 		cb_fn(cb_arg, NULL, -errno);
696 		_init_load_cleanup(vol, init_ctx);
697 		return;
698 	}
699 
700 	if (vol->pm_file.size != mapped_len) {
701 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
702 			    vol->pm_file.size, mapped_len);
703 		cb_fn(cb_arg, NULL, -ENOMEM);
704 		_init_load_cleanup(vol, init_ctx);
705 		return;
706 	}
707 
708 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
709 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
710 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
711 	memcpy(&vol->params, params, sizeof(*params));
712 
713 	vol->backing_dev = backing_dev;
714 
715 	rc = _allocate_bit_arrays(vol);
716 	if (rc != 0) {
717 		cb_fn(cb_arg, NULL, rc);
718 		_init_load_cleanup(vol, init_ctx);
719 		return;
720 	}
721 
722 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
723 	       sizeof(vol->backing_super->signature));
724 	memcpy(&vol->backing_super->params, params, sizeof(*params));
725 
726 	_initialize_vol_pm_pointers(vol);
727 
728 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
729 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
730 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
731 	 */
732 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
733 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
734 
735 	init_ctx->vol = vol;
736 	init_ctx->cb_fn = cb_fn;
737 	init_ctx->cb_arg = cb_arg;
738 
739 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
740 	init_ctx->iov[0].iov_base = init_ctx->path;
741 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
742 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
743 	init_ctx->backing_cb_args.cb_arg = init_ctx;
744 	/* Write path to offset 4K on backing device - just after where the super
745 	 *  block will be written.  We wait until this is committed before writing the
746 	 *  super block to guarantee we don't get the super block written without the
747 	 *  the path if the system crashed in the middle of a write operation.
748 	 */
749 	backing_io->dev = vol->backing_dev;
750 	backing_io->iov = init_ctx->iov;
751 	backing_io->iovcnt = 1;
752 	backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen;
753 	backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen;
754 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
755 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
756 
757 	vol->backing_dev->submit_backing_io(backing_io);
758 }
759 
760 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
761 
762 static void
763 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
764 {
765 	struct reduce_init_load_ctx *load_ctx = cb_arg;
766 	struct spdk_reduce_vol *vol = load_ctx->vol;
767 	uint64_t backing_dev_size;
768 	uint64_t i, num_chunks, logical_map_index;
769 	struct spdk_reduce_chunk_map *chunk;
770 	size_t mapped_len;
771 	uint32_t j;
772 	int rc;
773 
774 	rc = _alloc_zero_buff();
775 	if (rc) {
776 		goto error;
777 	}
778 
779 	if (memcmp(vol->backing_super->signature,
780 		   SPDK_REDUCE_SIGNATURE,
781 		   sizeof(vol->backing_super->signature)) != 0) {
782 		/* This backing device isn't a libreduce backing device. */
783 		rc = -EILSEQ;
784 		goto error;
785 	}
786 
787 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
788 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
789 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
790 	 *  persistent memory file if it exists.
791 	 */
792 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
793 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
794 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
795 		_init_load_cleanup(NULL, load_ctx);
796 		return;
797 	}
798 
799 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
800 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
801 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
802 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
803 
804 	rc = _allocate_bit_arrays(vol);
805 	if (rc != 0) {
806 		goto error;
807 	}
808 
809 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
810 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
811 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
812 			    backing_dev_size);
813 		rc = -EILSEQ;
814 		goto error;
815 	}
816 
817 	vol->pm_file.size = _get_pm_file_size(&vol->params);
818 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
819 					    &vol->pm_file.pm_is_pmem);
820 	if (vol->pm_file.pm_buf == NULL) {
821 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
822 		rc = -errno;
823 		goto error;
824 	}
825 
826 	if (vol->pm_file.size != mapped_len) {
827 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
828 			    vol->pm_file.size, mapped_len);
829 		rc = -ENOMEM;
830 		goto error;
831 	}
832 
833 	rc = _allocate_vol_requests(vol);
834 	if (rc != 0) {
835 		goto error;
836 	}
837 
838 	_initialize_vol_pm_pointers(vol);
839 
840 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
841 	for (i = 0; i < num_chunks; i++) {
842 		logical_map_index = vol->pm_logical_map[i];
843 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
844 			continue;
845 		}
846 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
847 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
848 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
849 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
850 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
851 			}
852 		}
853 	}
854 
855 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
856 	/* Only clean up the ctx - the vol has been passed to the application
857 	 *  for use now that volume load was successful.
858 	 */
859 	_init_load_cleanup(NULL, load_ctx);
860 	return;
861 
862 error:
863 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
864 	_init_load_cleanup(vol, load_ctx);
865 }
866 
867 void
868 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
869 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
870 {
871 	struct spdk_reduce_vol *vol;
872 	struct reduce_init_load_ctx *load_ctx;
873 	struct spdk_reduce_backing_io *backing_io;
874 
875 	if (backing_dev->submit_backing_io == NULL) {
876 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
877 		cb_fn(cb_arg, NULL, -EINVAL);
878 		return;
879 	}
880 
881 	vol = calloc(1, sizeof(*vol));
882 	if (vol == NULL) {
883 		cb_fn(cb_arg, NULL, -ENOMEM);
884 		return;
885 	}
886 
887 	TAILQ_INIT(&vol->free_requests);
888 	TAILQ_INIT(&vol->executing_requests);
889 	TAILQ_INIT(&vol->queued_requests);
890 	queue_init(&vol->free_chunks_queue);
891 	queue_init(&vol->free_backing_blocks_queue);
892 
893 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
894 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
895 	if (vol->backing_super == NULL) {
896 		_init_load_cleanup(vol, NULL);
897 		cb_fn(cb_arg, NULL, -ENOMEM);
898 		return;
899 	}
900 
901 	vol->backing_dev = backing_dev;
902 
903 	load_ctx = calloc(1, sizeof(*load_ctx));
904 	if (load_ctx == NULL) {
905 		_init_load_cleanup(vol, NULL);
906 		cb_fn(cb_arg, NULL, -ENOMEM);
907 		return;
908 	}
909 
910 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
911 	if (backing_io == NULL) {
912 		_init_load_cleanup(vol, load_ctx);
913 		cb_fn(cb_arg, NULL, -ENOMEM);
914 		return;
915 	}
916 
917 	load_ctx->backing_io = backing_io;
918 
919 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
920 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
921 	if (load_ctx->path == NULL) {
922 		_init_load_cleanup(vol, load_ctx);
923 		cb_fn(cb_arg, NULL, -ENOMEM);
924 		return;
925 	}
926 
927 	load_ctx->vol = vol;
928 	load_ctx->cb_fn = cb_fn;
929 	load_ctx->cb_arg = cb_arg;
930 
931 	load_ctx->iov[0].iov_base = vol->backing_super;
932 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
933 	load_ctx->iov[1].iov_base = load_ctx->path;
934 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
935 	backing_io->dev = vol->backing_dev;
936 	backing_io->iov = load_ctx->iov;
937 	backing_io->iovcnt = LOAD_IOV_COUNT;
938 	backing_io->lba = 0;
939 	backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
940 				vol->backing_dev->blocklen;
941 	backing_io->backing_cb_args = &load_ctx->backing_cb_args;
942 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
943 
944 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
945 	load_ctx->backing_cb_args.cb_arg = load_ctx;
946 	vol->backing_dev->submit_backing_io(backing_io);
947 }
948 
949 void
950 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
951 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
952 {
953 	if (vol == NULL) {
954 		/* This indicates a programming error. */
955 		assert(false);
956 		cb_fn(cb_arg, -EINVAL);
957 		return;
958 	}
959 
960 	if (--g_vol_count == 0) {
961 		spdk_free(g_zero_buf);
962 	}
963 	assert(g_vol_count >= 0);
964 	_init_load_cleanup(vol, NULL);
965 	cb_fn(cb_arg, 0);
966 }
967 
968 struct reduce_destroy_ctx {
969 	spdk_reduce_vol_op_complete		cb_fn;
970 	void					*cb_arg;
971 	struct spdk_reduce_vol			*vol;
972 	struct spdk_reduce_vol_superblock	*super;
973 	struct iovec				iov;
974 	struct spdk_reduce_vol_cb_args		backing_cb_args;
975 	int					reduce_errno;
976 	char					pm_path[REDUCE_PATH_MAX];
977 	struct spdk_reduce_backing_io           *backing_io;
978 };
979 
980 static void
981 destroy_unload_cpl(void *cb_arg, int reduce_errno)
982 {
983 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
984 
985 	if (destroy_ctx->reduce_errno == 0) {
986 		if (unlink(destroy_ctx->pm_path)) {
987 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
988 				    destroy_ctx->pm_path, strerror(errno));
989 		}
990 	}
991 
992 	/* Even if the unload somehow failed, we still pass the destroy_ctx
993 	 * reduce_errno since that indicates whether or not the volume was
994 	 * actually destroyed.
995 	 */
996 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
997 	spdk_free(destroy_ctx->super);
998 	free(destroy_ctx->backing_io);
999 	free(destroy_ctx);
1000 }
1001 
1002 static void
1003 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
1004 {
1005 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1006 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
1007 
1008 	destroy_ctx->reduce_errno = reduce_errno;
1009 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
1010 }
1011 
1012 static void
1013 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1014 {
1015 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1016 	struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io;
1017 
1018 	if (reduce_errno != 0) {
1019 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
1020 		spdk_free(destroy_ctx->super);
1021 		free(destroy_ctx);
1022 		return;
1023 	}
1024 
1025 	destroy_ctx->vol = vol;
1026 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
1027 	destroy_ctx->iov.iov_base = destroy_ctx->super;
1028 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
1029 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
1030 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
1031 
1032 	backing_io->dev = vol->backing_dev;
1033 	backing_io->iov = &destroy_ctx->iov;
1034 	backing_io->iovcnt = 1;
1035 	backing_io->lba = 0;
1036 	backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen;
1037 	backing_io->backing_cb_args = &destroy_ctx->backing_cb_args;
1038 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1039 
1040 	vol->backing_dev->submit_backing_io(backing_io);
1041 }
1042 
1043 void
1044 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
1045 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1046 {
1047 	struct reduce_destroy_ctx *destroy_ctx;
1048 	struct spdk_reduce_backing_io *backing_io;
1049 
1050 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
1051 	if (destroy_ctx == NULL) {
1052 		cb_fn(cb_arg, -ENOMEM);
1053 		return;
1054 	}
1055 
1056 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
1057 	if (backing_io == NULL) {
1058 		free(destroy_ctx);
1059 		cb_fn(cb_arg, -ENOMEM);
1060 		return;
1061 	}
1062 
1063 	destroy_ctx->backing_io = backing_io;
1064 
1065 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
1066 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1067 	if (destroy_ctx->super == NULL) {
1068 		free(destroy_ctx);
1069 		free(backing_io);
1070 		cb_fn(cb_arg, -ENOMEM);
1071 		return;
1072 	}
1073 	destroy_ctx->cb_fn = cb_fn;
1074 	destroy_ctx->cb_arg = cb_arg;
1075 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
1076 }
1077 
1078 static bool
1079 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
1080 {
1081 	uint64_t start_chunk, end_chunk;
1082 
1083 	start_chunk = offset / vol->logical_blocks_per_chunk;
1084 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
1085 
1086 	return (start_chunk != end_chunk);
1087 }
1088 
1089 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
1090 
1091 static void
1092 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
1093 {
1094 	struct spdk_reduce_vol_request *next_req;
1095 	struct spdk_reduce_vol *vol = req->vol;
1096 
1097 	req->cb_fn(req->cb_arg, reduce_errno);
1098 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
1099 
1100 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1101 		if (next_req->logical_map_index == req->logical_map_index) {
1102 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1103 			if (next_req->type == REDUCE_IO_READV) {
1104 				_start_readv_request(next_req);
1105 			} else {
1106 				assert(next_req->type == REDUCE_IO_WRITEV);
1107 				_start_writev_request(next_req);
1108 			}
1109 			break;
1110 		}
1111 	}
1112 
1113 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1114 }
1115 
1116 static void
1117 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
1118 {
1119 	struct spdk_reduce_chunk_map *chunk;
1120 	uint64_t index;
1121 	bool success;
1122 	uint32_t i;
1123 
1124 	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
1125 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1126 		index = chunk->io_unit_index[i];
1127 		if (index == REDUCE_EMPTY_MAP_ENTRY) {
1128 			break;
1129 		}
1130 		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
1131 					  index) == true);
1132 		spdk_bit_array_clear(vol->allocated_backing_io_units, index);
1133 		success = queue_enqueue(&vol->free_backing_blocks_queue, index);
1134 		if (!success && index < vol->find_block_offset) {
1135 			vol->find_block_offset = index;
1136 		}
1137 		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1138 	}
1139 	success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index);
1140 	if (!success && chunk_map_index < vol->find_chunk_offset) {
1141 		vol->find_chunk_offset = chunk_map_index;
1142 	}
1143 	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
1144 }
1145 
1146 static void
1147 _write_write_done(void *_req, int reduce_errno)
1148 {
1149 	struct spdk_reduce_vol_request *req = _req;
1150 	struct spdk_reduce_vol *vol = req->vol;
1151 	uint64_t old_chunk_map_index;
1152 
1153 	if (reduce_errno != 0) {
1154 		req->reduce_errno = reduce_errno;
1155 	}
1156 
1157 	assert(req->num_backing_ops > 0);
1158 	if (--req->num_backing_ops > 0) {
1159 		return;
1160 	}
1161 
1162 	if (req->reduce_errno != 0) {
1163 		_reduce_vol_reset_chunk(vol, req->chunk_map_index);
1164 		_reduce_vol_complete_req(req, req->reduce_errno);
1165 		return;
1166 	}
1167 
1168 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1169 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1170 		_reduce_vol_reset_chunk(vol, old_chunk_map_index);
1171 	}
1172 
1173 	/*
1174 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1175 	 * becomes invalid after we update the logical map, since the old chunk map will no
1176 	 * longer have a reference to it in the logical map.
1177 	 */
1178 
1179 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1180 	_reduce_persist(vol, req->chunk,
1181 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1182 
1183 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1184 
1185 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1186 
1187 	_reduce_vol_complete_req(req, 0);
1188 }
1189 
1190 static struct spdk_reduce_backing_io *
1191 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index)
1192 {
1193 	struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev;
1194 	struct spdk_reduce_backing_io *backing_io;
1195 
1196 	backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io +
1197 			(sizeof(*backing_io) + backing_dev->user_ctx_size) * index);
1198 
1199 	return backing_io;
1200 
1201 }
1202 
1203 struct reduce_merged_io_desc {
1204 	uint64_t io_unit_index;
1205 	uint32_t num_io_units;
1206 };
1207 
1208 static void
1209 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1210 				 reduce_request_fn next_fn, bool is_write)
1211 {
1212 	struct iovec *iov;
1213 	struct spdk_reduce_backing_io *backing_io;
1214 	uint8_t *buf;
1215 	uint32_t i;
1216 
1217 	if (req->chunk_is_compressed) {
1218 		iov = req->comp_buf_iov;
1219 		buf = req->comp_buf;
1220 	} else {
1221 		iov = req->decomp_buf_iov;
1222 		buf = req->decomp_buf;
1223 	}
1224 
1225 	req->num_backing_ops = req->num_io_units;
1226 	req->backing_cb_args.cb_fn = next_fn;
1227 	req->backing_cb_args.cb_arg = req;
1228 	for (i = 0; i < req->num_io_units; i++) {
1229 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1230 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1231 		iov[i].iov_len = vol->params.backing_io_unit_size;
1232 		backing_io->dev  = vol->backing_dev;
1233 		backing_io->iov = &iov[i];
1234 		backing_io->iovcnt = 1;
1235 		backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit;
1236 		backing_io->lba_count = vol->backing_lba_per_io_unit;
1237 		backing_io->backing_cb_args = &req->backing_cb_args;
1238 		if (is_write) {
1239 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1240 		} else {
1241 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1242 		}
1243 		vol->backing_dev->submit_backing_io(backing_io);
1244 	}
1245 }
1246 
1247 static void
1248 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1249 		   reduce_request_fn next_fn, bool is_write)
1250 {
1251 	struct iovec *iov;
1252 	struct spdk_reduce_backing_io *backing_io;
1253 	struct reduce_merged_io_desc merged_io_desc[4];
1254 	uint8_t *buf;
1255 	bool merge = false;
1256 	uint32_t num_io = 0;
1257 	uint32_t io_unit_counts = 0;
1258 	uint32_t merged_io_idx = 0;
1259 	uint32_t i;
1260 
1261 	/* The merged_io_desc value is defined here to contain four elements,
1262 	 * and the chunk size must be four times the maximum of the io unit.
1263 	 * if chunk size is too big, don't merge IO.
1264 	 */
1265 	if (vol->backing_io_units_per_chunk > 4) {
1266 		_issue_backing_ops_without_merge(req, vol, next_fn, is_write);
1267 		return;
1268 	}
1269 
1270 	if (req->chunk_is_compressed) {
1271 		iov = req->comp_buf_iov;
1272 		buf = req->comp_buf;
1273 	} else {
1274 		iov = req->decomp_buf_iov;
1275 		buf = req->decomp_buf;
1276 	}
1277 
1278 	for (i = 0; i < req->num_io_units; i++) {
1279 		if (!merge) {
1280 			merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
1281 			merged_io_desc[merged_io_idx].num_io_units = 1;
1282 			num_io++;
1283 		}
1284 
1285 		if (i + 1 == req->num_io_units) {
1286 			break;
1287 		}
1288 
1289 		if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
1290 			merged_io_desc[merged_io_idx].num_io_units += 1;
1291 			merge = true;
1292 			continue;
1293 		}
1294 		merge = false;
1295 		merged_io_idx++;
1296 	}
1297 
1298 	req->num_backing_ops = num_io;
1299 	req->backing_cb_args.cb_fn = next_fn;
1300 	req->backing_cb_args.cb_arg = req;
1301 	for (i = 0; i < num_io; i++) {
1302 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1303 		iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
1304 		iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
1305 		backing_io->dev  = vol->backing_dev;
1306 		backing_io->iov = &iov[i];
1307 		backing_io->iovcnt = 1;
1308 		backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit;
1309 		backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units;
1310 		backing_io->backing_cb_args = &req->backing_cb_args;
1311 		if (is_write) {
1312 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1313 		} else {
1314 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1315 		}
1316 		vol->backing_dev->submit_backing_io(backing_io);
1317 
1318 		/* Collects the number of processed I/O. */
1319 		io_unit_counts += merged_io_desc[i].num_io_units;
1320 	}
1321 }
1322 
1323 static void
1324 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1325 			uint32_t compressed_size)
1326 {
1327 	struct spdk_reduce_vol *vol = req->vol;
1328 	uint32_t i;
1329 	uint64_t chunk_offset, remainder, free_index, total_len = 0;
1330 	uint8_t *buf;
1331 	bool success;
1332 	int j;
1333 
1334 	success = queue_dequeue(&vol->free_chunks_queue, &free_index);
1335 	if (success) {
1336 		req->chunk_map_index = free_index;
1337 	} else {
1338 		req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps,
1339 				       vol->find_chunk_offset);
1340 		vol->find_chunk_offset = req->chunk_map_index + 1;
1341 	}
1342 
1343 	/* TODO: fail if no chunk map found - but really this should not happen if we
1344 	 * size the number of requests similarly to number of extra chunk maps
1345 	 */
1346 	assert(req->chunk_map_index != UINT32_MAX);
1347 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1348 
1349 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1350 	req->num_io_units = spdk_divide_round_up(compressed_size,
1351 			    vol->params.backing_io_unit_size);
1352 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1353 	req->chunk->compressed_size =
1354 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1355 
1356 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1357 	if (req->chunk_is_compressed == false) {
1358 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1359 		buf = req->decomp_buf;
1360 		total_len = chunk_offset * vol->params.logical_block_size;
1361 
1362 		/* zero any offset into chunk */
1363 		if (req->rmw == false && chunk_offset) {
1364 			memset(buf, 0, total_len);
1365 		}
1366 		buf += total_len;
1367 
1368 		/* copy the data */
1369 		for (j = 0; j < req->iovcnt; j++) {
1370 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1371 			buf += req->iov[j].iov_len;
1372 			total_len += req->iov[j].iov_len;
1373 		}
1374 
1375 		/* zero any remainder */
1376 		remainder = vol->params.chunk_size - total_len;
1377 		total_len += remainder;
1378 		if (req->rmw == false && remainder) {
1379 			memset(buf, 0, remainder);
1380 		}
1381 		assert(total_len == vol->params.chunk_size);
1382 	}
1383 
1384 	for (i = 0; i < req->num_io_units; i++) {
1385 		success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index);
1386 		if (success) {
1387 			req->chunk->io_unit_index[i] = free_index;
1388 		} else {
1389 			req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units,
1390 						       vol->find_block_offset);
1391 			vol->find_block_offset = req->chunk->io_unit_index[i] + 1;
1392 		}
1393 		/* TODO: fail if no backing block found - but really this should also not
1394 		 * happen (see comment above).
1395 		 */
1396 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1397 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1398 	}
1399 
1400 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1401 }
1402 
1403 static void
1404 _write_compress_done(void *_req, int reduce_errno)
1405 {
1406 	struct spdk_reduce_vol_request *req = _req;
1407 
1408 	/* Negative reduce_errno indicates failure for compression operations.
1409 	 * Just write the uncompressed data instead.  Force this to happen
1410 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1411 	 * When it sees the data couldn't be compressed, it will just write
1412 	 * the uncompressed buffer to disk.
1413 	 */
1414 	if (reduce_errno < 0) {
1415 		req->backing_cb_args.output_size = req->vol->params.chunk_size;
1416 	}
1417 
1418 	_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
1419 }
1420 
1421 static void
1422 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1423 {
1424 	struct spdk_reduce_vol *vol = req->vol;
1425 
1426 	req->backing_cb_args.cb_fn = next_fn;
1427 	req->backing_cb_args.cb_arg = req;
1428 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1429 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1430 	vol->backing_dev->compress(vol->backing_dev,
1431 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1432 				   &req->backing_cb_args);
1433 }
1434 
1435 static void
1436 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1437 {
1438 	struct spdk_reduce_vol *vol = req->vol;
1439 
1440 	req->backing_cb_args.cb_fn = next_fn;
1441 	req->backing_cb_args.cb_arg = req;
1442 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1443 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1444 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1445 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1446 	vol->backing_dev->decompress(vol->backing_dev,
1447 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1448 				     &req->backing_cb_args);
1449 }
1450 
1451 static void
1452 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1453 {
1454 	struct spdk_reduce_vol *vol = req->vol;
1455 	uint64_t chunk_offset, remainder = 0;
1456 	uint64_t ttl_len = 0;
1457 	size_t iov_len;
1458 	int i;
1459 
1460 	req->decomp_iovcnt = 0;
1461 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1462 
1463 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1464 	 * if at least one of the conditions below is true:
1465 	 * 1. User's buffer is fragmented
1466 	 * 2. Length of the user's buffer is less than the chunk
1467 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1468 	iov_len = req->iov[0].iov_len;
1469 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1470 				     req->iov[0].iov_len < vol->params.chunk_size ||
1471 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1472 	if (req->copy_after_decompress) {
1473 		req->decomp_iov[0].iov_base = req->decomp_buf;
1474 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1475 		req->decomp_iovcnt = 1;
1476 		goto decompress;
1477 	}
1478 
1479 	if (chunk_offset) {
1480 		/* first iov point to our scratch buffer for any offset into the chunk */
1481 		req->decomp_iov[0].iov_base = req->decomp_buf;
1482 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1483 		ttl_len += req->decomp_iov[0].iov_len;
1484 		req->decomp_iovcnt = 1;
1485 	}
1486 
1487 	/* now the user data iov, direct to the user buffer */
1488 	for (i = 0; i < req->iovcnt; i++) {
1489 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1490 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1491 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1492 	}
1493 	req->decomp_iovcnt += req->iovcnt;
1494 
1495 	/* send the rest of the chunk to our scratch buffer */
1496 	remainder = vol->params.chunk_size - ttl_len;
1497 	if (remainder) {
1498 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1499 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1500 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1501 		req->decomp_iovcnt++;
1502 	}
1503 	assert(ttl_len == vol->params.chunk_size);
1504 
1505 decompress:
1506 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1507 	req->backing_cb_args.cb_fn = next_fn;
1508 	req->backing_cb_args.cb_arg = req;
1509 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1510 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1511 	vol->backing_dev->decompress(vol->backing_dev,
1512 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1513 				     &req->backing_cb_args);
1514 }
1515 
1516 static inline void
1517 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1518 {
1519 	struct spdk_reduce_vol *vol = req->vol;
1520 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1521 	uint64_t chunk_offset, ttl_len = 0;
1522 	uint64_t remainder = 0;
1523 	char *copy_offset = NULL;
1524 	uint32_t lbsize = vol->params.logical_block_size;
1525 	int i;
1526 
1527 	req->decomp_iov[0].iov_base = req->decomp_buf;
1528 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1529 	req->decomp_iovcnt = 1;
1530 	copy_offset = req->decomp_iov[0].iov_base;
1531 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1532 
1533 	if (chunk_offset) {
1534 		ttl_len += chunk_offset * lbsize;
1535 		/* copy_offset already points to padding buffer if zero_paddings=false */
1536 		if (zero_paddings) {
1537 			memcpy(copy_offset, padding_buffer, ttl_len);
1538 		}
1539 		copy_offset += ttl_len;
1540 	}
1541 
1542 	/* now the user data iov, direct from the user buffer */
1543 	for (i = 0; i < req->iovcnt; i++) {
1544 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1545 		copy_offset += req->iov[i].iov_len;
1546 		ttl_len += req->iov[i].iov_len;
1547 	}
1548 
1549 	remainder = vol->params.chunk_size - ttl_len;
1550 	if (remainder) {
1551 		/* copy_offset already points to padding buffer if zero_paddings=false */
1552 		if (zero_paddings) {
1553 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1554 		}
1555 		ttl_len += remainder;
1556 	}
1557 
1558 	assert(ttl_len == req->vol->params.chunk_size);
1559 }
1560 
1561 /* This function can be called when we are compressing a new data or in case of read-modify-write
1562  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1563  * should point to already read and decompressed buffer */
1564 static inline void
1565 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1566 {
1567 	struct spdk_reduce_vol *vol = req->vol;
1568 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1569 	uint64_t chunk_offset, ttl_len = 0;
1570 	uint64_t remainder = 0;
1571 	uint32_t lbsize = vol->params.logical_block_size;
1572 	size_t iov_len;
1573 	int i;
1574 
1575 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1576 	 * if at least one of the conditions below is true:
1577 	 * 1. User's buffer is fragmented
1578 	 * 2. Length of the user's buffer is less than the chunk
1579 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1580 	iov_len = req->iov[0].iov_len;
1581 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1582 					  req->iov[0].iov_len < vol->params.chunk_size ||
1583 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1584 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1585 		return;
1586 	}
1587 
1588 	req->decomp_iovcnt = 0;
1589 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1590 
1591 	if (chunk_offset != 0) {
1592 		ttl_len += chunk_offset * lbsize;
1593 		req->decomp_iov[0].iov_base = padding_buffer;
1594 		req->decomp_iov[0].iov_len = ttl_len;
1595 		req->decomp_iovcnt = 1;
1596 	}
1597 
1598 	/* now the user data iov, direct from the user buffer */
1599 	for (i = 0; i < req->iovcnt; i++) {
1600 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1601 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1602 		ttl_len += req->iov[i].iov_len;
1603 	}
1604 	req->decomp_iovcnt += req->iovcnt;
1605 
1606 	remainder = vol->params.chunk_size - ttl_len;
1607 	if (remainder) {
1608 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1609 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1610 		req->decomp_iovcnt++;
1611 		ttl_len += remainder;
1612 	}
1613 	assert(ttl_len == req->vol->params.chunk_size);
1614 }
1615 
1616 static void
1617 _write_decompress_done(void *_req, int reduce_errno)
1618 {
1619 	struct spdk_reduce_vol_request *req = _req;
1620 
1621 	/* Negative reduce_errno indicates failure for compression operations. */
1622 	if (reduce_errno < 0) {
1623 		_reduce_vol_complete_req(req, reduce_errno);
1624 		return;
1625 	}
1626 
1627 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1628 	 * represents the output_size.
1629 	 */
1630 	if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
1631 		_reduce_vol_complete_req(req, -EIO);
1632 		return;
1633 	}
1634 
1635 	_prepare_compress_chunk(req, false);
1636 	_reduce_vol_compress_chunk(req, _write_compress_done);
1637 }
1638 
1639 static void
1640 _write_read_done(void *_req, int reduce_errno)
1641 {
1642 	struct spdk_reduce_vol_request *req = _req;
1643 
1644 	if (reduce_errno != 0) {
1645 		req->reduce_errno = reduce_errno;
1646 	}
1647 
1648 	assert(req->num_backing_ops > 0);
1649 	if (--req->num_backing_ops > 0) {
1650 		return;
1651 	}
1652 
1653 	if (req->reduce_errno != 0) {
1654 		_reduce_vol_complete_req(req, req->reduce_errno);
1655 		return;
1656 	}
1657 
1658 	if (req->chunk_is_compressed) {
1659 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1660 	} else {
1661 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1662 
1663 		_write_decompress_done(req, 0);
1664 	}
1665 }
1666 
1667 static void
1668 _read_decompress_done(void *_req, int reduce_errno)
1669 {
1670 	struct spdk_reduce_vol_request *req = _req;
1671 	struct spdk_reduce_vol *vol = req->vol;
1672 
1673 	/* Negative reduce_errno indicates failure for compression operations. */
1674 	if (reduce_errno < 0) {
1675 		_reduce_vol_complete_req(req, reduce_errno);
1676 		return;
1677 	}
1678 
1679 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1680 	 * represents the output_size.
1681 	 */
1682 	if (req->backing_cb_args.output_size != vol->params.chunk_size) {
1683 		_reduce_vol_complete_req(req, -EIO);
1684 		return;
1685 	}
1686 
1687 	if (req->copy_after_decompress) {
1688 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1689 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1690 		int i;
1691 
1692 		for (i = 0; i < req->iovcnt; i++) {
1693 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1694 			decomp_buffer += req->iov[i].iov_len;
1695 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1696 		}
1697 	}
1698 
1699 	_reduce_vol_complete_req(req, 0);
1700 }
1701 
1702 static void
1703 _read_read_done(void *_req, int reduce_errno)
1704 {
1705 	struct spdk_reduce_vol_request *req = _req;
1706 	uint64_t chunk_offset;
1707 	uint8_t *buf;
1708 	int i;
1709 
1710 	if (reduce_errno != 0) {
1711 		req->reduce_errno = reduce_errno;
1712 	}
1713 
1714 	assert(req->num_backing_ops > 0);
1715 	if (--req->num_backing_ops > 0) {
1716 		return;
1717 	}
1718 
1719 	if (req->reduce_errno != 0) {
1720 		_reduce_vol_complete_req(req, req->reduce_errno);
1721 		return;
1722 	}
1723 
1724 	if (req->chunk_is_compressed) {
1725 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1726 	} else {
1727 
1728 		/* If the chunk was compressed, the data would have been sent to the
1729 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1730 		 */
1731 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1732 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1733 		for (i = 0; i < req->iovcnt; i++) {
1734 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1735 			buf += req->iov[i].iov_len;
1736 		}
1737 
1738 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1739 
1740 		_read_decompress_done(req, 0);
1741 	}
1742 }
1743 
1744 static void
1745 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1746 {
1747 	struct spdk_reduce_vol *vol = req->vol;
1748 
1749 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1750 	assert(req->chunk_map_index != UINT32_MAX);
1751 
1752 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1753 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1754 			    vol->params.backing_io_unit_size);
1755 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1756 
1757 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1758 }
1759 
1760 static bool
1761 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1762 		    uint64_t length)
1763 {
1764 	uint64_t size = 0;
1765 	int i;
1766 
1767 	if (iovcnt > REDUCE_MAX_IOVECS) {
1768 		return false;
1769 	}
1770 
1771 	for (i = 0; i < iovcnt; i++) {
1772 		size += iov[i].iov_len;
1773 	}
1774 
1775 	return size == (length * vol->params.logical_block_size);
1776 }
1777 
1778 static bool
1779 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1780 {
1781 	struct spdk_reduce_vol_request *req;
1782 
1783 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1784 		if (logical_map_index == req->logical_map_index) {
1785 			return true;
1786 		}
1787 	}
1788 
1789 	return false;
1790 }
1791 
1792 static void
1793 _start_readv_request(struct spdk_reduce_vol_request *req)
1794 {
1795 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1796 	_reduce_vol_read_chunk(req, _read_read_done);
1797 }
1798 
1799 void
1800 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1801 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1802 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1803 {
1804 	struct spdk_reduce_vol_request *req;
1805 	uint64_t logical_map_index;
1806 	bool overlapped;
1807 	int i;
1808 
1809 	if (length == 0) {
1810 		cb_fn(cb_arg, 0);
1811 		return;
1812 	}
1813 
1814 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1815 		cb_fn(cb_arg, -EINVAL);
1816 		return;
1817 	}
1818 
1819 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1820 		cb_fn(cb_arg, -EINVAL);
1821 		return;
1822 	}
1823 
1824 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1825 	overlapped = _check_overlap(vol, logical_map_index);
1826 
1827 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1828 		/*
1829 		 * This chunk hasn't been allocated.  So treat the data as all
1830 		 * zeroes for this chunk - do the memset and immediately complete
1831 		 * the operation.
1832 		 */
1833 		for (i = 0; i < iovcnt; i++) {
1834 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1835 		}
1836 		cb_fn(cb_arg, 0);
1837 		return;
1838 	}
1839 
1840 	req = TAILQ_FIRST(&vol->free_requests);
1841 	if (req == NULL) {
1842 		cb_fn(cb_arg, -ENOMEM);
1843 		return;
1844 	}
1845 
1846 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1847 	req->type = REDUCE_IO_READV;
1848 	req->vol = vol;
1849 	req->iov = iov;
1850 	req->iovcnt = iovcnt;
1851 	req->offset = offset;
1852 	req->logical_map_index = logical_map_index;
1853 	req->length = length;
1854 	req->copy_after_decompress = false;
1855 	req->cb_fn = cb_fn;
1856 	req->cb_arg = cb_arg;
1857 
1858 	if (!overlapped) {
1859 		_start_readv_request(req);
1860 	} else {
1861 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1862 	}
1863 }
1864 
1865 static void
1866 _start_writev_request(struct spdk_reduce_vol_request *req)
1867 {
1868 	struct spdk_reduce_vol *vol = req->vol;
1869 
1870 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1871 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1872 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1873 			/* Read old chunk, then overwrite with data from this write
1874 			 *  operation.
1875 			 */
1876 			req->rmw = true;
1877 			_reduce_vol_read_chunk(req, _write_read_done);
1878 			return;
1879 		}
1880 	}
1881 
1882 	req->rmw = false;
1883 
1884 	_prepare_compress_chunk(req, true);
1885 	_reduce_vol_compress_chunk(req, _write_compress_done);
1886 }
1887 
1888 void
1889 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1890 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1891 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1892 {
1893 	struct spdk_reduce_vol_request *req;
1894 	uint64_t logical_map_index;
1895 	bool overlapped;
1896 
1897 	if (length == 0) {
1898 		cb_fn(cb_arg, 0);
1899 		return;
1900 	}
1901 
1902 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1903 		cb_fn(cb_arg, -EINVAL);
1904 		return;
1905 	}
1906 
1907 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1908 		cb_fn(cb_arg, -EINVAL);
1909 		return;
1910 	}
1911 
1912 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1913 	overlapped = _check_overlap(vol, logical_map_index);
1914 
1915 	req = TAILQ_FIRST(&vol->free_requests);
1916 	if (req == NULL) {
1917 		cb_fn(cb_arg, -ENOMEM);
1918 		return;
1919 	}
1920 
1921 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1922 	req->type = REDUCE_IO_WRITEV;
1923 	req->vol = vol;
1924 	req->iov = iov;
1925 	req->iovcnt = iovcnt;
1926 	req->offset = offset;
1927 	req->logical_map_index = logical_map_index;
1928 	req->length = length;
1929 	req->copy_after_decompress = false;
1930 	req->cb_fn = cb_fn;
1931 	req->cb_arg = cb_arg;
1932 
1933 	if (!overlapped) {
1934 		_start_writev_request(req);
1935 	} else {
1936 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1937 	}
1938 }
1939 
1940 const struct spdk_reduce_vol_params *
1941 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1942 {
1943 	return &vol->params;
1944 }
1945 
1946 const char *
1947 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
1948 {
1949 	return vol->pm_file.path;
1950 }
1951 
1952 void
1953 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1954 {
1955 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1956 	uint32_t struct_size;
1957 	uint64_t chunk_map_size;
1958 
1959 	SPDK_NOTICELOG("vol info:\n");
1960 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1961 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1962 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1963 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1964 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1965 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1966 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1967 		       vol->params.vol_size / vol->params.chunk_size);
1968 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1969 			vol->params.backing_io_unit_size);
1970 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1971 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1972 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1973 
1974 	SPDK_NOTICELOG("pmem info:\n");
1975 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1976 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1977 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1978 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1979 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1980 			   vol->params.chunk_size);
1981 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1982 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1983 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1984 			 vol->params.backing_io_unit_size);
1985 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1986 }
1987 
1988 SPDK_LOG_REGISTER_COMPONENT(reduce)
1989