xref: /spdk/lib/reduce/reduce.c (revision 9c2aea2ad581e78982146835ecab3b12107d25d6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk_internal/log.h"
41 
42 #include "libpmem.h"
43 
44 /* Always round up the size of the PM region to the nearest cacheline. */
45 #define REDUCE_PM_SIZE_ALIGNMENT	64
46 
47 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
48 
49 /* Offset into the backing device where the persistent memory file's path is stored. */
50 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
51 
52 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
53 
54 #define REDUCE_NUM_VOL_REQUESTS	256
55 
56 /* Structure written to offset 0 of both the pm file and the backing device. */
57 struct spdk_reduce_vol_superblock {
58 	uint8_t				signature[8];
59 	struct spdk_reduce_vol_params	params;
60 	uint8_t				reserved[4048];
61 };
62 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
63 
64 #define REDUCE_PATH_MAX 4096
65 
66 /**
67  * Describes a persistent memory file used to hold metadata associated with a
68  *  compressed volume.
69  */
70 struct spdk_reduce_pm_file {
71 	char			path[REDUCE_PATH_MAX];
72 	void			*pm_buf;
73 	int			pm_is_pmem;
74 	uint64_t		size;
75 };
76 
77 struct spdk_reduce_vol_request {
78 	uint8_t					*buf;
79 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
80 };
81 
82 struct spdk_reduce_vol {
83 	struct spdk_reduce_vol_params		params;
84 	uint32_t				backing_io_units_per_chunk;
85 	uint32_t				logical_blocks_per_chunk;
86 	struct spdk_reduce_pm_file		pm_file;
87 	struct spdk_reduce_backing_dev		*backing_dev;
88 	struct spdk_reduce_vol_superblock	*backing_super;
89 	struct spdk_reduce_vol_superblock	*pm_super;
90 	uint64_t				*pm_logical_map;
91 	uint64_t				*pm_chunk_maps;
92 
93 	struct spdk_bit_array			*allocated_chunk_maps;
94 	struct spdk_bit_array			*allocated_backing_io_units;
95 
96 	struct spdk_reduce_vol_request		*request_mem;
97 	TAILQ_HEAD(, spdk_reduce_vol_request)	requests;
98 	uint8_t					*bufspace;
99 };
100 
101 /*
102  * Allocate extra metadata chunks and corresponding backing io units to account for
103  *  outstanding IO in worst case scenario where logical map is completely allocated
104  *  and no data can be compressed.  We need extra chunks in this case to handle
105  *  in-flight writes since reduce never writes data in place.
106  */
107 #define REDUCE_NUM_EXTRA_CHUNKS 128
108 
109 static void
110 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
111 {
112 	if (vol->pm_file.pm_is_pmem) {
113 		pmem_persist(addr, len);
114 	} else {
115 		pmem_msync(addr, len);
116 	}
117 }
118 
119 static inline uint64_t
120 divide_round_up(uint64_t num, uint64_t divisor)
121 {
122 	return (num + divisor - 1) / divisor;
123 }
124 
125 static uint64_t
126 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
127 {
128 	uint64_t chunks_in_logical_map, logical_map_size;
129 
130 	chunks_in_logical_map = vol_size / chunk_size;
131 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
132 
133 	/* Round up to next cacheline. */
134 	return divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * REDUCE_PM_SIZE_ALIGNMENT;
135 }
136 
137 static uint64_t
138 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
139 {
140 	uint64_t num_chunks;
141 
142 	num_chunks = vol_size / chunk_size;
143 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
144 
145 	return num_chunks;
146 }
147 
148 static uint64_t
149 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
150 {
151 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
152 
153 	num_chunks = _get_total_chunks(vol_size, chunk_size);
154 	io_units_per_chunk = chunk_size / backing_io_unit_size;
155 	total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t);
156 
157 	return divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * REDUCE_PM_SIZE_ALIGNMENT;
158 }
159 
160 static int
161 _validate_vol_params(struct spdk_reduce_vol_params *params)
162 {
163 	if (params->vol_size == 0 || params->chunk_size == 0 ||
164 	    params->backing_io_unit_size == 0 || params->logical_block_size == 0) {
165 		return -EINVAL;
166 	}
167 
168 	/* Chunk size must be an even multiple of the backing io unit size. */
169 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
170 		return -EINVAL;
171 	}
172 
173 	/* Chunk size must be an even multiple of the logical block size. */
174 	if ((params->chunk_size % params->logical_block_size) != 0) {
175 		return -1;
176 	}
177 
178 	/* Volume size must be an even multiple of the chunk size. */
179 	if ((params->vol_size % params->chunk_size) != 0) {
180 		return -EINVAL;
181 	}
182 
183 	return 0;
184 }
185 
186 int64_t
187 spdk_reduce_get_pm_file_size(struct spdk_reduce_vol_params *params)
188 {
189 	uint64_t total_pm_size;
190 	int rc;
191 
192 	rc = _validate_vol_params(params);
193 	if (rc != 0) {
194 		return rc;
195 	}
196 
197 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
198 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
199 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
200 			 params->backing_io_unit_size);
201 	return total_pm_size;
202 }
203 
204 int64_t
205 spdk_reduce_get_backing_device_size(struct spdk_reduce_vol_params *params)
206 {
207 	uint64_t total_backing_size, num_chunks;
208 	int rc;
209 
210 	rc = _validate_vol_params(params);
211 	if (rc != 0) {
212 		return rc;
213 	}
214 
215 	num_chunks = _get_total_chunks(params->vol_size, params->chunk_size);
216 	total_backing_size = num_chunks * params->chunk_size;
217 	total_backing_size += sizeof(struct spdk_reduce_vol_superblock);
218 
219 	return total_backing_size;
220 }
221 
222 const struct spdk_uuid *
223 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
224 {
225 	return &vol->params.uuid;
226 }
227 
228 static void
229 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
230 {
231 	/* Superblock is at the beginning of the pm file. */
232 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
233 
234 	/* Logical map immediately follows the super block. */
235 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
236 
237 	/* Chunks maps follow the logical map. */
238 	vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size);
239 }
240 
241 /* We need 2 iovs during load - one for the superblock, another for the path */
242 #define LOAD_IOV_COUNT	2
243 
244 struct reduce_init_load_ctx {
245 	struct spdk_reduce_vol			*vol;
246 	struct spdk_reduce_vol_cb_args		backing_cb_args;
247 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
248 	void					*cb_arg;
249 	struct iovec				iov[LOAD_IOV_COUNT];
250 	void					*path;
251 };
252 
253 static int
254 _allocate_vol_requests(struct spdk_reduce_vol *vol)
255 {
256 	struct spdk_reduce_vol_request *req;
257 	int i;
258 
259 	vol->bufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL);
260 	if (vol->bufspace == NULL) {
261 		return -ENOMEM;
262 	}
263 
264 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
265 	if (vol->request_mem == NULL) {
266 		free(vol->bufspace);
267 		return -ENOMEM;
268 	}
269 
270 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
271 		req = &vol->request_mem[i];
272 		TAILQ_INSERT_HEAD(&vol->requests, req, tailq);
273 		req->buf = vol->bufspace + i * vol->params.chunk_size;
274 	}
275 
276 	return 0;
277 }
278 
279 static void
280 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
281 {
282 	if (ctx != NULL) {
283 		spdk_dma_free(ctx->path);
284 		free(ctx);
285 	}
286 
287 	if (vol != NULL) {
288 		spdk_dma_free(vol->backing_super);
289 		spdk_bit_array_free(&vol->allocated_chunk_maps);
290 		spdk_bit_array_free(&vol->allocated_backing_io_units);
291 		free(vol->request_mem);
292 		spdk_dma_free(vol->bufspace);
293 		free(vol);
294 	}
295 }
296 
297 static void
298 _init_write_super_cpl(void *cb_arg, int ziperrno)
299 {
300 	struct reduce_init_load_ctx *init_ctx = cb_arg;
301 	int rc;
302 
303 	rc = _allocate_vol_requests(init_ctx->vol);
304 	if (rc != 0) {
305 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
306 		_init_load_cleanup(init_ctx->vol, init_ctx);
307 		return;
308 	}
309 
310 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, ziperrno);
311 	/* Only clean up the ctx - the vol has been passed to the application
312 	 *  for use now that initialization was successful.
313 	 */
314 	_init_load_cleanup(NULL, init_ctx);
315 }
316 
317 static void
318 _init_write_path_cpl(void *cb_arg, int ziperrno)
319 {
320 	struct reduce_init_load_ctx *init_ctx = cb_arg;
321 	struct spdk_reduce_vol *vol = init_ctx->vol;
322 
323 	init_ctx->iov[0].iov_base = vol->backing_super;
324 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
325 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
326 	init_ctx->backing_cb_args.cb_arg = init_ctx;
327 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
328 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
329 				 &init_ctx->backing_cb_args);
330 }
331 
332 static int
333 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
334 {
335 	uint64_t total_chunks, total_backing_io_units;
336 
337 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
338 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
339 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
340 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
341 
342 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
343 		return -ENOMEM;
344 	}
345 
346 	/* Set backing io unit bits associated with metadata. */
347 	spdk_bit_array_set(vol->allocated_backing_io_units, 0);
348 	spdk_bit_array_set(vol->allocated_backing_io_units, 1);
349 
350 	return 0;
351 }
352 
353 void
354 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
355 		     struct spdk_reduce_backing_dev *backing_dev,
356 		     const char *pm_file_dir,
357 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
358 {
359 	struct spdk_reduce_vol *vol;
360 	struct reduce_init_load_ctx *init_ctx;
361 	int64_t size, size_needed;
362 	size_t mapped_len;
363 	int dir_len, max_dir_len, rc;
364 
365 	/* We need to append a path separator and the UUID to the supplied
366 	 * path.
367 	 */
368 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
369 	dir_len = strnlen(pm_file_dir, max_dir_len);
370 	/* Strip trailing slash if the user provided one - we will add it back
371 	 * later when appending the filename.
372 	 */
373 	if (pm_file_dir[dir_len - 1] == '/') {
374 		dir_len--;
375 	}
376 	if (dir_len == max_dir_len) {
377 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
378 		cb_fn(cb_arg, NULL, -EINVAL);
379 		return;
380 	}
381 
382 	rc = _validate_vol_params(params);
383 	if (rc != 0) {
384 		SPDK_ERRLOG("invalid vol params\n");
385 		cb_fn(cb_arg, NULL, rc);
386 		return;
387 	}
388 
389 	size_needed = spdk_reduce_get_backing_device_size(params);
390 	size = backing_dev->blockcnt * backing_dev->blocklen;
391 	if (size_needed > size) {
392 		SPDK_ERRLOG("backing device size %" PRIi64 " but %" PRIi64 " needed\n",
393 			    size, size_needed);
394 		cb_fn(cb_arg, NULL, -EINVAL);
395 		return;
396 	}
397 
398 	if (size_needed > size) {
399 		SPDK_ERRLOG("pm file size %" PRIi64 " but %" PRIi64 " needed\n",
400 			    size, size_needed);
401 		cb_fn(cb_arg, NULL, -EINVAL);
402 		return;
403 	}
404 
405 	if (backing_dev->close == NULL || backing_dev->readv == NULL ||
406 	    backing_dev->writev == NULL || backing_dev->unmap == NULL) {
407 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
408 		cb_fn(cb_arg, NULL, -EINVAL);
409 		return;
410 	}
411 
412 	vol = calloc(1, sizeof(*vol));
413 	if (vol == NULL) {
414 		cb_fn(cb_arg, NULL, -ENOMEM);
415 		return;
416 	}
417 
418 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL);
419 	if (vol->backing_super == NULL) {
420 		cb_fn(cb_arg, NULL, -ENOMEM);
421 		_init_load_cleanup(vol, NULL);
422 		return;
423 	}
424 
425 	init_ctx = calloc(1, sizeof(*init_ctx));
426 	if (init_ctx == NULL) {
427 		cb_fn(cb_arg, NULL, -ENOMEM);
428 		_init_load_cleanup(vol, NULL);
429 		return;
430 	}
431 
432 	init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL);
433 	if (init_ctx->path == NULL) {
434 		cb_fn(cb_arg, NULL, -ENOMEM);
435 		_init_load_cleanup(vol, init_ctx);
436 		return;
437 	}
438 
439 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
440 		spdk_uuid_generate(&params->uuid);
441 	}
442 
443 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
444 	vol->pm_file.path[dir_len] = '/';
445 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
446 			    &params->uuid);
447 	vol->pm_file.size = spdk_reduce_get_pm_file_size(params);
448 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
449 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
450 					    &mapped_len, &vol->pm_file.pm_is_pmem);
451 	if (vol->pm_file.pm_buf == NULL) {
452 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
453 			    vol->pm_file.path, strerror(errno));
454 		cb_fn(cb_arg, NULL, -errno);
455 		_init_load_cleanup(vol, init_ctx);
456 		return;
457 	}
458 
459 	if (vol->pm_file.size != mapped_len) {
460 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
461 			    vol->pm_file.size, mapped_len);
462 		cb_fn(cb_arg, NULL, -ENOMEM);
463 		_init_load_cleanup(vol, init_ctx);
464 		return;
465 	}
466 
467 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
468 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
469 	memcpy(&vol->params, params, sizeof(*params));
470 
471 	rc = _allocate_bit_arrays(vol);
472 	if (rc != 0) {
473 		cb_fn(cb_arg, NULL, rc);
474 		_init_load_cleanup(vol, init_ctx);
475 		return;
476 	}
477 
478 	vol->backing_dev = backing_dev;
479 
480 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
481 	       sizeof(vol->backing_super->signature));
482 	memcpy(&vol->backing_super->params, params, sizeof(*params));
483 
484 	_initialize_vol_pm_pointers(vol);
485 
486 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
487 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
488 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
489 	 */
490 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
491 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
492 
493 	init_ctx->vol = vol;
494 	init_ctx->cb_fn = cb_fn;
495 	init_ctx->cb_arg = cb_arg;
496 
497 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
498 	init_ctx->iov[0].iov_base = init_ctx->path;
499 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
500 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
501 	init_ctx->backing_cb_args.cb_arg = init_ctx;
502 	/* Write path to offset 4K on backing device - just after where the super
503 	 *  block will be written.  We wait until this is committed before writing the
504 	 *  super block to guarantee we don't get the super block written without the
505 	 *  the path if the system crashed in the middle of a write operation.
506 	 */
507 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
508 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
509 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
510 				 &init_ctx->backing_cb_args);
511 }
512 
513 static void
514 _load_read_super_and_path_cpl(void *cb_arg, int ziperrno)
515 {
516 	struct reduce_init_load_ctx *load_ctx = cb_arg;
517 	struct spdk_reduce_vol *vol = load_ctx->vol;
518 	int64_t size, size_needed;
519 	size_t mapped_len;
520 	int rc;
521 
522 	if (memcmp(vol->backing_super->signature,
523 		   SPDK_REDUCE_SIGNATURE,
524 		   sizeof(vol->backing_super->signature)) != 0) {
525 		/* This backing device isn't a libreduce backing device. */
526 		rc = -EILSEQ;
527 		goto error;
528 	}
529 
530 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
531 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
532 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
533 
534 	rc = _allocate_bit_arrays(vol);
535 	if (rc != 0) {
536 		goto error;
537 	}
538 
539 	size_needed = spdk_reduce_get_backing_device_size(&vol->params);
540 	size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
541 	if (size_needed > size) {
542 		SPDK_ERRLOG("backing device size %" PRIi64 " but %" PRIi64 " expected\n",
543 			    size, size_needed);
544 		rc = -EILSEQ;
545 		goto error;
546 	}
547 
548 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
549 	vol->pm_file.size = spdk_reduce_get_pm_file_size(&vol->params);
550 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
551 					    &vol->pm_file.pm_is_pmem);
552 	if (vol->pm_file.pm_buf == NULL) {
553 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
554 		rc = -errno;
555 		goto error;
556 	}
557 
558 	if (vol->pm_file.size != mapped_len) {
559 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
560 			    vol->pm_file.size, mapped_len);
561 		rc = -ENOMEM;
562 		goto error;
563 	}
564 
565 	rc = _allocate_vol_requests(vol);
566 	if (rc != 0) {
567 		goto error;
568 	}
569 
570 	_initialize_vol_pm_pointers(vol);
571 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
572 	/* Only clean up the ctx - the vol has been passed to the application
573 	 *  for use now that volume load was successful.
574 	 */
575 	_init_load_cleanup(NULL, load_ctx);
576 	return;
577 
578 error:
579 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
580 	_init_load_cleanup(vol, load_ctx);
581 }
582 
583 void
584 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
585 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
586 {
587 	struct spdk_reduce_vol *vol;
588 	struct reduce_init_load_ctx *load_ctx;
589 
590 	if (backing_dev->close == NULL || backing_dev->readv == NULL ||
591 	    backing_dev->writev == NULL || backing_dev->unmap == NULL) {
592 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
593 		cb_fn(cb_arg, NULL, -EINVAL);
594 		return;
595 	}
596 
597 	vol = calloc(1, sizeof(*vol));
598 	if (vol == NULL) {
599 		cb_fn(cb_arg, NULL, -ENOMEM);
600 		return;
601 	}
602 
603 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL);
604 	if (vol->backing_super == NULL) {
605 		_init_load_cleanup(vol, NULL);
606 		cb_fn(cb_arg, NULL, -ENOMEM);
607 		return;
608 	}
609 
610 	vol->backing_dev = backing_dev;
611 
612 	load_ctx = calloc(1, sizeof(*load_ctx));
613 	if (load_ctx == NULL) {
614 		_init_load_cleanup(vol, NULL);
615 		cb_fn(cb_arg, NULL, -ENOMEM);
616 		return;
617 	}
618 
619 	load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL);
620 	if (load_ctx->path == NULL) {
621 		_init_load_cleanup(vol, load_ctx);
622 		cb_fn(cb_arg, NULL, -ENOMEM);
623 		return;
624 	}
625 
626 	load_ctx->vol = vol;
627 	load_ctx->cb_fn = cb_fn;
628 	load_ctx->cb_arg = cb_arg;
629 
630 	load_ctx->iov[0].iov_base = vol->backing_super;
631 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
632 	load_ctx->iov[1].iov_base = load_ctx->path;
633 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
634 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
635 	load_ctx->backing_cb_args.cb_arg = load_ctx;
636 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
637 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
638 				vol->backing_dev->blocklen,
639 				&load_ctx->backing_cb_args);
640 }
641 
642 void
643 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
644 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
645 {
646 	if (vol == NULL) {
647 		/* This indicates a programming error. */
648 		assert(false);
649 		cb_fn(cb_arg, -EINVAL);
650 		return;
651 	}
652 
653 	pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
654 
655 	vol->backing_dev->close(vol->backing_dev);
656 
657 	_init_load_cleanup(vol, NULL);
658 	cb_fn(cb_arg, 0);
659 }
660 
661 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
662