xref: /spdk/lib/reduce/reduce.c (revision c164db9ffe3718ad4e4f5bab380ccfa62c2fa672)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "queue_internal.h"
10 
11 #include "spdk/reduce.h"
12 #include "spdk/env.h"
13 #include "spdk/string.h"
14 #include "spdk/bit_array.h"
15 #include "spdk/util.h"
16 #include "spdk/log.h"
17 #include "spdk/memory.h"
18 #include "spdk/tree.h"
19 
20 #include "libpmem.h"
21 
22 /* Always round up the size of the PM region to the nearest cacheline. */
23 #define REDUCE_PM_SIZE_ALIGNMENT	64
24 
25 /* Offset into the backing device where the persistent memory file's path is stored. */
26 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
27 
28 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
29 
30 #define REDUCE_NUM_VOL_REQUESTS	256
31 
32 /* Structure written to offset 0 of both the pm file and the backing device. */
33 struct spdk_reduce_vol_superblock {
34 	uint8_t				signature[8];
35 	struct spdk_reduce_vol_params	params;
36 	uint8_t				reserved[4040];
37 };
38 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
39 
40 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
41 /* null terminator counts one */
42 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
43 		   SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
44 
45 #define REDUCE_PATH_MAX 4096
46 
47 #define REDUCE_ZERO_BUF_SIZE 0x100000
48 
49 /**
50  * Describes a persistent memory file used to hold metadata associated with a
51  *  compressed volume.
52  */
53 struct spdk_reduce_pm_file {
54 	char			path[REDUCE_PATH_MAX];
55 	void			*pm_buf;
56 	int			pm_is_pmem;
57 	uint64_t		size;
58 };
59 
60 #define REDUCE_IO_READV		1
61 #define REDUCE_IO_WRITEV	2
62 #define	REDUCE_IO_UNMAP		3
63 
64 struct spdk_reduce_chunk_map {
65 	uint32_t		compressed_size;
66 	uint32_t		reserved;
67 	uint64_t		io_unit_index[0];
68 };
69 
70 struct spdk_reduce_vol_request {
71 	/**
72 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
73 	 *   1) source buffer for compression operations
74 	 *   2) destination buffer for decompression operations
75 	 *   3) data buffer when writing uncompressed chunk to disk
76 	 *   4) data buffer when reading uncompressed chunk from disk
77 	 */
78 	uint8_t					*decomp_buf;
79 	struct iovec				*decomp_buf_iov;
80 
81 	/**
82 	 * These are used to construct the iovecs that are sent to
83 	 *  the decomp engine, they point to a mix of the scratch buffer
84 	 *  and user buffer
85 	 */
86 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
87 	int					decomp_iovcnt;
88 
89 	/**
90 	 *  Scratch buffer used for compressed chunk.  This is used for:
91 	 *   1) destination buffer for compression operations
92 	 *   2) source buffer for decompression operations
93 	 *   3) data buffer when writing compressed chunk to disk
94 	 *   4) data buffer when reading compressed chunk from disk
95 	 */
96 	uint8_t					*comp_buf;
97 	struct iovec				*comp_buf_iov;
98 	struct iovec				*iov;
99 	bool					rmw;
100 	struct spdk_reduce_vol			*vol;
101 	int					type;
102 	int					reduce_errno;
103 	int					iovcnt;
104 	int					num_backing_ops;
105 	uint32_t				num_io_units;
106 	struct spdk_reduce_backing_io           *backing_io;
107 	bool					chunk_is_compressed;
108 	bool					copy_after_decompress;
109 	uint64_t				offset;
110 	uint64_t				logical_map_index;
111 	uint64_t				length;
112 	uint64_t				chunk_map_index;
113 	struct spdk_reduce_chunk_map		*chunk;
114 	spdk_reduce_vol_op_complete		cb_fn;
115 	void					*cb_arg;
116 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
117 	RB_ENTRY(spdk_reduce_vol_request)	rbnode;
118 	struct spdk_reduce_vol_cb_args		backing_cb_args;
119 };
120 
121 struct spdk_reduce_vol {
122 	struct spdk_reduce_vol_params		params;
123 	struct spdk_reduce_vol_info		info;
124 	uint32_t				backing_io_units_per_chunk;
125 	uint32_t				backing_lba_per_io_unit;
126 	uint32_t				logical_blocks_per_chunk;
127 	struct spdk_reduce_pm_file		pm_file;
128 	struct spdk_reduce_backing_dev		*backing_dev;
129 	struct spdk_reduce_vol_superblock	*backing_super;
130 	struct spdk_reduce_vol_superblock	*pm_super;
131 	uint64_t				*pm_logical_map;
132 	uint64_t				*pm_chunk_maps;
133 
134 	struct spdk_bit_array			*allocated_chunk_maps;
135 	/* The starting position when looking for a block from allocated_chunk_maps */
136 	uint64_t				find_chunk_offset;
137 	/* Cache free chunks to speed up lookup of free chunk. */
138 	struct reduce_queue			free_chunks_queue;
139 	struct spdk_bit_array			*allocated_backing_io_units;
140 	/* The starting position when looking for a block from allocated_backing_io_units */
141 	uint64_t				find_block_offset;
142 	/* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */
143 	struct reduce_queue			free_backing_blocks_queue;
144 
145 	struct spdk_reduce_vol_request		*request_mem;
146 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
147 	RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests;
148 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
149 
150 	/* Single contiguous buffer used for all request buffers for this volume. */
151 	uint8_t					*buf_mem;
152 	struct iovec				*buf_iov_mem;
153 	/* Single contiguous buffer used for backing io buffers for this volume. */
154 	uint8_t					*buf_backing_io_mem;
155 };
156 
157 static void _start_readv_request(struct spdk_reduce_vol_request *req);
158 static void _start_writev_request(struct spdk_reduce_vol_request *req);
159 static uint8_t *g_zero_buf;
160 static int g_vol_count = 0;
161 
162 /*
163  * Allocate extra metadata chunks and corresponding backing io units to account for
164  *  outstanding IO in worst case scenario where logical map is completely allocated
165  *  and no data can be compressed.  We need extra chunks in this case to handle
166  *  in-flight writes since reduce never writes data in place.
167  */
168 #define REDUCE_NUM_EXTRA_CHUNKS 128
169 
170 static void
171 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
172 {
173 	if (vol->pm_file.pm_is_pmem) {
174 		pmem_persist(addr, len);
175 	} else {
176 		pmem_msync(addr, len);
177 	}
178 }
179 
180 static uint64_t
181 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
182 {
183 	uint64_t chunks_in_logical_map, logical_map_size;
184 
185 	chunks_in_logical_map = vol_size / chunk_size;
186 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
187 
188 	/* Round up to next cacheline. */
189 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
190 	       REDUCE_PM_SIZE_ALIGNMENT;
191 }
192 
193 static uint64_t
194 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
195 {
196 	uint64_t num_chunks;
197 
198 	num_chunks = vol_size / chunk_size;
199 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
200 
201 	return num_chunks;
202 }
203 
204 static inline uint32_t
205 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
206 {
207 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
208 }
209 
210 static uint64_t
211 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
212 {
213 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
214 
215 	num_chunks = _get_total_chunks(vol_size, chunk_size);
216 	io_units_per_chunk = chunk_size / backing_io_unit_size;
217 
218 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
219 
220 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
221 	       REDUCE_PM_SIZE_ALIGNMENT;
222 }
223 
224 static struct spdk_reduce_chunk_map *
225 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
226 {
227 	uintptr_t chunk_map_addr;
228 
229 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
230 
231 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
232 	chunk_map_addr += chunk_map_index *
233 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
234 
235 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
236 }
237 
238 static int
239 _validate_vol_params(struct spdk_reduce_vol_params *params)
240 {
241 	if (params->vol_size > 0) {
242 		/**
243 		 * User does not pass in the vol size - it gets calculated by libreduce from
244 		 *  values in this structure plus the size of the backing device.
245 		 */
246 		return -EINVAL;
247 	}
248 
249 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
250 	    params->logical_block_size == 0) {
251 		return -EINVAL;
252 	}
253 
254 	/* Chunk size must be an even multiple of the backing io unit size. */
255 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
256 		return -EINVAL;
257 	}
258 
259 	/* Chunk size must be an even multiple of the logical block size. */
260 	if ((params->chunk_size % params->logical_block_size) != 0) {
261 		return -1;
262 	}
263 
264 	return 0;
265 }
266 
267 static uint64_t
268 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
269 {
270 	uint64_t num_chunks;
271 
272 	num_chunks = backing_dev_size / chunk_size;
273 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
274 		return 0;
275 	}
276 
277 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
278 	return num_chunks * chunk_size;
279 }
280 
281 static uint64_t
282 _get_pm_file_size(struct spdk_reduce_vol_params *params)
283 {
284 	uint64_t total_pm_size;
285 
286 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
287 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
288 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
289 			 params->backing_io_unit_size);
290 	return total_pm_size;
291 }
292 
293 const struct spdk_uuid *
294 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
295 {
296 	return &vol->params.uuid;
297 }
298 
299 static void
300 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
301 {
302 	uint64_t logical_map_size;
303 
304 	/* Superblock is at the beginning of the pm file. */
305 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
306 
307 	/* Logical map immediately follows the super block. */
308 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
309 
310 	/* Chunks maps follow the logical map. */
311 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
312 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
313 }
314 
315 /* We need 2 iovs during load - one for the superblock, another for the path */
316 #define LOAD_IOV_COUNT	2
317 
318 struct reduce_init_load_ctx {
319 	struct spdk_reduce_vol			*vol;
320 	struct spdk_reduce_vol_cb_args		backing_cb_args;
321 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
322 	void					*cb_arg;
323 	struct iovec				iov[LOAD_IOV_COUNT];
324 	void					*path;
325 	struct spdk_reduce_backing_io           *backing_io;
326 };
327 
328 static inline bool
329 _addr_crosses_huge_page(const void *addr, size_t *size)
330 {
331 	size_t _size;
332 	uint64_t rc;
333 
334 	assert(size);
335 
336 	_size = *size;
337 	rc = spdk_vtophys(addr, size);
338 
339 	return rc == SPDK_VTOPHYS_ERROR || _size != *size;
340 }
341 
342 static inline int
343 _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
344 {
345 	uint8_t *addr;
346 	size_t size_tmp = buffer_size;
347 
348 	addr = *_addr;
349 
350 	/* Verify that addr + buffer_size doesn't cross huge page boundary */
351 	if (_addr_crosses_huge_page(addr, &size_tmp)) {
352 		/* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
353 		 * Skip remaining bytes and continue from the beginning of the next page */
354 		addr += size_tmp;
355 	}
356 
357 	if (addr + buffer_size > addr_range) {
358 		SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
359 		return -ERANGE;
360 	}
361 
362 	*vol_buffer = addr;
363 	*_addr = addr + buffer_size;
364 
365 	return 0;
366 }
367 
368 static int
369 _allocate_vol_requests(struct spdk_reduce_vol *vol)
370 {
371 	struct spdk_reduce_vol_request *req;
372 	struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev;
373 	uint32_t reqs_in_2mb_page, huge_pages_needed;
374 	uint8_t *buffer, *buffer_end;
375 	int i = 0;
376 	int rc = 0;
377 
378 	/* It is needed to allocate comp and decomp buffers so that they do not cross physical
379 	* page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
380 	* necessarily power of 2
381 	* Allocate 2x since we need buffers for both read/write and compress/decompress
382 	* intermediate buffers. */
383 	reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
384 	if (!reqs_in_2mb_page) {
385 		return -EINVAL;
386 	}
387 	huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
388 
389 	vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
390 	if (vol->buf_mem == NULL) {
391 		return -ENOMEM;
392 	}
393 
394 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
395 	if (vol->request_mem == NULL) {
396 		spdk_free(vol->buf_mem);
397 		vol->buf_mem = NULL;
398 		return -ENOMEM;
399 	}
400 
401 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
402 	 *  buffers.
403 	 */
404 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
405 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
406 	if (vol->buf_iov_mem == NULL) {
407 		free(vol->request_mem);
408 		spdk_free(vol->buf_mem);
409 		vol->request_mem = NULL;
410 		vol->buf_mem = NULL;
411 		return -ENOMEM;
412 	}
413 
414 	vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) +
415 					 backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk);
416 	if (vol->buf_backing_io_mem == NULL) {
417 		free(vol->request_mem);
418 		free(vol->buf_iov_mem);
419 		spdk_free(vol->buf_mem);
420 		vol->request_mem = NULL;
421 		vol->buf_iov_mem = NULL;
422 		vol->buf_mem = NULL;
423 		return -ENOMEM;
424 	}
425 
426 	buffer = vol->buf_mem;
427 	buffer_end = buffer + VALUE_2MB * huge_pages_needed;
428 
429 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
430 		req = &vol->request_mem[i];
431 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
432 		req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i *
433 				  (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) *
434 				  vol->backing_io_units_per_chunk);
435 
436 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
437 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
438 
439 		rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
440 		if (rc) {
441 			SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
442 				    vol->buf_mem, buffer_end);
443 			break;
444 		}
445 		rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
446 		if (rc) {
447 			SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
448 				    vol->buf_mem, buffer_end);
449 			break;
450 		}
451 	}
452 
453 	if (rc) {
454 		free(vol->buf_backing_io_mem);
455 		free(vol->buf_iov_mem);
456 		free(vol->request_mem);
457 		spdk_free(vol->buf_mem);
458 		vol->buf_mem = NULL;
459 		vol->buf_backing_io_mem = NULL;
460 		vol->buf_iov_mem = NULL;
461 		vol->request_mem = NULL;
462 	}
463 
464 	return rc;
465 }
466 
467 const struct spdk_reduce_vol_info *
468 spdk_reduce_vol_get_info(const struct spdk_reduce_vol *vol)
469 {
470 	return &vol->info;
471 }
472 
473 static void
474 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
475 {
476 	if (ctx != NULL) {
477 		spdk_free(ctx->path);
478 		free(ctx->backing_io);
479 		free(ctx);
480 	}
481 
482 	if (vol != NULL) {
483 		if (vol->pm_file.pm_buf != NULL) {
484 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
485 		}
486 
487 		spdk_free(vol->backing_super);
488 		spdk_bit_array_free(&vol->allocated_chunk_maps);
489 		spdk_bit_array_free(&vol->allocated_backing_io_units);
490 		free(vol->request_mem);
491 		free(vol->buf_backing_io_mem);
492 		free(vol->buf_iov_mem);
493 		spdk_free(vol->buf_mem);
494 		free(vol);
495 	}
496 }
497 
498 static int
499 _alloc_zero_buff(void)
500 {
501 	int rc = 0;
502 
503 	/* The zero buffer is shared between all volumes and just used
504 	 * for reads so allocate one global instance here if not already
505 	 * allocated when another vol init'd or loaded.
506 	 */
507 	if (g_vol_count++ == 0) {
508 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
509 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
510 					  SPDK_MALLOC_DMA);
511 		if (g_zero_buf == NULL) {
512 			g_vol_count--;
513 			rc = -ENOMEM;
514 		}
515 	}
516 	return rc;
517 }
518 
519 static void
520 _init_write_super_cpl(void *cb_arg, int reduce_errno)
521 {
522 	struct reduce_init_load_ctx *init_ctx = cb_arg;
523 	int rc = 0;
524 
525 	if (reduce_errno != 0) {
526 		rc = reduce_errno;
527 		goto err;
528 	}
529 
530 	rc = _allocate_vol_requests(init_ctx->vol);
531 	if (rc != 0) {
532 		goto err;
533 	}
534 
535 	rc = _alloc_zero_buff();
536 	if (rc != 0) {
537 		goto err;
538 	}
539 
540 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, rc);
541 	/* Only clean up the ctx - the vol has been passed to the application
542 	 *  for use now that initialization was successful.
543 	 */
544 	_init_load_cleanup(NULL, init_ctx);
545 
546 	return;
547 err:
548 	if (unlink(init_ctx->path)) {
549 		SPDK_ERRLOG("%s could not be unlinked: %s\n",
550 			    (char *)init_ctx->path, spdk_strerror(errno));
551 	}
552 
553 	init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
554 	_init_load_cleanup(init_ctx->vol, init_ctx);
555 }
556 
557 static void
558 _init_write_path_cpl(void *cb_arg, int reduce_errno)
559 {
560 	struct reduce_init_load_ctx *init_ctx = cb_arg;
561 	struct spdk_reduce_vol *vol = init_ctx->vol;
562 	struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io;
563 
564 	if (reduce_errno != 0) {
565 		_init_write_super_cpl(cb_arg, reduce_errno);
566 		return;
567 	}
568 
569 	init_ctx->iov[0].iov_base = vol->backing_super;
570 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
571 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
572 	init_ctx->backing_cb_args.cb_arg = init_ctx;
573 
574 	backing_io->dev = vol->backing_dev;
575 	backing_io->iov = init_ctx->iov;
576 	backing_io->iovcnt = 1;
577 	backing_io->lba = 0;
578 	backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen;
579 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
580 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
581 
582 	vol->backing_dev->submit_backing_io(backing_io);
583 }
584 
585 static int
586 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
587 {
588 	uint64_t total_chunks, total_backing_io_units;
589 	uint32_t i, num_metadata_io_units;
590 
591 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
592 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
593 	vol->find_chunk_offset = 0;
594 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
595 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
596 	vol->find_block_offset = 0;
597 
598 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
599 		return -ENOMEM;
600 	}
601 
602 	/* Set backing io unit bits associated with metadata. */
603 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
604 				vol->params.backing_io_unit_size;
605 	for (i = 0; i < num_metadata_io_units; i++) {
606 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
607 		vol->info.allocated_io_units++;
608 	}
609 
610 	return 0;
611 }
612 
613 static int
614 overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2)
615 {
616 	return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index >
617 		req2->logical_map_index);
618 }
619 RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp);
620 
621 
622 void
623 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
624 		     struct spdk_reduce_backing_dev *backing_dev,
625 		     const char *pm_file_dir,
626 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
627 {
628 	struct spdk_reduce_vol *vol;
629 	struct reduce_init_load_ctx *init_ctx;
630 	struct spdk_reduce_backing_io *backing_io;
631 	uint64_t backing_dev_size;
632 	size_t mapped_len;
633 	int dir_len, max_dir_len, rc;
634 
635 	/* We need to append a path separator and the UUID to the supplied
636 	 * path.
637 	 */
638 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
639 	dir_len = strnlen(pm_file_dir, max_dir_len);
640 	/* Strip trailing slash if the user provided one - we will add it back
641 	 * later when appending the filename.
642 	 */
643 	if (pm_file_dir[dir_len - 1] == '/') {
644 		dir_len--;
645 	}
646 	if (dir_len == max_dir_len) {
647 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
648 		cb_fn(cb_arg, NULL, -EINVAL);
649 		return;
650 	}
651 
652 	rc = _validate_vol_params(params);
653 	if (rc != 0) {
654 		SPDK_ERRLOG("invalid vol params\n");
655 		cb_fn(cb_arg, NULL, rc);
656 		return;
657 	}
658 
659 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
660 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
661 	if (params->vol_size == 0) {
662 		SPDK_ERRLOG("backing device is too small\n");
663 		cb_fn(cb_arg, NULL, -EINVAL);
664 		return;
665 	}
666 
667 	if (backing_dev->submit_backing_io == NULL) {
668 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
669 		cb_fn(cb_arg, NULL, -EINVAL);
670 		return;
671 	}
672 
673 	vol = calloc(1, sizeof(*vol));
674 	if (vol == NULL) {
675 		cb_fn(cb_arg, NULL, -ENOMEM);
676 		return;
677 	}
678 
679 	TAILQ_INIT(&vol->free_requests);
680 	RB_INIT(&vol->executing_requests);
681 	TAILQ_INIT(&vol->queued_requests);
682 	queue_init(&vol->free_chunks_queue);
683 	queue_init(&vol->free_backing_blocks_queue);
684 
685 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
686 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
687 	if (vol->backing_super == NULL) {
688 		cb_fn(cb_arg, NULL, -ENOMEM);
689 		_init_load_cleanup(vol, NULL);
690 		return;
691 	}
692 
693 	init_ctx = calloc(1, sizeof(*init_ctx));
694 	if (init_ctx == NULL) {
695 		cb_fn(cb_arg, NULL, -ENOMEM);
696 		_init_load_cleanup(vol, NULL);
697 		return;
698 	}
699 
700 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
701 	if (backing_io == NULL) {
702 		cb_fn(cb_arg, NULL, -ENOMEM);
703 		_init_load_cleanup(vol, init_ctx);
704 		return;
705 	}
706 	init_ctx->backing_io = backing_io;
707 
708 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
709 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
710 	if (init_ctx->path == NULL) {
711 		cb_fn(cb_arg, NULL, -ENOMEM);
712 		_init_load_cleanup(vol, init_ctx);
713 		return;
714 	}
715 
716 	if (spdk_uuid_is_null(&params->uuid)) {
717 		spdk_uuid_generate(&params->uuid);
718 	}
719 
720 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
721 	vol->pm_file.path[dir_len] = '/';
722 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
723 			    &params->uuid);
724 	vol->pm_file.size = _get_pm_file_size(params);
725 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
726 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
727 					    &mapped_len, &vol->pm_file.pm_is_pmem);
728 	if (vol->pm_file.pm_buf == NULL) {
729 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
730 			    vol->pm_file.path, strerror(errno));
731 		cb_fn(cb_arg, NULL, -errno);
732 		_init_load_cleanup(vol, init_ctx);
733 		return;
734 	}
735 
736 	if (vol->pm_file.size != mapped_len) {
737 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
738 			    vol->pm_file.size, mapped_len);
739 		cb_fn(cb_arg, NULL, -ENOMEM);
740 		_init_load_cleanup(vol, init_ctx);
741 		return;
742 	}
743 
744 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
745 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
746 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
747 	memcpy(&vol->params, params, sizeof(*params));
748 
749 	vol->backing_dev = backing_dev;
750 
751 	rc = _allocate_bit_arrays(vol);
752 	if (rc != 0) {
753 		cb_fn(cb_arg, NULL, rc);
754 		_init_load_cleanup(vol, init_ctx);
755 		return;
756 	}
757 
758 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
759 	       sizeof(vol->backing_super->signature));
760 	memcpy(&vol->backing_super->params, params, sizeof(*params));
761 
762 	_initialize_vol_pm_pointers(vol);
763 
764 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
765 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
766 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
767 	 */
768 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
769 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
770 
771 	init_ctx->vol = vol;
772 	init_ctx->cb_fn = cb_fn;
773 	init_ctx->cb_arg = cb_arg;
774 
775 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
776 	init_ctx->iov[0].iov_base = init_ctx->path;
777 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
778 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
779 	init_ctx->backing_cb_args.cb_arg = init_ctx;
780 	/* Write path to offset 4K on backing device - just after where the super
781 	 *  block will be written.  We wait until this is committed before writing the
782 	 *  super block to guarantee we don't get the super block written without the
783 	 *  the path if the system crashed in the middle of a write operation.
784 	 */
785 	backing_io->dev = vol->backing_dev;
786 	backing_io->iov = init_ctx->iov;
787 	backing_io->iovcnt = 1;
788 	backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen;
789 	backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen;
790 	backing_io->backing_cb_args = &init_ctx->backing_cb_args;
791 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
792 
793 	vol->backing_dev->submit_backing_io(backing_io);
794 }
795 
796 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
797 
798 static void
799 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
800 {
801 	struct reduce_init_load_ctx *load_ctx = cb_arg;
802 	struct spdk_reduce_vol *vol = load_ctx->vol;
803 	uint64_t backing_dev_size;
804 	uint64_t i, num_chunks, logical_map_index;
805 	struct spdk_reduce_chunk_map *chunk;
806 	size_t mapped_len;
807 	uint32_t j;
808 	int rc;
809 
810 	if (reduce_errno != 0) {
811 		rc = reduce_errno;
812 		goto error;
813 	}
814 
815 	rc = _alloc_zero_buff();
816 	if (rc) {
817 		goto error;
818 	}
819 
820 	if (memcmp(vol->backing_super->signature,
821 		   SPDK_REDUCE_SIGNATURE,
822 		   sizeof(vol->backing_super->signature)) != 0) {
823 		/* This backing device isn't a libreduce backing device. */
824 		rc = -EILSEQ;
825 		goto error;
826 	}
827 
828 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
829 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
830 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
831 	 *  persistent memory file if it exists.
832 	 */
833 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
834 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
835 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
836 		_init_load_cleanup(NULL, load_ctx);
837 		return;
838 	}
839 
840 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
841 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
842 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
843 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
844 
845 	rc = _allocate_bit_arrays(vol);
846 	if (rc != 0) {
847 		goto error;
848 	}
849 
850 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
851 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
852 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
853 			    backing_dev_size);
854 		rc = -EILSEQ;
855 		goto error;
856 	}
857 
858 	vol->pm_file.size = _get_pm_file_size(&vol->params);
859 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
860 					    &vol->pm_file.pm_is_pmem);
861 	if (vol->pm_file.pm_buf == NULL) {
862 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
863 		rc = -errno;
864 		goto error;
865 	}
866 
867 	if (vol->pm_file.size != mapped_len) {
868 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
869 			    vol->pm_file.size, mapped_len);
870 		rc = -ENOMEM;
871 		goto error;
872 	}
873 
874 	rc = _allocate_vol_requests(vol);
875 	if (rc != 0) {
876 		goto error;
877 	}
878 
879 	_initialize_vol_pm_pointers(vol);
880 
881 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
882 	for (i = 0; i < num_chunks; i++) {
883 		logical_map_index = vol->pm_logical_map[i];
884 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
885 			continue;
886 		}
887 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
888 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
889 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
890 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
891 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
892 				vol->info.allocated_io_units++;
893 			}
894 		}
895 	}
896 
897 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
898 	/* Only clean up the ctx - the vol has been passed to the application
899 	 *  for use now that volume load was successful.
900 	 */
901 	_init_load_cleanup(NULL, load_ctx);
902 	return;
903 
904 error:
905 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
906 	_init_load_cleanup(vol, load_ctx);
907 }
908 
909 void
910 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
911 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
912 {
913 	struct spdk_reduce_vol *vol;
914 	struct reduce_init_load_ctx *load_ctx;
915 	struct spdk_reduce_backing_io *backing_io;
916 
917 	if (backing_dev->submit_backing_io == NULL) {
918 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
919 		cb_fn(cb_arg, NULL, -EINVAL);
920 		return;
921 	}
922 
923 	vol = calloc(1, sizeof(*vol));
924 	if (vol == NULL) {
925 		cb_fn(cb_arg, NULL, -ENOMEM);
926 		return;
927 	}
928 
929 	TAILQ_INIT(&vol->free_requests);
930 	RB_INIT(&vol->executing_requests);
931 	TAILQ_INIT(&vol->queued_requests);
932 	queue_init(&vol->free_chunks_queue);
933 	queue_init(&vol->free_backing_blocks_queue);
934 
935 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
936 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
937 	if (vol->backing_super == NULL) {
938 		_init_load_cleanup(vol, NULL);
939 		cb_fn(cb_arg, NULL, -ENOMEM);
940 		return;
941 	}
942 
943 	vol->backing_dev = backing_dev;
944 
945 	load_ctx = calloc(1, sizeof(*load_ctx));
946 	if (load_ctx == NULL) {
947 		_init_load_cleanup(vol, NULL);
948 		cb_fn(cb_arg, NULL, -ENOMEM);
949 		return;
950 	}
951 
952 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
953 	if (backing_io == NULL) {
954 		_init_load_cleanup(vol, load_ctx);
955 		cb_fn(cb_arg, NULL, -ENOMEM);
956 		return;
957 	}
958 
959 	load_ctx->backing_io = backing_io;
960 
961 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
962 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
963 	if (load_ctx->path == NULL) {
964 		_init_load_cleanup(vol, load_ctx);
965 		cb_fn(cb_arg, NULL, -ENOMEM);
966 		return;
967 	}
968 
969 	load_ctx->vol = vol;
970 	load_ctx->cb_fn = cb_fn;
971 	load_ctx->cb_arg = cb_arg;
972 
973 	load_ctx->iov[0].iov_base = vol->backing_super;
974 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
975 	load_ctx->iov[1].iov_base = load_ctx->path;
976 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
977 	backing_io->dev = vol->backing_dev;
978 	backing_io->iov = load_ctx->iov;
979 	backing_io->iovcnt = LOAD_IOV_COUNT;
980 	backing_io->lba = 0;
981 	backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
982 				vol->backing_dev->blocklen;
983 	backing_io->backing_cb_args = &load_ctx->backing_cb_args;
984 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
985 
986 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
987 	load_ctx->backing_cb_args.cb_arg = load_ctx;
988 	vol->backing_dev->submit_backing_io(backing_io);
989 }
990 
991 void
992 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
993 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
994 {
995 	if (vol == NULL) {
996 		/* This indicates a programming error. */
997 		assert(false);
998 		cb_fn(cb_arg, -EINVAL);
999 		return;
1000 	}
1001 
1002 	if (--g_vol_count == 0) {
1003 		spdk_free(g_zero_buf);
1004 	}
1005 	assert(g_vol_count >= 0);
1006 	_init_load_cleanup(vol, NULL);
1007 	cb_fn(cb_arg, 0);
1008 }
1009 
1010 struct reduce_destroy_ctx {
1011 	spdk_reduce_vol_op_complete		cb_fn;
1012 	void					*cb_arg;
1013 	struct spdk_reduce_vol			*vol;
1014 	struct spdk_reduce_vol_superblock	*super;
1015 	struct iovec				iov;
1016 	struct spdk_reduce_vol_cb_args		backing_cb_args;
1017 	int					reduce_errno;
1018 	char					pm_path[REDUCE_PATH_MAX];
1019 	struct spdk_reduce_backing_io           *backing_io;
1020 };
1021 
1022 static void
1023 destroy_unload_cpl(void *cb_arg, int reduce_errno)
1024 {
1025 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1026 
1027 	if (destroy_ctx->reduce_errno == 0) {
1028 		if (unlink(destroy_ctx->pm_path)) {
1029 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
1030 				    destroy_ctx->pm_path, strerror(errno));
1031 		}
1032 	}
1033 
1034 	/* Even if the unload somehow failed, we still pass the destroy_ctx
1035 	 * reduce_errno since that indicates whether or not the volume was
1036 	 * actually destroyed.
1037 	 */
1038 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
1039 	spdk_free(destroy_ctx->super);
1040 	free(destroy_ctx->backing_io);
1041 	free(destroy_ctx);
1042 }
1043 
1044 static void
1045 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
1046 {
1047 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1048 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
1049 
1050 	destroy_ctx->reduce_errno = reduce_errno;
1051 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
1052 }
1053 
1054 static void
1055 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1056 {
1057 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
1058 	struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io;
1059 
1060 	if (reduce_errno != 0) {
1061 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
1062 		spdk_free(destroy_ctx->super);
1063 		free(destroy_ctx);
1064 		return;
1065 	}
1066 
1067 	destroy_ctx->vol = vol;
1068 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
1069 	destroy_ctx->iov.iov_base = destroy_ctx->super;
1070 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
1071 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
1072 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
1073 
1074 	backing_io->dev = vol->backing_dev;
1075 	backing_io->iov = &destroy_ctx->iov;
1076 	backing_io->iovcnt = 1;
1077 	backing_io->lba = 0;
1078 	backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen;
1079 	backing_io->backing_cb_args = &destroy_ctx->backing_cb_args;
1080 	backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1081 
1082 	vol->backing_dev->submit_backing_io(backing_io);
1083 }
1084 
1085 void
1086 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
1087 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1088 {
1089 	struct reduce_destroy_ctx *destroy_ctx;
1090 	struct spdk_reduce_backing_io *backing_io;
1091 
1092 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
1093 	if (destroy_ctx == NULL) {
1094 		cb_fn(cb_arg, -ENOMEM);
1095 		return;
1096 	}
1097 
1098 	backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
1099 	if (backing_io == NULL) {
1100 		free(destroy_ctx);
1101 		cb_fn(cb_arg, -ENOMEM);
1102 		return;
1103 	}
1104 
1105 	destroy_ctx->backing_io = backing_io;
1106 
1107 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
1108 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1109 	if (destroy_ctx->super == NULL) {
1110 		free(destroy_ctx);
1111 		free(backing_io);
1112 		cb_fn(cb_arg, -ENOMEM);
1113 		return;
1114 	}
1115 	destroy_ctx->cb_fn = cb_fn;
1116 	destroy_ctx->cb_arg = cb_arg;
1117 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
1118 }
1119 
1120 static bool
1121 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
1122 {
1123 	uint64_t start_chunk, end_chunk;
1124 
1125 	start_chunk = offset / vol->logical_blocks_per_chunk;
1126 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
1127 
1128 	return (start_chunk != end_chunk);
1129 }
1130 
1131 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
1132 static void _start_unmap_request_full_chunk(void *ctx);
1133 
1134 static void
1135 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
1136 {
1137 	struct spdk_reduce_vol_request *next_req;
1138 	struct spdk_reduce_vol *vol = req->vol;
1139 
1140 	req->cb_fn(req->cb_arg, reduce_errno);
1141 	RB_REMOVE(executing_req_tree, &vol->executing_requests, req);
1142 
1143 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
1144 		if (next_req->logical_map_index == req->logical_map_index) {
1145 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
1146 			if (next_req->type == REDUCE_IO_READV) {
1147 				_start_readv_request(next_req);
1148 			} else if (next_req->type == REDUCE_IO_WRITEV) {
1149 				_start_writev_request(next_req);
1150 			} else {
1151 				assert(next_req->type == REDUCE_IO_UNMAP);
1152 				_start_unmap_request_full_chunk(next_req);
1153 			}
1154 			break;
1155 		}
1156 	}
1157 
1158 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
1159 }
1160 
1161 static void
1162 _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
1163 {
1164 	struct spdk_reduce_chunk_map *chunk;
1165 	uint64_t index;
1166 	bool success;
1167 	uint32_t i;
1168 
1169 	chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
1170 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
1171 		index = chunk->io_unit_index[i];
1172 		if (index == REDUCE_EMPTY_MAP_ENTRY) {
1173 			break;
1174 		}
1175 		assert(spdk_bit_array_get(vol->allocated_backing_io_units,
1176 					  index) == true);
1177 		spdk_bit_array_clear(vol->allocated_backing_io_units, index);
1178 		vol->info.allocated_io_units--;
1179 		success = queue_enqueue(&vol->free_backing_blocks_queue, index);
1180 		if (!success && index < vol->find_block_offset) {
1181 			vol->find_block_offset = index;
1182 		}
1183 		chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1184 	}
1185 	success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index);
1186 	if (!success && chunk_map_index < vol->find_chunk_offset) {
1187 		vol->find_chunk_offset = chunk_map_index;
1188 	}
1189 	spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
1190 }
1191 
1192 static void
1193 _write_write_done(void *_req, int reduce_errno)
1194 {
1195 	struct spdk_reduce_vol_request *req = _req;
1196 	struct spdk_reduce_vol *vol = req->vol;
1197 	uint64_t old_chunk_map_index;
1198 
1199 	if (reduce_errno != 0) {
1200 		req->reduce_errno = reduce_errno;
1201 	}
1202 
1203 	assert(req->num_backing_ops > 0);
1204 	if (--req->num_backing_ops > 0) {
1205 		return;
1206 	}
1207 
1208 	if (req->reduce_errno != 0) {
1209 		_reduce_vol_reset_chunk(vol, req->chunk_map_index);
1210 		_reduce_vol_complete_req(req, req->reduce_errno);
1211 		return;
1212 	}
1213 
1214 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1215 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1216 		_reduce_vol_reset_chunk(vol, old_chunk_map_index);
1217 	}
1218 
1219 	/*
1220 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1221 	 * becomes invalid after we update the logical map, since the old chunk map will no
1222 	 * longer have a reference to it in the logical map.
1223 	 */
1224 
1225 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1226 	_reduce_persist(vol, req->chunk,
1227 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1228 
1229 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1230 
1231 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1232 
1233 	_reduce_vol_complete_req(req, 0);
1234 }
1235 
1236 static struct spdk_reduce_backing_io *
1237 _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index)
1238 {
1239 	struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev;
1240 	struct spdk_reduce_backing_io *backing_io;
1241 
1242 	backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io +
1243 			(sizeof(*backing_io) + backing_dev->user_ctx_size) * index);
1244 
1245 	return backing_io;
1246 
1247 }
1248 
1249 struct reduce_merged_io_desc {
1250 	uint64_t io_unit_index;
1251 	uint32_t num_io_units;
1252 };
1253 
1254 static void
1255 _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1256 				 reduce_request_fn next_fn, bool is_write)
1257 {
1258 	struct iovec *iov;
1259 	struct spdk_reduce_backing_io *backing_io;
1260 	uint8_t *buf;
1261 	uint32_t i;
1262 
1263 	if (req->chunk_is_compressed) {
1264 		iov = req->comp_buf_iov;
1265 		buf = req->comp_buf;
1266 	} else {
1267 		iov = req->decomp_buf_iov;
1268 		buf = req->decomp_buf;
1269 	}
1270 
1271 	req->num_backing_ops = req->num_io_units;
1272 	req->backing_cb_args.cb_fn = next_fn;
1273 	req->backing_cb_args.cb_arg = req;
1274 	for (i = 0; i < req->num_io_units; i++) {
1275 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1276 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1277 		iov[i].iov_len = vol->params.backing_io_unit_size;
1278 		backing_io->dev  = vol->backing_dev;
1279 		backing_io->iov = &iov[i];
1280 		backing_io->iovcnt = 1;
1281 		backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit;
1282 		backing_io->lba_count = vol->backing_lba_per_io_unit;
1283 		backing_io->backing_cb_args = &req->backing_cb_args;
1284 		if (is_write) {
1285 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1286 		} else {
1287 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1288 		}
1289 		vol->backing_dev->submit_backing_io(backing_io);
1290 	}
1291 }
1292 
1293 static void
1294 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1295 		   reduce_request_fn next_fn, bool is_write)
1296 {
1297 	struct iovec *iov;
1298 	struct spdk_reduce_backing_io *backing_io;
1299 	struct reduce_merged_io_desc merged_io_desc[4];
1300 	uint8_t *buf;
1301 	bool merge = false;
1302 	uint32_t num_io = 0;
1303 	uint32_t io_unit_counts = 0;
1304 	uint32_t merged_io_idx = 0;
1305 	uint32_t i;
1306 
1307 	/* The merged_io_desc value is defined here to contain four elements,
1308 	 * and the chunk size must be four times the maximum of the io unit.
1309 	 * if chunk size is too big, don't merge IO.
1310 	 */
1311 	if (vol->backing_io_units_per_chunk > 4) {
1312 		_issue_backing_ops_without_merge(req, vol, next_fn, is_write);
1313 		return;
1314 	}
1315 
1316 	if (req->chunk_is_compressed) {
1317 		iov = req->comp_buf_iov;
1318 		buf = req->comp_buf;
1319 	} else {
1320 		iov = req->decomp_buf_iov;
1321 		buf = req->decomp_buf;
1322 	}
1323 
1324 	for (i = 0; i < req->num_io_units; i++) {
1325 		if (!merge) {
1326 			merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
1327 			merged_io_desc[merged_io_idx].num_io_units = 1;
1328 			num_io++;
1329 		}
1330 
1331 		if (i + 1 == req->num_io_units) {
1332 			break;
1333 		}
1334 
1335 		if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
1336 			merged_io_desc[merged_io_idx].num_io_units += 1;
1337 			merge = true;
1338 			continue;
1339 		}
1340 		merge = false;
1341 		merged_io_idx++;
1342 	}
1343 
1344 	req->num_backing_ops = num_io;
1345 	req->backing_cb_args.cb_fn = next_fn;
1346 	req->backing_cb_args.cb_arg = req;
1347 	for (i = 0; i < num_io; i++) {
1348 		backing_io = _reduce_vol_req_get_backing_io(req, i);
1349 		iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
1350 		iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
1351 		backing_io->dev  = vol->backing_dev;
1352 		backing_io->iov = &iov[i];
1353 		backing_io->iovcnt = 1;
1354 		backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit;
1355 		backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units;
1356 		backing_io->backing_cb_args = &req->backing_cb_args;
1357 		if (is_write) {
1358 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
1359 		} else {
1360 			backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
1361 		}
1362 		vol->backing_dev->submit_backing_io(backing_io);
1363 
1364 		/* Collects the number of processed I/O. */
1365 		io_unit_counts += merged_io_desc[i].num_io_units;
1366 	}
1367 }
1368 
1369 static void
1370 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1371 			uint32_t compressed_size)
1372 {
1373 	struct spdk_reduce_vol *vol = req->vol;
1374 	uint32_t i;
1375 	uint64_t chunk_offset, remainder, free_index, total_len = 0;
1376 	uint8_t *buf;
1377 	bool success;
1378 	int j;
1379 
1380 	success = queue_dequeue(&vol->free_chunks_queue, &free_index);
1381 	if (success) {
1382 		req->chunk_map_index = free_index;
1383 	} else {
1384 		req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps,
1385 				       vol->find_chunk_offset);
1386 		vol->find_chunk_offset = req->chunk_map_index + 1;
1387 	}
1388 
1389 	/* TODO: fail if no chunk map found - but really this should not happen if we
1390 	 * size the number of requests similarly to number of extra chunk maps
1391 	 */
1392 	assert(req->chunk_map_index != UINT32_MAX);
1393 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1394 
1395 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1396 	req->num_io_units = spdk_divide_round_up(compressed_size,
1397 			    vol->params.backing_io_unit_size);
1398 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1399 	req->chunk->compressed_size =
1400 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1401 
1402 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1403 	if (req->chunk_is_compressed == false) {
1404 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1405 		buf = req->decomp_buf;
1406 		total_len = chunk_offset * vol->params.logical_block_size;
1407 
1408 		/* zero any offset into chunk */
1409 		if (req->rmw == false && chunk_offset) {
1410 			memset(buf, 0, total_len);
1411 		}
1412 		buf += total_len;
1413 
1414 		/* copy the data */
1415 		for (j = 0; j < req->iovcnt; j++) {
1416 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1417 			buf += req->iov[j].iov_len;
1418 			total_len += req->iov[j].iov_len;
1419 		}
1420 
1421 		/* zero any remainder */
1422 		remainder = vol->params.chunk_size - total_len;
1423 		total_len += remainder;
1424 		if (req->rmw == false && remainder) {
1425 			memset(buf, 0, remainder);
1426 		}
1427 		assert(total_len == vol->params.chunk_size);
1428 	}
1429 
1430 	for (i = 0; i < req->num_io_units; i++) {
1431 		success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index);
1432 		if (success) {
1433 			req->chunk->io_unit_index[i] = free_index;
1434 		} else {
1435 			req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units,
1436 						       vol->find_block_offset);
1437 			vol->find_block_offset = req->chunk->io_unit_index[i] + 1;
1438 		}
1439 		/* TODO: fail if no backing block found - but really this should also not
1440 		 * happen (see comment above).
1441 		 */
1442 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1443 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1444 		vol->info.allocated_io_units++;
1445 	}
1446 
1447 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1448 }
1449 
1450 static void
1451 _write_compress_done(void *_req, int reduce_errno)
1452 {
1453 	struct spdk_reduce_vol_request *req = _req;
1454 
1455 	/* Negative reduce_errno indicates failure for compression operations.
1456 	 * Just write the uncompressed data instead.  Force this to happen
1457 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1458 	 * When it sees the data couldn't be compressed, it will just write
1459 	 * the uncompressed buffer to disk.
1460 	 */
1461 	if (reduce_errno < 0) {
1462 		req->backing_cb_args.output_size = req->vol->params.chunk_size;
1463 	}
1464 
1465 	_reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
1466 }
1467 
1468 static void
1469 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1470 {
1471 	struct spdk_reduce_vol *vol = req->vol;
1472 
1473 	req->backing_cb_args.cb_fn = next_fn;
1474 	req->backing_cb_args.cb_arg = req;
1475 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1476 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1477 	vol->backing_dev->compress(vol->backing_dev,
1478 				   req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
1479 				   &req->backing_cb_args);
1480 }
1481 
1482 static void
1483 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1484 {
1485 	struct spdk_reduce_vol *vol = req->vol;
1486 
1487 	req->backing_cb_args.cb_fn = next_fn;
1488 	req->backing_cb_args.cb_arg = req;
1489 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1490 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1491 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1492 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1493 	vol->backing_dev->decompress(vol->backing_dev,
1494 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1495 				     &req->backing_cb_args);
1496 }
1497 
1498 static void
1499 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1500 {
1501 	struct spdk_reduce_vol *vol = req->vol;
1502 	uint64_t chunk_offset, remainder = 0;
1503 	uint64_t ttl_len = 0;
1504 	size_t iov_len;
1505 	int i;
1506 
1507 	req->decomp_iovcnt = 0;
1508 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1509 
1510 	/* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
1511 	 * if at least one of the conditions below is true:
1512 	 * 1. User's buffer is fragmented
1513 	 * 2. Length of the user's buffer is less than the chunk
1514 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1515 	iov_len = req->iov[0].iov_len;
1516 	req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
1517 				     req->iov[0].iov_len < vol->params.chunk_size ||
1518 				     _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
1519 	if (req->copy_after_decompress) {
1520 		req->decomp_iov[0].iov_base = req->decomp_buf;
1521 		req->decomp_iov[0].iov_len = vol->params.chunk_size;
1522 		req->decomp_iovcnt = 1;
1523 		goto decompress;
1524 	}
1525 
1526 	if (chunk_offset) {
1527 		/* first iov point to our scratch buffer for any offset into the chunk */
1528 		req->decomp_iov[0].iov_base = req->decomp_buf;
1529 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1530 		ttl_len += req->decomp_iov[0].iov_len;
1531 		req->decomp_iovcnt = 1;
1532 	}
1533 
1534 	/* now the user data iov, direct to the user buffer */
1535 	for (i = 0; i < req->iovcnt; i++) {
1536 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1537 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1538 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1539 	}
1540 	req->decomp_iovcnt += req->iovcnt;
1541 
1542 	/* send the rest of the chunk to our scratch buffer */
1543 	remainder = vol->params.chunk_size - ttl_len;
1544 	if (remainder) {
1545 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1546 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1547 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1548 		req->decomp_iovcnt++;
1549 	}
1550 	assert(ttl_len == vol->params.chunk_size);
1551 
1552 decompress:
1553 	assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
1554 	req->backing_cb_args.cb_fn = next_fn;
1555 	req->backing_cb_args.cb_arg = req;
1556 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1557 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1558 	vol->backing_dev->decompress(vol->backing_dev,
1559 				     req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
1560 				     &req->backing_cb_args);
1561 }
1562 
1563 static inline void
1564 _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
1565 {
1566 	struct spdk_reduce_vol *vol = req->vol;
1567 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1568 	uint64_t chunk_offset, ttl_len = 0;
1569 	uint64_t remainder = 0;
1570 	char *copy_offset = NULL;
1571 	uint32_t lbsize = vol->params.logical_block_size;
1572 	int i;
1573 
1574 	req->decomp_iov[0].iov_base = req->decomp_buf;
1575 	req->decomp_iov[0].iov_len = vol->params.chunk_size;
1576 	req->decomp_iovcnt = 1;
1577 	copy_offset = req->decomp_iov[0].iov_base;
1578 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1579 
1580 	if (chunk_offset) {
1581 		ttl_len += chunk_offset * lbsize;
1582 		/* copy_offset already points to padding buffer if zero_paddings=false */
1583 		if (zero_paddings) {
1584 			memcpy(copy_offset, padding_buffer, ttl_len);
1585 		}
1586 		copy_offset += ttl_len;
1587 	}
1588 
1589 	/* now the user data iov, direct from the user buffer */
1590 	for (i = 0; i < req->iovcnt; i++) {
1591 		memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
1592 		copy_offset += req->iov[i].iov_len;
1593 		ttl_len += req->iov[i].iov_len;
1594 	}
1595 
1596 	remainder = vol->params.chunk_size - ttl_len;
1597 	if (remainder) {
1598 		/* copy_offset already points to padding buffer if zero_paddings=false */
1599 		if (zero_paddings) {
1600 			memcpy(copy_offset, padding_buffer + ttl_len, remainder);
1601 		}
1602 		ttl_len += remainder;
1603 	}
1604 
1605 	assert(ttl_len == req->vol->params.chunk_size);
1606 }
1607 
1608 /* This function can be called when we are compressing a new data or in case of read-modify-write
1609  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
1610  * should point to already read and decompressed buffer */
1611 static inline void
1612 _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
1613 {
1614 	struct spdk_reduce_vol *vol = req->vol;
1615 	char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
1616 	uint64_t chunk_offset, ttl_len = 0;
1617 	uint64_t remainder = 0;
1618 	uint32_t lbsize = vol->params.logical_block_size;
1619 	size_t iov_len;
1620 	int i;
1621 
1622 	/* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
1623 	 * if at least one of the conditions below is true:
1624 	 * 1. User's buffer is fragmented
1625 	 * 2. Length of the user's buffer is less than the chunk
1626 	 * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
1627 	iov_len = req->iov[0].iov_len;
1628 	if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
1629 					  req->iov[0].iov_len < vol->params.chunk_size ||
1630 					  _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
1631 		_prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
1632 		return;
1633 	}
1634 
1635 	req->decomp_iovcnt = 0;
1636 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1637 
1638 	if (chunk_offset != 0) {
1639 		ttl_len += chunk_offset * lbsize;
1640 		req->decomp_iov[0].iov_base = padding_buffer;
1641 		req->decomp_iov[0].iov_len = ttl_len;
1642 		req->decomp_iovcnt = 1;
1643 	}
1644 
1645 	/* now the user data iov, direct from the user buffer */
1646 	for (i = 0; i < req->iovcnt; i++) {
1647 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1648 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1649 		ttl_len += req->iov[i].iov_len;
1650 	}
1651 	req->decomp_iovcnt += req->iovcnt;
1652 
1653 	remainder = vol->params.chunk_size - ttl_len;
1654 	if (remainder) {
1655 		req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
1656 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1657 		req->decomp_iovcnt++;
1658 		ttl_len += remainder;
1659 	}
1660 	assert(ttl_len == req->vol->params.chunk_size);
1661 }
1662 
1663 static void
1664 _write_decompress_done(void *_req, int reduce_errno)
1665 {
1666 	struct spdk_reduce_vol_request *req = _req;
1667 
1668 	/* Negative reduce_errno indicates failure for compression operations. */
1669 	if (reduce_errno < 0) {
1670 		_reduce_vol_complete_req(req, reduce_errno);
1671 		return;
1672 	}
1673 
1674 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1675 	 * represents the output_size.
1676 	 */
1677 	if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
1678 		_reduce_vol_complete_req(req, -EIO);
1679 		return;
1680 	}
1681 
1682 	_prepare_compress_chunk(req, false);
1683 	_reduce_vol_compress_chunk(req, _write_compress_done);
1684 }
1685 
1686 static void
1687 _write_read_done(void *_req, int reduce_errno)
1688 {
1689 	struct spdk_reduce_vol_request *req = _req;
1690 
1691 	if (reduce_errno != 0) {
1692 		req->reduce_errno = reduce_errno;
1693 	}
1694 
1695 	assert(req->num_backing_ops > 0);
1696 	if (--req->num_backing_ops > 0) {
1697 		return;
1698 	}
1699 
1700 	if (req->reduce_errno != 0) {
1701 		_reduce_vol_complete_req(req, req->reduce_errno);
1702 		return;
1703 	}
1704 
1705 	if (req->chunk_is_compressed) {
1706 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1707 	} else {
1708 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1709 
1710 		_write_decompress_done(req, 0);
1711 	}
1712 }
1713 
1714 static void
1715 _read_decompress_done(void *_req, int reduce_errno)
1716 {
1717 	struct spdk_reduce_vol_request *req = _req;
1718 	struct spdk_reduce_vol *vol = req->vol;
1719 
1720 	/* Negative reduce_errno indicates failure for compression operations. */
1721 	if (reduce_errno < 0) {
1722 		_reduce_vol_complete_req(req, reduce_errno);
1723 		return;
1724 	}
1725 
1726 	/* Positive reduce_errno indicates that the output size field in the backing_cb_args
1727 	 * represents the output_size.
1728 	 */
1729 	if (req->backing_cb_args.output_size != vol->params.chunk_size) {
1730 		_reduce_vol_complete_req(req, -EIO);
1731 		return;
1732 	}
1733 
1734 	if (req->copy_after_decompress) {
1735 		uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1736 		char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
1737 		int i;
1738 
1739 		for (i = 0; i < req->iovcnt; i++) {
1740 			memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
1741 			decomp_buffer += req->iov[i].iov_len;
1742 			assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
1743 		}
1744 	}
1745 
1746 	_reduce_vol_complete_req(req, 0);
1747 }
1748 
1749 static void
1750 _read_read_done(void *_req, int reduce_errno)
1751 {
1752 	struct spdk_reduce_vol_request *req = _req;
1753 	uint64_t chunk_offset;
1754 	uint8_t *buf;
1755 	int i;
1756 
1757 	if (reduce_errno != 0) {
1758 		req->reduce_errno = reduce_errno;
1759 	}
1760 
1761 	assert(req->num_backing_ops > 0);
1762 	if (--req->num_backing_ops > 0) {
1763 		return;
1764 	}
1765 
1766 	if (req->reduce_errno != 0) {
1767 		_reduce_vol_complete_req(req, req->reduce_errno);
1768 		return;
1769 	}
1770 
1771 	if (req->chunk_is_compressed) {
1772 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1773 	} else {
1774 
1775 		/* If the chunk was compressed, the data would have been sent to the
1776 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1777 		 */
1778 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1779 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1780 		for (i = 0; i < req->iovcnt; i++) {
1781 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1782 			buf += req->iov[i].iov_len;
1783 		}
1784 
1785 		req->backing_cb_args.output_size = req->chunk->compressed_size;
1786 
1787 		_read_decompress_done(req, 0);
1788 	}
1789 }
1790 
1791 static void
1792 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1793 {
1794 	struct spdk_reduce_vol *vol = req->vol;
1795 
1796 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1797 	assert(req->chunk_map_index != UINT32_MAX);
1798 
1799 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1800 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1801 			    vol->params.backing_io_unit_size);
1802 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1803 
1804 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1805 }
1806 
1807 static bool
1808 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1809 		    uint64_t length)
1810 {
1811 	uint64_t size = 0;
1812 	int i;
1813 
1814 	if (iovcnt > REDUCE_MAX_IOVECS) {
1815 		return false;
1816 	}
1817 
1818 	for (i = 0; i < iovcnt; i++) {
1819 		size += iov[i].iov_len;
1820 	}
1821 
1822 	return size == (length * vol->params.logical_block_size);
1823 }
1824 
1825 static bool
1826 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1827 {
1828 	struct spdk_reduce_vol_request req;
1829 
1830 	req.logical_map_index = logical_map_index;
1831 
1832 	return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req));
1833 }
1834 
1835 static void
1836 _start_readv_request(struct spdk_reduce_vol_request *req)
1837 {
1838 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1839 	_reduce_vol_read_chunk(req, _read_read_done);
1840 }
1841 
1842 void
1843 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1844 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1845 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1846 {
1847 	struct spdk_reduce_vol_request *req;
1848 	uint64_t logical_map_index;
1849 	bool overlapped;
1850 	int i;
1851 
1852 	if (length == 0) {
1853 		cb_fn(cb_arg, 0);
1854 		return;
1855 	}
1856 
1857 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1858 		cb_fn(cb_arg, -EINVAL);
1859 		return;
1860 	}
1861 
1862 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1863 		cb_fn(cb_arg, -EINVAL);
1864 		return;
1865 	}
1866 
1867 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1868 	overlapped = _check_overlap(vol, logical_map_index);
1869 
1870 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1871 		/*
1872 		 * This chunk hasn't been allocated.  So treat the data as all
1873 		 * zeroes for this chunk - do the memset and immediately complete
1874 		 * the operation.
1875 		 */
1876 		for (i = 0; i < iovcnt; i++) {
1877 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1878 		}
1879 		cb_fn(cb_arg, 0);
1880 		return;
1881 	}
1882 
1883 	req = TAILQ_FIRST(&vol->free_requests);
1884 	if (req == NULL) {
1885 		cb_fn(cb_arg, -ENOMEM);
1886 		return;
1887 	}
1888 
1889 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1890 	req->type = REDUCE_IO_READV;
1891 	req->vol = vol;
1892 	req->iov = iov;
1893 	req->iovcnt = iovcnt;
1894 	req->offset = offset;
1895 	req->logical_map_index = logical_map_index;
1896 	req->length = length;
1897 	req->copy_after_decompress = false;
1898 	req->cb_fn = cb_fn;
1899 	req->cb_arg = cb_arg;
1900 
1901 	if (!overlapped) {
1902 		_start_readv_request(req);
1903 	} else {
1904 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1905 	}
1906 }
1907 
1908 static void
1909 _start_writev_request(struct spdk_reduce_vol_request *req)
1910 {
1911 	struct spdk_reduce_vol *vol = req->vol;
1912 
1913 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1914 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1915 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1916 			/* Read old chunk, then overwrite with data from this write
1917 			 *  operation.
1918 			 */
1919 			req->rmw = true;
1920 			_reduce_vol_read_chunk(req, _write_read_done);
1921 			return;
1922 		}
1923 	}
1924 
1925 	req->rmw = false;
1926 
1927 	_prepare_compress_chunk(req, true);
1928 	_reduce_vol_compress_chunk(req, _write_compress_done);
1929 }
1930 
1931 void
1932 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1933 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1934 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1935 {
1936 	struct spdk_reduce_vol_request *req;
1937 	uint64_t logical_map_index;
1938 	bool overlapped;
1939 
1940 	if (length == 0) {
1941 		cb_fn(cb_arg, 0);
1942 		return;
1943 	}
1944 
1945 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1946 		cb_fn(cb_arg, -EINVAL);
1947 		return;
1948 	}
1949 
1950 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1951 		cb_fn(cb_arg, -EINVAL);
1952 		return;
1953 	}
1954 
1955 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1956 	overlapped = _check_overlap(vol, logical_map_index);
1957 
1958 	req = TAILQ_FIRST(&vol->free_requests);
1959 	if (req == NULL) {
1960 		cb_fn(cb_arg, -ENOMEM);
1961 		return;
1962 	}
1963 
1964 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1965 	req->type = REDUCE_IO_WRITEV;
1966 	req->vol = vol;
1967 	req->iov = iov;
1968 	req->iovcnt = iovcnt;
1969 	req->offset = offset;
1970 	req->logical_map_index = logical_map_index;
1971 	req->length = length;
1972 	req->copy_after_decompress = false;
1973 	req->cb_fn = cb_fn;
1974 	req->cb_arg = cb_arg;
1975 
1976 	if (!overlapped) {
1977 		_start_writev_request(req);
1978 	} else {
1979 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1980 	}
1981 }
1982 
1983 static void
1984 _start_unmap_request_full_chunk(void *ctx)
1985 {
1986 	struct spdk_reduce_vol_request *req = ctx;
1987 	struct spdk_reduce_vol *vol = req->vol;
1988 	uint64_t chunk_map_index;
1989 
1990 	RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
1991 
1992 	chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1993 	if (chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
1994 		_reduce_vol_reset_chunk(vol, chunk_map_index);
1995 		req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1996 		_reduce_persist(vol, req->chunk,
1997 				_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1998 		vol->pm_logical_map[req->logical_map_index] = REDUCE_EMPTY_MAP_ENTRY;
1999 		_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
2000 	}
2001 	_reduce_vol_complete_req(req, 0);
2002 }
2003 
2004 static void
2005 _reduce_vol_unmap_full_chunk(struct spdk_reduce_vol *vol,
2006 			     uint64_t offset, uint64_t length,
2007 			     spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
2008 {
2009 	struct spdk_reduce_vol_request *req;
2010 	uint64_t logical_map_index;
2011 	bool overlapped;
2012 
2013 	if (_request_spans_chunk_boundary(vol, offset, length)) {
2014 		cb_fn(cb_arg, -EINVAL);
2015 		return;
2016 	}
2017 
2018 	logical_map_index = offset / vol->logical_blocks_per_chunk;
2019 	overlapped = _check_overlap(vol, logical_map_index);
2020 
2021 	req = TAILQ_FIRST(&vol->free_requests);
2022 	if (req == NULL) {
2023 		cb_fn(cb_arg, -ENOMEM);
2024 		return;
2025 	}
2026 
2027 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
2028 	req->type = REDUCE_IO_UNMAP;
2029 	req->vol = vol;
2030 	req->iov = NULL;
2031 	req->iovcnt = 0;
2032 	req->offset = offset;
2033 	req->logical_map_index = logical_map_index;
2034 	req->length = length;
2035 	req->copy_after_decompress = false;
2036 	req->cb_fn = cb_fn;
2037 	req->cb_arg = cb_arg;
2038 
2039 	if (!overlapped) {
2040 		_start_unmap_request_full_chunk(req);
2041 	} else {
2042 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
2043 	}
2044 }
2045 
2046 struct unmap_partial_chunk_ctx {
2047 	struct spdk_reduce_vol *vol;
2048 	struct iovec iov;
2049 	spdk_reduce_vol_op_complete cb_fn;
2050 	void *cb_arg;
2051 };
2052 
2053 static void
2054 _reduce_unmap_partial_chunk_complete(void *_ctx, int reduce_errno)
2055 {
2056 	struct unmap_partial_chunk_ctx *ctx = _ctx;
2057 
2058 	ctx->cb_fn(ctx->cb_arg, reduce_errno);
2059 	free(ctx);
2060 }
2061 
2062 static void
2063 _reduce_vol_unmap_partial_chunk(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length,
2064 				spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
2065 {
2066 	struct unmap_partial_chunk_ctx *ctx;
2067 
2068 	ctx = calloc(1, sizeof(struct unmap_partial_chunk_ctx));
2069 	if (ctx == NULL) {
2070 		cb_fn(cb_arg, -ENOMEM);
2071 		return;
2072 	}
2073 
2074 	ctx->vol = vol;
2075 	ctx->iov.iov_base = g_zero_buf;
2076 	ctx->iov.iov_len = length * vol->params.logical_block_size;
2077 	ctx->cb_fn = cb_fn;
2078 	ctx->cb_arg = cb_arg;
2079 
2080 	spdk_reduce_vol_writev(vol, &ctx->iov, 1, offset, length, _reduce_unmap_partial_chunk_complete,
2081 			       ctx);
2082 }
2083 
2084 void
2085 spdk_reduce_vol_unmap(struct spdk_reduce_vol *vol,
2086 		      uint64_t offset, uint64_t length,
2087 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
2088 {
2089 	if (length < vol->logical_blocks_per_chunk) {
2090 		_reduce_vol_unmap_partial_chunk(vol, offset, length, cb_fn, cb_arg);
2091 	} else if (length == vol->logical_blocks_per_chunk) {
2092 		_reduce_vol_unmap_full_chunk(vol, offset, length, cb_fn, cb_arg);
2093 	} else {
2094 		cb_fn(cb_arg, -EINVAL);
2095 	}
2096 }
2097 
2098 const struct spdk_reduce_vol_params *
2099 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
2100 {
2101 	return &vol->params;
2102 }
2103 
2104 const char *
2105 spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
2106 {
2107 	return vol->pm_file.path;
2108 }
2109 
2110 void
2111 spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
2112 {
2113 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
2114 	uint32_t struct_size;
2115 	uint64_t chunk_map_size;
2116 
2117 	SPDK_NOTICELOG("vol info:\n");
2118 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
2119 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
2120 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
2121 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
2122 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
2123 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
2124 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
2125 		       vol->params.vol_size / vol->params.chunk_size);
2126 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
2127 			vol->params.backing_io_unit_size);
2128 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
2129 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
2130 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
2131 
2132 	SPDK_NOTICELOG("pmem info:\n");
2133 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
2134 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
2135 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
2136 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
2137 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
2138 			   vol->params.chunk_size);
2139 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
2140 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
2141 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
2142 			 vol->params.backing_io_unit_size);
2143 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
2144 }
2145 
2146 SPDK_LOG_REGISTER_COMPONENT(reduce)
2147