xref: /spdk/lib/reduce/reduce.c (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 /**
71  * Describes a persistent memory file used to hold metadata associated with a
72  *  compressed volume.
73  */
74 struct spdk_reduce_pm_file {
75 	char			path[REDUCE_PATH_MAX];
76 	void			*pm_buf;
77 	int			pm_is_pmem;
78 	uint64_t		size;
79 };
80 
81 #define REDUCE_IO_READV		1
82 #define REDUCE_IO_WRITEV	2
83 
84 struct spdk_reduce_chunk_map {
85 	uint32_t		compressed_size;
86 	uint32_t		reserved;
87 	uint64_t		io_unit_index[0];
88 };
89 
90 struct spdk_reduce_vol_request {
91 	/**
92 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
93 	 *   1) source buffer for compression operations
94 	 *   2) destination buffer for decompression operations
95 	 *   3) data buffer when writing uncompressed chunk to disk
96 	 *   4) data buffer when reading uncompressed chunk from disk
97 	 */
98 	uint8_t					*decomp_buf;
99 	struct iovec				*decomp_buf_iov;
100 
101 	/**
102 	 * These are used to construct the iovecs that are sent to
103 	 *  the decomp engine, they point to a mix of the scratch buffer
104 	 *  and user buffer
105 	 */
106 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS];
107 	int					decomp_iovcnt;
108 
109 	/**
110 	 *  Scratch buffer used for compressed chunk.  This is used for:
111 	 *   1) destination buffer for compression operations
112 	 *   2) source buffer for decompression operations
113 	 *   3) data buffer when writing compressed chunk to disk
114 	 *   4) data buffer when reading compressed chunk from disk
115 	 */
116 	uint8_t					*comp_buf;
117 	struct iovec				*comp_buf_iov;
118 	struct iovec				*iov;
119 	bool					rmw;
120 	struct spdk_reduce_vol			*vol;
121 	int					type;
122 	int					reduce_errno;
123 	int					iovcnt;
124 	int					num_backing_ops;
125 	uint32_t				num_io_units;
126 	bool					chunk_is_compressed;
127 	uint64_t				offset;
128 	uint64_t				logical_map_index;
129 	uint64_t				length;
130 	uint64_t				chunk_map_index;
131 	struct spdk_reduce_chunk_map		*chunk;
132 	spdk_reduce_vol_op_complete		cb_fn;
133 	void					*cb_arg;
134 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
135 	struct spdk_reduce_vol_cb_args		backing_cb_args;
136 };
137 
138 struct spdk_reduce_vol {
139 	struct spdk_reduce_vol_params		params;
140 	uint32_t				backing_io_units_per_chunk;
141 	uint32_t				backing_lba_per_io_unit;
142 	uint32_t				logical_blocks_per_chunk;
143 	struct spdk_reduce_pm_file		pm_file;
144 	struct spdk_reduce_backing_dev		*backing_dev;
145 	struct spdk_reduce_vol_superblock	*backing_super;
146 	struct spdk_reduce_vol_superblock	*pm_super;
147 	uint64_t				*pm_logical_map;
148 	uint64_t				*pm_chunk_maps;
149 
150 	struct spdk_bit_array			*allocated_chunk_maps;
151 	struct spdk_bit_array			*allocated_backing_io_units;
152 
153 	struct spdk_reduce_vol_request		*request_mem;
154 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
155 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
156 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
157 
158 	/* Single contiguous buffer used for all request buffers for this volume. */
159 	uint8_t					*buf_mem;
160 	struct iovec				*buf_iov_mem;
161 };
162 
163 static void _start_readv_request(struct spdk_reduce_vol_request *req);
164 static void _start_writev_request(struct spdk_reduce_vol_request *req);
165 static uint8_t *g_zero_buf;
166 static int g_vol_count = 0;
167 
168 /*
169  * Allocate extra metadata chunks and corresponding backing io units to account for
170  *  outstanding IO in worst case scenario where logical map is completely allocated
171  *  and no data can be compressed.  We need extra chunks in this case to handle
172  *  in-flight writes since reduce never writes data in place.
173  */
174 #define REDUCE_NUM_EXTRA_CHUNKS 128
175 
176 static void
177 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
178 {
179 	if (vol->pm_file.pm_is_pmem) {
180 		pmem_persist(addr, len);
181 	} else {
182 		pmem_msync(addr, len);
183 	}
184 }
185 
186 static uint64_t
187 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
188 {
189 	uint64_t chunks_in_logical_map, logical_map_size;
190 
191 	chunks_in_logical_map = vol_size / chunk_size;
192 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
193 
194 	/* Round up to next cacheline. */
195 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
196 	       REDUCE_PM_SIZE_ALIGNMENT;
197 }
198 
199 static uint64_t
200 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
201 {
202 	uint64_t num_chunks;
203 
204 	num_chunks = vol_size / chunk_size;
205 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
206 
207 	return num_chunks;
208 }
209 
210 static inline uint32_t
211 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
212 {
213 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
214 }
215 
216 static uint64_t
217 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
218 {
219 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
220 
221 	num_chunks = _get_total_chunks(vol_size, chunk_size);
222 	io_units_per_chunk = chunk_size / backing_io_unit_size;
223 
224 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
225 
226 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
227 	       REDUCE_PM_SIZE_ALIGNMENT;
228 }
229 
230 static struct spdk_reduce_chunk_map *
231 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
232 {
233 	uintptr_t chunk_map_addr;
234 
235 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
236 
237 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
238 	chunk_map_addr += chunk_map_index *
239 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
240 
241 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
242 }
243 
244 static int
245 _validate_vol_params(struct spdk_reduce_vol_params *params)
246 {
247 	if (params->vol_size > 0) {
248 		/**
249 		 * User does not pass in the vol size - it gets calculated by libreduce from
250 		 *  values in this structure plus the size of the backing device.
251 		 */
252 		return -EINVAL;
253 	}
254 
255 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
256 	    params->logical_block_size == 0) {
257 		return -EINVAL;
258 	}
259 
260 	/* Chunk size must be an even multiple of the backing io unit size. */
261 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
262 		return -EINVAL;
263 	}
264 
265 	/* Chunk size must be an even multiple of the logical block size. */
266 	if ((params->chunk_size % params->logical_block_size) != 0) {
267 		return -1;
268 	}
269 
270 	return 0;
271 }
272 
273 static uint64_t
274 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
275 {
276 	uint64_t num_chunks;
277 
278 	num_chunks = backing_dev_size / chunk_size;
279 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
280 		return 0;
281 	}
282 
283 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
284 	return num_chunks * chunk_size;
285 }
286 
287 static uint64_t
288 _get_pm_file_size(struct spdk_reduce_vol_params *params)
289 {
290 	uint64_t total_pm_size;
291 
292 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
293 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
294 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
295 			 params->backing_io_unit_size);
296 	return total_pm_size;
297 }
298 
299 const struct spdk_uuid *
300 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
301 {
302 	return &vol->params.uuid;
303 }
304 
305 static void
306 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
307 {
308 	uint64_t logical_map_size;
309 
310 	/* Superblock is at the beginning of the pm file. */
311 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
312 
313 	/* Logical map immediately follows the super block. */
314 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
315 
316 	/* Chunks maps follow the logical map. */
317 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
318 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
319 }
320 
321 /* We need 2 iovs during load - one for the superblock, another for the path */
322 #define LOAD_IOV_COUNT	2
323 
324 struct reduce_init_load_ctx {
325 	struct spdk_reduce_vol			*vol;
326 	struct spdk_reduce_vol_cb_args		backing_cb_args;
327 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
328 	void					*cb_arg;
329 	struct iovec				iov[LOAD_IOV_COUNT];
330 	void					*path;
331 };
332 
333 static int
334 _allocate_vol_requests(struct spdk_reduce_vol *vol)
335 {
336 	struct spdk_reduce_vol_request *req;
337 	int i;
338 
339 	/* Allocate 2x since we need buffers for both read/write and compress/decompress
340 	 *  intermediate buffers.
341 	 */
342 	vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size,
343 				   64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
344 	if (vol->buf_mem == NULL) {
345 		return -ENOMEM;
346 	}
347 
348 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
349 	if (vol->request_mem == NULL) {
350 		spdk_free(vol->buf_mem);
351 		vol->buf_mem = NULL;
352 		return -ENOMEM;
353 	}
354 
355 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
356 	 *  buffers.
357 	 */
358 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
359 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
360 	if (vol->buf_iov_mem == NULL) {
361 		free(vol->request_mem);
362 		spdk_free(vol->buf_mem);
363 		vol->request_mem = NULL;
364 		vol->buf_mem = NULL;
365 		return -ENOMEM;
366 	}
367 
368 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
369 		req = &vol->request_mem[i];
370 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
371 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
372 		req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size;
373 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
374 		req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size;
375 	}
376 
377 	return 0;
378 }
379 
380 static void
381 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
382 {
383 	if (ctx != NULL) {
384 		spdk_free(ctx->path);
385 		free(ctx);
386 	}
387 
388 	if (vol != NULL) {
389 		if (vol->pm_file.pm_buf != NULL) {
390 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
391 		}
392 
393 		spdk_free(vol->backing_super);
394 		spdk_bit_array_free(&vol->allocated_chunk_maps);
395 		spdk_bit_array_free(&vol->allocated_backing_io_units);
396 		free(vol->request_mem);
397 		free(vol->buf_iov_mem);
398 		spdk_free(vol->buf_mem);
399 		free(vol);
400 	}
401 }
402 
403 static int
404 _alloc_zero_buff(struct spdk_reduce_vol *vol)
405 {
406 	int rc = 0;
407 
408 	/* The zero buffer is shared between all volumnes and just used
409 	 * for reads so allocate one global instance here if not already
410 	 * allocated when another vol init'd or loaded.
411 	 */
412 	if (g_vol_count++ == 0) {
413 		g_zero_buf = spdk_zmalloc(vol->params.chunk_size,
414 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
415 					  SPDK_MALLOC_DMA);
416 		if (g_zero_buf == NULL) {
417 			rc = -ENOMEM;
418 		}
419 	}
420 	return rc;
421 }
422 
423 static void
424 _init_write_super_cpl(void *cb_arg, int reduce_errno)
425 {
426 	struct reduce_init_load_ctx *init_ctx = cb_arg;
427 	int rc;
428 
429 	rc = _allocate_vol_requests(init_ctx->vol);
430 	if (rc != 0) {
431 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
432 		_init_load_cleanup(init_ctx->vol, init_ctx);
433 		return;
434 	}
435 
436 	rc = _alloc_zero_buff(init_ctx->vol);
437 	if (rc != 0) {
438 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
439 		_init_load_cleanup(init_ctx->vol, init_ctx);
440 		return;
441 	}
442 
443 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
444 	/* Only clean up the ctx - the vol has been passed to the application
445 	 *  for use now that initialization was successful.
446 	 */
447 	_init_load_cleanup(NULL, init_ctx);
448 }
449 
450 static void
451 _init_write_path_cpl(void *cb_arg, int reduce_errno)
452 {
453 	struct reduce_init_load_ctx *init_ctx = cb_arg;
454 	struct spdk_reduce_vol *vol = init_ctx->vol;
455 
456 	init_ctx->iov[0].iov_base = vol->backing_super;
457 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
458 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
459 	init_ctx->backing_cb_args.cb_arg = init_ctx;
460 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
461 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
462 				 &init_ctx->backing_cb_args);
463 }
464 
465 static int
466 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
467 {
468 	uint64_t total_chunks, total_backing_io_units;
469 	uint32_t i, num_metadata_io_units;
470 
471 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
472 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
473 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
474 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
475 
476 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
477 		return -ENOMEM;
478 	}
479 
480 	/* Set backing io unit bits associated with metadata. */
481 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
482 				vol->backing_dev->blocklen;
483 	for (i = 0; i < num_metadata_io_units; i++) {
484 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
485 	}
486 
487 	return 0;
488 }
489 
490 void
491 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
492 		     struct spdk_reduce_backing_dev *backing_dev,
493 		     const char *pm_file_dir,
494 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
495 {
496 	struct spdk_reduce_vol *vol;
497 	struct reduce_init_load_ctx *init_ctx;
498 	uint64_t backing_dev_size;
499 	size_t mapped_len;
500 	int dir_len, max_dir_len, rc;
501 
502 	/* We need to append a path separator and the UUID to the supplied
503 	 * path.
504 	 */
505 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
506 	dir_len = strnlen(pm_file_dir, max_dir_len);
507 	/* Strip trailing slash if the user provided one - we will add it back
508 	 * later when appending the filename.
509 	 */
510 	if (pm_file_dir[dir_len - 1] == '/') {
511 		dir_len--;
512 	}
513 	if (dir_len == max_dir_len) {
514 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
515 		cb_fn(cb_arg, NULL, -EINVAL);
516 		return;
517 	}
518 
519 	rc = _validate_vol_params(params);
520 	if (rc != 0) {
521 		SPDK_ERRLOG("invalid vol params\n");
522 		cb_fn(cb_arg, NULL, rc);
523 		return;
524 	}
525 
526 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
527 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
528 	if (params->vol_size == 0) {
529 		SPDK_ERRLOG("backing device is too small\n");
530 		cb_fn(cb_arg, NULL, -EINVAL);
531 		return;
532 	}
533 
534 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
535 	    backing_dev->unmap == NULL) {
536 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
537 		cb_fn(cb_arg, NULL, -EINVAL);
538 		return;
539 	}
540 
541 	vol = calloc(1, sizeof(*vol));
542 	if (vol == NULL) {
543 		cb_fn(cb_arg, NULL, -ENOMEM);
544 		return;
545 	}
546 
547 	TAILQ_INIT(&vol->free_requests);
548 	TAILQ_INIT(&vol->executing_requests);
549 	TAILQ_INIT(&vol->queued_requests);
550 
551 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
552 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
553 	if (vol->backing_super == NULL) {
554 		cb_fn(cb_arg, NULL, -ENOMEM);
555 		_init_load_cleanup(vol, NULL);
556 		return;
557 	}
558 
559 	init_ctx = calloc(1, sizeof(*init_ctx));
560 	if (init_ctx == NULL) {
561 		cb_fn(cb_arg, NULL, -ENOMEM);
562 		_init_load_cleanup(vol, NULL);
563 		return;
564 	}
565 
566 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
567 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
568 	if (init_ctx->path == NULL) {
569 		cb_fn(cb_arg, NULL, -ENOMEM);
570 		_init_load_cleanup(vol, init_ctx);
571 		return;
572 	}
573 
574 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
575 		spdk_uuid_generate(&params->uuid);
576 	}
577 
578 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
579 	vol->pm_file.path[dir_len] = '/';
580 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
581 			    &params->uuid);
582 	vol->pm_file.size = _get_pm_file_size(params);
583 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
584 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
585 					    &mapped_len, &vol->pm_file.pm_is_pmem);
586 	if (vol->pm_file.pm_buf == NULL) {
587 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
588 			    vol->pm_file.path, strerror(errno));
589 		cb_fn(cb_arg, NULL, -errno);
590 		_init_load_cleanup(vol, init_ctx);
591 		return;
592 	}
593 
594 	if (vol->pm_file.size != mapped_len) {
595 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
596 			    vol->pm_file.size, mapped_len);
597 		cb_fn(cb_arg, NULL, -ENOMEM);
598 		_init_load_cleanup(vol, init_ctx);
599 		return;
600 	}
601 
602 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
603 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
604 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
605 	memcpy(&vol->params, params, sizeof(*params));
606 
607 	vol->backing_dev = backing_dev;
608 
609 	rc = _allocate_bit_arrays(vol);
610 	if (rc != 0) {
611 		cb_fn(cb_arg, NULL, rc);
612 		_init_load_cleanup(vol, init_ctx);
613 		return;
614 	}
615 
616 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
617 	       sizeof(vol->backing_super->signature));
618 	memcpy(&vol->backing_super->params, params, sizeof(*params));
619 
620 	_initialize_vol_pm_pointers(vol);
621 
622 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
623 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
624 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
625 	 */
626 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
627 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
628 
629 	init_ctx->vol = vol;
630 	init_ctx->cb_fn = cb_fn;
631 	init_ctx->cb_arg = cb_arg;
632 
633 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
634 	init_ctx->iov[0].iov_base = init_ctx->path;
635 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
636 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
637 	init_ctx->backing_cb_args.cb_arg = init_ctx;
638 	/* Write path to offset 4K on backing device - just after where the super
639 	 *  block will be written.  We wait until this is committed before writing the
640 	 *  super block to guarantee we don't get the super block written without the
641 	 *  the path if the system crashed in the middle of a write operation.
642 	 */
643 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
644 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
645 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
646 				 &init_ctx->backing_cb_args);
647 }
648 
649 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
650 
651 static void
652 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
653 {
654 	struct reduce_init_load_ctx *load_ctx = cb_arg;
655 	struct spdk_reduce_vol *vol = load_ctx->vol;
656 	uint64_t backing_dev_size;
657 	uint64_t i, num_chunks, logical_map_index;
658 	struct spdk_reduce_chunk_map *chunk;
659 	size_t mapped_len;
660 	uint32_t j;
661 	int rc;
662 
663 	if (memcmp(vol->backing_super->signature,
664 		   SPDK_REDUCE_SIGNATURE,
665 		   sizeof(vol->backing_super->signature)) != 0) {
666 		/* This backing device isn't a libreduce backing device. */
667 		rc = -EILSEQ;
668 		goto error;
669 	}
670 
671 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
672 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
673 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
674 	 *  persistent memory file if it exists.
675 	 */
676 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
677 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
678 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
679 		_init_load_cleanup(NULL, load_ctx);
680 		return;
681 	}
682 
683 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
684 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
685 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
686 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
687 
688 	rc = _allocate_bit_arrays(vol);
689 	if (rc != 0) {
690 		goto error;
691 	}
692 
693 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
694 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
695 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
696 			    backing_dev_size);
697 		rc = -EILSEQ;
698 		goto error;
699 	}
700 
701 	vol->pm_file.size = _get_pm_file_size(&vol->params);
702 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
703 					    &vol->pm_file.pm_is_pmem);
704 	if (vol->pm_file.pm_buf == NULL) {
705 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
706 		rc = -errno;
707 		goto error;
708 	}
709 
710 	if (vol->pm_file.size != mapped_len) {
711 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
712 			    vol->pm_file.size, mapped_len);
713 		rc = -ENOMEM;
714 		goto error;
715 	}
716 
717 	rc = _allocate_vol_requests(vol);
718 	if (rc != 0) {
719 		goto error;
720 	}
721 
722 	_initialize_vol_pm_pointers(vol);
723 
724 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
725 	for (i = 0; i < num_chunks; i++) {
726 		logical_map_index = vol->pm_logical_map[i];
727 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
728 			continue;
729 		}
730 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
731 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
732 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
733 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
734 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
735 			}
736 		}
737 	}
738 
739 	rc = _alloc_zero_buff(vol);
740 	if (rc) {
741 		goto error;
742 	}
743 
744 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
745 	/* Only clean up the ctx - the vol has been passed to the application
746 	 *  for use now that volume load was successful.
747 	 */
748 	_init_load_cleanup(NULL, load_ctx);
749 	return;
750 
751 error:
752 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
753 	_init_load_cleanup(vol, load_ctx);
754 }
755 
756 void
757 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
758 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
759 {
760 	struct spdk_reduce_vol *vol;
761 	struct reduce_init_load_ctx *load_ctx;
762 
763 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
764 	    backing_dev->unmap == NULL) {
765 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
766 		cb_fn(cb_arg, NULL, -EINVAL);
767 		return;
768 	}
769 
770 	vol = calloc(1, sizeof(*vol));
771 	if (vol == NULL) {
772 		cb_fn(cb_arg, NULL, -ENOMEM);
773 		return;
774 	}
775 
776 	TAILQ_INIT(&vol->free_requests);
777 	TAILQ_INIT(&vol->executing_requests);
778 	TAILQ_INIT(&vol->queued_requests);
779 
780 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
781 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
782 	if (vol->backing_super == NULL) {
783 		_init_load_cleanup(vol, NULL);
784 		cb_fn(cb_arg, NULL, -ENOMEM);
785 		return;
786 	}
787 
788 	vol->backing_dev = backing_dev;
789 
790 	load_ctx = calloc(1, sizeof(*load_ctx));
791 	if (load_ctx == NULL) {
792 		_init_load_cleanup(vol, NULL);
793 		cb_fn(cb_arg, NULL, -ENOMEM);
794 		return;
795 	}
796 
797 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
798 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
799 	if (load_ctx->path == NULL) {
800 		_init_load_cleanup(vol, load_ctx);
801 		cb_fn(cb_arg, NULL, -ENOMEM);
802 		return;
803 	}
804 
805 	load_ctx->vol = vol;
806 	load_ctx->cb_fn = cb_fn;
807 	load_ctx->cb_arg = cb_arg;
808 
809 	load_ctx->iov[0].iov_base = vol->backing_super;
810 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
811 	load_ctx->iov[1].iov_base = load_ctx->path;
812 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
813 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
814 	load_ctx->backing_cb_args.cb_arg = load_ctx;
815 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
816 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
817 				vol->backing_dev->blocklen,
818 				&load_ctx->backing_cb_args);
819 }
820 
821 void
822 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
823 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
824 {
825 	if (vol == NULL) {
826 		/* This indicates a programming error. */
827 		assert(false);
828 		cb_fn(cb_arg, -EINVAL);
829 		return;
830 	}
831 
832 	if (--g_vol_count == 0) {
833 		spdk_free(g_zero_buf);
834 	}
835 	_init_load_cleanup(vol, NULL);
836 	cb_fn(cb_arg, 0);
837 }
838 
839 struct reduce_destroy_ctx {
840 	spdk_reduce_vol_op_complete		cb_fn;
841 	void					*cb_arg;
842 	struct spdk_reduce_vol			*vol;
843 	struct spdk_reduce_vol_superblock	*super;
844 	struct iovec				iov;
845 	struct spdk_reduce_vol_cb_args		backing_cb_args;
846 	int					reduce_errno;
847 	char					pm_path[REDUCE_PATH_MAX];
848 };
849 
850 static void
851 destroy_unload_cpl(void *cb_arg, int reduce_errno)
852 {
853 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
854 
855 	if (destroy_ctx->reduce_errno == 0) {
856 		if (unlink(destroy_ctx->pm_path)) {
857 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
858 				    destroy_ctx->pm_path, strerror(errno));
859 		}
860 	}
861 
862 	/* Even if the unload somehow failed, we still pass the destroy_ctx
863 	 * reduce_errno since that indicates whether or not the volume was
864 	 * actually destroyed.
865 	 */
866 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
867 	spdk_free(destroy_ctx->super);
868 	free(destroy_ctx);
869 }
870 
871 static void
872 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
873 {
874 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
875 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
876 
877 	destroy_ctx->reduce_errno = reduce_errno;
878 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
879 }
880 
881 static void
882 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
883 {
884 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
885 
886 	if (reduce_errno != 0) {
887 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
888 		spdk_free(destroy_ctx->super);
889 		free(destroy_ctx);
890 		return;
891 	}
892 
893 	destroy_ctx->vol = vol;
894 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
895 	destroy_ctx->iov.iov_base = destroy_ctx->super;
896 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
897 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
898 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
899 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
900 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
901 				 &destroy_ctx->backing_cb_args);
902 }
903 
904 void
905 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
906 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
907 {
908 	struct reduce_destroy_ctx *destroy_ctx;
909 
910 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
911 	if (destroy_ctx == NULL) {
912 		cb_fn(cb_arg, -ENOMEM);
913 		return;
914 	}
915 
916 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
917 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
918 	if (destroy_ctx->super == NULL) {
919 		free(destroy_ctx);
920 		cb_fn(cb_arg, -ENOMEM);
921 		return;
922 	}
923 	destroy_ctx->cb_fn = cb_fn;
924 	destroy_ctx->cb_arg = cb_arg;
925 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
926 }
927 
928 static bool
929 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
930 {
931 	uint64_t start_chunk, end_chunk;
932 
933 	start_chunk = offset / vol->logical_blocks_per_chunk;
934 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
935 
936 	return (start_chunk != end_chunk);
937 }
938 
939 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
940 
941 static void
942 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
943 {
944 	struct spdk_reduce_vol_request *next_req;
945 	struct spdk_reduce_vol *vol = req->vol;
946 
947 	req->cb_fn(req->cb_arg, reduce_errno);
948 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
949 
950 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
951 		if (next_req->logical_map_index == req->logical_map_index) {
952 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
953 			if (next_req->type == REDUCE_IO_READV) {
954 				_start_readv_request(next_req);
955 			} else {
956 				assert(next_req->type == REDUCE_IO_WRITEV);
957 				_start_writev_request(next_req);
958 			}
959 			break;
960 		}
961 	}
962 
963 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
964 }
965 
966 static void
967 _write_write_done(void *_req, int reduce_errno)
968 {
969 	struct spdk_reduce_vol_request *req = _req;
970 	struct spdk_reduce_vol *vol = req->vol;
971 	uint64_t old_chunk_map_index;
972 	struct spdk_reduce_chunk_map *old_chunk;
973 	uint32_t i;
974 
975 	if (reduce_errno != 0) {
976 		req->reduce_errno = reduce_errno;
977 	}
978 
979 	assert(req->num_backing_ops > 0);
980 	if (--req->num_backing_ops > 0) {
981 		return;
982 	}
983 
984 	if (req->reduce_errno != 0) {
985 		_reduce_vol_complete_req(req, req->reduce_errno);
986 		return;
987 	}
988 
989 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
990 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
991 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
992 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
993 			if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
994 				break;
995 			}
996 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
997 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
998 			old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
999 		}
1000 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
1001 	}
1002 
1003 	/*
1004 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1005 	 * becomes invalid after we update the logical map, since the old chunk map will no
1006 	 * longer have a reference to it in the logical map.
1007 	 */
1008 
1009 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1010 	_reduce_persist(vol, req->chunk,
1011 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1012 
1013 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1014 
1015 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1016 
1017 	_reduce_vol_complete_req(req, 0);
1018 }
1019 
1020 static void
1021 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1022 		   reduce_request_fn next_fn, bool is_write)
1023 {
1024 	struct iovec *iov;
1025 	uint8_t *buf;
1026 	uint32_t i;
1027 
1028 	if (req->chunk_is_compressed) {
1029 		iov = req->comp_buf_iov;
1030 		buf = req->comp_buf;
1031 	} else {
1032 		iov = req->decomp_buf_iov;
1033 		buf = req->decomp_buf;
1034 	}
1035 
1036 	req->num_backing_ops = req->num_io_units;
1037 	req->backing_cb_args.cb_fn = next_fn;
1038 	req->backing_cb_args.cb_arg = req;
1039 	for (i = 0; i < req->num_io_units; i++) {
1040 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1041 		iov[i].iov_len = vol->params.backing_io_unit_size;
1042 		if (is_write) {
1043 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1044 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1045 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1046 		} else {
1047 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1048 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1049 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
1050 		}
1051 	}
1052 }
1053 
1054 static void
1055 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1056 			uint32_t compressed_size)
1057 {
1058 	struct spdk_reduce_vol *vol = req->vol;
1059 	uint32_t i;
1060 	uint64_t chunk_offset, remainder, total_len = 0;
1061 	uint8_t *buf;
1062 	int j;
1063 
1064 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1065 
1066 	/* TODO: fail if no chunk map found - but really this should not happen if we
1067 	 * size the number of requests similarly to number of extra chunk maps
1068 	 */
1069 	assert(req->chunk_map_index != UINT32_MAX);
1070 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1071 
1072 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1073 	req->num_io_units = spdk_divide_round_up(compressed_size,
1074 			    vol->params.backing_io_unit_size);
1075 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1076 	req->chunk->compressed_size =
1077 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1078 
1079 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1080 	if (req->chunk_is_compressed == false) {
1081 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1082 		buf = req->decomp_buf;
1083 		total_len = chunk_offset * vol->params.logical_block_size;
1084 
1085 		/* zero any offset into chunk */
1086 		if (req->rmw == false && chunk_offset) {
1087 			memset(buf, 0, total_len);
1088 		}
1089 		buf += total_len;
1090 
1091 		/* copy the data */
1092 		for (j = 0; j < req->iovcnt; j++) {
1093 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1094 			buf += req->iov[j].iov_len;
1095 			total_len += req->iov[j].iov_len;
1096 		}
1097 
1098 		/* zero any remainder */
1099 		remainder = vol->params.chunk_size - total_len;
1100 		total_len += remainder;
1101 		if (req->rmw == false && remainder) {
1102 			memset(buf, 0, remainder);
1103 		}
1104 		assert(total_len == vol->params.chunk_size);
1105 	}
1106 
1107 	for (i = 0; i < req->num_io_units; i++) {
1108 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1109 		/* TODO: fail if no backing block found - but really this should also not
1110 		 * happen (see comment above).
1111 		 */
1112 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1113 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1114 	}
1115 
1116 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1117 }
1118 
1119 static void
1120 _write_compress_done(void *_req, int reduce_errno)
1121 {
1122 	struct spdk_reduce_vol_request *req = _req;
1123 
1124 	/* Negative reduce_errno indicates failure for compression operations.
1125 	 * Just write the uncompressed data instead.  Force this to happen
1126 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1127 	 * When it sees the data couldn't be compressed, it will just write
1128 	 * the uncompressed buffer to disk.
1129 	 */
1130 	if (reduce_errno < 0) {
1131 		reduce_errno = req->vol->params.chunk_size;
1132 	}
1133 
1134 	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
1135 	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1136 }
1137 
1138 static void
1139 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1140 {
1141 	struct spdk_reduce_vol *vol = req->vol;
1142 
1143 	req->backing_cb_args.cb_fn = next_fn;
1144 	req->backing_cb_args.cb_arg = req;
1145 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1146 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1147 	vol->backing_dev->compress(vol->backing_dev,
1148 				   &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1,
1149 				   &req->backing_cb_args);
1150 }
1151 
1152 static void
1153 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1154 {
1155 	struct spdk_reduce_vol *vol = req->vol;
1156 
1157 	req->backing_cb_args.cb_fn = next_fn;
1158 	req->backing_cb_args.cb_arg = req;
1159 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1160 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1161 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1162 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1163 	vol->backing_dev->decompress(vol->backing_dev,
1164 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1165 				     &req->backing_cb_args);
1166 }
1167 
1168 static void
1169 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1170 {
1171 	struct spdk_reduce_vol *vol = req->vol;
1172 	uint64_t chunk_offset, remainder = 0;
1173 	uint64_t ttl_len = 0;
1174 	int i;
1175 
1176 	req->decomp_iovcnt = 0;
1177 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1178 
1179 	if (chunk_offset) {
1180 		/* first iov point to our scratch buffer for any offset into the chunk */
1181 		req->decomp_iov[0].iov_base = req->decomp_buf;
1182 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1183 		ttl_len += req->decomp_iov[0].iov_len;
1184 		req->decomp_iovcnt = 1;
1185 	}
1186 
1187 	/* now the user data iov, direct to the user buffer */
1188 	for (i = 0; i < req->iovcnt; i++) {
1189 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1190 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1191 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1192 	}
1193 	req->decomp_iovcnt += req->iovcnt;
1194 
1195 	/* send the rest of the chunk to our scratch buffer */
1196 	remainder = vol->params.chunk_size - ttl_len;
1197 	if (remainder) {
1198 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1199 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1200 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1201 		req->decomp_iovcnt++;
1202 	}
1203 	assert(ttl_len == vol->params.chunk_size);
1204 
1205 	req->backing_cb_args.cb_fn = next_fn;
1206 	req->backing_cb_args.cb_arg = req;
1207 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1208 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1209 	vol->backing_dev->decompress(vol->backing_dev,
1210 				     req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt,
1211 				     &req->backing_cb_args);
1212 }
1213 
1214 static void
1215 _write_decompress_done(void *_req, int reduce_errno)
1216 {
1217 	struct spdk_reduce_vol_request *req = _req;
1218 	struct spdk_reduce_vol *vol = req->vol;
1219 	uint64_t chunk_offset, remainder, ttl_len = 0;
1220 	int i;
1221 
1222 	/* Negative reduce_errno indicates failure for compression operations. */
1223 	if (reduce_errno < 0) {
1224 		_reduce_vol_complete_req(req, reduce_errno);
1225 		return;
1226 	}
1227 
1228 	/* Positive reduce_errno indicates number of bytes in decompressed
1229 	 *  buffer.  This should equal the chunk size - otherwise that's another
1230 	 *  type of failure.
1231 	 */
1232 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1233 		_reduce_vol_complete_req(req, -EIO);
1234 		return;
1235 	}
1236 
1237 	req->decomp_iovcnt = 0;
1238 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1239 
1240 	if (chunk_offset) {
1241 		req->decomp_iov[0].iov_base = req->decomp_buf;
1242 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1243 		ttl_len += req->decomp_iov[0].iov_len;
1244 		req->decomp_iovcnt = 1;
1245 	}
1246 
1247 	for (i = 0; i < req->iovcnt; i++) {
1248 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1249 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1250 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1251 	}
1252 	req->decomp_iovcnt += req->iovcnt;
1253 
1254 	remainder = vol->params.chunk_size - ttl_len;
1255 	if (remainder) {
1256 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1257 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1258 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1259 		req->decomp_iovcnt++;
1260 	}
1261 	assert(ttl_len == vol->params.chunk_size);
1262 
1263 	_reduce_vol_compress_chunk(req, _write_compress_done);
1264 }
1265 
1266 static void
1267 _write_read_done(void *_req, int reduce_errno)
1268 {
1269 	struct spdk_reduce_vol_request *req = _req;
1270 
1271 	if (reduce_errno != 0) {
1272 		req->reduce_errno = reduce_errno;
1273 	}
1274 
1275 	assert(req->num_backing_ops > 0);
1276 	if (--req->num_backing_ops > 0) {
1277 		return;
1278 	}
1279 
1280 	if (req->reduce_errno != 0) {
1281 		_reduce_vol_complete_req(req, req->reduce_errno);
1282 		return;
1283 	}
1284 
1285 	if (req->chunk_is_compressed) {
1286 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1287 	} else {
1288 		_write_decompress_done(req, req->chunk->compressed_size);
1289 	}
1290 }
1291 
1292 static void
1293 _read_decompress_done(void *_req, int reduce_errno)
1294 {
1295 	struct spdk_reduce_vol_request *req = _req;
1296 	struct spdk_reduce_vol *vol = req->vol;
1297 
1298 	/* Negative reduce_errno indicates failure for compression operations. */
1299 	if (reduce_errno < 0) {
1300 		_reduce_vol_complete_req(req, reduce_errno);
1301 		return;
1302 	}
1303 
1304 	/* Positive reduce_errno indicates number of bytes in decompressed
1305 	 *  buffer.  This should equal the chunk size - otherwise that's another
1306 	 *  type of failure.
1307 	 */
1308 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1309 		_reduce_vol_complete_req(req, -EIO);
1310 		return;
1311 	}
1312 
1313 	_reduce_vol_complete_req(req, 0);
1314 }
1315 
1316 static void
1317 _read_read_done(void *_req, int reduce_errno)
1318 {
1319 	struct spdk_reduce_vol_request *req = _req;
1320 	uint64_t chunk_offset;
1321 	uint8_t *buf;
1322 	int i;
1323 
1324 	if (reduce_errno != 0) {
1325 		req->reduce_errno = reduce_errno;
1326 	}
1327 
1328 	assert(req->num_backing_ops > 0);
1329 	if (--req->num_backing_ops > 0) {
1330 		return;
1331 	}
1332 
1333 	if (req->reduce_errno != 0) {
1334 		_reduce_vol_complete_req(req, req->reduce_errno);
1335 		return;
1336 	}
1337 
1338 	if (req->chunk_is_compressed) {
1339 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1340 	} else {
1341 
1342 		/* If the chunk was compressed, the data would have been sent to the
1343 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1344 		 */
1345 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1346 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1347 		for (i = 0; i < req->iovcnt; i++) {
1348 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1349 			buf += req->iov[i].iov_len;
1350 		}
1351 
1352 		_read_decompress_done(req, req->chunk->compressed_size);
1353 	}
1354 }
1355 
1356 static void
1357 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1358 {
1359 	struct spdk_reduce_vol *vol = req->vol;
1360 
1361 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1362 	assert(req->chunk_map_index != UINT32_MAX);
1363 
1364 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1365 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1366 			    vol->params.backing_io_unit_size);
1367 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1368 
1369 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1370 }
1371 
1372 static bool
1373 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1374 		    uint64_t length)
1375 {
1376 	uint64_t size = 0;
1377 	int i;
1378 
1379 	for (i = 0; i < iovcnt; i++) {
1380 		size += iov[i].iov_len;
1381 	}
1382 
1383 	return size == (length * vol->params.logical_block_size);
1384 }
1385 
1386 static bool
1387 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1388 {
1389 	struct spdk_reduce_vol_request *req;
1390 
1391 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1392 		if (logical_map_index == req->logical_map_index) {
1393 			return true;
1394 		}
1395 	}
1396 
1397 	return false;
1398 }
1399 
1400 static void
1401 _start_readv_request(struct spdk_reduce_vol_request *req)
1402 {
1403 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1404 	_reduce_vol_read_chunk(req, _read_read_done);
1405 }
1406 
1407 void
1408 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1409 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1410 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1411 {
1412 	struct spdk_reduce_vol_request *req;
1413 	uint64_t logical_map_index;
1414 	bool overlapped;
1415 	int i;
1416 
1417 	if (length == 0) {
1418 		cb_fn(cb_arg, 0);
1419 		return;
1420 	}
1421 
1422 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1423 		cb_fn(cb_arg, -EINVAL);
1424 		return;
1425 	}
1426 
1427 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1428 		cb_fn(cb_arg, -EINVAL);
1429 		return;
1430 	}
1431 
1432 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1433 	overlapped = _check_overlap(vol, logical_map_index);
1434 
1435 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1436 		/*
1437 		 * This chunk hasn't been allocated.  So treat the data as all
1438 		 * zeroes for this chunk - do the memset and immediately complete
1439 		 * the operation.
1440 		 */
1441 		for (i = 0; i < iovcnt; i++) {
1442 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1443 		}
1444 		cb_fn(cb_arg, 0);
1445 		return;
1446 	}
1447 
1448 	req = TAILQ_FIRST(&vol->free_requests);
1449 	if (req == NULL) {
1450 		cb_fn(cb_arg, -ENOMEM);
1451 		return;
1452 	}
1453 
1454 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1455 	req->type = REDUCE_IO_READV;
1456 	req->vol = vol;
1457 	req->iov = iov;
1458 	req->iovcnt = iovcnt;
1459 	req->offset = offset;
1460 	req->logical_map_index = logical_map_index;
1461 	req->length = length;
1462 	req->cb_fn = cb_fn;
1463 	req->cb_arg = cb_arg;
1464 
1465 	if (!overlapped) {
1466 		_start_readv_request(req);
1467 	} else {
1468 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1469 	}
1470 }
1471 
1472 static void
1473 _start_writev_request(struct spdk_reduce_vol_request *req)
1474 {
1475 	struct spdk_reduce_vol *vol = req->vol;
1476 	uint64_t chunk_offset, ttl_len = 0;
1477 	uint64_t remainder = 0;
1478 	uint32_t lbsize;
1479 	int i;
1480 
1481 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1482 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1483 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1484 			/* Read old chunk, then overwrite with data from this write
1485 			 *  operation.
1486 			 */
1487 			req->rmw = true;
1488 			_reduce_vol_read_chunk(req, _write_read_done);
1489 			return;
1490 		}
1491 	}
1492 
1493 	lbsize = vol->params.logical_block_size;
1494 	req->decomp_iovcnt = 0;
1495 	req->rmw = false;
1496 
1497 	/* Note: point to our zero buf for offset into the chunk. */
1498 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1499 	if (chunk_offset != 0) {
1500 		ttl_len += chunk_offset * lbsize;
1501 		req->decomp_iov[0].iov_base = g_zero_buf;
1502 		req->decomp_iov[0].iov_len = ttl_len;
1503 		req->decomp_iovcnt = 1;
1504 	}
1505 
1506 	/* now the user data iov, direct from the user buffer */
1507 	for (i = 0; i < req->iovcnt; i++) {
1508 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1509 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1510 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1511 	}
1512 	req->decomp_iovcnt += req->iovcnt;
1513 
1514 	remainder = vol->params.chunk_size - ttl_len;
1515 	if (remainder) {
1516 		req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf;
1517 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1518 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1519 		req->decomp_iovcnt++;
1520 	}
1521 	assert(ttl_len == req->vol->params.chunk_size);
1522 
1523 	_reduce_vol_compress_chunk(req, _write_compress_done);
1524 }
1525 
1526 void
1527 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1528 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1529 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1530 {
1531 	struct spdk_reduce_vol_request *req;
1532 	uint64_t logical_map_index;
1533 	bool overlapped;
1534 
1535 	if (length == 0) {
1536 		cb_fn(cb_arg, 0);
1537 		return;
1538 	}
1539 
1540 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1541 		cb_fn(cb_arg, -EINVAL);
1542 		return;
1543 	}
1544 
1545 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1546 		cb_fn(cb_arg, -EINVAL);
1547 		return;
1548 	}
1549 
1550 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1551 	overlapped = _check_overlap(vol, logical_map_index);
1552 
1553 	req = TAILQ_FIRST(&vol->free_requests);
1554 	if (req == NULL) {
1555 		cb_fn(cb_arg, -ENOMEM);
1556 		return;
1557 	}
1558 
1559 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1560 	req->type = REDUCE_IO_WRITEV;
1561 	req->vol = vol;
1562 	req->iov = iov;
1563 	req->iovcnt = iovcnt;
1564 	req->offset = offset;
1565 	req->logical_map_index = logical_map_index;
1566 	req->length = length;
1567 	req->cb_fn = cb_fn;
1568 	req->cb_arg = cb_arg;
1569 
1570 	if (!overlapped) {
1571 		_start_writev_request(req);
1572 	} else {
1573 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1574 	}
1575 }
1576 
1577 const struct spdk_reduce_vol_params *
1578 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1579 {
1580 	return &vol->params;
1581 }
1582 
1583 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1584 {
1585 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1586 	uint32_t struct_size;
1587 	uint64_t chunk_map_size;
1588 
1589 	SPDK_NOTICELOG("vol info:\n");
1590 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1591 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1592 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1593 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1594 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1595 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1596 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1597 		       vol->params.vol_size / vol->params.chunk_size);
1598 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1599 			vol->params.backing_io_unit_size);
1600 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1601 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1602 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1603 
1604 	SPDK_NOTICELOG("pmem info:\n");
1605 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1606 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1607 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1608 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1609 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1610 			   vol->params.chunk_size);
1611 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1612 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1613 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1614 			 vol->params.backing_io_unit_size);
1615 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1616 }
1617 
1618 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1619