xref: /spdk/lib/reduce/reduce.c (revision 5977aad8f7486552c94c5cc93ea9bb110e1cb5d0)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 /**
71  * Describes a persistent memory file used to hold metadata associated with a
72  *  compressed volume.
73  */
74 struct spdk_reduce_pm_file {
75 	char			path[REDUCE_PATH_MAX];
76 	void			*pm_buf;
77 	int			pm_is_pmem;
78 	uint64_t		size;
79 };
80 
81 struct spdk_reduce_vol_request {
82 	/**
83 	 *  Scratch buffer used for read/modify/write operations on
84 	 *  I/Os less than a full chunk size, and as the intermediate
85 	 *  buffer for compress/decompress operations.
86 	 */
87 	uint8_t					*buf;
88 	struct iovec				*buf_iov;
89 	struct iovec				*iov;
90 	struct spdk_reduce_vol			*vol;
91 	int					reduce_errno;
92 	int					iovcnt;
93 	int					num_backing_ops;
94 	uint64_t				offset;
95 	uint64_t				length;
96 	uint64_t				chunk_map_index;
97 	uint64_t				*chunk;
98 	spdk_reduce_vol_op_complete		cb_fn;
99 	void					*cb_arg;
100 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
101 	struct spdk_reduce_vol_cb_args		backing_cb_args;
102 };
103 
104 struct spdk_reduce_vol {
105 	struct spdk_reduce_vol_params		params;
106 	uint32_t				backing_io_units_per_chunk;
107 	uint32_t				backing_lba_per_io_unit;
108 	uint32_t				logical_blocks_per_chunk;
109 	struct spdk_reduce_pm_file		pm_file;
110 	struct spdk_reduce_backing_dev		*backing_dev;
111 	struct spdk_reduce_vol_superblock	*backing_super;
112 	struct spdk_reduce_vol_superblock	*pm_super;
113 	uint64_t				*pm_logical_map;
114 	uint64_t				*pm_chunk_maps;
115 
116 	struct spdk_bit_array			*allocated_chunk_maps;
117 	struct spdk_bit_array			*allocated_backing_io_units;
118 
119 	struct spdk_reduce_vol_request		*request_mem;
120 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
121 
122 	/* Single contiguous buffer used for all request buffers for this volume. */
123 	uint8_t					*reqbufspace;
124 	struct iovec				*buf_iov_mem;
125 };
126 
127 /*
128  * Allocate extra metadata chunks and corresponding backing io units to account for
129  *  outstanding IO in worst case scenario where logical map is completely allocated
130  *  and no data can be compressed.  We need extra chunks in this case to handle
131  *  in-flight writes since reduce never writes data in place.
132  */
133 #define REDUCE_NUM_EXTRA_CHUNKS 128
134 
135 static void
136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
137 {
138 	if (vol->pm_file.pm_is_pmem) {
139 		pmem_persist(addr, len);
140 	} else {
141 		pmem_msync(addr, len);
142 	}
143 }
144 
145 static uint64_t
146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
147 {
148 	uint64_t chunks_in_logical_map, logical_map_size;
149 
150 	chunks_in_logical_map = vol_size / chunk_size;
151 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
152 
153 	/* Round up to next cacheline. */
154 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
155 	       REDUCE_PM_SIZE_ALIGNMENT;
156 }
157 
158 static uint64_t
159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
160 {
161 	uint64_t num_chunks;
162 
163 	num_chunks = vol_size / chunk_size;
164 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
165 
166 	return num_chunks;
167 }
168 
169 static uint64_t
170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
171 {
172 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
173 
174 	num_chunks = _get_total_chunks(vol_size, chunk_size);
175 	io_units_per_chunk = chunk_size / backing_io_unit_size;
176 	total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t);
177 
178 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
179 	       REDUCE_PM_SIZE_ALIGNMENT;
180 }
181 
182 static uint64_t *
183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
184 {
185 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
186 
187 	return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk);
188 }
189 
190 static int
191 _validate_vol_params(struct spdk_reduce_vol_params *params)
192 {
193 	if (params->vol_size > 0) {
194 		/**
195 		 * User does not pass in the vol size - it gets calculated by libreduce from
196 		 *  values in this structure plus the size of the backing device.
197 		 */
198 		return -EINVAL;
199 	}
200 
201 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
202 	    params->logical_block_size == 0) {
203 		return -EINVAL;
204 	}
205 
206 	/* Chunk size must be an even multiple of the backing io unit size. */
207 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
208 		return -EINVAL;
209 	}
210 
211 	/* Chunk size must be an even multiple of the logical block size. */
212 	if ((params->chunk_size % params->logical_block_size) != 0) {
213 		return -1;
214 	}
215 
216 	return 0;
217 }
218 
219 static uint64_t
220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
221 {
222 	uint64_t num_chunks;
223 
224 	num_chunks = backing_dev_size / chunk_size;
225 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
226 		return 0;
227 	}
228 
229 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
230 	return num_chunks * chunk_size;
231 }
232 
233 static uint64_t
234 _get_pm_file_size(struct spdk_reduce_vol_params *params)
235 {
236 	uint64_t total_pm_size;
237 
238 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
239 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
240 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
241 			 params->backing_io_unit_size);
242 	return total_pm_size;
243 }
244 
245 const struct spdk_uuid *
246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
247 {
248 	return &vol->params.uuid;
249 }
250 
251 static void
252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
253 {
254 	/* Superblock is at the beginning of the pm file. */
255 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
256 
257 	/* Logical map immediately follows the super block. */
258 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
259 
260 	/* Chunks maps follow the logical map. */
261 	vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size);
262 }
263 
264 /* We need 2 iovs during load - one for the superblock, another for the path */
265 #define LOAD_IOV_COUNT	2
266 
267 struct reduce_init_load_ctx {
268 	struct spdk_reduce_vol			*vol;
269 	struct spdk_reduce_vol_cb_args		backing_cb_args;
270 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
271 	void					*cb_arg;
272 	struct iovec				iov[LOAD_IOV_COUNT];
273 	void					*path;
274 };
275 
276 static int
277 _allocate_vol_requests(struct spdk_reduce_vol *vol)
278 {
279 	struct spdk_reduce_vol_request *req;
280 	int i;
281 
282 	vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL);
283 	if (vol->reqbufspace == NULL) {
284 		return -ENOMEM;
285 	}
286 
287 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
288 	if (vol->request_mem == NULL) {
289 		spdk_dma_free(vol->reqbufspace);
290 		vol->reqbufspace = NULL;
291 		return -ENOMEM;
292 	}
293 
294 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
295 				  sizeof(struct iovec) * vol->backing_io_units_per_chunk);
296 	if (vol->buf_iov_mem == NULL) {
297 		free(vol->request_mem);
298 		spdk_dma_free(vol->reqbufspace);
299 		vol->request_mem = NULL;
300 		vol->reqbufspace = NULL;
301 		return -ENOMEM;
302 	}
303 
304 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
305 		req = &vol->request_mem[i];
306 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
307 		req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk];
308 		req->buf = vol->reqbufspace + i * vol->params.chunk_size;
309 	}
310 
311 	return 0;
312 }
313 
314 static void
315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
316 {
317 	if (ctx != NULL) {
318 		spdk_dma_free(ctx->path);
319 		free(ctx);
320 	}
321 
322 	if (vol != NULL) {
323 		pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
324 		spdk_dma_free(vol->backing_super);
325 		spdk_bit_array_free(&vol->allocated_chunk_maps);
326 		spdk_bit_array_free(&vol->allocated_backing_io_units);
327 		free(vol->request_mem);
328 		free(vol->buf_iov_mem);
329 		spdk_dma_free(vol->reqbufspace);
330 		free(vol);
331 	}
332 }
333 
334 static void
335 _init_write_super_cpl(void *cb_arg, int reduce_errno)
336 {
337 	struct reduce_init_load_ctx *init_ctx = cb_arg;
338 	int rc;
339 
340 	rc = _allocate_vol_requests(init_ctx->vol);
341 	if (rc != 0) {
342 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
343 		_init_load_cleanup(init_ctx->vol, init_ctx);
344 		return;
345 	}
346 
347 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
348 	/* Only clean up the ctx - the vol has been passed to the application
349 	 *  for use now that initialization was successful.
350 	 */
351 	_init_load_cleanup(NULL, init_ctx);
352 }
353 
354 static void
355 _init_write_path_cpl(void *cb_arg, int reduce_errno)
356 {
357 	struct reduce_init_load_ctx *init_ctx = cb_arg;
358 	struct spdk_reduce_vol *vol = init_ctx->vol;
359 
360 	init_ctx->iov[0].iov_base = vol->backing_super;
361 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
362 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
363 	init_ctx->backing_cb_args.cb_arg = init_ctx;
364 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
365 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
366 				 &init_ctx->backing_cb_args);
367 }
368 
369 static int
370 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
371 {
372 	uint64_t total_chunks, total_backing_io_units;
373 
374 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
375 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
376 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
377 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
378 
379 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
380 		return -ENOMEM;
381 	}
382 
383 	/* Set backing io unit bits associated with metadata. */
384 	spdk_bit_array_set(vol->allocated_backing_io_units, 0);
385 	spdk_bit_array_set(vol->allocated_backing_io_units, 1);
386 
387 	return 0;
388 }
389 
390 void
391 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
392 		     struct spdk_reduce_backing_dev *backing_dev,
393 		     const char *pm_file_dir,
394 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
395 {
396 	struct spdk_reduce_vol *vol;
397 	struct reduce_init_load_ctx *init_ctx;
398 	uint64_t backing_dev_size;
399 	size_t mapped_len;
400 	int dir_len, max_dir_len, rc;
401 
402 	/* We need to append a path separator and the UUID to the supplied
403 	 * path.
404 	 */
405 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
406 	dir_len = strnlen(pm_file_dir, max_dir_len);
407 	/* Strip trailing slash if the user provided one - we will add it back
408 	 * later when appending the filename.
409 	 */
410 	if (pm_file_dir[dir_len - 1] == '/') {
411 		dir_len--;
412 	}
413 	if (dir_len == max_dir_len) {
414 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
415 		cb_fn(cb_arg, NULL, -EINVAL);
416 		return;
417 	}
418 
419 	rc = _validate_vol_params(params);
420 	if (rc != 0) {
421 		SPDK_ERRLOG("invalid vol params\n");
422 		cb_fn(cb_arg, NULL, rc);
423 		return;
424 	}
425 
426 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
427 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
428 	if (params->vol_size == 0) {
429 		SPDK_ERRLOG("backing device is too small\n");
430 		cb_fn(cb_arg, NULL, -EINVAL);
431 		return;
432 	}
433 
434 	if (backing_dev->close == NULL || backing_dev->readv == NULL ||
435 	    backing_dev->writev == NULL || backing_dev->unmap == NULL) {
436 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
437 		cb_fn(cb_arg, NULL, -EINVAL);
438 		return;
439 	}
440 
441 	vol = calloc(1, sizeof(*vol));
442 	if (vol == NULL) {
443 		cb_fn(cb_arg, NULL, -ENOMEM);
444 		return;
445 	}
446 
447 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL);
448 	if (vol->backing_super == NULL) {
449 		cb_fn(cb_arg, NULL, -ENOMEM);
450 		_init_load_cleanup(vol, NULL);
451 		return;
452 	}
453 
454 	init_ctx = calloc(1, sizeof(*init_ctx));
455 	if (init_ctx == NULL) {
456 		cb_fn(cb_arg, NULL, -ENOMEM);
457 		_init_load_cleanup(vol, NULL);
458 		return;
459 	}
460 
461 	init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL);
462 	if (init_ctx->path == NULL) {
463 		cb_fn(cb_arg, NULL, -ENOMEM);
464 		_init_load_cleanup(vol, init_ctx);
465 		return;
466 	}
467 
468 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
469 		spdk_uuid_generate(&params->uuid);
470 	}
471 
472 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
473 	vol->pm_file.path[dir_len] = '/';
474 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
475 			    &params->uuid);
476 	vol->pm_file.size = _get_pm_file_size(params);
477 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
478 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
479 					    &mapped_len, &vol->pm_file.pm_is_pmem);
480 	if (vol->pm_file.pm_buf == NULL) {
481 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
482 			    vol->pm_file.path, strerror(errno));
483 		cb_fn(cb_arg, NULL, -errno);
484 		_init_load_cleanup(vol, init_ctx);
485 		return;
486 	}
487 
488 	if (vol->pm_file.size != mapped_len) {
489 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
490 			    vol->pm_file.size, mapped_len);
491 		cb_fn(cb_arg, NULL, -ENOMEM);
492 		_init_load_cleanup(vol, init_ctx);
493 		return;
494 	}
495 
496 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
497 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
498 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
499 	memcpy(&vol->params, params, sizeof(*params));
500 
501 	rc = _allocate_bit_arrays(vol);
502 	if (rc != 0) {
503 		cb_fn(cb_arg, NULL, rc);
504 		_init_load_cleanup(vol, init_ctx);
505 		return;
506 	}
507 
508 	vol->backing_dev = backing_dev;
509 
510 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
511 	       sizeof(vol->backing_super->signature));
512 	memcpy(&vol->backing_super->params, params, sizeof(*params));
513 
514 	_initialize_vol_pm_pointers(vol);
515 
516 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
517 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
518 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
519 	 */
520 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
521 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
522 
523 	init_ctx->vol = vol;
524 	init_ctx->cb_fn = cb_fn;
525 	init_ctx->cb_arg = cb_arg;
526 
527 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
528 	init_ctx->iov[0].iov_base = init_ctx->path;
529 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
530 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
531 	init_ctx->backing_cb_args.cb_arg = init_ctx;
532 	/* Write path to offset 4K on backing device - just after where the super
533 	 *  block will be written.  We wait until this is committed before writing the
534 	 *  super block to guarantee we don't get the super block written without the
535 	 *  the path if the system crashed in the middle of a write operation.
536 	 */
537 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
538 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
539 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
540 				 &init_ctx->backing_cb_args);
541 }
542 
543 static void
544 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
545 {
546 	struct reduce_init_load_ctx *load_ctx = cb_arg;
547 	struct spdk_reduce_vol *vol = load_ctx->vol;
548 	uint64_t backing_dev_size;
549 	size_t mapped_len;
550 	int rc;
551 
552 	if (memcmp(vol->backing_super->signature,
553 		   SPDK_REDUCE_SIGNATURE,
554 		   sizeof(vol->backing_super->signature)) != 0) {
555 		/* This backing device isn't a libreduce backing device. */
556 		rc = -EILSEQ;
557 		goto error;
558 	}
559 
560 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
561 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
562 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
563 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
564 
565 	rc = _allocate_bit_arrays(vol);
566 	if (rc != 0) {
567 		goto error;
568 	}
569 
570 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
571 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
572 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
573 			    backing_dev_size);
574 		rc = -EILSEQ;
575 		goto error;
576 	}
577 
578 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
579 	vol->pm_file.size = _get_pm_file_size(&vol->params);
580 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
581 					    &vol->pm_file.pm_is_pmem);
582 	if (vol->pm_file.pm_buf == NULL) {
583 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
584 		rc = -errno;
585 		goto error;
586 	}
587 
588 	if (vol->pm_file.size != mapped_len) {
589 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
590 			    vol->pm_file.size, mapped_len);
591 		rc = -ENOMEM;
592 		goto error;
593 	}
594 
595 	rc = _allocate_vol_requests(vol);
596 	if (rc != 0) {
597 		goto error;
598 	}
599 
600 	_initialize_vol_pm_pointers(vol);
601 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
602 	/* Only clean up the ctx - the vol has been passed to the application
603 	 *  for use now that volume load was successful.
604 	 */
605 	_init_load_cleanup(NULL, load_ctx);
606 	return;
607 
608 error:
609 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
610 	_init_load_cleanup(vol, load_ctx);
611 }
612 
613 void
614 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
615 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
616 {
617 	struct spdk_reduce_vol *vol;
618 	struct reduce_init_load_ctx *load_ctx;
619 
620 	if (backing_dev->close == NULL || backing_dev->readv == NULL ||
621 	    backing_dev->writev == NULL || backing_dev->unmap == NULL) {
622 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
623 		cb_fn(cb_arg, NULL, -EINVAL);
624 		return;
625 	}
626 
627 	vol = calloc(1, sizeof(*vol));
628 	if (vol == NULL) {
629 		cb_fn(cb_arg, NULL, -ENOMEM);
630 		return;
631 	}
632 
633 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL);
634 	if (vol->backing_super == NULL) {
635 		_init_load_cleanup(vol, NULL);
636 		cb_fn(cb_arg, NULL, -ENOMEM);
637 		return;
638 	}
639 
640 	vol->backing_dev = backing_dev;
641 
642 	load_ctx = calloc(1, sizeof(*load_ctx));
643 	if (load_ctx == NULL) {
644 		_init_load_cleanup(vol, NULL);
645 		cb_fn(cb_arg, NULL, -ENOMEM);
646 		return;
647 	}
648 
649 	load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL);
650 	if (load_ctx->path == NULL) {
651 		_init_load_cleanup(vol, load_ctx);
652 		cb_fn(cb_arg, NULL, -ENOMEM);
653 		return;
654 	}
655 
656 	load_ctx->vol = vol;
657 	load_ctx->cb_fn = cb_fn;
658 	load_ctx->cb_arg = cb_arg;
659 
660 	load_ctx->iov[0].iov_base = vol->backing_super;
661 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
662 	load_ctx->iov[1].iov_base = load_ctx->path;
663 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
664 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
665 	load_ctx->backing_cb_args.cb_arg = load_ctx;
666 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
667 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
668 				vol->backing_dev->blocklen,
669 				&load_ctx->backing_cb_args);
670 }
671 
672 void
673 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
674 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
675 {
676 	if (vol == NULL) {
677 		/* This indicates a programming error. */
678 		assert(false);
679 		cb_fn(cb_arg, -EINVAL);
680 		return;
681 	}
682 
683 	vol->backing_dev->close(vol->backing_dev);
684 
685 	_init_load_cleanup(vol, NULL);
686 	cb_fn(cb_arg, 0);
687 }
688 
689 static bool
690 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
691 {
692 	uint64_t start_chunk, end_chunk;
693 
694 	start_chunk = offset / vol->logical_blocks_per_chunk;
695 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
696 
697 	return (start_chunk != end_chunk);
698 }
699 
700 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
701 
702 static void
703 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
704 {
705 	req->cb_fn(req->cb_arg, reduce_errno);
706 	TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq);
707 }
708 
709 static void
710 _write_complete_req(void *_req, int reduce_errno)
711 {
712 	struct spdk_reduce_vol_request *req = _req;
713 	struct spdk_reduce_vol *vol = req->vol;
714 	uint64_t logical_map_index, old_chunk_map_index;
715 	uint64_t *old_chunk;
716 	uint32_t i;
717 
718 	if (reduce_errno != 0) {
719 		req->reduce_errno = reduce_errno;
720 	}
721 
722 	assert(req->num_backing_ops > 0);
723 	if (--req->num_backing_ops > 0) {
724 		return;
725 	}
726 
727 	if (req->reduce_errno != 0) {
728 		_reduce_vol_complete_req(req, req->reduce_errno);
729 		return;
730 	}
731 
732 	logical_map_index = req->offset / vol->logical_blocks_per_chunk;
733 
734 	old_chunk_map_index = vol->pm_logical_map[logical_map_index];
735 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
736 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
737 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
738 			if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) {
739 				break;
740 			}
741 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true);
742 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]);
743 			old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY;
744 		}
745 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
746 	}
747 
748 	/*
749 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
750 	 * becomes invalid after we update the logical map, since the old chunk map will no
751 	 * longer have a reference to it in the logical map.
752 	 */
753 
754 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
755 	_reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk);
756 
757 	vol->pm_logical_map[logical_map_index] = req->chunk_map_index;
758 
759 	_reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t));
760 
761 	_reduce_vol_complete_req(req, 0);
762 }
763 
764 static void
765 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
766 		   reduce_request_fn next_fn, bool is_write)
767 {
768 	uint32_t i;
769 
770 	req->num_backing_ops = vol->backing_io_units_per_chunk;
771 	req->backing_cb_args.cb_fn = next_fn;
772 	req->backing_cb_args.cb_arg = req;
773 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
774 		req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size;
775 		req->buf_iov[i].iov_len = vol->params.backing_io_unit_size;
776 		if (is_write) {
777 			vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1,
778 						 req->chunk[i] * vol->backing_lba_per_io_unit,
779 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
780 		} else {
781 			vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1,
782 						req->chunk[i] * vol->backing_lba_per_io_unit,
783 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
784 		}
785 	}
786 }
787 
788 static void
789 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
790 {
791 	struct spdk_reduce_vol *vol = req->vol;
792 	uint32_t i;
793 
794 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
795 
796 	/* TODO: fail if no chunk map found - but really this should not happen if we
797 	 * size the number of requests similarly to number of extra chunk maps
798 	 */
799 	assert(req->chunk_map_index != UINT32_MAX);
800 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
801 
802 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
803 
804 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
805 		req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
806 		/* TODO: fail if no backing block found - but really this should also not
807 		 * happen (see comment above).
808 		 */
809 		assert(req->chunk[i] != UINT32_MAX);
810 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]);
811 	}
812 
813 	_issue_backing_ops(req, vol, next_fn, true /* write */);
814 }
815 
816 static void
817 _write_read_done(void *_req, int reduce_errno)
818 {
819 	struct spdk_reduce_vol_request *req = _req;
820 	uint64_t chunk_offset;
821 	uint8_t *buf;
822 	int i;
823 
824 	if (reduce_errno != 0) {
825 		req->reduce_errno = reduce_errno;
826 	}
827 
828 	assert(req->num_backing_ops > 0);
829 	if (--req->num_backing_ops > 0) {
830 		return;
831 	}
832 
833 	if (req->reduce_errno != 0) {
834 		_reduce_vol_complete_req(req, req->reduce_errno);
835 		return;
836 	}
837 
838 	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
839 	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
840 	for (i = 0; i < req->iovcnt; i++) {
841 		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
842 		buf += req->iov[i].iov_len;
843 	}
844 
845 	_reduce_vol_write_chunk(req, _write_complete_req);
846 }
847 
848 static void
849 _read_read_done(void *_req, int reduce_errno)
850 {
851 	struct spdk_reduce_vol_request *req = _req;
852 	uint64_t chunk_offset;
853 	uint8_t *buf;
854 	int i;
855 
856 	if (reduce_errno != 0) {
857 		req->reduce_errno = reduce_errno;
858 	}
859 
860 	assert(req->num_backing_ops > 0);
861 	if (--req->num_backing_ops > 0) {
862 		return;
863 	}
864 
865 	if (req->reduce_errno != 0) {
866 		_reduce_vol_complete_req(req, req->reduce_errno);
867 		return;
868 	}
869 
870 	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
871 	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
872 	for (i = 0; i < req->iovcnt; i++) {
873 		memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
874 		buf += req->iov[i].iov_len;
875 	}
876 	_reduce_vol_complete_req(req, 0);
877 }
878 
879 static void
880 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
881 {
882 	struct spdk_reduce_vol *vol = req->vol;
883 	uint64_t chunk;
884 
885 	chunk = req->offset / vol->logical_blocks_per_chunk;
886 	req->chunk_map_index = vol->pm_logical_map[chunk];
887 	assert(req->chunk_map_index != UINT32_MAX);
888 
889 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
890 	_issue_backing_ops(req, vol, next_fn, false /* read */);
891 }
892 
893 static bool
894 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
895 		    uint64_t length)
896 {
897 	uint64_t size = 0;
898 	int i;
899 
900 	for (i = 0; i < iovcnt; i++) {
901 		size += iov[i].iov_len;
902 	}
903 
904 	return size == (length * vol->params.logical_block_size);
905 }
906 
907 void
908 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
909 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
910 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
911 {
912 	struct spdk_reduce_vol_request *req;
913 	uint64_t chunk;
914 	int i;
915 
916 	if (length == 0) {
917 		cb_fn(cb_arg, 0);
918 		return;
919 	}
920 
921 	if (_request_spans_chunk_boundary(vol, offset, length)) {
922 		cb_fn(cb_arg, -EINVAL);
923 		return;
924 	}
925 
926 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
927 		cb_fn(cb_arg, -EINVAL);
928 		return;
929 	}
930 
931 	chunk = offset / vol->logical_blocks_per_chunk;
932 	if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) {
933 		/*
934 		 * This chunk hasn't been allocated.  So treat the data as all
935 		 * zeroes for this chunk - do the memset and immediately complete
936 		 * the operation.
937 		 */
938 		for (i = 0; i < iovcnt; i++) {
939 			memset(iov[i].iov_base, 0, iov[i].iov_len);
940 		}
941 		cb_fn(cb_arg, 0);
942 		return;
943 	}
944 
945 	req = TAILQ_FIRST(&vol->free_requests);
946 	if (req == NULL) {
947 		cb_fn(cb_arg, -ENOMEM);
948 		return;
949 	}
950 
951 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
952 	req->vol = vol;
953 	req->iov = iov;
954 	req->iovcnt = iovcnt;
955 	req->offset = offset;
956 	req->length = length;
957 	req->cb_fn = cb_fn;
958 	req->cb_arg = cb_arg;
959 
960 	_reduce_vol_read_chunk(req, _read_read_done);
961 }
962 
963 void
964 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
965 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
966 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
967 {
968 	struct spdk_reduce_vol_request *req;
969 	uint64_t chunk, chunk_offset;
970 	uint32_t lbsize, lb_per_chunk;
971 	int i;
972 	uint8_t *buf;
973 
974 	if (length == 0) {
975 		cb_fn(cb_arg, 0);
976 		return;
977 	}
978 
979 	if (_request_spans_chunk_boundary(vol, offset, length)) {
980 		cb_fn(cb_arg, -EINVAL);
981 		return;
982 	}
983 
984 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
985 		cb_fn(cb_arg, -EINVAL);
986 		return;
987 	}
988 
989 	req = TAILQ_FIRST(&vol->free_requests);
990 	if (req == NULL) {
991 		cb_fn(cb_arg, -ENOMEM);
992 		return;
993 	}
994 
995 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
996 	req->vol = vol;
997 	req->iov = iov;
998 	req->iovcnt = iovcnt;
999 	req->offset = offset;
1000 	req->length = length;
1001 	req->cb_fn = cb_fn;
1002 	req->cb_arg = cb_arg;
1003 
1004 	chunk = offset / vol->logical_blocks_per_chunk;
1005 	if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) {
1006 		/* Read old chunk, then overwrite with data from this write operation.
1007 		 * TODO: bypass reading old chunk if this write operation overwrites
1008 		 * the entire chunk.
1009 		 */
1010 		_reduce_vol_read_chunk(req, _write_read_done);
1011 		return;
1012 	}
1013 
1014 	buf = req->buf;
1015 	lbsize = vol->params.logical_block_size;
1016 	lb_per_chunk = vol->logical_blocks_per_chunk;
1017 	/* Note: we must zero out parts of req->buf not specified by this write operation. */
1018 	chunk_offset = offset % lb_per_chunk;
1019 	if (chunk_offset != 0) {
1020 		memset(buf, 0, chunk_offset * lbsize);
1021 		buf += chunk_offset * lbsize;
1022 	}
1023 	for (i = 0; i < iovcnt; i++) {
1024 		memcpy(buf, iov[i].iov_base, iov[i].iov_len);
1025 		buf += iov[i].iov_len;
1026 	}
1027 	chunk_offset += length;
1028 	if (chunk_offset != lb_per_chunk) {
1029 		memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize);
1030 	}
1031 	_reduce_vol_write_chunk(req, _write_complete_req);
1032 }
1033 
1034 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1035