xref: /spdk/test/unit/lib/reduce/reduce.c/reduce_ut.c (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk_cunit.h"
38 
39 #include "reduce/reduce.c"
40 #include "spdk_internal/mock.h"
41 #include "common/lib/test_env.c"
42 
43 static struct spdk_reduce_vol *g_vol;
44 static int g_reduce_errno;
45 static char *g_volatile_pm_buf;
46 static size_t g_volatile_pm_buf_len;
47 static char *g_persistent_pm_buf;
48 static size_t g_persistent_pm_buf_len;
49 static char *g_backing_dev_buf;
50 static char g_path[REDUCE_PATH_MAX];
51 static char *g_decomp_buf;
52 static int g_decompressed_len;
53 
54 #define TEST_MD_PATH "/tmp"
55 
56 enum ut_reduce_bdev_io_type {
57 	UT_REDUCE_IO_READV = 1,
58 	UT_REDUCE_IO_WRITEV = 2,
59 	UT_REDUCE_IO_UNMAP = 3,
60 };
61 
62 struct ut_reduce_bdev_io {
63 	enum ut_reduce_bdev_io_type type;
64 	struct spdk_reduce_backing_dev *backing_dev;
65 	struct iovec *iov;
66 	int iovcnt;
67 	uint64_t lba;
68 	uint32_t lba_count;
69 	struct spdk_reduce_vol_cb_args *args;
70 	TAILQ_ENTRY(ut_reduce_bdev_io)	link;
71 };
72 
73 static bool g_defer_bdev_io = false;
74 static TAILQ_HEAD(, ut_reduce_bdev_io) g_pending_bdev_io =
75 	TAILQ_HEAD_INITIALIZER(g_pending_bdev_io);
76 static uint32_t g_pending_bdev_io_count = 0;
77 
78 static void
79 sync_pm_buf(const void *addr, size_t length)
80 {
81 	uint64_t offset = (char *)addr - g_volatile_pm_buf;
82 
83 	memcpy(&g_persistent_pm_buf[offset], addr, length);
84 }
85 
86 int
87 pmem_msync(const void *addr, size_t length)
88 {
89 	sync_pm_buf(addr, length);
90 	return 0;
91 }
92 
93 void
94 pmem_persist(const void *addr, size_t len)
95 {
96 	sync_pm_buf(addr, len);
97 }
98 
99 static void
100 get_pm_file_size(void)
101 {
102 	struct spdk_reduce_vol_params params;
103 	uint64_t pm_size, expected_pm_size;
104 
105 	params.backing_io_unit_size = 4096;
106 	params.chunk_size = 4096 * 4;
107 	params.vol_size = 4096 * 4 * 100;
108 
109 	pm_size = _get_pm_file_size(&params);
110 	expected_pm_size = sizeof(struct spdk_reduce_vol_superblock);
111 	/* 100 chunks in logical map * 8 bytes per chunk */
112 	expected_pm_size += 100 * sizeof(uint64_t);
113 	/* 100 chunks * (chunk struct size + 4 backing io units per chunk * 8 bytes per backing io unit) */
114 	expected_pm_size += 100 * (sizeof(struct spdk_reduce_chunk_map) + 4 * sizeof(uint64_t));
115 	/* reduce allocates some extra chunks too for in-flight writes when logical map
116 	 * is full.  REDUCE_EXTRA_CHUNKS is a private #ifdef in reduce.c Here we need the num chunks
117 	 * times (chunk struct size + 4 backing io units per chunk * 8 bytes per backing io unit).
118 	 */
119 	expected_pm_size += REDUCE_NUM_EXTRA_CHUNKS *
120 			    (sizeof(struct spdk_reduce_chunk_map) + 4 * sizeof(uint64_t));
121 	/* reduce will add some padding so numbers may not match exactly.  Make sure
122 	 * they are close though.
123 	 */
124 	CU_ASSERT((pm_size - expected_pm_size) <= REDUCE_PM_SIZE_ALIGNMENT);
125 }
126 
127 static void
128 get_vol_size(void)
129 {
130 	uint64_t chunk_size, backing_dev_size;
131 
132 	chunk_size = 16 * 1024;
133 	backing_dev_size = 16 * 1024 * 1000;
134 	CU_ASSERT(_get_vol_size(chunk_size, backing_dev_size) < backing_dev_size);
135 }
136 
137 void *
138 pmem_map_file(const char *path, size_t len, int flags, mode_t mode,
139 	      size_t *mapped_lenp, int *is_pmemp)
140 {
141 	CU_ASSERT(g_volatile_pm_buf == NULL);
142 	snprintf(g_path, sizeof(g_path), "%s", path);
143 	*is_pmemp = 1;
144 
145 	if (g_persistent_pm_buf == NULL) {
146 		g_persistent_pm_buf = calloc(1, len);
147 		g_persistent_pm_buf_len = len;
148 		SPDK_CU_ASSERT_FATAL(g_persistent_pm_buf != NULL);
149 	}
150 
151 	*mapped_lenp = g_persistent_pm_buf_len;
152 	g_volatile_pm_buf = calloc(1, g_persistent_pm_buf_len);
153 	SPDK_CU_ASSERT_FATAL(g_volatile_pm_buf != NULL);
154 	memcpy(g_volatile_pm_buf, g_persistent_pm_buf, g_persistent_pm_buf_len);
155 	g_volatile_pm_buf_len = g_persistent_pm_buf_len;
156 
157 	return g_volatile_pm_buf;
158 }
159 
160 int
161 pmem_unmap(void *addr, size_t len)
162 {
163 	CU_ASSERT(addr == g_volatile_pm_buf);
164 	CU_ASSERT(len == g_volatile_pm_buf_len);
165 	free(g_volatile_pm_buf);
166 	g_volatile_pm_buf = NULL;
167 	g_volatile_pm_buf_len = 0;
168 
169 	return 0;
170 }
171 
172 static void
173 persistent_pm_buf_destroy(void)
174 {
175 	CU_ASSERT(g_persistent_pm_buf != NULL);
176 	free(g_persistent_pm_buf);
177 	g_persistent_pm_buf = NULL;
178 	g_persistent_pm_buf_len = 0;
179 }
180 
181 static void
182 unlink_cb(void)
183 {
184 	persistent_pm_buf_destroy();
185 }
186 
187 static void
188 init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
189 {
190 	g_vol = vol;
191 	g_reduce_errno = reduce_errno;
192 }
193 
194 static void
195 load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
196 {
197 	g_vol = vol;
198 	g_reduce_errno = reduce_errno;
199 }
200 
201 static void
202 unload_cb(void *cb_arg, int reduce_errno)
203 {
204 	g_reduce_errno = reduce_errno;
205 }
206 
207 static void
208 init_failure(void)
209 {
210 	struct spdk_reduce_vol_params params = {};
211 	struct spdk_reduce_backing_dev backing_dev = {};
212 
213 	backing_dev.blocklen = 512;
214 	/* This blockcnt is too small for a reduce vol - there needs to be
215 	 *  enough space for at least REDUCE_NUM_EXTRA_CHUNKS + 1 chunks.
216 	 */
217 	backing_dev.blockcnt = 20;
218 
219 	params.vol_size = 0;
220 	params.chunk_size = 16 * 1024;
221 	params.backing_io_unit_size = backing_dev.blocklen;
222 	params.logical_block_size = 512;
223 
224 	/* backing_dev has an invalid size.  This should fail. */
225 	g_vol = NULL;
226 	g_reduce_errno = 0;
227 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
228 	CU_ASSERT(g_reduce_errno == -EINVAL);
229 	SPDK_CU_ASSERT_FATAL(g_vol == NULL);
230 
231 	/* backing_dev now has valid size, but backing_dev still has null
232 	 *  function pointers.  This should fail.
233 	 */
234 	backing_dev.blockcnt = 20000;
235 
236 	g_vol = NULL;
237 	g_reduce_errno = 0;
238 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
239 	CU_ASSERT(g_reduce_errno == -EINVAL);
240 	SPDK_CU_ASSERT_FATAL(g_vol == NULL);
241 }
242 
243 static void
244 backing_dev_readv_execute(struct spdk_reduce_backing_dev *backing_dev,
245 			  struct iovec *iov, int iovcnt,
246 			  uint64_t lba, uint32_t lba_count,
247 			  struct spdk_reduce_vol_cb_args *args)
248 {
249 	char *offset;
250 	int i;
251 
252 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
253 	for (i = 0; i < iovcnt; i++) {
254 		memcpy(iov[i].iov_base, offset, iov[i].iov_len);
255 		offset += iov[i].iov_len;
256 	}
257 	args->cb_fn(args->cb_arg, 0);
258 }
259 
260 static void
261 backing_dev_insert_io(enum ut_reduce_bdev_io_type type, struct spdk_reduce_backing_dev *backing_dev,
262 		      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
263 		      struct spdk_reduce_vol_cb_args *args)
264 {
265 	struct ut_reduce_bdev_io *ut_bdev_io;
266 
267 	ut_bdev_io = calloc(1, sizeof(*ut_bdev_io));
268 	SPDK_CU_ASSERT_FATAL(ut_bdev_io != NULL);
269 
270 	ut_bdev_io->type = type;
271 	ut_bdev_io->backing_dev = backing_dev;
272 	ut_bdev_io->iov = iov;
273 	ut_bdev_io->iovcnt = iovcnt;
274 	ut_bdev_io->lba = lba;
275 	ut_bdev_io->lba_count = lba_count;
276 	ut_bdev_io->args = args;
277 	TAILQ_INSERT_TAIL(&g_pending_bdev_io, ut_bdev_io, link);
278 	g_pending_bdev_io_count++;
279 }
280 
281 static void
282 backing_dev_readv(struct spdk_reduce_backing_dev *backing_dev, struct iovec *iov, int iovcnt,
283 		  uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
284 {
285 	if (g_defer_bdev_io == false) {
286 		CU_ASSERT(g_pending_bdev_io_count == 0);
287 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
288 		backing_dev_readv_execute(backing_dev, iov, iovcnt, lba, lba_count, args);
289 		return;
290 	}
291 
292 	backing_dev_insert_io(UT_REDUCE_IO_READV, backing_dev, iov, iovcnt, lba, lba_count, args);
293 }
294 
295 static void
296 backing_dev_writev_execute(struct spdk_reduce_backing_dev *backing_dev,
297 			   struct iovec *iov, int iovcnt,
298 			   uint64_t lba, uint32_t lba_count,
299 			   struct spdk_reduce_vol_cb_args *args)
300 {
301 	char *offset;
302 	int i;
303 
304 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
305 	for (i = 0; i < iovcnt; i++) {
306 		memcpy(offset, iov[i].iov_base, iov[i].iov_len);
307 		offset += iov[i].iov_len;
308 	}
309 	args->cb_fn(args->cb_arg, 0);
310 }
311 
312 static void
313 backing_dev_writev(struct spdk_reduce_backing_dev *backing_dev, struct iovec *iov, int iovcnt,
314 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
315 {
316 	if (g_defer_bdev_io == false) {
317 		CU_ASSERT(g_pending_bdev_io_count == 0);
318 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
319 		backing_dev_writev_execute(backing_dev, iov, iovcnt, lba, lba_count, args);
320 		return;
321 	}
322 
323 	backing_dev_insert_io(UT_REDUCE_IO_WRITEV, backing_dev, iov, iovcnt, lba, lba_count, args);
324 }
325 
326 static void
327 backing_dev_unmap_execute(struct spdk_reduce_backing_dev *backing_dev,
328 			  uint64_t lba, uint32_t lba_count,
329 			  struct spdk_reduce_vol_cb_args *args)
330 {
331 	char *offset;
332 
333 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
334 	memset(offset, 0, lba_count * backing_dev->blocklen);
335 	args->cb_fn(args->cb_arg, 0);
336 }
337 
338 static void
339 backing_dev_unmap(struct spdk_reduce_backing_dev *backing_dev,
340 		  uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
341 {
342 	if (g_defer_bdev_io == false) {
343 		CU_ASSERT(g_pending_bdev_io_count == 0);
344 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
345 		backing_dev_unmap_execute(backing_dev, lba, lba_count, args);
346 		return;
347 	}
348 
349 	backing_dev_insert_io(UT_REDUCE_IO_UNMAP, backing_dev, NULL, 0, lba, lba_count, args);
350 }
351 
352 static void
353 backing_dev_io_execute(uint32_t count)
354 {
355 	struct ut_reduce_bdev_io *ut_bdev_io;
356 	uint32_t done = 0;
357 
358 	CU_ASSERT(g_defer_bdev_io == true);
359 	while (!TAILQ_EMPTY(&g_pending_bdev_io) && (count == 0 || done < count)) {
360 		ut_bdev_io = TAILQ_FIRST(&g_pending_bdev_io);
361 		TAILQ_REMOVE(&g_pending_bdev_io, ut_bdev_io, link);
362 		g_pending_bdev_io_count--;
363 		switch (ut_bdev_io->type) {
364 		case UT_REDUCE_IO_READV:
365 			backing_dev_readv_execute(ut_bdev_io->backing_dev,
366 						  ut_bdev_io->iov, ut_bdev_io->iovcnt,
367 						  ut_bdev_io->lba, ut_bdev_io->lba_count,
368 						  ut_bdev_io->args);
369 			break;
370 		case UT_REDUCE_IO_WRITEV:
371 			backing_dev_writev_execute(ut_bdev_io->backing_dev,
372 						   ut_bdev_io->iov, ut_bdev_io->iovcnt,
373 						   ut_bdev_io->lba, ut_bdev_io->lba_count,
374 						   ut_bdev_io->args);
375 			break;
376 		case UT_REDUCE_IO_UNMAP:
377 			backing_dev_unmap_execute(ut_bdev_io->backing_dev,
378 						  ut_bdev_io->lba, ut_bdev_io->lba_count,
379 						  ut_bdev_io->args);
380 			break;
381 		default:
382 			CU_ASSERT(false);
383 			break;
384 		}
385 		free(ut_bdev_io);
386 		done++;
387 	}
388 }
389 
390 static int
391 ut_compress(char *outbuf, uint32_t *compressed_len, char *inbuf, uint32_t inbuflen)
392 {
393 	uint32_t len = 0;
394 	uint8_t count;
395 	char last;
396 
397 	while (true) {
398 		if (inbuflen == 0) {
399 			*compressed_len = len;
400 			return 0;
401 		}
402 
403 		if (*compressed_len < (len + 2)) {
404 			return -ENOSPC;
405 		}
406 
407 		last = *inbuf;
408 		count = 1;
409 		inbuflen--;
410 		inbuf++;
411 
412 		while (inbuflen > 0 && *inbuf == last && count < UINT8_MAX) {
413 			count++;
414 			inbuflen--;
415 			inbuf++;
416 		}
417 
418 		outbuf[len] = count;
419 		outbuf[len + 1] = last;
420 		len += 2;
421 	}
422 }
423 
424 static int
425 ut_decompress(uint8_t *outbuf, uint32_t *compressed_len, uint8_t *inbuf, uint32_t inbuflen)
426 {
427 	uint32_t len = 0;
428 
429 	SPDK_CU_ASSERT_FATAL(inbuflen % 2 == 0);
430 
431 	while (true) {
432 		if (inbuflen == 0) {
433 			*compressed_len = len;
434 			return 0;
435 		}
436 
437 		if ((len + inbuf[0]) > *compressed_len) {
438 			return -ENOSPC;
439 		}
440 
441 		memset(outbuf, inbuf[1], inbuf[0]);
442 		outbuf += inbuf[0];
443 		len += inbuf[0];
444 		inbuflen -= 2;
445 		inbuf += 2;
446 	}
447 }
448 
449 static void
450 ut_build_data_buffer(uint8_t *data, uint32_t data_len, uint8_t init_val, uint32_t repeat)
451 {
452 	uint32_t _repeat = repeat;
453 
454 	SPDK_CU_ASSERT_FATAL(repeat > 0);
455 
456 	while (data_len > 0) {
457 		*data = init_val;
458 		data++;
459 		data_len--;
460 		_repeat--;
461 		if (_repeat == 0) {
462 			init_val++;
463 			_repeat = repeat;
464 		}
465 	}
466 }
467 
468 static void
469 backing_dev_compress(struct spdk_reduce_backing_dev *backing_dev,
470 		     struct iovec *src_iov, int src_iovcnt,
471 		     struct iovec *dst_iov, int dst_iovcnt,
472 		     struct spdk_reduce_vol_cb_args *args)
473 {
474 	uint32_t compressed_len;
475 	uint64_t total_length = 0;
476 	char *buf = g_decomp_buf;
477 	int rc, i;
478 
479 	CU_ASSERT(dst_iovcnt == 1);
480 
481 	for (i = 0; i < src_iovcnt; i++) {
482 		memcpy(buf, src_iov[i].iov_base, src_iov[i].iov_len);
483 		buf += src_iov[i].iov_len;
484 		total_length += src_iov[i].iov_len;
485 	}
486 
487 	compressed_len = dst_iov[0].iov_len;
488 	rc = ut_compress(dst_iov[0].iov_base, &compressed_len,
489 			 g_decomp_buf, total_length);
490 
491 	args->cb_fn(args->cb_arg, rc ? rc : (int)compressed_len);
492 }
493 
494 static void
495 backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
496 		       struct iovec *src_iov, int src_iovcnt,
497 		       struct iovec *dst_iov, int dst_iovcnt,
498 		       struct spdk_reduce_vol_cb_args *args)
499 {
500 	uint32_t decompressed_len = 0;
501 	char *buf = g_decomp_buf;
502 	int rc, i;
503 
504 	CU_ASSERT(src_iovcnt == 1);
505 
506 	for (i = 0; i < dst_iovcnt; i++) {
507 		decompressed_len += dst_iov[i].iov_len;
508 	}
509 
510 	rc = ut_decompress(g_decomp_buf, &decompressed_len,
511 			   src_iov[0].iov_base, src_iov[0].iov_len);
512 
513 	for (i = 0; i < dst_iovcnt; i++) {
514 		memcpy(dst_iov[i].iov_base, buf, dst_iov[i].iov_len);
515 		buf += dst_iov[i].iov_len;
516 	}
517 
518 	args->cb_fn(args->cb_arg, rc ? rc : (int)decompressed_len);
519 }
520 
521 static void
522 backing_dev_destroy(struct spdk_reduce_backing_dev *backing_dev)
523 {
524 	/* We don't free this during backing_dev_close so that we can test init/unload/load
525 	 *  scenarios.
526 	 */
527 	free(g_backing_dev_buf);
528 	free(g_decomp_buf);
529 	g_backing_dev_buf = NULL;
530 }
531 
532 static void
533 backing_dev_init(struct spdk_reduce_backing_dev *backing_dev, struct spdk_reduce_vol_params *params,
534 		 uint32_t backing_blocklen)
535 {
536 	int64_t size;
537 
538 	size = 4 * 1024 * 1024;
539 	backing_dev->blocklen = backing_blocklen;
540 	backing_dev->blockcnt = size / backing_dev->blocklen;
541 	backing_dev->readv = backing_dev_readv;
542 	backing_dev->writev = backing_dev_writev;
543 	backing_dev->unmap = backing_dev_unmap;
544 	backing_dev->compress = backing_dev_compress;
545 	backing_dev->decompress = backing_dev_decompress;
546 	backing_dev->sgl_in = true;
547 	backing_dev->sgl_out = true;
548 
549 	g_decomp_buf = calloc(1, params->chunk_size);
550 	SPDK_CU_ASSERT_FATAL(g_decomp_buf != NULL);
551 
552 	g_backing_dev_buf = calloc(1, size);
553 	SPDK_CU_ASSERT_FATAL(g_backing_dev_buf != NULL);
554 }
555 
556 static void
557 init_md(void)
558 {
559 	struct spdk_reduce_vol_params params = {};
560 	struct spdk_reduce_vol_params *persistent_params;
561 	struct spdk_reduce_backing_dev backing_dev = {};
562 	struct spdk_uuid uuid;
563 	uint64_t *entry;
564 
565 	params.chunk_size = 16 * 1024;
566 	params.backing_io_unit_size = 512;
567 	params.logical_block_size = 512;
568 
569 	backing_dev_init(&backing_dev, &params, 512);
570 
571 	g_vol = NULL;
572 	g_reduce_errno = -1;
573 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
574 	CU_ASSERT(g_reduce_errno == 0);
575 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
576 	/* Confirm that reduce persisted the params to metadata. */
577 	CU_ASSERT(memcmp(g_persistent_pm_buf, SPDK_REDUCE_SIGNATURE, 8) == 0);
578 	persistent_params = (struct spdk_reduce_vol_params *)(g_persistent_pm_buf + 8);
579 	CU_ASSERT(memcmp(persistent_params, &params, sizeof(params)) == 0);
580 	/* Now confirm that contents of pm_file after the superblock have been initialized
581 	 *  to REDUCE_EMPTY_MAP_ENTRY.
582 	 */
583 	entry = (uint64_t *)(g_persistent_pm_buf + sizeof(struct spdk_reduce_vol_superblock));
584 	while (entry != (uint64_t *)(g_persistent_pm_buf + g_vol->pm_file.size)) {
585 		CU_ASSERT(*entry == REDUCE_EMPTY_MAP_ENTRY);
586 		entry++;
587 	}
588 
589 	/* Check that the pm file path was constructed correctly.  It should be in
590 	 * the form:
591 	 * TEST_MD_PATH + "/" + <uuid string>
592 	 */
593 	CU_ASSERT(strncmp(&g_path[0], TEST_MD_PATH, strlen(TEST_MD_PATH)) == 0);
594 	CU_ASSERT(g_path[strlen(TEST_MD_PATH)] == '/');
595 	CU_ASSERT(spdk_uuid_parse(&uuid, &g_path[strlen(TEST_MD_PATH) + 1]) == 0);
596 	CU_ASSERT(spdk_uuid_compare(&uuid, spdk_reduce_vol_get_uuid(g_vol)) == 0);
597 
598 	g_reduce_errno = -1;
599 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
600 	CU_ASSERT(g_reduce_errno == 0);
601 	CU_ASSERT(g_volatile_pm_buf == NULL);
602 
603 	persistent_pm_buf_destroy();
604 	backing_dev_destroy(&backing_dev);
605 }
606 
607 static void
608 _init_backing_dev(uint32_t backing_blocklen)
609 {
610 	struct spdk_reduce_vol_params params = {};
611 	struct spdk_reduce_vol_params *persistent_params;
612 	struct spdk_reduce_backing_dev backing_dev = {};
613 
614 	params.chunk_size = 16 * 1024;
615 	params.backing_io_unit_size = 512;
616 	params.logical_block_size = 512;
617 	spdk_uuid_generate(&params.uuid);
618 
619 	backing_dev_init(&backing_dev, &params, backing_blocklen);
620 
621 	g_vol = NULL;
622 	memset(g_path, 0, sizeof(g_path));
623 	g_reduce_errno = -1;
624 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
625 	CU_ASSERT(g_reduce_errno == 0);
626 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
627 	CU_ASSERT(strncmp(TEST_MD_PATH, g_path, strlen(TEST_MD_PATH)) == 0);
628 	/* Confirm that libreduce persisted the params to the backing device. */
629 	CU_ASSERT(memcmp(g_backing_dev_buf, SPDK_REDUCE_SIGNATURE, 8) == 0);
630 	persistent_params = (struct spdk_reduce_vol_params *)(g_backing_dev_buf + 8);
631 	CU_ASSERT(memcmp(persistent_params, &params, sizeof(params)) == 0);
632 	/* Confirm that the path to the persistent memory metadata file was persisted to
633 	 *  the backing device.
634 	 */
635 	CU_ASSERT(strncmp(g_path,
636 			  g_backing_dev_buf + REDUCE_BACKING_DEV_PATH_OFFSET,
637 			  REDUCE_PATH_MAX) == 0);
638 
639 	g_reduce_errno = -1;
640 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
641 	CU_ASSERT(g_reduce_errno == 0);
642 
643 	persistent_pm_buf_destroy();
644 	backing_dev_destroy(&backing_dev);
645 }
646 
647 static void
648 init_backing_dev(void)
649 {
650 	_init_backing_dev(512);
651 	_init_backing_dev(4096);
652 }
653 
654 static void
655 _load(uint32_t backing_blocklen)
656 {
657 	struct spdk_reduce_vol_params params = {};
658 	struct spdk_reduce_backing_dev backing_dev = {};
659 	char pmem_file_path[REDUCE_PATH_MAX];
660 
661 	params.chunk_size = 16 * 1024;
662 	params.backing_io_unit_size = 512;
663 	params.logical_block_size = 512;
664 	spdk_uuid_generate(&params.uuid);
665 
666 	backing_dev_init(&backing_dev, &params, backing_blocklen);
667 
668 	g_vol = NULL;
669 	g_reduce_errno = -1;
670 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
671 	CU_ASSERT(g_reduce_errno == 0);
672 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
673 	CU_ASSERT(strncmp(TEST_MD_PATH, g_path, strlen(TEST_MD_PATH)) == 0);
674 	memcpy(pmem_file_path, g_path, sizeof(pmem_file_path));
675 
676 	g_reduce_errno = -1;
677 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
678 	CU_ASSERT(g_reduce_errno == 0);
679 
680 	g_vol = NULL;
681 	memset(g_path, 0, sizeof(g_path));
682 	g_reduce_errno = -1;
683 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
684 	CU_ASSERT(g_reduce_errno == 0);
685 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
686 	CU_ASSERT(strncmp(g_path, pmem_file_path, sizeof(pmem_file_path)) == 0);
687 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
688 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
689 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
690 
691 	g_reduce_errno = -1;
692 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
693 	CU_ASSERT(g_reduce_errno == 0);
694 
695 	persistent_pm_buf_destroy();
696 	backing_dev_destroy(&backing_dev);
697 }
698 
699 static void
700 load(void)
701 {
702 	_load(512);
703 	_load(4096);
704 }
705 
706 static uint64_t
707 _vol_get_chunk_map_index(struct spdk_reduce_vol *vol, uint64_t offset)
708 {
709 	uint64_t logical_map_index = offset / vol->logical_blocks_per_chunk;
710 
711 	return vol->pm_logical_map[logical_map_index];
712 }
713 
714 static void
715 write_cb(void *arg, int reduce_errno)
716 {
717 	g_reduce_errno = reduce_errno;
718 }
719 
720 static void
721 read_cb(void *arg, int reduce_errno)
722 {
723 	g_reduce_errno = reduce_errno;
724 }
725 
726 static void
727 _write_maps(uint32_t backing_blocklen)
728 {
729 	struct spdk_reduce_vol_params params = {};
730 	struct spdk_reduce_backing_dev backing_dev = {};
731 	struct iovec iov;
732 	const int bufsize = 16 * 1024; /* chunk size */
733 	char buf[bufsize];
734 	uint32_t num_lbas, i;
735 	uint64_t old_chunk0_map_index, new_chunk0_map_index;
736 	struct spdk_reduce_chunk_map *old_chunk0_map, *new_chunk0_map;
737 
738 	params.chunk_size = bufsize;
739 	params.backing_io_unit_size = 4096;
740 	params.logical_block_size = 512;
741 	num_lbas = bufsize / params.logical_block_size;
742 	spdk_uuid_generate(&params.uuid);
743 
744 	backing_dev_init(&backing_dev, &params, backing_blocklen);
745 
746 	g_vol = NULL;
747 	g_reduce_errno = -1;
748 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
749 	CU_ASSERT(g_reduce_errno == 0);
750 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
751 
752 	for (i = 0; i < g_vol->params.vol_size / g_vol->params.chunk_size; i++) {
753 		CU_ASSERT(_vol_get_chunk_map_index(g_vol, i) == REDUCE_EMPTY_MAP_ENTRY);
754 	}
755 
756 	ut_build_data_buffer(buf, bufsize, 0x00, 1);
757 	iov.iov_base = buf;
758 	iov.iov_len = bufsize;
759 	g_reduce_errno = -1;
760 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, num_lbas, write_cb, NULL);
761 	CU_ASSERT(g_reduce_errno == 0);
762 
763 	old_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
764 	CU_ASSERT(old_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
765 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == true);
766 
767 	old_chunk0_map = _reduce_vol_get_chunk_map(g_vol, old_chunk0_map_index);
768 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
769 		CU_ASSERT(old_chunk0_map->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
770 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
771 					     old_chunk0_map->io_unit_index[i]) == true);
772 	}
773 
774 	g_reduce_errno = -1;
775 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, num_lbas, write_cb, NULL);
776 	CU_ASSERT(g_reduce_errno == 0);
777 
778 	new_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
779 	CU_ASSERT(new_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
780 	CU_ASSERT(new_chunk0_map_index != old_chunk0_map_index);
781 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, new_chunk0_map_index) == true);
782 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == false);
783 
784 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
785 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
786 					     old_chunk0_map->io_unit_index[i]) == false);
787 	}
788 
789 	new_chunk0_map = _reduce_vol_get_chunk_map(g_vol, new_chunk0_map_index);
790 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
791 		CU_ASSERT(new_chunk0_map->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
792 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
793 					     new_chunk0_map->io_unit_index[i]) == true);
794 	}
795 
796 	g_reduce_errno = -1;
797 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
798 	CU_ASSERT(g_reduce_errno == 0);
799 
800 	g_vol = NULL;
801 	g_reduce_errno = -1;
802 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
803 	CU_ASSERT(g_reduce_errno == 0);
804 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
805 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
806 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
807 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
808 
809 	g_reduce_errno = -1;
810 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
811 	CU_ASSERT(g_reduce_errno == 0);
812 
813 	persistent_pm_buf_destroy();
814 	backing_dev_destroy(&backing_dev);
815 }
816 
817 static void
818 write_maps(void)
819 {
820 	_write_maps(512);
821 	_write_maps(4096);
822 }
823 
824 static void
825 _read_write(uint32_t backing_blocklen)
826 {
827 	struct spdk_reduce_vol_params params = {};
828 	struct spdk_reduce_backing_dev backing_dev = {};
829 	struct iovec iov;
830 	char buf[16 * 1024]; /* chunk size */
831 	char compare_buf[16 * 1024];
832 	uint32_t i;
833 
834 	params.chunk_size = 16 * 1024;
835 	params.backing_io_unit_size = 4096;
836 	params.logical_block_size = 512;
837 	spdk_uuid_generate(&params.uuid);
838 
839 	backing_dev_init(&backing_dev, &params, backing_blocklen);
840 
841 	g_vol = NULL;
842 	g_reduce_errno = -1;
843 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
844 	CU_ASSERT(g_reduce_errno == 0);
845 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
846 
847 	/* Write 0xAA to 2 512-byte logical blocks, starting at LBA 2. */
848 	memset(buf, 0xAA, 2 * params.logical_block_size);
849 	iov.iov_base = buf;
850 	iov.iov_len = 2 * params.logical_block_size;
851 	g_reduce_errno = -1;
852 	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
853 	CU_ASSERT(g_reduce_errno == 0);
854 
855 	memset(compare_buf, 0xAA, sizeof(compare_buf));
856 	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
857 		memset(buf, 0xFF, params.logical_block_size);
858 		iov.iov_base = buf;
859 		iov.iov_len = params.logical_block_size;
860 		g_reduce_errno = -1;
861 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
862 		CU_ASSERT(g_reduce_errno == 0);
863 
864 		switch (i) {
865 		case 2:
866 		case 3:
867 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
868 			break;
869 		default:
870 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
871 			break;
872 		}
873 	}
874 
875 	g_reduce_errno = -1;
876 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
877 	CU_ASSERT(g_reduce_errno == 0);
878 
879 	/* Overwrite what we just wrote with 0xCC */
880 	g_vol = NULL;
881 	g_reduce_errno = -1;
882 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
883 	CU_ASSERT(g_reduce_errno == 0);
884 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
885 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
886 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
887 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
888 
889 	memset(buf, 0xCC, 2 * params.logical_block_size);
890 	iov.iov_base = buf;
891 	iov.iov_len = 2 * params.logical_block_size;
892 	g_reduce_errno = -1;
893 	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
894 	CU_ASSERT(g_reduce_errno == 0);
895 
896 	memset(compare_buf, 0xCC, sizeof(compare_buf));
897 	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
898 		memset(buf, 0xFF, params.logical_block_size);
899 		iov.iov_base = buf;
900 		iov.iov_len = params.logical_block_size;
901 		g_reduce_errno = -1;
902 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
903 		CU_ASSERT(g_reduce_errno == 0);
904 
905 		switch (i) {
906 		case 2:
907 		case 3:
908 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
909 			break;
910 		default:
911 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
912 			break;
913 		}
914 	}
915 
916 	g_reduce_errno = -1;
917 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
918 	CU_ASSERT(g_reduce_errno == 0);
919 
920 	g_vol = NULL;
921 	g_reduce_errno = -1;
922 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
923 	CU_ASSERT(g_reduce_errno == 0);
924 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
925 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
926 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
927 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
928 
929 	g_reduce_errno = -1;
930 
931 	/* Write 0xBB to 2 512-byte logical blocks, starting at LBA 37.
932 	 * This is writing into the second chunk of the volume.  This also
933 	 * enables implicitly checking that we reloaded the bit arrays
934 	 * correctly - making sure we don't use the first chunk map again
935 	 * for this new write - the first chunk map was already used by the
936 	 * write from before we unloaded and reloaded.
937 	 */
938 	memset(buf, 0xBB, 2 * params.logical_block_size);
939 	iov.iov_base = buf;
940 	iov.iov_len = 2 * params.logical_block_size;
941 	g_reduce_errno = -1;
942 	spdk_reduce_vol_writev(g_vol, &iov, 1, 37, 2, write_cb, NULL);
943 	CU_ASSERT(g_reduce_errno == 0);
944 
945 	for (i = 0; i < 2 * params.chunk_size / params.logical_block_size; i++) {
946 		memset(buf, 0xFF, params.logical_block_size);
947 		iov.iov_base = buf;
948 		iov.iov_len = params.logical_block_size;
949 		g_reduce_errno = -1;
950 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
951 		CU_ASSERT(g_reduce_errno == 0);
952 
953 		switch (i) {
954 		case 2:
955 		case 3:
956 			memset(compare_buf, 0xCC, sizeof(compare_buf));
957 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
958 			break;
959 		case 37:
960 		case 38:
961 			memset(compare_buf, 0xBB, sizeof(compare_buf));
962 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
963 			break;
964 		default:
965 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
966 			break;
967 		}
968 	}
969 
970 	g_reduce_errno = -1;
971 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
972 	CU_ASSERT(g_reduce_errno == 0);
973 
974 	persistent_pm_buf_destroy();
975 	backing_dev_destroy(&backing_dev);
976 }
977 
978 static void
979 read_write(void)
980 {
981 	_read_write(512);
982 	_read_write(4096);
983 }
984 
985 static void
986 _readv_writev(uint32_t backing_blocklen)
987 {
988 	struct spdk_reduce_vol_params params = {};
989 	struct spdk_reduce_backing_dev backing_dev = {};
990 	struct iovec iov[REDUCE_MAX_IOVECS + 1];
991 
992 	params.chunk_size = 16 * 1024;
993 	params.backing_io_unit_size = 4096;
994 	params.logical_block_size = 512;
995 	spdk_uuid_generate(&params.uuid);
996 
997 	backing_dev_init(&backing_dev, &params, backing_blocklen);
998 
999 	g_vol = NULL;
1000 	g_reduce_errno = -1;
1001 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1002 	CU_ASSERT(g_reduce_errno == 0);
1003 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1004 
1005 	g_reduce_errno = -1;
1006 	spdk_reduce_vol_writev(g_vol, iov, REDUCE_MAX_IOVECS + 1, 2, REDUCE_MAX_IOVECS + 1, write_cb, NULL);
1007 	CU_ASSERT(g_reduce_errno == -EINVAL);
1008 
1009 	g_reduce_errno = -1;
1010 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1011 	CU_ASSERT(g_reduce_errno == 0);
1012 
1013 	persistent_pm_buf_destroy();
1014 	backing_dev_destroy(&backing_dev);
1015 }
1016 
1017 static void
1018 readv_writev(void)
1019 {
1020 	_readv_writev(512);
1021 	_readv_writev(4096);
1022 }
1023 
1024 static void
1025 destroy_cb(void *ctx, int reduce_errno)
1026 {
1027 	g_reduce_errno = reduce_errno;
1028 }
1029 
1030 static void
1031 destroy(void)
1032 {
1033 	struct spdk_reduce_vol_params params = {};
1034 	struct spdk_reduce_backing_dev backing_dev = {};
1035 
1036 	params.chunk_size = 16 * 1024;
1037 	params.backing_io_unit_size = 512;
1038 	params.logical_block_size = 512;
1039 	spdk_uuid_generate(&params.uuid);
1040 
1041 	backing_dev_init(&backing_dev, &params, 512);
1042 
1043 	g_vol = NULL;
1044 	g_reduce_errno = -1;
1045 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1046 	CU_ASSERT(g_reduce_errno == 0);
1047 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1048 
1049 	g_reduce_errno = -1;
1050 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1051 	CU_ASSERT(g_reduce_errno == 0);
1052 
1053 	g_vol = NULL;
1054 	g_reduce_errno = -1;
1055 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
1056 	CU_ASSERT(g_reduce_errno == 0);
1057 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1058 
1059 	g_reduce_errno = -1;
1060 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1061 	CU_ASSERT(g_reduce_errno == 0);
1062 
1063 	g_reduce_errno = -1;
1064 	MOCK_CLEAR(spdk_malloc);
1065 	MOCK_CLEAR(spdk_zmalloc);
1066 	spdk_reduce_vol_destroy(&backing_dev, destroy_cb, NULL);
1067 	CU_ASSERT(g_reduce_errno == 0);
1068 
1069 	g_reduce_errno = 0;
1070 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
1071 	CU_ASSERT(g_reduce_errno == -EILSEQ);
1072 
1073 	backing_dev_destroy(&backing_dev);
1074 }
1075 
1076 /* This test primarily checks that the reduce unit test infrastructure for asynchronous
1077  * backing device I/O operations is working correctly.
1078  */
1079 static void
1080 defer_bdev_io(void)
1081 {
1082 	struct spdk_reduce_vol_params params = {};
1083 	struct spdk_reduce_backing_dev backing_dev = {};
1084 	const uint32_t logical_block_size = 512;
1085 	struct iovec iov;
1086 	char buf[logical_block_size];
1087 	char compare_buf[logical_block_size];
1088 
1089 	params.chunk_size = 16 * 1024;
1090 	params.backing_io_unit_size = 4096;
1091 	params.logical_block_size = logical_block_size;
1092 	spdk_uuid_generate(&params.uuid);
1093 
1094 	backing_dev_init(&backing_dev, &params, 512);
1095 
1096 	g_vol = NULL;
1097 	g_reduce_errno = -1;
1098 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1099 	CU_ASSERT(g_reduce_errno == 0);
1100 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1101 
1102 	/* Write 0xAA to 1 512-byte logical block. */
1103 	memset(buf, 0xAA, params.logical_block_size);
1104 	iov.iov_base = buf;
1105 	iov.iov_len = params.logical_block_size;
1106 	g_reduce_errno = -100;
1107 	g_defer_bdev_io = true;
1108 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
1109 	/* Callback should not have executed, so this should still equal -100. */
1110 	CU_ASSERT(g_reduce_errno == -100);
1111 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1112 	/* We wrote to just 512 bytes of one chunk which was previously unallocated.  This
1113 	 * should result in 1 pending I/O since the rest of this chunk will be zeroes and
1114 	 * very compressible.
1115 	 */
1116 	CU_ASSERT(g_pending_bdev_io_count == 1);
1117 
1118 	backing_dev_io_execute(0);
1119 	CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
1120 	CU_ASSERT(g_reduce_errno == 0);
1121 
1122 	g_defer_bdev_io = false;
1123 	memset(compare_buf, 0xAA, sizeof(compare_buf));
1124 	memset(buf, 0xFF, sizeof(buf));
1125 	iov.iov_base = buf;
1126 	iov.iov_len = params.logical_block_size;
1127 	g_reduce_errno = -100;
1128 	spdk_reduce_vol_readv(g_vol, &iov, 1, 0, 1, read_cb, NULL);
1129 	CU_ASSERT(g_reduce_errno == 0);
1130 	CU_ASSERT(memcmp(buf, compare_buf, sizeof(buf)) == 0);
1131 
1132 	g_reduce_errno = -1;
1133 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1134 	CU_ASSERT(g_reduce_errno == 0);
1135 
1136 	persistent_pm_buf_destroy();
1137 	backing_dev_destroy(&backing_dev);
1138 }
1139 
1140 static void
1141 overlapped(void)
1142 {
1143 	struct spdk_reduce_vol_params params = {};
1144 	struct spdk_reduce_backing_dev backing_dev = {};
1145 	const uint32_t logical_block_size = 512;
1146 	struct iovec iov;
1147 	char buf[2 * logical_block_size];
1148 	char compare_buf[2 * logical_block_size];
1149 
1150 	params.chunk_size = 16 * 1024;
1151 	params.backing_io_unit_size = 4096;
1152 	params.logical_block_size = logical_block_size;
1153 	spdk_uuid_generate(&params.uuid);
1154 
1155 	backing_dev_init(&backing_dev, &params, 512);
1156 
1157 	g_vol = NULL;
1158 	g_reduce_errno = -1;
1159 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1160 	CU_ASSERT(g_reduce_errno == 0);
1161 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1162 
1163 	/* Write 0xAA to 1 512-byte logical block. */
1164 	memset(buf, 0xAA, logical_block_size);
1165 	iov.iov_base = buf;
1166 	iov.iov_len = logical_block_size;
1167 	g_reduce_errno = -100;
1168 	g_defer_bdev_io = true;
1169 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
1170 	/* Callback should not have executed, so this should still equal -100. */
1171 	CU_ASSERT(g_reduce_errno == -100);
1172 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1173 	/* We wrote to just 512 bytes of one chunk which was previously unallocated.  This
1174 	 * should result in 1 pending I/O since the rest of this chunk will be zeroes and
1175 	 * very compressible.
1176 	 */
1177 	CU_ASSERT(g_pending_bdev_io_count == 1);
1178 
1179 	/* Now do an overlapped I/O to the same chunk. */
1180 	spdk_reduce_vol_writev(g_vol, &iov, 1, 1, 1, write_cb, NULL);
1181 	/* Callback should not have executed, so this should still equal -100. */
1182 	CU_ASSERT(g_reduce_errno == -100);
1183 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1184 	/* The second I/O overlaps with the first one.  So we should only see pending bdev_io
1185 	 * related to the first I/O here - the second one won't start until the first one is completed.
1186 	 */
1187 	CU_ASSERT(g_pending_bdev_io_count == 1);
1188 
1189 	backing_dev_io_execute(0);
1190 	CU_ASSERT(g_reduce_errno == 0);
1191 
1192 	g_defer_bdev_io = false;
1193 	memset(compare_buf, 0xAA, sizeof(compare_buf));
1194 	memset(buf, 0xFF, sizeof(buf));
1195 	iov.iov_base = buf;
1196 	iov.iov_len = 2 * logical_block_size;
1197 	g_reduce_errno = -100;
1198 	spdk_reduce_vol_readv(g_vol, &iov, 1, 0, 2, read_cb, NULL);
1199 	CU_ASSERT(g_reduce_errno == 0);
1200 	CU_ASSERT(memcmp(buf, compare_buf, 2 * logical_block_size) == 0);
1201 
1202 	g_reduce_errno = -1;
1203 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1204 	CU_ASSERT(g_reduce_errno == 0);
1205 
1206 	persistent_pm_buf_destroy();
1207 	backing_dev_destroy(&backing_dev);
1208 }
1209 
1210 #define BUFSIZE 4096
1211 
1212 static void
1213 compress_algorithm(void)
1214 {
1215 	uint8_t original_data[BUFSIZE];
1216 	uint8_t compressed_data[BUFSIZE];
1217 	uint8_t decompressed_data[BUFSIZE];
1218 	uint32_t compressed_len, decompressed_len;
1219 	int rc;
1220 
1221 	ut_build_data_buffer(original_data, BUFSIZE, 0xAA, BUFSIZE);
1222 	compressed_len = sizeof(compressed_data);
1223 	rc = ut_compress(compressed_data, &compressed_len, original_data, UINT8_MAX);
1224 	CU_ASSERT(rc == 0);
1225 	CU_ASSERT(compressed_len == 2);
1226 	CU_ASSERT(compressed_data[0] == UINT8_MAX);
1227 	CU_ASSERT(compressed_data[1] == 0xAA);
1228 
1229 	decompressed_len = sizeof(decompressed_data);
1230 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1231 	CU_ASSERT(rc == 0);
1232 	CU_ASSERT(decompressed_len == UINT8_MAX);
1233 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1234 
1235 	compressed_len = sizeof(compressed_data);
1236 	rc = ut_compress(compressed_data, &compressed_len, original_data, UINT8_MAX + 1);
1237 	CU_ASSERT(rc == 0);
1238 	CU_ASSERT(compressed_len == 4);
1239 	CU_ASSERT(compressed_data[0] == UINT8_MAX);
1240 	CU_ASSERT(compressed_data[1] == 0xAA);
1241 	CU_ASSERT(compressed_data[2] == 1);
1242 	CU_ASSERT(compressed_data[3] == 0xAA);
1243 
1244 	decompressed_len = sizeof(decompressed_data);
1245 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1246 	CU_ASSERT(rc == 0);
1247 	CU_ASSERT(decompressed_len == UINT8_MAX + 1);
1248 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1249 
1250 	ut_build_data_buffer(original_data, BUFSIZE, 0x00, 1);
1251 	compressed_len = sizeof(compressed_data);
1252 	rc = ut_compress(compressed_data, &compressed_len, original_data, 2048);
1253 	CU_ASSERT(rc == 0);
1254 	CU_ASSERT(compressed_len == 4096);
1255 	CU_ASSERT(compressed_data[0] == 1);
1256 	CU_ASSERT(compressed_data[1] == 0);
1257 	CU_ASSERT(compressed_data[4094] == 1);
1258 	CU_ASSERT(compressed_data[4095] == 0xFF);
1259 
1260 	decompressed_len = sizeof(decompressed_data);
1261 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1262 	CU_ASSERT(rc == 0);
1263 	CU_ASSERT(decompressed_len == 2048);
1264 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1265 
1266 	compressed_len = sizeof(compressed_data);
1267 	rc = ut_compress(compressed_data, &compressed_len, original_data, 2049);
1268 	CU_ASSERT(rc == -ENOSPC);
1269 }
1270 
1271 static void
1272 test_prepare_compress_chunk(void)
1273 {
1274 	struct spdk_reduce_vol vol = {};
1275 	struct spdk_reduce_backing_dev backing_dev = {};
1276 	struct spdk_reduce_vol_request req = {};
1277 	char decomp_buffer[16 * 1024] = {};
1278 	char comp_buffer[16 * 1024] = {};
1279 	char user_buffer[16 * 1024] = {};
1280 	struct iovec user_iov[2] = {};
1281 	size_t user_buffer_iov_len = 8192;
1282 	size_t remainder_bytes;
1283 	size_t offset_bytes;
1284 	size_t memcmp_offset;
1285 	uint32_t i;
1286 
1287 	vol.params.chunk_size = 16 * 1024;
1288 	vol.params.backing_io_unit_size = 4096;
1289 	vol.params.logical_block_size = 512;
1290 	backing_dev_init(&backing_dev, &vol.params, 512);
1291 	vol.backing_dev = &backing_dev;
1292 	vol.logical_blocks_per_chunk = vol.params.chunk_size / vol.params.logical_block_size;
1293 
1294 	req.vol = &vol;
1295 	req.decomp_buf = decomp_buffer;
1296 	req.comp_buf = comp_buffer;
1297 	req.iov = user_iov;
1298 	req.iovcnt = 2;
1299 	req.offset = 0;
1300 
1301 	/* Part 1 - backing dev supports sgl_in */
1302 	/* Test 1 - user's buffers length equals to chunk_size */
1303 	for (i = 0; i < 2; i++) {
1304 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1305 		req.iov[i].iov_len = user_buffer_iov_len;
1306 	}
1307 
1308 	_prepare_compress_chunk(&req, false);
1309 	CU_ASSERT(req.decomp_iovcnt == 2);
1310 	for (i = 0; i < 2; i++) {
1311 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1312 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1313 	}
1314 
1315 	_prepare_compress_chunk(&req, true);
1316 	CU_ASSERT(req.decomp_iovcnt == 2);
1317 	for (i = 0; i < 2; i++) {
1318 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1319 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1320 	}
1321 
1322 	/* Test 2 - user's buffer less than chunk_size, without offset */
1323 	user_buffer_iov_len = 4096;
1324 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1325 	for (i = 0; i < 2; i++) {
1326 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1327 		req.iov[i].iov_len = user_buffer_iov_len;
1328 	}
1329 
1330 	_prepare_compress_chunk(&req, false);
1331 	CU_ASSERT(req.decomp_iovcnt == 3);
1332 	for (i = 0; i < 2; i++) {
1333 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1334 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1335 	}
1336 	CU_ASSERT(req.decomp_iov[i].iov_base == req.decomp_buf + user_buffer_iov_len * 2);
1337 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1338 
1339 	_prepare_compress_chunk(&req, true);
1340 	CU_ASSERT(req.decomp_iovcnt == 3);
1341 	for (i = 0; i < 2; i++) {
1342 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1343 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1344 	}
1345 	CU_ASSERT(req.decomp_iov[i].iov_base == g_zero_buf + user_buffer_iov_len * 2);
1346 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1347 
1348 	/* Test 3 - user's buffer less than chunk_size, non zero offset */
1349 	user_buffer_iov_len = 4096;
1350 	req.offset = 3;
1351 	offset_bytes = req.offset * vol.params.logical_block_size;
1352 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1353 
1354 	_prepare_compress_chunk(&req, false);
1355 	CU_ASSERT(req.decomp_iovcnt == 4);
1356 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1357 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1358 	for (i = 0; i < 2; i++) {
1359 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1360 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1361 	}
1362 	CU_ASSERT(req.decomp_iov[3].iov_base == req.decomp_buf + offset_bytes + user_buffer_iov_len * 2);
1363 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1364 
1365 	_prepare_compress_chunk(&req, true);
1366 	CU_ASSERT(req.decomp_iovcnt == 4);
1367 	CU_ASSERT(req.decomp_iov[0].iov_base == g_zero_buf);
1368 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1369 	for (i = 0; i < 2; i++) {
1370 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1371 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1372 	}
1373 	CU_ASSERT(req.decomp_iov[3].iov_base == g_zero_buf + offset_bytes + user_buffer_iov_len * 2);
1374 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1375 
1376 	/* Part 2 - backing dev doesn't support sgl_in */
1377 	/* Test 1 - user's buffers length equals to chunk_size
1378 	 * user's buffers are copied */
1379 	vol.backing_dev->sgl_in = false;
1380 	req.offset = 0;
1381 	user_buffer_iov_len = 8192;
1382 	for (i = 0; i < 2; i++) {
1383 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1384 		req.iov[i].iov_len = user_buffer_iov_len;
1385 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1386 	}
1387 
1388 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1389 
1390 	_prepare_compress_chunk(&req, false);
1391 	CU_ASSERT(req.decomp_iovcnt == 1);
1392 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1393 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1394 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base, req.iov[0].iov_len) == 0);
1395 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + req.iov[0].iov_len, req.iov[1].iov_base,
1396 			 req.iov[1].iov_len) == 0);
1397 
1398 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1399 
1400 	_prepare_compress_chunk(&req, true);
1401 	CU_ASSERT(req.decomp_iovcnt == 1);
1402 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1403 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1404 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base, req.iov[0].iov_len) == 0);
1405 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + req.iov[0].iov_len, req.iov[1].iov_base,
1406 			 req.iov[1].iov_len) == 0);
1407 
1408 	/* Test 2 - single user's buffer length equals to chunk_size
1409 	 * User's buffer is not copied */
1410 	req.iov[0].iov_base = user_buffer;
1411 	req.iov[0].iov_len = vol.params.chunk_size;
1412 	req.iovcnt = 1;
1413 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1414 
1415 	_prepare_compress_chunk(&req, false);
1416 	CU_ASSERT(req.decomp_iovcnt == 1);
1417 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1418 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1419 
1420 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1421 
1422 	_prepare_compress_chunk(&req, true);
1423 	CU_ASSERT(req.decomp_iovcnt == 1);
1424 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1425 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1426 
1427 	/* Test 3 - user's buffer less than chunk_size, without offset
1428 	 * User's buffers are copied */
1429 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1430 	user_buffer_iov_len = 4096;
1431 	req.iovcnt = 2;
1432 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1433 	for (i = 0; i < 2; i++) {
1434 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1435 		req.iov[i].iov_len = user_buffer_iov_len;
1436 	}
1437 
1438 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1439 
1440 	_prepare_compress_chunk(&req, false);
1441 	CU_ASSERT(req.decomp_iovcnt == 1);
1442 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1443 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1444 	memcmp_offset = 0;
1445 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1446 			 req.iov[0].iov_len) == 0);
1447 	memcmp_offset += req.iov[0].iov_len;
1448 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1449 			 req.iov[1].iov_len) == 0);
1450 	memcmp_offset += req.iov[0].iov_len;
1451 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf + memcmp_offset,
1452 			 remainder_bytes) == 0);
1453 
1454 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1455 
1456 	_prepare_compress_chunk(&req, true);
1457 	CU_ASSERT(req.decomp_iovcnt == 1);
1458 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1459 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1460 	memcmp_offset = 0;
1461 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1462 			 req.iov[0].iov_len) == 0);
1463 	memcmp_offset += req.iov[0].iov_len;
1464 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1465 			 req.iov[1].iov_len) == 0);
1466 	memcmp_offset += req.iov[0].iov_len;
1467 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf + memcmp_offset,
1468 			 remainder_bytes) == 0);
1469 
1470 	/* Test 4 - user's buffer less than chunk_size, non zero offset
1471 	 * user's buffers are copied */
1472 	req.offset = 3;
1473 	offset_bytes = req.offset * vol.params.logical_block_size;
1474 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1475 
1476 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1477 
1478 	_prepare_compress_chunk(&req, false);
1479 	CU_ASSERT(req.decomp_iovcnt == 1);
1480 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1481 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1482 	memcmp_offset = 0;
1483 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf, offset_bytes) == 0);
1484 	memcmp_offset += offset_bytes;
1485 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1486 			 req.iov[0].iov_len) == 0);
1487 	memcmp_offset += req.iov[0].iov_len;
1488 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1489 			 req.iov[1].iov_len) == 0);
1490 	memcmp_offset += req.iov[1].iov_len;
1491 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf + memcmp_offset,
1492 			 remainder_bytes) == 0);
1493 
1494 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1495 
1496 	_prepare_compress_chunk(&req, true);
1497 	CU_ASSERT(req.decomp_iovcnt == 1);
1498 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1499 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1500 	memcmp_offset = 0;
1501 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf, offset_bytes) == 0);
1502 	memcmp_offset += offset_bytes;
1503 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1504 			 req.iov[0].iov_len) == 0);
1505 	memcmp_offset += req.iov[0].iov_len;
1506 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1507 			 req.iov[1].iov_len) == 0);
1508 	memcmp_offset += req.iov[1].iov_len;
1509 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf + memcmp_offset,
1510 			 remainder_bytes) == 0);
1511 }
1512 
1513 static void _reduce_vol_op_complete(void *ctx, int reduce_errno)
1514 {
1515 	g_reduce_errno = reduce_errno;
1516 }
1517 
1518 static void
1519 dummy_backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
1520 			     struct iovec *src_iov, int src_iovcnt,
1521 			     struct iovec *dst_iov, int dst_iovcnt,
1522 			     struct spdk_reduce_vol_cb_args *args)
1523 {
1524 	args->cb_fn(args->cb_arg, g_decompressed_len);
1525 }
1526 static void test_reduce_decompress_chunk(void)
1527 {
1528 	struct spdk_reduce_vol vol = {};
1529 	struct spdk_reduce_backing_dev backing_dev = {};
1530 	struct spdk_reduce_vol_request req = {};
1531 	char decomp_buffer[16 * 1024] = {};
1532 	char comp_buffer[16 * 1024] = {};
1533 	char user_buffer[16 * 1024] = {};
1534 	struct iovec user_iov[2] = {};
1535 	struct iovec comp_buf_iov = {};
1536 	struct spdk_reduce_chunk_map chunk = {};
1537 	size_t user_buffer_iov_len = 8192;
1538 	size_t remainder_bytes;
1539 	size_t offset_bytes;
1540 	uint32_t i;
1541 
1542 	vol.params.chunk_size = 16 * 1024;
1543 	vol.params.backing_io_unit_size = 4096;
1544 	vol.params.logical_block_size = 512;
1545 	backing_dev_init(&backing_dev, &vol.params, 512);
1546 	backing_dev.decompress = dummy_backing_dev_decompress;
1547 	vol.backing_dev = &backing_dev;
1548 	vol.logical_blocks_per_chunk = vol.params.chunk_size / vol.params.logical_block_size;
1549 	TAILQ_INIT(&vol.executing_requests);
1550 	TAILQ_INIT(&vol.queued_requests);
1551 	TAILQ_INIT(&vol.free_requests);
1552 
1553 	chunk.compressed_size = user_buffer_iov_len / 2;
1554 	req.chunk = &chunk;
1555 	req.vol = &vol;
1556 	req.decomp_buf = decomp_buffer;
1557 	req.comp_buf = comp_buffer;
1558 	req.comp_buf_iov = &comp_buf_iov;
1559 	req.iov = user_iov;
1560 	req.iovcnt = 2;
1561 	req.offset = 0;
1562 	req.cb_fn = _reduce_vol_op_complete;
1563 
1564 	/* Part 1 - backing dev supports sgl_out */
1565 	/* Test 1 - user's buffers length equals to chunk_size */
1566 	for (i = 0; i < 2; i++) {
1567 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1568 		req.iov[i].iov_len = user_buffer_iov_len;
1569 		memset(req.iov[i].iov_base, 0, req.iov[i].iov_len);
1570 	}
1571 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1572 	g_reduce_errno = -1;
1573 	g_decompressed_len = vol.params.chunk_size;
1574 
1575 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1576 	CU_ASSERT(g_reduce_errno == 0);
1577 	CU_ASSERT(req.copy_after_decompress == false);
1578 	CU_ASSERT(req.decomp_iovcnt == 2);
1579 	for (i = 0; i < 2; i++) {
1580 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1581 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1582 	}
1583 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1584 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1585 
1586 	/* Test 2 - user's buffer less than chunk_size, without offset */
1587 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1588 	g_reduce_errno = -1;
1589 	user_buffer_iov_len = 4096;
1590 	for (i = 0; i < 2; i++) {
1591 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1592 		req.iov[i].iov_len = user_buffer_iov_len;
1593 		memset(req.iov[i].iov_base, 0, req.iov[i].iov_len);
1594 	}
1595 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1596 
1597 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1598 	CU_ASSERT(g_reduce_errno == 0);
1599 	CU_ASSERT(req.copy_after_decompress == false);
1600 	CU_ASSERT(req.decomp_iovcnt == 3);
1601 	for (i = 0; i < 2; i++) {
1602 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1603 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1604 	}
1605 	CU_ASSERT(req.decomp_iov[i].iov_base == req.decomp_buf + user_buffer_iov_len * 2);
1606 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1607 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1608 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1609 
1610 	/* Test 3 - user's buffer less than chunk_size, non zero offset */
1611 	req.offset = 3;
1612 	offset_bytes = req.offset * vol.params.logical_block_size;
1613 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1614 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1615 	g_reduce_errno = -1;
1616 
1617 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1618 	CU_ASSERT(g_reduce_errno == 0);
1619 	CU_ASSERT(req.copy_after_decompress == false);
1620 	CU_ASSERT(req.decomp_iovcnt == 4);
1621 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1622 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1623 	for (i = 0; i < 2; i++) {
1624 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1625 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1626 	}
1627 	CU_ASSERT(req.decomp_iov[3].iov_base == req.decomp_buf + offset_bytes + user_buffer_iov_len * 2);
1628 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1629 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1630 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1631 
1632 	/* Part 2 - backing dev doesn't support sgl_out */
1633 	/* Test 1 - user's buffers length equals to chunk_size
1634 	 * user's buffers are copied */
1635 	vol.backing_dev->sgl_out = false;
1636 	req.offset = 0;
1637 	user_buffer_iov_len = 8192;
1638 
1639 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1640 	for (i = 0; i < 2; i++) {
1641 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1642 		req.iov[i].iov_len = user_buffer_iov_len;
1643 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1644 	}
1645 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1646 	g_reduce_errno = -1;
1647 
1648 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1649 	CU_ASSERT(g_reduce_errno == 0);
1650 	CU_ASSERT(req.copy_after_decompress == true);
1651 	CU_ASSERT(req.decomp_iovcnt == 1);
1652 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1653 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1654 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base, req.iov[0].iov_len) == 0);
1655 	CU_ASSERT(memcmp(req.iov[1].iov_base, req.decomp_iov[0].iov_base + req.iov[0].iov_len,
1656 			 req.iov[1].iov_len) == 0);
1657 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1658 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1659 
1660 	/* Test 2 - single user's buffer length equals to chunk_size
1661 	* User's buffer is not copied */
1662 	req.iov[0].iov_base = user_buffer;
1663 	req.iov[0].iov_len = vol.params.chunk_size;
1664 	req.iovcnt = 1;
1665 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1666 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1667 	g_reduce_errno = -1;
1668 
1669 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1670 	CU_ASSERT(g_reduce_errno == 0);
1671 	CU_ASSERT(req.copy_after_decompress == false);
1672 	CU_ASSERT(req.decomp_iovcnt == 1);
1673 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1674 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1675 
1676 	/* Test 3 - user's buffer less than chunk_size, without offset
1677 	 * User's buffers are copied */
1678 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1679 	user_buffer_iov_len = 4096;
1680 	req.iovcnt = 2;
1681 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1682 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1683 	for (i = 0; i < 2; i++) {
1684 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1685 		req.iov[i].iov_len = user_buffer_iov_len;
1686 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1687 	}
1688 
1689 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1690 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1691 	g_reduce_errno = -1;
1692 
1693 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1694 	CU_ASSERT(g_reduce_errno == 0);
1695 	CU_ASSERT(req.copy_after_decompress == true);
1696 	CU_ASSERT(req.decomp_iovcnt == 1);
1697 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1698 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1699 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base,
1700 			 req.iov[0].iov_len) == 0);
1701 	CU_ASSERT(memcmp(req.iov[1].iov_base, req.decomp_iov[0].iov_base + req.iov[0].iov_len,
1702 			 req.iov[1].iov_len) == 0);
1703 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1704 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1705 
1706 	/* Test 4 - user's buffer less than chunk_size, non zero offset
1707 	* user's buffers are copied */
1708 	req.offset = 3;
1709 	offset_bytes = req.offset * vol.params.logical_block_size;
1710 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1711 
1712 	for (i = 0; i < 2; i++) {
1713 		req.iov[i].iov_base = user_buffer + i * user_buffer_iov_len;
1714 		req.iov[i].iov_len = user_buffer_iov_len;
1715 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1716 	}
1717 
1718 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1719 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1720 	g_reduce_errno = -1;
1721 
1722 	_prepare_compress_chunk(&req, false);
1723 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1724 	CU_ASSERT(g_reduce_errno == 0);
1725 	CU_ASSERT(req.copy_after_decompress == true);
1726 	CU_ASSERT(req.decomp_iovcnt == 1);
1727 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1728 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1729 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + offset_bytes, req.iov[0].iov_base,
1730 			 req.iov[0].iov_len) == 0);
1731 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + offset_bytes + req.iov[0].iov_len,
1732 			 req.iov[1].iov_base,
1733 			 req.iov[1].iov_len) == 0);
1734 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1735 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1736 }
1737 
1738 int
1739 main(int argc, char **argv)
1740 {
1741 	CU_pSuite	suite = NULL;
1742 	unsigned int	num_failures;
1743 
1744 	CU_set_error_action(CUEA_ABORT);
1745 	CU_initialize_registry();
1746 
1747 	suite = CU_add_suite("reduce", NULL, NULL);
1748 
1749 	CU_ADD_TEST(suite, get_pm_file_size);
1750 	CU_ADD_TEST(suite, get_vol_size);
1751 	CU_ADD_TEST(suite, init_failure);
1752 	CU_ADD_TEST(suite, init_md);
1753 	CU_ADD_TEST(suite, init_backing_dev);
1754 	CU_ADD_TEST(suite, load);
1755 	CU_ADD_TEST(suite, write_maps);
1756 	CU_ADD_TEST(suite, read_write);
1757 	CU_ADD_TEST(suite, readv_writev);
1758 	CU_ADD_TEST(suite, destroy);
1759 	CU_ADD_TEST(suite, defer_bdev_io);
1760 	CU_ADD_TEST(suite, overlapped);
1761 	CU_ADD_TEST(suite, compress_algorithm);
1762 	CU_ADD_TEST(suite, test_prepare_compress_chunk);
1763 	CU_ADD_TEST(suite, test_reduce_decompress_chunk);
1764 
1765 	g_unlink_path = g_path;
1766 	g_unlink_callback = unlink_cb;
1767 
1768 	CU_basic_set_mode(CU_BRM_VERBOSE);
1769 	CU_basic_run_tests();
1770 	num_failures = CU_get_number_of_failures();
1771 	CU_cleanup_registry();
1772 	return num_failures;
1773 }
1774