xref: /spdk/lib/reduce/reduce.c (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 #define REDUCE_ZERO_BUF_SIZE 0x100000
71 
72 /**
73  * Describes a persistent memory file used to hold metadata associated with a
74  *  compressed volume.
75  */
76 struct spdk_reduce_pm_file {
77 	char			path[REDUCE_PATH_MAX];
78 	void			*pm_buf;
79 	int			pm_is_pmem;
80 	uint64_t		size;
81 };
82 
83 #define REDUCE_IO_READV		1
84 #define REDUCE_IO_WRITEV	2
85 
86 struct spdk_reduce_chunk_map {
87 	uint32_t		compressed_size;
88 	uint32_t		reserved;
89 	uint64_t		io_unit_index[0];
90 };
91 
92 struct spdk_reduce_vol_request {
93 	/**
94 	 *  Scratch buffer used for uncompressed chunk.  This is used for:
95 	 *   1) source buffer for compression operations
96 	 *   2) destination buffer for decompression operations
97 	 *   3) data buffer when writing uncompressed chunk to disk
98 	 *   4) data buffer when reading uncompressed chunk from disk
99 	 */
100 	uint8_t					*decomp_buf;
101 	struct iovec				*decomp_buf_iov;
102 
103 	/**
104 	 * These are used to construct the iovecs that are sent to
105 	 *  the decomp engine, they point to a mix of the scratch buffer
106 	 *  and user buffer
107 	 */
108 	struct iovec				decomp_iov[REDUCE_MAX_IOVECS + 2];
109 	int					decomp_iovcnt;
110 
111 	/**
112 	 *  Scratch buffer used for compressed chunk.  This is used for:
113 	 *   1) destination buffer for compression operations
114 	 *   2) source buffer for decompression operations
115 	 *   3) data buffer when writing compressed chunk to disk
116 	 *   4) data buffer when reading compressed chunk from disk
117 	 */
118 	uint8_t					*comp_buf;
119 	struct iovec				*comp_buf_iov;
120 	struct iovec				*iov;
121 	bool					rmw;
122 	struct spdk_reduce_vol			*vol;
123 	int					type;
124 	int					reduce_errno;
125 	int					iovcnt;
126 	int					num_backing_ops;
127 	uint32_t				num_io_units;
128 	bool					chunk_is_compressed;
129 	uint64_t				offset;
130 	uint64_t				logical_map_index;
131 	uint64_t				length;
132 	uint64_t				chunk_map_index;
133 	struct spdk_reduce_chunk_map		*chunk;
134 	spdk_reduce_vol_op_complete		cb_fn;
135 	void					*cb_arg;
136 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
137 	struct spdk_reduce_vol_cb_args		backing_cb_args;
138 };
139 
140 struct spdk_reduce_vol {
141 	struct spdk_reduce_vol_params		params;
142 	uint32_t				backing_io_units_per_chunk;
143 	uint32_t				backing_lba_per_io_unit;
144 	uint32_t				logical_blocks_per_chunk;
145 	struct spdk_reduce_pm_file		pm_file;
146 	struct spdk_reduce_backing_dev		*backing_dev;
147 	struct spdk_reduce_vol_superblock	*backing_super;
148 	struct spdk_reduce_vol_superblock	*pm_super;
149 	uint64_t				*pm_logical_map;
150 	uint64_t				*pm_chunk_maps;
151 
152 	struct spdk_bit_array			*allocated_chunk_maps;
153 	struct spdk_bit_array			*allocated_backing_io_units;
154 
155 	struct spdk_reduce_vol_request		*request_mem;
156 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
157 	TAILQ_HEAD(, spdk_reduce_vol_request)	executing_requests;
158 	TAILQ_HEAD(, spdk_reduce_vol_request)	queued_requests;
159 
160 	/* Single contiguous buffer used for all request buffers for this volume. */
161 	uint8_t					*buf_mem;
162 	struct iovec				*buf_iov_mem;
163 };
164 
165 static void _start_readv_request(struct spdk_reduce_vol_request *req);
166 static void _start_writev_request(struct spdk_reduce_vol_request *req);
167 static uint8_t *g_zero_buf;
168 static int g_vol_count = 0;
169 
170 /*
171  * Allocate extra metadata chunks and corresponding backing io units to account for
172  *  outstanding IO in worst case scenario where logical map is completely allocated
173  *  and no data can be compressed.  We need extra chunks in this case to handle
174  *  in-flight writes since reduce never writes data in place.
175  */
176 #define REDUCE_NUM_EXTRA_CHUNKS 128
177 
178 static void
179 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
180 {
181 	if (vol->pm_file.pm_is_pmem) {
182 		pmem_persist(addr, len);
183 	} else {
184 		pmem_msync(addr, len);
185 	}
186 }
187 
188 static uint64_t
189 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
190 {
191 	uint64_t chunks_in_logical_map, logical_map_size;
192 
193 	chunks_in_logical_map = vol_size / chunk_size;
194 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
195 
196 	/* Round up to next cacheline. */
197 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
198 	       REDUCE_PM_SIZE_ALIGNMENT;
199 }
200 
201 static uint64_t
202 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
203 {
204 	uint64_t num_chunks;
205 
206 	num_chunks = vol_size / chunk_size;
207 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
208 
209 	return num_chunks;
210 }
211 
212 static inline uint32_t
213 _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
214 {
215 	return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
216 }
217 
218 static uint64_t
219 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
220 {
221 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
222 
223 	num_chunks = _get_total_chunks(vol_size, chunk_size);
224 	io_units_per_chunk = chunk_size / backing_io_unit_size;
225 
226 	total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
227 
228 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
229 	       REDUCE_PM_SIZE_ALIGNMENT;
230 }
231 
232 static struct spdk_reduce_chunk_map *
233 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
234 {
235 	uintptr_t chunk_map_addr;
236 
237 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
238 
239 	chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
240 	chunk_map_addr += chunk_map_index *
241 			  _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
242 
243 	return (struct spdk_reduce_chunk_map *)chunk_map_addr;
244 }
245 
246 static int
247 _validate_vol_params(struct spdk_reduce_vol_params *params)
248 {
249 	if (params->vol_size > 0) {
250 		/**
251 		 * User does not pass in the vol size - it gets calculated by libreduce from
252 		 *  values in this structure plus the size of the backing device.
253 		 */
254 		return -EINVAL;
255 	}
256 
257 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
258 	    params->logical_block_size == 0) {
259 		return -EINVAL;
260 	}
261 
262 	/* Chunk size must be an even multiple of the backing io unit size. */
263 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
264 		return -EINVAL;
265 	}
266 
267 	/* Chunk size must be an even multiple of the logical block size. */
268 	if ((params->chunk_size % params->logical_block_size) != 0) {
269 		return -1;
270 	}
271 
272 	return 0;
273 }
274 
275 static uint64_t
276 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
277 {
278 	uint64_t num_chunks;
279 
280 	num_chunks = backing_dev_size / chunk_size;
281 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
282 		return 0;
283 	}
284 
285 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
286 	return num_chunks * chunk_size;
287 }
288 
289 static uint64_t
290 _get_pm_file_size(struct spdk_reduce_vol_params *params)
291 {
292 	uint64_t total_pm_size;
293 
294 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
295 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
296 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
297 			 params->backing_io_unit_size);
298 	return total_pm_size;
299 }
300 
301 const struct spdk_uuid *
302 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
303 {
304 	return &vol->params.uuid;
305 }
306 
307 static void
308 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
309 {
310 	uint64_t logical_map_size;
311 
312 	/* Superblock is at the beginning of the pm file. */
313 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
314 
315 	/* Logical map immediately follows the super block. */
316 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
317 
318 	/* Chunks maps follow the logical map. */
319 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
320 	vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
321 }
322 
323 /* We need 2 iovs during load - one for the superblock, another for the path */
324 #define LOAD_IOV_COUNT	2
325 
326 struct reduce_init_load_ctx {
327 	struct spdk_reduce_vol			*vol;
328 	struct spdk_reduce_vol_cb_args		backing_cb_args;
329 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
330 	void					*cb_arg;
331 	struct iovec				iov[LOAD_IOV_COUNT];
332 	void					*path;
333 };
334 
335 static int
336 _allocate_vol_requests(struct spdk_reduce_vol *vol)
337 {
338 	struct spdk_reduce_vol_request *req;
339 	int i;
340 
341 	/* Allocate 2x since we need buffers for both read/write and compress/decompress
342 	 *  intermediate buffers.
343 	 */
344 	vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size,
345 				   64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
346 	if (vol->buf_mem == NULL) {
347 		return -ENOMEM;
348 	}
349 
350 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
351 	if (vol->request_mem == NULL) {
352 		spdk_free(vol->buf_mem);
353 		vol->buf_mem = NULL;
354 		return -ENOMEM;
355 	}
356 
357 	/* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
358 	 *  buffers.
359 	 */
360 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
361 				  2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
362 	if (vol->buf_iov_mem == NULL) {
363 		free(vol->request_mem);
364 		spdk_free(vol->buf_mem);
365 		vol->request_mem = NULL;
366 		vol->buf_mem = NULL;
367 		return -ENOMEM;
368 	}
369 
370 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
371 		req = &vol->request_mem[i];
372 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
373 		req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
374 		req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size;
375 		req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
376 		req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size;
377 	}
378 
379 	return 0;
380 }
381 
382 static void
383 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
384 {
385 	if (ctx != NULL) {
386 		spdk_free(ctx->path);
387 		free(ctx);
388 	}
389 
390 	if (vol != NULL) {
391 		if (vol->pm_file.pm_buf != NULL) {
392 			pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
393 		}
394 
395 		spdk_free(vol->backing_super);
396 		spdk_bit_array_free(&vol->allocated_chunk_maps);
397 		spdk_bit_array_free(&vol->allocated_backing_io_units);
398 		free(vol->request_mem);
399 		free(vol->buf_iov_mem);
400 		spdk_free(vol->buf_mem);
401 		free(vol);
402 	}
403 }
404 
405 static int
406 _alloc_zero_buff(void)
407 {
408 	int rc = 0;
409 
410 	/* The zero buffer is shared between all volumnes and just used
411 	 * for reads so allocate one global instance here if not already
412 	 * allocated when another vol init'd or loaded.
413 	 */
414 	if (g_vol_count++ == 0) {
415 		g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
416 					  64, NULL, SPDK_ENV_LCORE_ID_ANY,
417 					  SPDK_MALLOC_DMA);
418 		if (g_zero_buf == NULL) {
419 			rc = -ENOMEM;
420 		}
421 	}
422 	return rc;
423 }
424 
425 static void
426 _init_write_super_cpl(void *cb_arg, int reduce_errno)
427 {
428 	struct reduce_init_load_ctx *init_ctx = cb_arg;
429 	int rc;
430 
431 	rc = _allocate_vol_requests(init_ctx->vol);
432 	if (rc != 0) {
433 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
434 		_init_load_cleanup(init_ctx->vol, init_ctx);
435 		return;
436 	}
437 
438 	rc = _alloc_zero_buff();
439 	if (rc != 0) {
440 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
441 		_init_load_cleanup(init_ctx->vol, init_ctx);
442 		return;
443 	}
444 
445 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
446 	/* Only clean up the ctx - the vol has been passed to the application
447 	 *  for use now that initialization was successful.
448 	 */
449 	_init_load_cleanup(NULL, init_ctx);
450 }
451 
452 static void
453 _init_write_path_cpl(void *cb_arg, int reduce_errno)
454 {
455 	struct reduce_init_load_ctx *init_ctx = cb_arg;
456 	struct spdk_reduce_vol *vol = init_ctx->vol;
457 
458 	init_ctx->iov[0].iov_base = vol->backing_super;
459 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
460 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
461 	init_ctx->backing_cb_args.cb_arg = init_ctx;
462 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
463 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
464 				 &init_ctx->backing_cb_args);
465 }
466 
467 static int
468 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
469 {
470 	uint64_t total_chunks, total_backing_io_units;
471 	uint32_t i, num_metadata_io_units;
472 
473 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
474 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
475 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
476 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
477 
478 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
479 		return -ENOMEM;
480 	}
481 
482 	/* Set backing io unit bits associated with metadata. */
483 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
484 				vol->backing_dev->blocklen;
485 	for (i = 0; i < num_metadata_io_units; i++) {
486 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
487 	}
488 
489 	return 0;
490 }
491 
492 void
493 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
494 		     struct spdk_reduce_backing_dev *backing_dev,
495 		     const char *pm_file_dir,
496 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
497 {
498 	struct spdk_reduce_vol *vol;
499 	struct reduce_init_load_ctx *init_ctx;
500 	uint64_t backing_dev_size;
501 	size_t mapped_len;
502 	int dir_len, max_dir_len, rc;
503 
504 	/* We need to append a path separator and the UUID to the supplied
505 	 * path.
506 	 */
507 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
508 	dir_len = strnlen(pm_file_dir, max_dir_len);
509 	/* Strip trailing slash if the user provided one - we will add it back
510 	 * later when appending the filename.
511 	 */
512 	if (pm_file_dir[dir_len - 1] == '/') {
513 		dir_len--;
514 	}
515 	if (dir_len == max_dir_len) {
516 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
517 		cb_fn(cb_arg, NULL, -EINVAL);
518 		return;
519 	}
520 
521 	rc = _validate_vol_params(params);
522 	if (rc != 0) {
523 		SPDK_ERRLOG("invalid vol params\n");
524 		cb_fn(cb_arg, NULL, rc);
525 		return;
526 	}
527 
528 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
529 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
530 	if (params->vol_size == 0) {
531 		SPDK_ERRLOG("backing device is too small\n");
532 		cb_fn(cb_arg, NULL, -EINVAL);
533 		return;
534 	}
535 
536 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
537 	    backing_dev->unmap == NULL) {
538 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
539 		cb_fn(cb_arg, NULL, -EINVAL);
540 		return;
541 	}
542 
543 	vol = calloc(1, sizeof(*vol));
544 	if (vol == NULL) {
545 		cb_fn(cb_arg, NULL, -ENOMEM);
546 		return;
547 	}
548 
549 	TAILQ_INIT(&vol->free_requests);
550 	TAILQ_INIT(&vol->executing_requests);
551 	TAILQ_INIT(&vol->queued_requests);
552 
553 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
554 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
555 	if (vol->backing_super == NULL) {
556 		cb_fn(cb_arg, NULL, -ENOMEM);
557 		_init_load_cleanup(vol, NULL);
558 		return;
559 	}
560 
561 	init_ctx = calloc(1, sizeof(*init_ctx));
562 	if (init_ctx == NULL) {
563 		cb_fn(cb_arg, NULL, -ENOMEM);
564 		_init_load_cleanup(vol, NULL);
565 		return;
566 	}
567 
568 	init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
569 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
570 	if (init_ctx->path == NULL) {
571 		cb_fn(cb_arg, NULL, -ENOMEM);
572 		_init_load_cleanup(vol, init_ctx);
573 		return;
574 	}
575 
576 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
577 		spdk_uuid_generate(&params->uuid);
578 	}
579 
580 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
581 	vol->pm_file.path[dir_len] = '/';
582 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
583 			    &params->uuid);
584 	vol->pm_file.size = _get_pm_file_size(params);
585 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
586 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
587 					    &mapped_len, &vol->pm_file.pm_is_pmem);
588 	if (vol->pm_file.pm_buf == NULL) {
589 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
590 			    vol->pm_file.path, strerror(errno));
591 		cb_fn(cb_arg, NULL, -errno);
592 		_init_load_cleanup(vol, init_ctx);
593 		return;
594 	}
595 
596 	if (vol->pm_file.size != mapped_len) {
597 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
598 			    vol->pm_file.size, mapped_len);
599 		cb_fn(cb_arg, NULL, -ENOMEM);
600 		_init_load_cleanup(vol, init_ctx);
601 		return;
602 	}
603 
604 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
605 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
606 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
607 	memcpy(&vol->params, params, sizeof(*params));
608 
609 	vol->backing_dev = backing_dev;
610 
611 	rc = _allocate_bit_arrays(vol);
612 	if (rc != 0) {
613 		cb_fn(cb_arg, NULL, rc);
614 		_init_load_cleanup(vol, init_ctx);
615 		return;
616 	}
617 
618 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
619 	       sizeof(vol->backing_super->signature));
620 	memcpy(&vol->backing_super->params, params, sizeof(*params));
621 
622 	_initialize_vol_pm_pointers(vol);
623 
624 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
625 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
626 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
627 	 */
628 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
629 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
630 
631 	init_ctx->vol = vol;
632 	init_ctx->cb_fn = cb_fn;
633 	init_ctx->cb_arg = cb_arg;
634 
635 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
636 	init_ctx->iov[0].iov_base = init_ctx->path;
637 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
638 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
639 	init_ctx->backing_cb_args.cb_arg = init_ctx;
640 	/* Write path to offset 4K on backing device - just after where the super
641 	 *  block will be written.  We wait until this is committed before writing the
642 	 *  super block to guarantee we don't get the super block written without the
643 	 *  the path if the system crashed in the middle of a write operation.
644 	 */
645 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
646 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
647 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
648 				 &init_ctx->backing_cb_args);
649 }
650 
651 static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
652 
653 static void
654 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
655 {
656 	struct reduce_init_load_ctx *load_ctx = cb_arg;
657 	struct spdk_reduce_vol *vol = load_ctx->vol;
658 	uint64_t backing_dev_size;
659 	uint64_t i, num_chunks, logical_map_index;
660 	struct spdk_reduce_chunk_map *chunk;
661 	size_t mapped_len;
662 	uint32_t j;
663 	int rc;
664 
665 	rc = _alloc_zero_buff();
666 	if (rc) {
667 		goto error;
668 	}
669 
670 	if (memcmp(vol->backing_super->signature,
671 		   SPDK_REDUCE_SIGNATURE,
672 		   sizeof(vol->backing_super->signature)) != 0) {
673 		/* This backing device isn't a libreduce backing device. */
674 		rc = -EILSEQ;
675 		goto error;
676 	}
677 
678 	/* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
679 	 *  So don't bother getting the volume ready to use - invoke the callback immediately
680 	 *  so destroy_load_cb can delete the metadata off of the block device and delete the
681 	 *  persistent memory file if it exists.
682 	 */
683 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
684 	if (load_ctx->cb_fn == (*destroy_load_cb)) {
685 		load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
686 		_init_load_cleanup(NULL, load_ctx);
687 		return;
688 	}
689 
690 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
691 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
692 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
693 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
694 
695 	rc = _allocate_bit_arrays(vol);
696 	if (rc != 0) {
697 		goto error;
698 	}
699 
700 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
701 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
702 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
703 			    backing_dev_size);
704 		rc = -EILSEQ;
705 		goto error;
706 	}
707 
708 	vol->pm_file.size = _get_pm_file_size(&vol->params);
709 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
710 					    &vol->pm_file.pm_is_pmem);
711 	if (vol->pm_file.pm_buf == NULL) {
712 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
713 		rc = -errno;
714 		goto error;
715 	}
716 
717 	if (vol->pm_file.size != mapped_len) {
718 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
719 			    vol->pm_file.size, mapped_len);
720 		rc = -ENOMEM;
721 		goto error;
722 	}
723 
724 	rc = _allocate_vol_requests(vol);
725 	if (rc != 0) {
726 		goto error;
727 	}
728 
729 	_initialize_vol_pm_pointers(vol);
730 
731 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
732 	for (i = 0; i < num_chunks; i++) {
733 		logical_map_index = vol->pm_logical_map[i];
734 		if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
735 			continue;
736 		}
737 		spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
738 		chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
739 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
740 			if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
741 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
742 			}
743 		}
744 	}
745 
746 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
747 	/* Only clean up the ctx - the vol has been passed to the application
748 	 *  for use now that volume load was successful.
749 	 */
750 	_init_load_cleanup(NULL, load_ctx);
751 	return;
752 
753 error:
754 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
755 	_init_load_cleanup(vol, load_ctx);
756 }
757 
758 void
759 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
760 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
761 {
762 	struct spdk_reduce_vol *vol;
763 	struct reduce_init_load_ctx *load_ctx;
764 
765 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
766 	    backing_dev->unmap == NULL) {
767 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
768 		cb_fn(cb_arg, NULL, -EINVAL);
769 		return;
770 	}
771 
772 	vol = calloc(1, sizeof(*vol));
773 	if (vol == NULL) {
774 		cb_fn(cb_arg, NULL, -ENOMEM);
775 		return;
776 	}
777 
778 	TAILQ_INIT(&vol->free_requests);
779 	TAILQ_INIT(&vol->executing_requests);
780 	TAILQ_INIT(&vol->queued_requests);
781 
782 	vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
783 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
784 	if (vol->backing_super == NULL) {
785 		_init_load_cleanup(vol, NULL);
786 		cb_fn(cb_arg, NULL, -ENOMEM);
787 		return;
788 	}
789 
790 	vol->backing_dev = backing_dev;
791 
792 	load_ctx = calloc(1, sizeof(*load_ctx));
793 	if (load_ctx == NULL) {
794 		_init_load_cleanup(vol, NULL);
795 		cb_fn(cb_arg, NULL, -ENOMEM);
796 		return;
797 	}
798 
799 	load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
800 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
801 	if (load_ctx->path == NULL) {
802 		_init_load_cleanup(vol, load_ctx);
803 		cb_fn(cb_arg, NULL, -ENOMEM);
804 		return;
805 	}
806 
807 	load_ctx->vol = vol;
808 	load_ctx->cb_fn = cb_fn;
809 	load_ctx->cb_arg = cb_arg;
810 
811 	load_ctx->iov[0].iov_base = vol->backing_super;
812 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
813 	load_ctx->iov[1].iov_base = load_ctx->path;
814 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
815 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
816 	load_ctx->backing_cb_args.cb_arg = load_ctx;
817 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
818 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
819 				vol->backing_dev->blocklen,
820 				&load_ctx->backing_cb_args);
821 }
822 
823 void
824 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
825 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
826 {
827 	if (vol == NULL) {
828 		/* This indicates a programming error. */
829 		assert(false);
830 		cb_fn(cb_arg, -EINVAL);
831 		return;
832 	}
833 
834 	if (--g_vol_count == 0) {
835 		spdk_free(g_zero_buf);
836 	}
837 	assert(g_vol_count >= 0);
838 	_init_load_cleanup(vol, NULL);
839 	cb_fn(cb_arg, 0);
840 }
841 
842 struct reduce_destroy_ctx {
843 	spdk_reduce_vol_op_complete		cb_fn;
844 	void					*cb_arg;
845 	struct spdk_reduce_vol			*vol;
846 	struct spdk_reduce_vol_superblock	*super;
847 	struct iovec				iov;
848 	struct spdk_reduce_vol_cb_args		backing_cb_args;
849 	int					reduce_errno;
850 	char					pm_path[REDUCE_PATH_MAX];
851 };
852 
853 static void
854 destroy_unload_cpl(void *cb_arg, int reduce_errno)
855 {
856 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
857 
858 	if (destroy_ctx->reduce_errno == 0) {
859 		if (unlink(destroy_ctx->pm_path)) {
860 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
861 				    destroy_ctx->pm_path, strerror(errno));
862 		}
863 	}
864 
865 	/* Even if the unload somehow failed, we still pass the destroy_ctx
866 	 * reduce_errno since that indicates whether or not the volume was
867 	 * actually destroyed.
868 	 */
869 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
870 	spdk_free(destroy_ctx->super);
871 	free(destroy_ctx);
872 }
873 
874 static void
875 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
876 {
877 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
878 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
879 
880 	destroy_ctx->reduce_errno = reduce_errno;
881 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
882 }
883 
884 static void
885 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
886 {
887 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
888 
889 	if (reduce_errno != 0) {
890 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
891 		spdk_free(destroy_ctx->super);
892 		free(destroy_ctx);
893 		return;
894 	}
895 
896 	destroy_ctx->vol = vol;
897 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
898 	destroy_ctx->iov.iov_base = destroy_ctx->super;
899 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
900 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
901 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
902 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
903 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
904 				 &destroy_ctx->backing_cb_args);
905 }
906 
907 void
908 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
909 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
910 {
911 	struct reduce_destroy_ctx *destroy_ctx;
912 
913 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
914 	if (destroy_ctx == NULL) {
915 		cb_fn(cb_arg, -ENOMEM);
916 		return;
917 	}
918 
919 	destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
920 					  SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
921 	if (destroy_ctx->super == NULL) {
922 		free(destroy_ctx);
923 		cb_fn(cb_arg, -ENOMEM);
924 		return;
925 	}
926 	destroy_ctx->cb_fn = cb_fn;
927 	destroy_ctx->cb_arg = cb_arg;
928 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
929 }
930 
931 static bool
932 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
933 {
934 	uint64_t start_chunk, end_chunk;
935 
936 	start_chunk = offset / vol->logical_blocks_per_chunk;
937 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
938 
939 	return (start_chunk != end_chunk);
940 }
941 
942 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
943 
944 static void
945 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
946 {
947 	struct spdk_reduce_vol_request *next_req;
948 	struct spdk_reduce_vol *vol = req->vol;
949 
950 	req->cb_fn(req->cb_arg, reduce_errno);
951 	TAILQ_REMOVE(&vol->executing_requests, req, tailq);
952 
953 	TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
954 		if (next_req->logical_map_index == req->logical_map_index) {
955 			TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
956 			if (next_req->type == REDUCE_IO_READV) {
957 				_start_readv_request(next_req);
958 			} else {
959 				assert(next_req->type == REDUCE_IO_WRITEV);
960 				_start_writev_request(next_req);
961 			}
962 			break;
963 		}
964 	}
965 
966 	TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
967 }
968 
969 static void
970 _write_write_done(void *_req, int reduce_errno)
971 {
972 	struct spdk_reduce_vol_request *req = _req;
973 	struct spdk_reduce_vol *vol = req->vol;
974 	uint64_t old_chunk_map_index;
975 	struct spdk_reduce_chunk_map *old_chunk;
976 	uint32_t i;
977 
978 	if (reduce_errno != 0) {
979 		req->reduce_errno = reduce_errno;
980 	}
981 
982 	assert(req->num_backing_ops > 0);
983 	if (--req->num_backing_ops > 0) {
984 		return;
985 	}
986 
987 	if (req->reduce_errno != 0) {
988 		_reduce_vol_complete_req(req, req->reduce_errno);
989 		return;
990 	}
991 
992 	old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
993 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
994 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
995 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
996 			if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) {
997 				break;
998 			}
999 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true);
1000 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]);
1001 			old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
1002 		}
1003 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
1004 	}
1005 
1006 	/*
1007 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
1008 	 * becomes invalid after we update the logical map, since the old chunk map will no
1009 	 * longer have a reference to it in the logical map.
1010 	 */
1011 
1012 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
1013 	_reduce_persist(vol, req->chunk,
1014 			_reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
1015 
1016 	vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
1017 
1018 	_reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
1019 
1020 	_reduce_vol_complete_req(req, 0);
1021 }
1022 
1023 static void
1024 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
1025 		   reduce_request_fn next_fn, bool is_write)
1026 {
1027 	struct iovec *iov;
1028 	uint8_t *buf;
1029 	uint32_t i;
1030 
1031 	if (req->chunk_is_compressed) {
1032 		iov = req->comp_buf_iov;
1033 		buf = req->comp_buf;
1034 	} else {
1035 		iov = req->decomp_buf_iov;
1036 		buf = req->decomp_buf;
1037 	}
1038 
1039 	req->num_backing_ops = req->num_io_units;
1040 	req->backing_cb_args.cb_fn = next_fn;
1041 	req->backing_cb_args.cb_arg = req;
1042 	for (i = 0; i < req->num_io_units; i++) {
1043 		iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
1044 		iov[i].iov_len = vol->params.backing_io_unit_size;
1045 		if (is_write) {
1046 			vol->backing_dev->writev(vol->backing_dev, &iov[i], 1,
1047 						 req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1048 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
1049 		} else {
1050 			vol->backing_dev->readv(vol->backing_dev, &iov[i], 1,
1051 						req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit,
1052 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
1053 		}
1054 	}
1055 }
1056 
1057 static void
1058 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
1059 			uint32_t compressed_size)
1060 {
1061 	struct spdk_reduce_vol *vol = req->vol;
1062 	uint32_t i;
1063 	uint64_t chunk_offset, remainder, total_len = 0;
1064 	uint8_t *buf;
1065 	int j;
1066 
1067 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
1068 
1069 	/* TODO: fail if no chunk map found - but really this should not happen if we
1070 	 * size the number of requests similarly to number of extra chunk maps
1071 	 */
1072 	assert(req->chunk_map_index != UINT32_MAX);
1073 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
1074 
1075 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1076 	req->num_io_units = spdk_divide_round_up(compressed_size,
1077 			    vol->params.backing_io_unit_size);
1078 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1079 	req->chunk->compressed_size =
1080 		req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
1081 
1082 	/* if the chunk is uncompressed we need to copy the data from the host buffers. */
1083 	if (req->chunk_is_compressed == false) {
1084 		chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1085 		buf = req->decomp_buf;
1086 		total_len = chunk_offset * vol->params.logical_block_size;
1087 
1088 		/* zero any offset into chunk */
1089 		if (req->rmw == false && chunk_offset) {
1090 			memset(buf, 0, total_len);
1091 		}
1092 		buf += total_len;
1093 
1094 		/* copy the data */
1095 		for (j = 0; j < req->iovcnt; j++) {
1096 			memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
1097 			buf += req->iov[j].iov_len;
1098 			total_len += req->iov[j].iov_len;
1099 		}
1100 
1101 		/* zero any remainder */
1102 		remainder = vol->params.chunk_size - total_len;
1103 		total_len += remainder;
1104 		if (req->rmw == false && remainder) {
1105 			memset(buf, 0, remainder);
1106 		}
1107 		assert(total_len == vol->params.chunk_size);
1108 	}
1109 
1110 	for (i = 0; i < req->num_io_units; i++) {
1111 		req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
1112 		/* TODO: fail if no backing block found - but really this should also not
1113 		 * happen (see comment above).
1114 		 */
1115 		assert(req->chunk->io_unit_index[i] != UINT32_MAX);
1116 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
1117 	}
1118 
1119 	_issue_backing_ops(req, vol, next_fn, true /* write */);
1120 }
1121 
1122 static void
1123 _write_compress_done(void *_req, int reduce_errno)
1124 {
1125 	struct spdk_reduce_vol_request *req = _req;
1126 
1127 	/* Negative reduce_errno indicates failure for compression operations.
1128 	 * Just write the uncompressed data instead.  Force this to happen
1129 	 * by just passing the full chunk size to _reduce_vol_write_chunk.
1130 	 * When it sees the data couldn't be compressed, it will just write
1131 	 * the uncompressed buffer to disk.
1132 	 */
1133 	if (reduce_errno < 0) {
1134 		reduce_errno = req->vol->params.chunk_size;
1135 	}
1136 
1137 	/* Positive reduce_errno indicates number of bytes in compressed buffer. */
1138 	_reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno);
1139 }
1140 
1141 static void
1142 _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1143 {
1144 	struct spdk_reduce_vol *vol = req->vol;
1145 
1146 	req->backing_cb_args.cb_fn = next_fn;
1147 	req->backing_cb_args.cb_arg = req;
1148 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1149 	req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
1150 	vol->backing_dev->compress(vol->backing_dev,
1151 				   &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1,
1152 				   &req->backing_cb_args);
1153 }
1154 
1155 static void
1156 _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1157 {
1158 	struct spdk_reduce_vol *vol = req->vol;
1159 
1160 	req->backing_cb_args.cb_fn = next_fn;
1161 	req->backing_cb_args.cb_arg = req;
1162 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1163 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1164 	req->decomp_buf_iov[0].iov_base = req->decomp_buf;
1165 	req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
1166 	vol->backing_dev->decompress(vol->backing_dev,
1167 				     req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
1168 				     &req->backing_cb_args);
1169 }
1170 
1171 static void
1172 _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1173 {
1174 	struct spdk_reduce_vol *vol = req->vol;
1175 	uint64_t chunk_offset, remainder = 0;
1176 	uint64_t ttl_len = 0;
1177 	int i;
1178 
1179 	req->decomp_iovcnt = 0;
1180 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1181 
1182 	if (chunk_offset) {
1183 		/* first iov point to our scratch buffer for any offset into the chunk */
1184 		req->decomp_iov[0].iov_base = req->decomp_buf;
1185 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1186 		ttl_len += req->decomp_iov[0].iov_len;
1187 		req->decomp_iovcnt = 1;
1188 	}
1189 
1190 	/* now the user data iov, direct to the user buffer */
1191 	for (i = 0; i < req->iovcnt; i++) {
1192 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1193 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1194 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1195 	}
1196 	req->decomp_iovcnt += req->iovcnt;
1197 
1198 	/* send the rest of the chunk to our scratch buffer */
1199 	remainder = vol->params.chunk_size - ttl_len;
1200 	if (remainder) {
1201 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1202 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1203 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1204 		req->decomp_iovcnt++;
1205 	}
1206 	assert(ttl_len == vol->params.chunk_size);
1207 
1208 	req->backing_cb_args.cb_fn = next_fn;
1209 	req->backing_cb_args.cb_arg = req;
1210 	req->comp_buf_iov[0].iov_base = req->comp_buf;
1211 	req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
1212 	vol->backing_dev->decompress(vol->backing_dev,
1213 				     req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt,
1214 				     &req->backing_cb_args);
1215 }
1216 
1217 static void
1218 _write_decompress_done(void *_req, int reduce_errno)
1219 {
1220 	struct spdk_reduce_vol_request *req = _req;
1221 	struct spdk_reduce_vol *vol = req->vol;
1222 	uint64_t chunk_offset, remainder, ttl_len = 0;
1223 	int i;
1224 
1225 	/* Negative reduce_errno indicates failure for compression operations. */
1226 	if (reduce_errno < 0) {
1227 		_reduce_vol_complete_req(req, reduce_errno);
1228 		return;
1229 	}
1230 
1231 	/* Positive reduce_errno indicates number of bytes in decompressed
1232 	 *  buffer.  This should equal the chunk size - otherwise that's another
1233 	 *  type of failure.
1234 	 */
1235 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1236 		_reduce_vol_complete_req(req, -EIO);
1237 		return;
1238 	}
1239 
1240 	req->decomp_iovcnt = 0;
1241 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1242 
1243 	if (chunk_offset) {
1244 		req->decomp_iov[0].iov_base = req->decomp_buf;
1245 		req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
1246 		ttl_len += req->decomp_iov[0].iov_len;
1247 		req->decomp_iovcnt = 1;
1248 	}
1249 
1250 	for (i = 0; i < req->iovcnt; i++) {
1251 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1252 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1253 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1254 	}
1255 	req->decomp_iovcnt += req->iovcnt;
1256 
1257 	remainder = vol->params.chunk_size - ttl_len;
1258 	if (remainder) {
1259 		req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
1260 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1261 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1262 		req->decomp_iovcnt++;
1263 	}
1264 	assert(ttl_len == vol->params.chunk_size);
1265 
1266 	_reduce_vol_compress_chunk(req, _write_compress_done);
1267 }
1268 
1269 static void
1270 _write_read_done(void *_req, int reduce_errno)
1271 {
1272 	struct spdk_reduce_vol_request *req = _req;
1273 
1274 	if (reduce_errno != 0) {
1275 		req->reduce_errno = reduce_errno;
1276 	}
1277 
1278 	assert(req->num_backing_ops > 0);
1279 	if (--req->num_backing_ops > 0) {
1280 		return;
1281 	}
1282 
1283 	if (req->reduce_errno != 0) {
1284 		_reduce_vol_complete_req(req, req->reduce_errno);
1285 		return;
1286 	}
1287 
1288 	if (req->chunk_is_compressed) {
1289 		_reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
1290 	} else {
1291 		_write_decompress_done(req, req->chunk->compressed_size);
1292 	}
1293 }
1294 
1295 static void
1296 _read_decompress_done(void *_req, int reduce_errno)
1297 {
1298 	struct spdk_reduce_vol_request *req = _req;
1299 	struct spdk_reduce_vol *vol = req->vol;
1300 
1301 	/* Negative reduce_errno indicates failure for compression operations. */
1302 	if (reduce_errno < 0) {
1303 		_reduce_vol_complete_req(req, reduce_errno);
1304 		return;
1305 	}
1306 
1307 	/* Positive reduce_errno indicates number of bytes in decompressed
1308 	 *  buffer.  This should equal the chunk size - otherwise that's another
1309 	 *  type of failure.
1310 	 */
1311 	if ((uint32_t)reduce_errno != vol->params.chunk_size) {
1312 		_reduce_vol_complete_req(req, -EIO);
1313 		return;
1314 	}
1315 
1316 	_reduce_vol_complete_req(req, 0);
1317 }
1318 
1319 static void
1320 _read_read_done(void *_req, int reduce_errno)
1321 {
1322 	struct spdk_reduce_vol_request *req = _req;
1323 	uint64_t chunk_offset;
1324 	uint8_t *buf;
1325 	int i;
1326 
1327 	if (reduce_errno != 0) {
1328 		req->reduce_errno = reduce_errno;
1329 	}
1330 
1331 	assert(req->num_backing_ops > 0);
1332 	if (--req->num_backing_ops > 0) {
1333 		return;
1334 	}
1335 
1336 	if (req->reduce_errno != 0) {
1337 		_reduce_vol_complete_req(req, req->reduce_errno);
1338 		return;
1339 	}
1340 
1341 	if (req->chunk_is_compressed) {
1342 		_reduce_vol_decompress_chunk(req, _read_decompress_done);
1343 	} else {
1344 
1345 		/* If the chunk was compressed, the data would have been sent to the
1346 		 *  host buffers by the decompression operation, if not we need to memcpy here.
1347 		 */
1348 		chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
1349 		buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size;
1350 		for (i = 0; i < req->iovcnt; i++) {
1351 			memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
1352 			buf += req->iov[i].iov_len;
1353 		}
1354 
1355 		_read_decompress_done(req, req->chunk->compressed_size);
1356 	}
1357 }
1358 
1359 static void
1360 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
1361 {
1362 	struct spdk_reduce_vol *vol = req->vol;
1363 
1364 	req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
1365 	assert(req->chunk_map_index != UINT32_MAX);
1366 
1367 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
1368 	req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
1369 			    vol->params.backing_io_unit_size);
1370 	req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
1371 
1372 	_issue_backing_ops(req, vol, next_fn, false /* read */);
1373 }
1374 
1375 static bool
1376 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1377 		    uint64_t length)
1378 {
1379 	uint64_t size = 0;
1380 	int i;
1381 
1382 	if (iovcnt > REDUCE_MAX_IOVECS) {
1383 		return false;
1384 	}
1385 
1386 	for (i = 0; i < iovcnt; i++) {
1387 		size += iov[i].iov_len;
1388 	}
1389 
1390 	return size == (length * vol->params.logical_block_size);
1391 }
1392 
1393 static bool
1394 _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
1395 {
1396 	struct spdk_reduce_vol_request *req;
1397 
1398 	TAILQ_FOREACH(req, &vol->executing_requests, tailq) {
1399 		if (logical_map_index == req->logical_map_index) {
1400 			return true;
1401 		}
1402 	}
1403 
1404 	return false;
1405 }
1406 
1407 static void
1408 _start_readv_request(struct spdk_reduce_vol_request *req)
1409 {
1410 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1411 	_reduce_vol_read_chunk(req, _read_read_done);
1412 }
1413 
1414 void
1415 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1416 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1417 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1418 {
1419 	struct spdk_reduce_vol_request *req;
1420 	uint64_t logical_map_index;
1421 	bool overlapped;
1422 	int i;
1423 
1424 	if (length == 0) {
1425 		cb_fn(cb_arg, 0);
1426 		return;
1427 	}
1428 
1429 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1430 		cb_fn(cb_arg, -EINVAL);
1431 		return;
1432 	}
1433 
1434 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1435 		cb_fn(cb_arg, -EINVAL);
1436 		return;
1437 	}
1438 
1439 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1440 	overlapped = _check_overlap(vol, logical_map_index);
1441 
1442 	if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
1443 		/*
1444 		 * This chunk hasn't been allocated.  So treat the data as all
1445 		 * zeroes for this chunk - do the memset and immediately complete
1446 		 * the operation.
1447 		 */
1448 		for (i = 0; i < iovcnt; i++) {
1449 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1450 		}
1451 		cb_fn(cb_arg, 0);
1452 		return;
1453 	}
1454 
1455 	req = TAILQ_FIRST(&vol->free_requests);
1456 	if (req == NULL) {
1457 		cb_fn(cb_arg, -ENOMEM);
1458 		return;
1459 	}
1460 
1461 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1462 	req->type = REDUCE_IO_READV;
1463 	req->vol = vol;
1464 	req->iov = iov;
1465 	req->iovcnt = iovcnt;
1466 	req->offset = offset;
1467 	req->logical_map_index = logical_map_index;
1468 	req->length = length;
1469 	req->cb_fn = cb_fn;
1470 	req->cb_arg = cb_arg;
1471 
1472 	if (!overlapped) {
1473 		_start_readv_request(req);
1474 	} else {
1475 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1476 	}
1477 }
1478 
1479 static void
1480 _start_writev_request(struct spdk_reduce_vol_request *req)
1481 {
1482 	struct spdk_reduce_vol *vol = req->vol;
1483 	uint64_t chunk_offset, ttl_len = 0;
1484 	uint64_t remainder = 0;
1485 	uint32_t lbsize;
1486 	int i;
1487 
1488 	TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq);
1489 	if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
1490 		if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
1491 			/* Read old chunk, then overwrite with data from this write
1492 			 *  operation.
1493 			 */
1494 			req->rmw = true;
1495 			_reduce_vol_read_chunk(req, _write_read_done);
1496 			return;
1497 		}
1498 	}
1499 
1500 	lbsize = vol->params.logical_block_size;
1501 	req->decomp_iovcnt = 0;
1502 	req->rmw = false;
1503 
1504 	/* Note: point to our zero buf for offset into the chunk. */
1505 	chunk_offset = req->offset % vol->logical_blocks_per_chunk;
1506 	if (chunk_offset != 0) {
1507 		ttl_len += chunk_offset * lbsize;
1508 		req->decomp_iov[0].iov_base = g_zero_buf;
1509 		req->decomp_iov[0].iov_len = ttl_len;
1510 		req->decomp_iovcnt = 1;
1511 	}
1512 
1513 	/* now the user data iov, direct from the user buffer */
1514 	for (i = 0; i < req->iovcnt; i++) {
1515 		req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
1516 		req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
1517 		ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
1518 	}
1519 	req->decomp_iovcnt += req->iovcnt;
1520 
1521 	remainder = vol->params.chunk_size - ttl_len;
1522 	if (remainder) {
1523 		req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf;
1524 		req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
1525 		ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
1526 		req->decomp_iovcnt++;
1527 	}
1528 	assert(ttl_len == req->vol->params.chunk_size);
1529 
1530 	_reduce_vol_compress_chunk(req, _write_compress_done);
1531 }
1532 
1533 void
1534 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1535 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1536 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1537 {
1538 	struct spdk_reduce_vol_request *req;
1539 	uint64_t logical_map_index;
1540 	bool overlapped;
1541 
1542 	if (length == 0) {
1543 		cb_fn(cb_arg, 0);
1544 		return;
1545 	}
1546 
1547 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1548 		cb_fn(cb_arg, -EINVAL);
1549 		return;
1550 	}
1551 
1552 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1553 		cb_fn(cb_arg, -EINVAL);
1554 		return;
1555 	}
1556 
1557 	logical_map_index = offset / vol->logical_blocks_per_chunk;
1558 	overlapped = _check_overlap(vol, logical_map_index);
1559 
1560 	req = TAILQ_FIRST(&vol->free_requests);
1561 	if (req == NULL) {
1562 		cb_fn(cb_arg, -ENOMEM);
1563 		return;
1564 	}
1565 
1566 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1567 	req->type = REDUCE_IO_WRITEV;
1568 	req->vol = vol;
1569 	req->iov = iov;
1570 	req->iovcnt = iovcnt;
1571 	req->offset = offset;
1572 	req->logical_map_index = logical_map_index;
1573 	req->length = length;
1574 	req->cb_fn = cb_fn;
1575 	req->cb_arg = cb_arg;
1576 
1577 	if (!overlapped) {
1578 		_start_writev_request(req);
1579 	} else {
1580 		TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
1581 	}
1582 }
1583 
1584 const struct spdk_reduce_vol_params *
1585 spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
1586 {
1587 	return &vol->params;
1588 }
1589 
1590 void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
1591 {
1592 	uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
1593 	uint32_t struct_size;
1594 	uint64_t chunk_map_size;
1595 
1596 	SPDK_NOTICELOG("vol info:\n");
1597 	SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
1598 	SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
1599 	SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
1600 	SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
1601 	num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
1602 	SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
1603 	SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
1604 		       vol->params.vol_size / vol->params.chunk_size);
1605 	ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1606 			vol->params.backing_io_unit_size);
1607 	SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
1608 	struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
1609 	SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
1610 
1611 	SPDK_NOTICELOG("pmem info:\n");
1612 	SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
1613 	SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
1614 	SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
1615 	SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
1616 	logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
1617 			   vol->params.chunk_size);
1618 	SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
1619 	SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
1620 	chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
1621 			 vol->params.backing_io_unit_size);
1622 	SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
1623 }
1624 
1625 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1626