xref: /spdk/lib/reduce/reduce.c (revision 1fc4165fe9bf8512483356ad8e6d27f793f2e3db)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 /**
71  * Describes a persistent memory file used to hold metadata associated with a
72  *  compressed volume.
73  */
74 struct spdk_reduce_pm_file {
75 	char			path[REDUCE_PATH_MAX];
76 	void			*pm_buf;
77 	int			pm_is_pmem;
78 	uint64_t		size;
79 };
80 
81 struct spdk_reduce_vol_request {
82 	/**
83 	 *  Scratch buffer used for read/modify/write operations on
84 	 *  I/Os less than a full chunk size, and as the intermediate
85 	 *  buffer for compress/decompress operations.
86 	 */
87 	uint8_t					*buf;
88 	struct iovec				*buf_iov;
89 	struct iovec				*iov;
90 	struct spdk_reduce_vol			*vol;
91 	int					reduce_errno;
92 	int					iovcnt;
93 	int					num_backing_ops;
94 	uint64_t				offset;
95 	uint64_t				length;
96 	uint64_t				chunk_map_index;
97 	uint64_t				*chunk;
98 	spdk_reduce_vol_op_complete		cb_fn;
99 	void					*cb_arg;
100 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
101 	struct spdk_reduce_vol_cb_args		backing_cb_args;
102 };
103 
104 struct spdk_reduce_vol {
105 	struct spdk_reduce_vol_params		params;
106 	uint32_t				backing_io_units_per_chunk;
107 	uint32_t				backing_lba_per_io_unit;
108 	uint32_t				logical_blocks_per_chunk;
109 	struct spdk_reduce_pm_file		pm_file;
110 	struct spdk_reduce_backing_dev		*backing_dev;
111 	struct spdk_reduce_vol_superblock	*backing_super;
112 	struct spdk_reduce_vol_superblock	*pm_super;
113 	uint64_t				*pm_logical_map;
114 	uint64_t				*pm_chunk_maps;
115 
116 	struct spdk_bit_array			*allocated_chunk_maps;
117 	struct spdk_bit_array			*allocated_backing_io_units;
118 
119 	struct spdk_reduce_vol_request		*request_mem;
120 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
121 
122 	/* Single contiguous buffer used for all request buffers for this volume. */
123 	uint8_t					*reqbufspace;
124 	struct iovec				*buf_iov_mem;
125 };
126 
127 /*
128  * Allocate extra metadata chunks and corresponding backing io units to account for
129  *  outstanding IO in worst case scenario where logical map is completely allocated
130  *  and no data can be compressed.  We need extra chunks in this case to handle
131  *  in-flight writes since reduce never writes data in place.
132  */
133 #define REDUCE_NUM_EXTRA_CHUNKS 128
134 
135 static void
136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
137 {
138 	if (vol->pm_file.pm_is_pmem) {
139 		pmem_persist(addr, len);
140 	} else {
141 		pmem_msync(addr, len);
142 	}
143 }
144 
145 static uint64_t
146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
147 {
148 	uint64_t chunks_in_logical_map, logical_map_size;
149 
150 	chunks_in_logical_map = vol_size / chunk_size;
151 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
152 
153 	/* Round up to next cacheline. */
154 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
155 	       REDUCE_PM_SIZE_ALIGNMENT;
156 }
157 
158 static uint64_t
159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
160 {
161 	uint64_t num_chunks;
162 
163 	num_chunks = vol_size / chunk_size;
164 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
165 
166 	return num_chunks;
167 }
168 
169 static uint64_t
170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
171 {
172 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
173 
174 	num_chunks = _get_total_chunks(vol_size, chunk_size);
175 	io_units_per_chunk = chunk_size / backing_io_unit_size;
176 	total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t);
177 
178 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
179 	       REDUCE_PM_SIZE_ALIGNMENT;
180 }
181 
182 static uint64_t *
183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
184 {
185 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
186 
187 	return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk);
188 }
189 
190 static int
191 _validate_vol_params(struct spdk_reduce_vol_params *params)
192 {
193 	if (params->vol_size > 0) {
194 		/**
195 		 * User does not pass in the vol size - it gets calculated by libreduce from
196 		 *  values in this structure plus the size of the backing device.
197 		 */
198 		return -EINVAL;
199 	}
200 
201 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
202 	    params->logical_block_size == 0) {
203 		return -EINVAL;
204 	}
205 
206 	/* Chunk size must be an even multiple of the backing io unit size. */
207 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
208 		return -EINVAL;
209 	}
210 
211 	/* Chunk size must be an even multiple of the logical block size. */
212 	if ((params->chunk_size % params->logical_block_size) != 0) {
213 		return -1;
214 	}
215 
216 	return 0;
217 }
218 
219 static uint64_t
220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
221 {
222 	uint64_t num_chunks;
223 
224 	num_chunks = backing_dev_size / chunk_size;
225 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
226 		return 0;
227 	}
228 
229 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
230 	return num_chunks * chunk_size;
231 }
232 
233 static uint64_t
234 _get_pm_file_size(struct spdk_reduce_vol_params *params)
235 {
236 	uint64_t total_pm_size;
237 
238 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
239 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
240 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
241 			 params->backing_io_unit_size);
242 	return total_pm_size;
243 }
244 
245 const struct spdk_uuid *
246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
247 {
248 	return &vol->params.uuid;
249 }
250 
251 static void
252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
253 {
254 	/* Superblock is at the beginning of the pm file. */
255 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
256 
257 	/* Logical map immediately follows the super block. */
258 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
259 
260 	/* Chunks maps follow the logical map. */
261 	vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size);
262 }
263 
264 /* We need 2 iovs during load - one for the superblock, another for the path */
265 #define LOAD_IOV_COUNT	2
266 
267 struct reduce_init_load_ctx {
268 	struct spdk_reduce_vol			*vol;
269 	struct spdk_reduce_vol_cb_args		backing_cb_args;
270 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
271 	void					*cb_arg;
272 	struct iovec				iov[LOAD_IOV_COUNT];
273 	void					*path;
274 };
275 
276 static int
277 _allocate_vol_requests(struct spdk_reduce_vol *vol)
278 {
279 	struct spdk_reduce_vol_request *req;
280 	int i;
281 
282 	vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL);
283 	if (vol->reqbufspace == NULL) {
284 		return -ENOMEM;
285 	}
286 
287 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
288 	if (vol->request_mem == NULL) {
289 		spdk_dma_free(vol->reqbufspace);
290 		vol->reqbufspace = NULL;
291 		return -ENOMEM;
292 	}
293 
294 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
295 				  sizeof(struct iovec) * vol->backing_io_units_per_chunk);
296 	if (vol->buf_iov_mem == NULL) {
297 		free(vol->request_mem);
298 		spdk_dma_free(vol->reqbufspace);
299 		vol->request_mem = NULL;
300 		vol->reqbufspace = NULL;
301 		return -ENOMEM;
302 	}
303 
304 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
305 		req = &vol->request_mem[i];
306 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
307 		req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk];
308 		req->buf = vol->reqbufspace + i * vol->params.chunk_size;
309 	}
310 
311 	return 0;
312 }
313 
314 static void
315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
316 {
317 	if (ctx != NULL) {
318 		spdk_dma_free(ctx->path);
319 		free(ctx);
320 	}
321 
322 	if (vol != NULL) {
323 		pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
324 		spdk_dma_free(vol->backing_super);
325 		spdk_bit_array_free(&vol->allocated_chunk_maps);
326 		spdk_bit_array_free(&vol->allocated_backing_io_units);
327 		free(vol->request_mem);
328 		free(vol->buf_iov_mem);
329 		spdk_dma_free(vol->reqbufspace);
330 		free(vol);
331 	}
332 }
333 
334 static void
335 _init_write_super_cpl(void *cb_arg, int reduce_errno)
336 {
337 	struct reduce_init_load_ctx *init_ctx = cb_arg;
338 	int rc;
339 
340 	rc = _allocate_vol_requests(init_ctx->vol);
341 	if (rc != 0) {
342 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
343 		_init_load_cleanup(init_ctx->vol, init_ctx);
344 		return;
345 	}
346 
347 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
348 	/* Only clean up the ctx - the vol has been passed to the application
349 	 *  for use now that initialization was successful.
350 	 */
351 	_init_load_cleanup(NULL, init_ctx);
352 }
353 
354 static void
355 _init_write_path_cpl(void *cb_arg, int reduce_errno)
356 {
357 	struct reduce_init_load_ctx *init_ctx = cb_arg;
358 	struct spdk_reduce_vol *vol = init_ctx->vol;
359 
360 	init_ctx->iov[0].iov_base = vol->backing_super;
361 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
362 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
363 	init_ctx->backing_cb_args.cb_arg = init_ctx;
364 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
365 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
366 				 &init_ctx->backing_cb_args);
367 }
368 
369 static int
370 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
371 {
372 	uint64_t total_chunks, total_backing_io_units;
373 	uint32_t i, num_metadata_io_units;
374 
375 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
376 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
377 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
378 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
379 
380 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
381 		return -ENOMEM;
382 	}
383 
384 	/* Set backing io unit bits associated with metadata. */
385 	num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
386 				vol->backing_dev->blocklen;
387 	for (i = 0; i < num_metadata_io_units; i++) {
388 		spdk_bit_array_set(vol->allocated_backing_io_units, i);
389 	}
390 
391 	return 0;
392 }
393 
394 void
395 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
396 		     struct spdk_reduce_backing_dev *backing_dev,
397 		     const char *pm_file_dir,
398 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
399 {
400 	struct spdk_reduce_vol *vol;
401 	struct reduce_init_load_ctx *init_ctx;
402 	uint64_t backing_dev_size;
403 	size_t mapped_len;
404 	int dir_len, max_dir_len, rc;
405 
406 	/* We need to append a path separator and the UUID to the supplied
407 	 * path.
408 	 */
409 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
410 	dir_len = strnlen(pm_file_dir, max_dir_len);
411 	/* Strip trailing slash if the user provided one - we will add it back
412 	 * later when appending the filename.
413 	 */
414 	if (pm_file_dir[dir_len - 1] == '/') {
415 		dir_len--;
416 	}
417 	if (dir_len == max_dir_len) {
418 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
419 		cb_fn(cb_arg, NULL, -EINVAL);
420 		return;
421 	}
422 
423 	rc = _validate_vol_params(params);
424 	if (rc != 0) {
425 		SPDK_ERRLOG("invalid vol params\n");
426 		cb_fn(cb_arg, NULL, rc);
427 		return;
428 	}
429 
430 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
431 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
432 	if (params->vol_size == 0) {
433 		SPDK_ERRLOG("backing device is too small\n");
434 		cb_fn(cb_arg, NULL, -EINVAL);
435 		return;
436 	}
437 
438 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
439 	    backing_dev->unmap == NULL) {
440 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
441 		cb_fn(cb_arg, NULL, -EINVAL);
442 		return;
443 	}
444 
445 	vol = calloc(1, sizeof(*vol));
446 	if (vol == NULL) {
447 		cb_fn(cb_arg, NULL, -ENOMEM);
448 		return;
449 	}
450 
451 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL);
452 	if (vol->backing_super == NULL) {
453 		cb_fn(cb_arg, NULL, -ENOMEM);
454 		_init_load_cleanup(vol, NULL);
455 		return;
456 	}
457 
458 	init_ctx = calloc(1, sizeof(*init_ctx));
459 	if (init_ctx == NULL) {
460 		cb_fn(cb_arg, NULL, -ENOMEM);
461 		_init_load_cleanup(vol, NULL);
462 		return;
463 	}
464 
465 	init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL);
466 	if (init_ctx->path == NULL) {
467 		cb_fn(cb_arg, NULL, -ENOMEM);
468 		_init_load_cleanup(vol, init_ctx);
469 		return;
470 	}
471 
472 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
473 		spdk_uuid_generate(&params->uuid);
474 	}
475 
476 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
477 	vol->pm_file.path[dir_len] = '/';
478 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
479 			    &params->uuid);
480 	vol->pm_file.size = _get_pm_file_size(params);
481 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
482 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
483 					    &mapped_len, &vol->pm_file.pm_is_pmem);
484 	if (vol->pm_file.pm_buf == NULL) {
485 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
486 			    vol->pm_file.path, strerror(errno));
487 		cb_fn(cb_arg, NULL, -errno);
488 		_init_load_cleanup(vol, init_ctx);
489 		return;
490 	}
491 
492 	if (vol->pm_file.size != mapped_len) {
493 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
494 			    vol->pm_file.size, mapped_len);
495 		cb_fn(cb_arg, NULL, -ENOMEM);
496 		_init_load_cleanup(vol, init_ctx);
497 		return;
498 	}
499 
500 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
501 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
502 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
503 	memcpy(&vol->params, params, sizeof(*params));
504 
505 	vol->backing_dev = backing_dev;
506 
507 	rc = _allocate_bit_arrays(vol);
508 	if (rc != 0) {
509 		cb_fn(cb_arg, NULL, rc);
510 		_init_load_cleanup(vol, init_ctx);
511 		return;
512 	}
513 
514 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
515 	       sizeof(vol->backing_super->signature));
516 	memcpy(&vol->backing_super->params, params, sizeof(*params));
517 
518 	_initialize_vol_pm_pointers(vol);
519 
520 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
521 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
522 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
523 	 */
524 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
525 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
526 
527 	init_ctx->vol = vol;
528 	init_ctx->cb_fn = cb_fn;
529 	init_ctx->cb_arg = cb_arg;
530 
531 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
532 	init_ctx->iov[0].iov_base = init_ctx->path;
533 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
534 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
535 	init_ctx->backing_cb_args.cb_arg = init_ctx;
536 	/* Write path to offset 4K on backing device - just after where the super
537 	 *  block will be written.  We wait until this is committed before writing the
538 	 *  super block to guarantee we don't get the super block written without the
539 	 *  the path if the system crashed in the middle of a write operation.
540 	 */
541 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
542 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
543 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
544 				 &init_ctx->backing_cb_args);
545 }
546 
547 static void
548 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
549 {
550 	struct reduce_init_load_ctx *load_ctx = cb_arg;
551 	struct spdk_reduce_vol *vol = load_ctx->vol;
552 	uint64_t backing_dev_size;
553 	uint64_t i, num_chunks;
554 	uint64_t *chunk;
555 	size_t mapped_len;
556 	uint32_t j;
557 	int rc;
558 
559 	if (memcmp(vol->backing_super->signature,
560 		   SPDK_REDUCE_SIGNATURE,
561 		   sizeof(vol->backing_super->signature)) != 0) {
562 		/* This backing device isn't a libreduce backing device. */
563 		rc = -EILSEQ;
564 		goto error;
565 	}
566 
567 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
568 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
569 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
570 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
571 
572 	rc = _allocate_bit_arrays(vol);
573 	if (rc != 0) {
574 		goto error;
575 	}
576 
577 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
578 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
579 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
580 			    backing_dev_size);
581 		rc = -EILSEQ;
582 		goto error;
583 	}
584 
585 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
586 	vol->pm_file.size = _get_pm_file_size(&vol->params);
587 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
588 					    &vol->pm_file.pm_is_pmem);
589 	if (vol->pm_file.pm_buf == NULL) {
590 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
591 		rc = -errno;
592 		goto error;
593 	}
594 
595 	if (vol->pm_file.size != mapped_len) {
596 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
597 			    vol->pm_file.size, mapped_len);
598 		rc = -ENOMEM;
599 		goto error;
600 	}
601 
602 	rc = _allocate_vol_requests(vol);
603 	if (rc != 0) {
604 		goto error;
605 	}
606 
607 	_initialize_vol_pm_pointers(vol);
608 
609 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
610 	for (i = 0; i < num_chunks; i++) {
611 		if (vol->pm_logical_map[i] == REDUCE_EMPTY_MAP_ENTRY) {
612 			continue;
613 		}
614 		spdk_bit_array_set(vol->allocated_chunk_maps, i);
615 		chunk = _reduce_vol_get_chunk_map(vol, i);
616 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
617 			if (chunk[j] != REDUCE_EMPTY_MAP_ENTRY) {
618 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk[j]);
619 			}
620 		}
621 	}
622 
623 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
624 	/* Only clean up the ctx - the vol has been passed to the application
625 	 *  for use now that volume load was successful.
626 	 */
627 	_init_load_cleanup(NULL, load_ctx);
628 	return;
629 
630 error:
631 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
632 	_init_load_cleanup(vol, load_ctx);
633 }
634 
635 void
636 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
637 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
638 {
639 	struct spdk_reduce_vol *vol;
640 	struct reduce_init_load_ctx *load_ctx;
641 
642 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
643 	    backing_dev->unmap == NULL) {
644 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
645 		cb_fn(cb_arg, NULL, -EINVAL);
646 		return;
647 	}
648 
649 	vol = calloc(1, sizeof(*vol));
650 	if (vol == NULL) {
651 		cb_fn(cb_arg, NULL, -ENOMEM);
652 		return;
653 	}
654 
655 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL);
656 	if (vol->backing_super == NULL) {
657 		_init_load_cleanup(vol, NULL);
658 		cb_fn(cb_arg, NULL, -ENOMEM);
659 		return;
660 	}
661 
662 	vol->backing_dev = backing_dev;
663 
664 	load_ctx = calloc(1, sizeof(*load_ctx));
665 	if (load_ctx == NULL) {
666 		_init_load_cleanup(vol, NULL);
667 		cb_fn(cb_arg, NULL, -ENOMEM);
668 		return;
669 	}
670 
671 	load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL);
672 	if (load_ctx->path == NULL) {
673 		_init_load_cleanup(vol, load_ctx);
674 		cb_fn(cb_arg, NULL, -ENOMEM);
675 		return;
676 	}
677 
678 	load_ctx->vol = vol;
679 	load_ctx->cb_fn = cb_fn;
680 	load_ctx->cb_arg = cb_arg;
681 
682 	load_ctx->iov[0].iov_base = vol->backing_super;
683 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
684 	load_ctx->iov[1].iov_base = load_ctx->path;
685 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
686 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
687 	load_ctx->backing_cb_args.cb_arg = load_ctx;
688 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
689 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
690 				vol->backing_dev->blocklen,
691 				&load_ctx->backing_cb_args);
692 }
693 
694 void
695 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
696 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
697 {
698 	if (vol == NULL) {
699 		/* This indicates a programming error. */
700 		assert(false);
701 		cb_fn(cb_arg, -EINVAL);
702 		return;
703 	}
704 
705 	_init_load_cleanup(vol, NULL);
706 	cb_fn(cb_arg, 0);
707 }
708 
709 struct reduce_destroy_ctx {
710 	spdk_reduce_vol_op_complete		cb_fn;
711 	void					*cb_arg;
712 	struct spdk_reduce_vol			*vol;
713 	struct spdk_reduce_vol_superblock	*super;
714 	struct iovec				iov;
715 	struct spdk_reduce_vol_cb_args		backing_cb_args;
716 	int					reduce_errno;
717 	char					pm_path[REDUCE_PATH_MAX];
718 };
719 
720 static void
721 destroy_unload_cpl(void *cb_arg, int reduce_errno)
722 {
723 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
724 
725 	if (destroy_ctx->reduce_errno == 0) {
726 		if (unlink(destroy_ctx->pm_path)) {
727 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
728 				    destroy_ctx->pm_path, strerror(errno));
729 		}
730 	}
731 
732 	/* Even if the unload somehow failed, we still pass the destroy_ctx
733 	 * reduce_errno since that indicates whether or not the volume was
734 	 * actually destroyed.
735 	 */
736 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
737 	spdk_dma_free(destroy_ctx->super);
738 	free(destroy_ctx);
739 }
740 
741 static void
742 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
743 {
744 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
745 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
746 
747 	destroy_ctx->reduce_errno = reduce_errno;
748 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
749 }
750 
751 static void
752 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
753 {
754 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
755 
756 	if (reduce_errno != 0) {
757 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
758 		spdk_dma_free(destroy_ctx->super);
759 		free(destroy_ctx);
760 		return;
761 	}
762 
763 	destroy_ctx->vol = vol;
764 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
765 	destroy_ctx->iov.iov_base = destroy_ctx->super;
766 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
767 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
768 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
769 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
770 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
771 				 &destroy_ctx->backing_cb_args);
772 }
773 
774 void
775 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
776 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
777 {
778 	struct reduce_destroy_ctx *destroy_ctx;
779 
780 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
781 	if (destroy_ctx == NULL) {
782 		cb_fn(cb_arg, -ENOMEM);
783 		return;
784 	}
785 
786 	destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL);
787 	if (destroy_ctx->super == NULL) {
788 		free(destroy_ctx);
789 		cb_fn(cb_arg, -ENOMEM);
790 		return;
791 	}
792 	destroy_ctx->cb_fn = cb_fn;
793 	destroy_ctx->cb_arg = cb_arg;
794 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
795 }
796 
797 static bool
798 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
799 {
800 	uint64_t start_chunk, end_chunk;
801 
802 	start_chunk = offset / vol->logical_blocks_per_chunk;
803 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
804 
805 	return (start_chunk != end_chunk);
806 }
807 
808 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
809 
810 static void
811 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
812 {
813 	req->cb_fn(req->cb_arg, reduce_errno);
814 	TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq);
815 }
816 
817 static void
818 _write_complete_req(void *_req, int reduce_errno)
819 {
820 	struct spdk_reduce_vol_request *req = _req;
821 	struct spdk_reduce_vol *vol = req->vol;
822 	uint64_t logical_map_index, old_chunk_map_index;
823 	uint64_t *old_chunk;
824 	uint32_t i;
825 
826 	if (reduce_errno != 0) {
827 		req->reduce_errno = reduce_errno;
828 	}
829 
830 	assert(req->num_backing_ops > 0);
831 	if (--req->num_backing_ops > 0) {
832 		return;
833 	}
834 
835 	if (req->reduce_errno != 0) {
836 		_reduce_vol_complete_req(req, req->reduce_errno);
837 		return;
838 	}
839 
840 	logical_map_index = req->offset / vol->logical_blocks_per_chunk;
841 
842 	old_chunk_map_index = vol->pm_logical_map[logical_map_index];
843 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
844 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
845 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
846 			if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) {
847 				break;
848 			}
849 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true);
850 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]);
851 			old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY;
852 		}
853 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
854 	}
855 
856 	/*
857 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
858 	 * becomes invalid after we update the logical map, since the old chunk map will no
859 	 * longer have a reference to it in the logical map.
860 	 */
861 
862 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
863 	_reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk);
864 
865 	vol->pm_logical_map[logical_map_index] = req->chunk_map_index;
866 
867 	_reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t));
868 
869 	_reduce_vol_complete_req(req, 0);
870 }
871 
872 static void
873 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
874 		   reduce_request_fn next_fn, bool is_write)
875 {
876 	uint32_t i;
877 
878 	req->num_backing_ops = vol->backing_io_units_per_chunk;
879 	req->backing_cb_args.cb_fn = next_fn;
880 	req->backing_cb_args.cb_arg = req;
881 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
882 		req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size;
883 		req->buf_iov[i].iov_len = vol->params.backing_io_unit_size;
884 		if (is_write) {
885 			vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1,
886 						 req->chunk[i] * vol->backing_lba_per_io_unit,
887 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
888 		} else {
889 			vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1,
890 						req->chunk[i] * vol->backing_lba_per_io_unit,
891 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
892 		}
893 	}
894 }
895 
896 static void
897 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
898 {
899 	struct spdk_reduce_vol *vol = req->vol;
900 	uint32_t i;
901 
902 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
903 
904 	/* TODO: fail if no chunk map found - but really this should not happen if we
905 	 * size the number of requests similarly to number of extra chunk maps
906 	 */
907 	assert(req->chunk_map_index != UINT32_MAX);
908 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
909 
910 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
911 
912 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
913 		req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
914 		/* TODO: fail if no backing block found - but really this should also not
915 		 * happen (see comment above).
916 		 */
917 		assert(req->chunk[i] != UINT32_MAX);
918 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]);
919 	}
920 
921 	_issue_backing_ops(req, vol, next_fn, true /* write */);
922 }
923 
924 static void
925 _write_read_done(void *_req, int reduce_errno)
926 {
927 	struct spdk_reduce_vol_request *req = _req;
928 	uint64_t chunk_offset;
929 	uint8_t *buf;
930 	int i;
931 
932 	if (reduce_errno != 0) {
933 		req->reduce_errno = reduce_errno;
934 	}
935 
936 	assert(req->num_backing_ops > 0);
937 	if (--req->num_backing_ops > 0) {
938 		return;
939 	}
940 
941 	if (req->reduce_errno != 0) {
942 		_reduce_vol_complete_req(req, req->reduce_errno);
943 		return;
944 	}
945 
946 	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
947 	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
948 	for (i = 0; i < req->iovcnt; i++) {
949 		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
950 		buf += req->iov[i].iov_len;
951 	}
952 
953 	_reduce_vol_write_chunk(req, _write_complete_req);
954 }
955 
956 static void
957 _read_read_done(void *_req, int reduce_errno)
958 {
959 	struct spdk_reduce_vol_request *req = _req;
960 	uint64_t chunk_offset;
961 	uint8_t *buf;
962 	int i;
963 
964 	if (reduce_errno != 0) {
965 		req->reduce_errno = reduce_errno;
966 	}
967 
968 	assert(req->num_backing_ops > 0);
969 	if (--req->num_backing_ops > 0) {
970 		return;
971 	}
972 
973 	if (req->reduce_errno != 0) {
974 		_reduce_vol_complete_req(req, req->reduce_errno);
975 		return;
976 	}
977 
978 	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
979 	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
980 	for (i = 0; i < req->iovcnt; i++) {
981 		memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
982 		buf += req->iov[i].iov_len;
983 	}
984 	_reduce_vol_complete_req(req, 0);
985 }
986 
987 static void
988 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
989 {
990 	struct spdk_reduce_vol *vol = req->vol;
991 	uint64_t chunk;
992 
993 	chunk = req->offset / vol->logical_blocks_per_chunk;
994 	req->chunk_map_index = vol->pm_logical_map[chunk];
995 	assert(req->chunk_map_index != UINT32_MAX);
996 
997 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
998 	_issue_backing_ops(req, vol, next_fn, false /* read */);
999 }
1000 
1001 static bool
1002 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
1003 		    uint64_t length)
1004 {
1005 	uint64_t size = 0;
1006 	int i;
1007 
1008 	for (i = 0; i < iovcnt; i++) {
1009 		size += iov[i].iov_len;
1010 	}
1011 
1012 	return size == (length * vol->params.logical_block_size);
1013 }
1014 
1015 void
1016 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1017 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1018 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1019 {
1020 	struct spdk_reduce_vol_request *req;
1021 	uint64_t chunk;
1022 	int i;
1023 
1024 	if (length == 0) {
1025 		cb_fn(cb_arg, 0);
1026 		return;
1027 	}
1028 
1029 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1030 		cb_fn(cb_arg, -EINVAL);
1031 		return;
1032 	}
1033 
1034 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1035 		cb_fn(cb_arg, -EINVAL);
1036 		return;
1037 	}
1038 
1039 	chunk = offset / vol->logical_blocks_per_chunk;
1040 	if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) {
1041 		/*
1042 		 * This chunk hasn't been allocated.  So treat the data as all
1043 		 * zeroes for this chunk - do the memset and immediately complete
1044 		 * the operation.
1045 		 */
1046 		for (i = 0; i < iovcnt; i++) {
1047 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1048 		}
1049 		cb_fn(cb_arg, 0);
1050 		return;
1051 	}
1052 
1053 	req = TAILQ_FIRST(&vol->free_requests);
1054 	if (req == NULL) {
1055 		cb_fn(cb_arg, -ENOMEM);
1056 		return;
1057 	}
1058 
1059 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1060 	req->vol = vol;
1061 	req->iov = iov;
1062 	req->iovcnt = iovcnt;
1063 	req->offset = offset;
1064 	req->length = length;
1065 	req->cb_fn = cb_fn;
1066 	req->cb_arg = cb_arg;
1067 
1068 	_reduce_vol_read_chunk(req, _read_read_done);
1069 }
1070 
1071 void
1072 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1073 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1074 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1075 {
1076 	struct spdk_reduce_vol_request *req;
1077 	uint64_t chunk, chunk_offset;
1078 	uint32_t lbsize, lb_per_chunk;
1079 	int i;
1080 	uint8_t *buf;
1081 
1082 	if (length == 0) {
1083 		cb_fn(cb_arg, 0);
1084 		return;
1085 	}
1086 
1087 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1088 		cb_fn(cb_arg, -EINVAL);
1089 		return;
1090 	}
1091 
1092 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1093 		cb_fn(cb_arg, -EINVAL);
1094 		return;
1095 	}
1096 
1097 	req = TAILQ_FIRST(&vol->free_requests);
1098 	if (req == NULL) {
1099 		cb_fn(cb_arg, -ENOMEM);
1100 		return;
1101 	}
1102 
1103 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1104 	req->vol = vol;
1105 	req->iov = iov;
1106 	req->iovcnt = iovcnt;
1107 	req->offset = offset;
1108 	req->length = length;
1109 	req->cb_fn = cb_fn;
1110 	req->cb_arg = cb_arg;
1111 
1112 	chunk = offset / vol->logical_blocks_per_chunk;
1113 	if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) {
1114 		/* Read old chunk, then overwrite with data from this write operation.
1115 		 * TODO: bypass reading old chunk if this write operation overwrites
1116 		 * the entire chunk.
1117 		 */
1118 		_reduce_vol_read_chunk(req, _write_read_done);
1119 		return;
1120 	}
1121 
1122 	buf = req->buf;
1123 	lbsize = vol->params.logical_block_size;
1124 	lb_per_chunk = vol->logical_blocks_per_chunk;
1125 	/* Note: we must zero out parts of req->buf not specified by this write operation. */
1126 	chunk_offset = offset % lb_per_chunk;
1127 	if (chunk_offset != 0) {
1128 		memset(buf, 0, chunk_offset * lbsize);
1129 		buf += chunk_offset * lbsize;
1130 	}
1131 	for (i = 0; i < iovcnt; i++) {
1132 		memcpy(buf, iov[i].iov_base, iov[i].iov_len);
1133 		buf += iov[i].iov_len;
1134 	}
1135 	chunk_offset += length;
1136 	if (chunk_offset != lb_per_chunk) {
1137 		memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize);
1138 	}
1139 	_reduce_vol_write_chunk(req, _write_complete_req);
1140 }
1141 
1142 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1143