xref: /spdk/test/unit/lib/reduce/reduce.c/reduce_ut.c (revision 63e0c25dad5f2793fdb9ff9b1e6ce516673dc6aa)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk_internal/cunit.h"
10 
11 #include "reduce/reduce.c"
12 #include "spdk_internal/mock.h"
13 #define UNIT_TEST_NO_VTOPHYS
14 #include "common/lib/test_env.c"
15 #undef UNIT_TEST_NO_VTOPHYS
16 
17 static struct spdk_reduce_vol *g_vol;
18 static int g_reduce_errno;
19 static char *g_volatile_pm_buf;
20 static size_t g_volatile_pm_buf_len;
21 static char *g_persistent_pm_buf;
22 static size_t g_persistent_pm_buf_len;
23 static char *g_backing_dev_buf;
24 static char g_path[REDUCE_PATH_MAX];
25 static char *g_decomp_buf;
26 static int g_decompressed_len;
27 
28 #define TEST_MD_PATH "/tmp"
29 
30 uint64_t
31 spdk_vtophys(const void *buf, uint64_t *size)
32 {
33 	/* add + 1 to buf addr for cases where buf is the start of the page, that will give us correct end of the page */
34 	const uint8_t *page_2mb_end = (const uint8_t *)SPDK_ALIGN_CEIL((uintptr_t)buf + 1, VALUE_2MB);
35 	uint64_t bytes_to_page_end = page_2mb_end - (const uint8_t *)buf;
36 	uint64_t _size;
37 
38 	if (*size) {
39 		_size = *size;
40 		_size = spdk_min(_size, bytes_to_page_end);
41 		*size = _size;
42 	}
43 
44 	return (uintptr_t)buf;
45 }
46 
47 enum ut_reduce_bdev_io_type {
48 	UT_REDUCE_IO_READV = 1,
49 	UT_REDUCE_IO_WRITEV = 2,
50 	UT_REDUCE_IO_UNMAP = 3,
51 };
52 
53 struct ut_reduce_bdev_io {
54 	enum ut_reduce_bdev_io_type type;
55 	struct spdk_reduce_backing_dev *backing_dev;
56 	struct iovec *iov;
57 	int iovcnt;
58 	uint64_t lba;
59 	uint32_t lba_count;
60 	struct spdk_reduce_vol_cb_args *args;
61 	TAILQ_ENTRY(ut_reduce_bdev_io)	link;
62 };
63 
64 static bool g_defer_bdev_io = false;
65 static TAILQ_HEAD(, ut_reduce_bdev_io) g_pending_bdev_io =
66 	TAILQ_HEAD_INITIALIZER(g_pending_bdev_io);
67 static uint32_t g_pending_bdev_io_count = 0;
68 
69 static void
70 sync_pm_buf(const void *addr, size_t length)
71 {
72 	uint64_t offset = (char *)addr - g_volatile_pm_buf;
73 
74 	memcpy(&g_persistent_pm_buf[offset], addr, length);
75 }
76 
77 int
78 pmem_msync(const void *addr, size_t length)
79 {
80 	sync_pm_buf(addr, length);
81 	return 0;
82 }
83 
84 void
85 pmem_persist(const void *addr, size_t len)
86 {
87 	sync_pm_buf(addr, len);
88 }
89 
90 static void
91 get_pm_file_size(void)
92 {
93 	struct spdk_reduce_vol_params params;
94 	uint64_t pm_size, expected_pm_size;
95 
96 	params.backing_io_unit_size = 4096;
97 	params.chunk_size = 4096 * 4;
98 	params.vol_size = 4096 * 4 * 100;
99 
100 	pm_size = _get_pm_file_size(&params);
101 	expected_pm_size = sizeof(struct spdk_reduce_vol_superblock);
102 	/* 100 chunks in logical map * 8 bytes per chunk */
103 	expected_pm_size += 100 * sizeof(uint64_t);
104 	/* 100 chunks * (chunk struct size + 4 backing io units per chunk * 8 bytes per backing io unit) */
105 	expected_pm_size += 100 * (sizeof(struct spdk_reduce_chunk_map) + 4 * sizeof(uint64_t));
106 	/* reduce allocates some extra chunks too for in-flight writes when logical map
107 	 * is full.  REDUCE_EXTRA_CHUNKS is a private #ifdef in reduce.c Here we need the num chunks
108 	 * times (chunk struct size + 4 backing io units per chunk * 8 bytes per backing io unit).
109 	 */
110 	expected_pm_size += REDUCE_NUM_EXTRA_CHUNKS *
111 			    (sizeof(struct spdk_reduce_chunk_map) + 4 * sizeof(uint64_t));
112 	/* reduce will add some padding so numbers may not match exactly.  Make sure
113 	 * they are close though.
114 	 */
115 	CU_ASSERT((pm_size - expected_pm_size) <= REDUCE_PM_SIZE_ALIGNMENT);
116 }
117 
118 static void
119 get_vol_size(void)
120 {
121 	uint64_t chunk_size, backing_dev_size;
122 
123 	chunk_size = 16 * 1024;
124 	backing_dev_size = 16 * 1024 * 1000;
125 	CU_ASSERT(_get_vol_size(chunk_size, backing_dev_size) < backing_dev_size);
126 }
127 
128 void *
129 pmem_map_file(const char *path, size_t len, int flags, mode_t mode,
130 	      size_t *mapped_lenp, int *is_pmemp)
131 {
132 	CU_ASSERT(g_volatile_pm_buf == NULL);
133 	snprintf(g_path, sizeof(g_path), "%s", path);
134 	*is_pmemp = 1;
135 
136 	if (g_persistent_pm_buf == NULL) {
137 		g_persistent_pm_buf = calloc(1, len);
138 		g_persistent_pm_buf_len = len;
139 		SPDK_CU_ASSERT_FATAL(g_persistent_pm_buf != NULL);
140 	}
141 
142 	*mapped_lenp = g_persistent_pm_buf_len;
143 	g_volatile_pm_buf = calloc(1, g_persistent_pm_buf_len);
144 	SPDK_CU_ASSERT_FATAL(g_volatile_pm_buf != NULL);
145 	memcpy(g_volatile_pm_buf, g_persistent_pm_buf, g_persistent_pm_buf_len);
146 	g_volatile_pm_buf_len = g_persistent_pm_buf_len;
147 
148 	return g_volatile_pm_buf;
149 }
150 
151 int
152 pmem_unmap(void *addr, size_t len)
153 {
154 	CU_ASSERT(addr == g_volatile_pm_buf);
155 	CU_ASSERT(len == g_volatile_pm_buf_len);
156 	free(g_volatile_pm_buf);
157 	g_volatile_pm_buf = NULL;
158 	g_volatile_pm_buf_len = 0;
159 
160 	return 0;
161 }
162 
163 static void
164 persistent_pm_buf_destroy(void)
165 {
166 	CU_ASSERT(g_persistent_pm_buf != NULL);
167 	free(g_persistent_pm_buf);
168 	g_persistent_pm_buf = NULL;
169 	g_persistent_pm_buf_len = 0;
170 }
171 
172 static void
173 unlink_cb(void)
174 {
175 	persistent_pm_buf_destroy();
176 }
177 
178 static void
179 init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
180 {
181 	g_vol = vol;
182 	g_reduce_errno = reduce_errno;
183 }
184 
185 static void
186 load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
187 {
188 	g_vol = vol;
189 	g_reduce_errno = reduce_errno;
190 }
191 
192 static void
193 unload_cb(void *cb_arg, int reduce_errno)
194 {
195 	g_reduce_errno = reduce_errno;
196 }
197 
198 static void
199 init_failure(void)
200 {
201 	struct spdk_reduce_vol_params params = {};
202 	struct spdk_reduce_backing_dev backing_dev = {};
203 
204 	backing_dev.blocklen = 512;
205 	/* This blockcnt is too small for a reduce vol - there needs to be
206 	 *  enough space for at least REDUCE_NUM_EXTRA_CHUNKS + 1 chunks.
207 	 */
208 	backing_dev.blockcnt = 20;
209 
210 	params.vol_size = 0;
211 	params.chunk_size = 16 * 1024;
212 	params.backing_io_unit_size = backing_dev.blocklen;
213 	params.logical_block_size = 512;
214 
215 	/* backing_dev has an invalid size.  This should fail. */
216 	g_vol = NULL;
217 	g_reduce_errno = 0;
218 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
219 	CU_ASSERT(g_reduce_errno == -EINVAL);
220 	SPDK_CU_ASSERT_FATAL(g_vol == NULL);
221 
222 	/* backing_dev now has valid size, but backing_dev still has null
223 	 *  function pointers.  This should fail.
224 	 */
225 	backing_dev.blockcnt = 20000;
226 
227 	g_vol = NULL;
228 	g_reduce_errno = 0;
229 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
230 	CU_ASSERT(g_reduce_errno == -EINVAL);
231 	SPDK_CU_ASSERT_FATAL(g_vol == NULL);
232 }
233 
234 static void
235 backing_dev_readv_execute(struct spdk_reduce_backing_dev *backing_dev,
236 			  struct iovec *iov, int iovcnt,
237 			  uint64_t lba, uint32_t lba_count,
238 			  struct spdk_reduce_vol_cb_args *args)
239 {
240 	char *offset;
241 	int i;
242 
243 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
244 	for (i = 0; i < iovcnt; i++) {
245 		memcpy(iov[i].iov_base, offset, iov[i].iov_len);
246 		offset += iov[i].iov_len;
247 	}
248 	args->cb_fn(args->cb_arg, 0);
249 }
250 
251 static void
252 backing_dev_insert_io(enum ut_reduce_bdev_io_type type, struct spdk_reduce_backing_dev *backing_dev,
253 		      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
254 		      struct spdk_reduce_vol_cb_args *args)
255 {
256 	struct ut_reduce_bdev_io *ut_bdev_io;
257 
258 	ut_bdev_io = calloc(1, sizeof(*ut_bdev_io));
259 	SPDK_CU_ASSERT_FATAL(ut_bdev_io != NULL);
260 
261 	ut_bdev_io->type = type;
262 	ut_bdev_io->backing_dev = backing_dev;
263 	ut_bdev_io->iov = iov;
264 	ut_bdev_io->iovcnt = iovcnt;
265 	ut_bdev_io->lba = lba;
266 	ut_bdev_io->lba_count = lba_count;
267 	ut_bdev_io->args = args;
268 	TAILQ_INSERT_TAIL(&g_pending_bdev_io, ut_bdev_io, link);
269 	g_pending_bdev_io_count++;
270 }
271 
272 static void
273 backing_dev_readv(struct spdk_reduce_backing_dev *backing_dev, struct iovec *iov, int iovcnt,
274 		  uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
275 {
276 	if (g_defer_bdev_io == false) {
277 		CU_ASSERT(g_pending_bdev_io_count == 0);
278 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
279 		backing_dev_readv_execute(backing_dev, iov, iovcnt, lba, lba_count, args);
280 		return;
281 	}
282 
283 	backing_dev_insert_io(UT_REDUCE_IO_READV, backing_dev, iov, iovcnt, lba, lba_count, args);
284 }
285 
286 static void
287 backing_dev_writev_execute(struct spdk_reduce_backing_dev *backing_dev,
288 			   struct iovec *iov, int iovcnt,
289 			   uint64_t lba, uint32_t lba_count,
290 			   struct spdk_reduce_vol_cb_args *args)
291 {
292 	char *offset;
293 	int i;
294 
295 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
296 	for (i = 0; i < iovcnt; i++) {
297 		memcpy(offset, iov[i].iov_base, iov[i].iov_len);
298 		offset += iov[i].iov_len;
299 	}
300 	args->cb_fn(args->cb_arg, 0);
301 }
302 
303 static void
304 backing_dev_writev(struct spdk_reduce_backing_dev *backing_dev, struct iovec *iov, int iovcnt,
305 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
306 {
307 	if (g_defer_bdev_io == false) {
308 		CU_ASSERT(g_pending_bdev_io_count == 0);
309 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
310 		backing_dev_writev_execute(backing_dev, iov, iovcnt, lba, lba_count, args);
311 		return;
312 	}
313 
314 	backing_dev_insert_io(UT_REDUCE_IO_WRITEV, backing_dev, iov, iovcnt, lba, lba_count, args);
315 }
316 
317 static void
318 backing_dev_unmap_execute(struct spdk_reduce_backing_dev *backing_dev,
319 			  uint64_t lba, uint32_t lba_count,
320 			  struct spdk_reduce_vol_cb_args *args)
321 {
322 	char *offset;
323 
324 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
325 	memset(offset, 0, lba_count * backing_dev->blocklen);
326 	args->cb_fn(args->cb_arg, 0);
327 }
328 
329 static void
330 backing_dev_unmap(struct spdk_reduce_backing_dev *backing_dev,
331 		  uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
332 {
333 	if (g_defer_bdev_io == false) {
334 		CU_ASSERT(g_pending_bdev_io_count == 0);
335 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
336 		backing_dev_unmap_execute(backing_dev, lba, lba_count, args);
337 		return;
338 	}
339 
340 	backing_dev_insert_io(UT_REDUCE_IO_UNMAP, backing_dev, NULL, 0, lba, lba_count, args);
341 }
342 
343 static void
344 backing_dev_io_execute(uint32_t count)
345 {
346 	struct ut_reduce_bdev_io *ut_bdev_io;
347 	uint32_t done = 0;
348 
349 	CU_ASSERT(g_defer_bdev_io == true);
350 	while (!TAILQ_EMPTY(&g_pending_bdev_io) && (count == 0 || done < count)) {
351 		ut_bdev_io = TAILQ_FIRST(&g_pending_bdev_io);
352 		TAILQ_REMOVE(&g_pending_bdev_io, ut_bdev_io, link);
353 		g_pending_bdev_io_count--;
354 		switch (ut_bdev_io->type) {
355 		case UT_REDUCE_IO_READV:
356 			backing_dev_readv_execute(ut_bdev_io->backing_dev,
357 						  ut_bdev_io->iov, ut_bdev_io->iovcnt,
358 						  ut_bdev_io->lba, ut_bdev_io->lba_count,
359 						  ut_bdev_io->args);
360 			break;
361 		case UT_REDUCE_IO_WRITEV:
362 			backing_dev_writev_execute(ut_bdev_io->backing_dev,
363 						   ut_bdev_io->iov, ut_bdev_io->iovcnt,
364 						   ut_bdev_io->lba, ut_bdev_io->lba_count,
365 						   ut_bdev_io->args);
366 			break;
367 		case UT_REDUCE_IO_UNMAP:
368 			backing_dev_unmap_execute(ut_bdev_io->backing_dev,
369 						  ut_bdev_io->lba, ut_bdev_io->lba_count,
370 						  ut_bdev_io->args);
371 			break;
372 		default:
373 			CU_ASSERT(false);
374 			break;
375 		}
376 		free(ut_bdev_io);
377 		done++;
378 	}
379 }
380 
381 static void
382 backing_dev_submit_io(struct spdk_reduce_backing_io *backing_io)
383 {
384 	switch (backing_io->backing_io_type) {
385 	case SPDK_REDUCE_BACKING_IO_WRITE:
386 		backing_dev_writev(backing_io->dev, backing_io->iov, backing_io->iovcnt,
387 				   backing_io->lba, backing_io->lba_count, backing_io->backing_cb_args);
388 		break;
389 	case SPDK_REDUCE_BACKING_IO_READ:
390 		backing_dev_readv(backing_io->dev, backing_io->iov, backing_io->iovcnt,
391 				  backing_io->lba, backing_io->lba_count, backing_io->backing_cb_args);
392 		break;
393 	case SPDK_REDUCE_BACKING_IO_UNMAP:
394 		backing_dev_unmap(backing_io->dev, backing_io->lba, backing_io->lba_count,
395 				  backing_io->backing_cb_args);
396 		break;
397 	default:
398 		CU_ASSERT(false);
399 		break;
400 	}
401 }
402 
403 static int
404 ut_compress(char *outbuf, uint32_t *compressed_len, char *inbuf, uint32_t inbuflen)
405 {
406 	uint32_t len = 0;
407 	uint8_t count;
408 	char last;
409 
410 	while (true) {
411 		if (inbuflen == 0) {
412 			*compressed_len = len;
413 			return 0;
414 		}
415 
416 		if (*compressed_len < (len + 2)) {
417 			return -ENOSPC;
418 		}
419 
420 		last = *inbuf;
421 		count = 1;
422 		inbuflen--;
423 		inbuf++;
424 
425 		while (inbuflen > 0 && *inbuf == last && count < UINT8_MAX) {
426 			count++;
427 			inbuflen--;
428 			inbuf++;
429 		}
430 
431 		outbuf[len] = count;
432 		outbuf[len + 1] = last;
433 		len += 2;
434 	}
435 }
436 
437 static int
438 ut_decompress(uint8_t *outbuf, uint32_t *compressed_len, uint8_t *inbuf, uint32_t inbuflen)
439 {
440 	uint32_t len = 0;
441 
442 	SPDK_CU_ASSERT_FATAL(inbuflen % 2 == 0);
443 
444 	while (true) {
445 		if (inbuflen == 0) {
446 			*compressed_len = len;
447 			return 0;
448 		}
449 
450 		if ((len + inbuf[0]) > *compressed_len) {
451 			return -ENOSPC;
452 		}
453 
454 		memset(outbuf, inbuf[1], inbuf[0]);
455 		outbuf += inbuf[0];
456 		len += inbuf[0];
457 		inbuflen -= 2;
458 		inbuf += 2;
459 	}
460 }
461 
462 static void
463 ut_build_data_buffer(uint8_t *data, uint32_t data_len, uint8_t init_val, uint32_t repeat)
464 {
465 	uint32_t _repeat = repeat;
466 
467 	SPDK_CU_ASSERT_FATAL(repeat > 0);
468 
469 	while (data_len > 0) {
470 		*data = init_val;
471 		data++;
472 		data_len--;
473 		_repeat--;
474 		if (_repeat == 0) {
475 			init_val++;
476 			_repeat = repeat;
477 		}
478 	}
479 }
480 
481 static void
482 backing_dev_compress(struct spdk_reduce_backing_dev *backing_dev,
483 		     struct iovec *src_iov, int src_iovcnt,
484 		     struct iovec *dst_iov, int dst_iovcnt,
485 		     struct spdk_reduce_vol_cb_args *args)
486 {
487 	uint32_t compressed_len;
488 	uint64_t total_length = 0;
489 	char *buf = g_decomp_buf;
490 	int rc, i;
491 
492 	CU_ASSERT(dst_iovcnt == 1);
493 
494 	for (i = 0; i < src_iovcnt; i++) {
495 		memcpy(buf, src_iov[i].iov_base, src_iov[i].iov_len);
496 		buf += src_iov[i].iov_len;
497 		total_length += src_iov[i].iov_len;
498 	}
499 
500 	compressed_len = dst_iov[0].iov_len;
501 	rc = ut_compress(dst_iov[0].iov_base, &compressed_len,
502 			 g_decomp_buf, total_length);
503 
504 	args->output_size = compressed_len;
505 
506 	args->cb_fn(args->cb_arg, rc);
507 }
508 
509 static void
510 backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
511 		       struct iovec *src_iov, int src_iovcnt,
512 		       struct iovec *dst_iov, int dst_iovcnt,
513 		       struct spdk_reduce_vol_cb_args *args)
514 {
515 	uint32_t decompressed_len = 0;
516 	char *buf = g_decomp_buf;
517 	int rc, i;
518 
519 	CU_ASSERT(src_iovcnt == 1);
520 
521 	for (i = 0; i < dst_iovcnt; i++) {
522 		decompressed_len += dst_iov[i].iov_len;
523 	}
524 
525 	rc = ut_decompress(g_decomp_buf, &decompressed_len,
526 			   src_iov[0].iov_base, src_iov[0].iov_len);
527 
528 	for (i = 0; i < dst_iovcnt; i++) {
529 		memcpy(dst_iov[i].iov_base, buf, dst_iov[i].iov_len);
530 		buf += dst_iov[i].iov_len;
531 	}
532 
533 	args->output_size = decompressed_len;
534 
535 	args->cb_fn(args->cb_arg, rc);
536 }
537 
538 static void
539 backing_dev_destroy(struct spdk_reduce_backing_dev *backing_dev)
540 {
541 	/* We don't free this during backing_dev_close so that we can test init/unload/load
542 	 *  scenarios.
543 	 */
544 	free(g_backing_dev_buf);
545 	free(g_decomp_buf);
546 	g_backing_dev_buf = NULL;
547 }
548 
549 static void
550 backing_dev_init(struct spdk_reduce_backing_dev *backing_dev, struct spdk_reduce_vol_params *params,
551 		 uint32_t backing_blocklen)
552 {
553 	int64_t size;
554 
555 	size = 4 * 1024 * 1024;
556 	backing_dev->blocklen = backing_blocklen;
557 	backing_dev->blockcnt = size / backing_dev->blocklen;
558 	backing_dev->submit_backing_io = backing_dev_submit_io;
559 	backing_dev->compress = backing_dev_compress;
560 	backing_dev->decompress = backing_dev_decompress;
561 	backing_dev->sgl_in = true;
562 	backing_dev->sgl_out = true;
563 
564 	g_decomp_buf = calloc(1, params->chunk_size);
565 	SPDK_CU_ASSERT_FATAL(g_decomp_buf != NULL);
566 
567 	g_backing_dev_buf = calloc(1, size);
568 	SPDK_CU_ASSERT_FATAL(g_backing_dev_buf != NULL);
569 }
570 
571 static void
572 init_md(void)
573 {
574 	struct spdk_reduce_vol_params params = {};
575 	struct spdk_reduce_vol_params *persistent_params;
576 	struct spdk_reduce_backing_dev backing_dev = {};
577 	struct spdk_uuid uuid;
578 	uint64_t *entry;
579 
580 	params.chunk_size = 16 * 1024;
581 	params.backing_io_unit_size = 512;
582 	params.logical_block_size = 512;
583 
584 	backing_dev_init(&backing_dev, &params, 512);
585 
586 	g_vol = NULL;
587 	g_reduce_errno = -1;
588 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
589 	CU_ASSERT(g_reduce_errno == 0);
590 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
591 	/* Confirm that reduce persisted the params to metadata. */
592 	CU_ASSERT(memcmp(g_persistent_pm_buf, SPDK_REDUCE_SIGNATURE, 8) == 0);
593 	persistent_params = (struct spdk_reduce_vol_params *)(g_persistent_pm_buf + 8);
594 	CU_ASSERT(memcmp(persistent_params, &params, sizeof(params)) == 0);
595 	/* Now confirm that contents of pm_file after the superblock have been initialized
596 	 *  to REDUCE_EMPTY_MAP_ENTRY.
597 	 */
598 	entry = (uint64_t *)(g_persistent_pm_buf + sizeof(struct spdk_reduce_vol_superblock));
599 	while (entry != (uint64_t *)(g_persistent_pm_buf + g_vol->pm_file.size)) {
600 		CU_ASSERT(*entry == REDUCE_EMPTY_MAP_ENTRY);
601 		entry++;
602 	}
603 
604 	/* Check that the pm file path was constructed correctly.  It should be in
605 	 * the form:
606 	 * TEST_MD_PATH + "/" + <uuid string>
607 	 */
608 	CU_ASSERT(strncmp(&g_path[0], TEST_MD_PATH, strlen(TEST_MD_PATH)) == 0);
609 	CU_ASSERT(g_path[strlen(TEST_MD_PATH)] == '/');
610 	CU_ASSERT(spdk_uuid_parse(&uuid, &g_path[strlen(TEST_MD_PATH) + 1]) == 0);
611 	CU_ASSERT(spdk_uuid_compare(&uuid, spdk_reduce_vol_get_uuid(g_vol)) == 0);
612 
613 	g_reduce_errno = -1;
614 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
615 	CU_ASSERT(g_reduce_errno == 0);
616 	CU_ASSERT(g_volatile_pm_buf == NULL);
617 
618 	persistent_pm_buf_destroy();
619 	backing_dev_destroy(&backing_dev);
620 }
621 
622 static void
623 _init_backing_dev(uint32_t backing_blocklen)
624 {
625 	struct spdk_reduce_vol_params params = {};
626 	struct spdk_reduce_vol_params *persistent_params;
627 	struct spdk_reduce_backing_dev backing_dev = {};
628 
629 	params.chunk_size = 16 * 1024;
630 	params.backing_io_unit_size = 512;
631 	params.logical_block_size = 512;
632 	spdk_uuid_generate(&params.uuid);
633 
634 	backing_dev_init(&backing_dev, &params, backing_blocklen);
635 
636 	g_vol = NULL;
637 	memset(g_path, 0, sizeof(g_path));
638 	g_reduce_errno = -1;
639 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
640 	CU_ASSERT(g_reduce_errno == 0);
641 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
642 	CU_ASSERT(strncmp(TEST_MD_PATH, g_path, strlen(TEST_MD_PATH)) == 0);
643 	/* Confirm that libreduce persisted the params to the backing device. */
644 	CU_ASSERT(memcmp(g_backing_dev_buf, SPDK_REDUCE_SIGNATURE, 8) == 0);
645 	persistent_params = (struct spdk_reduce_vol_params *)(g_backing_dev_buf + 8);
646 	CU_ASSERT(memcmp(persistent_params, &params, sizeof(params)) == 0);
647 	/* Confirm that the path to the persistent memory metadata file was persisted to
648 	 *  the backing device.
649 	 */
650 	CU_ASSERT(strncmp(g_path,
651 			  g_backing_dev_buf + REDUCE_BACKING_DEV_PATH_OFFSET,
652 			  REDUCE_PATH_MAX) == 0);
653 
654 	g_reduce_errno = -1;
655 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
656 	CU_ASSERT(g_reduce_errno == 0);
657 
658 	persistent_pm_buf_destroy();
659 	backing_dev_destroy(&backing_dev);
660 }
661 
662 static void
663 init_backing_dev(void)
664 {
665 	_init_backing_dev(512);
666 	_init_backing_dev(4096);
667 }
668 
669 static void
670 _load(uint32_t backing_blocklen)
671 {
672 	struct spdk_reduce_vol_params params = {};
673 	struct spdk_reduce_backing_dev backing_dev = {};
674 	char pmem_file_path[REDUCE_PATH_MAX];
675 
676 	params.chunk_size = 16 * 1024;
677 	params.backing_io_unit_size = 512;
678 	params.logical_block_size = 512;
679 	spdk_uuid_generate(&params.uuid);
680 
681 	backing_dev_init(&backing_dev, &params, backing_blocklen);
682 
683 	g_vol = NULL;
684 	g_reduce_errno = -1;
685 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
686 	CU_ASSERT(g_reduce_errno == 0);
687 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
688 	CU_ASSERT(strncmp(TEST_MD_PATH, g_path, strlen(TEST_MD_PATH)) == 0);
689 	memcpy(pmem_file_path, g_path, sizeof(pmem_file_path));
690 
691 	g_reduce_errno = -1;
692 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
693 	CU_ASSERT(g_reduce_errno == 0);
694 
695 	g_vol = NULL;
696 	memset(g_path, 0, sizeof(g_path));
697 	g_reduce_errno = -1;
698 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
699 	CU_ASSERT(g_reduce_errno == 0);
700 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
701 	CU_ASSERT(strncmp(g_path, pmem_file_path, sizeof(pmem_file_path)) == 0);
702 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
703 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
704 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
705 
706 	g_reduce_errno = -1;
707 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
708 	CU_ASSERT(g_reduce_errno == 0);
709 
710 	persistent_pm_buf_destroy();
711 	backing_dev_destroy(&backing_dev);
712 }
713 
714 static void
715 load(void)
716 {
717 	_load(512);
718 	_load(4096);
719 }
720 
721 static uint64_t
722 _vol_get_chunk_map_index(struct spdk_reduce_vol *vol, uint64_t offset)
723 {
724 	uint64_t logical_map_index = offset / vol->logical_blocks_per_chunk;
725 
726 	return vol->pm_logical_map[logical_map_index];
727 }
728 
729 static void
730 write_cb(void *arg, int reduce_errno)
731 {
732 	g_reduce_errno = reduce_errno;
733 }
734 
735 static void
736 read_cb(void *arg, int reduce_errno)
737 {
738 	g_reduce_errno = reduce_errno;
739 }
740 
741 static void
742 _write_maps(uint32_t backing_blocklen)
743 {
744 	struct spdk_reduce_vol_params params = {};
745 	struct spdk_reduce_backing_dev backing_dev = {};
746 	struct iovec iov;
747 	const int bufsize = 16 * 1024; /* chunk size */
748 	char buf[bufsize];
749 	uint32_t num_lbas, i;
750 	uint64_t old_chunk0_map_index, new_chunk0_map_index;
751 	struct spdk_reduce_chunk_map *old_chunk0_map, *new_chunk0_map;
752 
753 	params.chunk_size = bufsize;
754 	params.backing_io_unit_size = 4096;
755 	params.logical_block_size = 512;
756 	num_lbas = bufsize / params.logical_block_size;
757 	spdk_uuid_generate(&params.uuid);
758 
759 	backing_dev_init(&backing_dev, &params, backing_blocklen);
760 
761 	g_vol = NULL;
762 	g_reduce_errno = -1;
763 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
764 	CU_ASSERT(g_reduce_errno == 0);
765 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
766 
767 	for (i = 0; i < g_vol->params.vol_size / g_vol->params.chunk_size; i++) {
768 		CU_ASSERT(_vol_get_chunk_map_index(g_vol, i) == REDUCE_EMPTY_MAP_ENTRY);
769 	}
770 
771 	ut_build_data_buffer(buf, bufsize, 0x00, 1);
772 	iov.iov_base = buf;
773 	iov.iov_len = bufsize;
774 	g_reduce_errno = -1;
775 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, num_lbas, write_cb, NULL);
776 	CU_ASSERT(g_reduce_errno == 0);
777 
778 	old_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
779 	CU_ASSERT(old_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
780 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == true);
781 
782 	old_chunk0_map = _reduce_vol_get_chunk_map(g_vol, old_chunk0_map_index);
783 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
784 		CU_ASSERT(old_chunk0_map->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
785 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
786 					     old_chunk0_map->io_unit_index[i]) == true);
787 	}
788 
789 	g_reduce_errno = -1;
790 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, num_lbas, write_cb, NULL);
791 	CU_ASSERT(g_reduce_errno == 0);
792 
793 	new_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
794 	CU_ASSERT(new_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
795 	CU_ASSERT(new_chunk0_map_index != old_chunk0_map_index);
796 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, new_chunk0_map_index) == true);
797 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == false);
798 
799 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
800 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
801 					     old_chunk0_map->io_unit_index[i]) == false);
802 	}
803 
804 	new_chunk0_map = _reduce_vol_get_chunk_map(g_vol, new_chunk0_map_index);
805 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
806 		CU_ASSERT(new_chunk0_map->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
807 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
808 					     new_chunk0_map->io_unit_index[i]) == true);
809 	}
810 
811 	g_reduce_errno = -1;
812 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
813 	CU_ASSERT(g_reduce_errno == 0);
814 
815 	g_vol = NULL;
816 	g_reduce_errno = -1;
817 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
818 	CU_ASSERT(g_reduce_errno == 0);
819 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
820 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
821 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
822 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
823 
824 	g_reduce_errno = -1;
825 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
826 	CU_ASSERT(g_reduce_errno == 0);
827 
828 	persistent_pm_buf_destroy();
829 	backing_dev_destroy(&backing_dev);
830 }
831 
832 static void
833 write_maps(void)
834 {
835 	_write_maps(512);
836 	_write_maps(4096);
837 }
838 
839 static void
840 _read_write(uint32_t backing_blocklen)
841 {
842 	struct spdk_reduce_vol_params params = {};
843 	struct spdk_reduce_backing_dev backing_dev = {};
844 	struct iovec iov;
845 	char buf[16 * 1024]; /* chunk size */
846 	char compare_buf[16 * 1024];
847 	uint32_t i;
848 
849 	params.chunk_size = 16 * 1024;
850 	params.backing_io_unit_size = 4096;
851 	params.logical_block_size = 512;
852 	spdk_uuid_generate(&params.uuid);
853 
854 	backing_dev_init(&backing_dev, &params, backing_blocklen);
855 
856 	g_vol = NULL;
857 	g_reduce_errno = -1;
858 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
859 	CU_ASSERT(g_reduce_errno == 0);
860 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
861 
862 	/* Write 0xAA to 2 512-byte logical blocks, starting at LBA 2. */
863 	memset(buf, 0xAA, 2 * params.logical_block_size);
864 	iov.iov_base = buf;
865 	iov.iov_len = 2 * params.logical_block_size;
866 	g_reduce_errno = -1;
867 	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
868 	CU_ASSERT(g_reduce_errno == 0);
869 
870 	memset(compare_buf, 0xAA, sizeof(compare_buf));
871 	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
872 		memset(buf, 0xFF, params.logical_block_size);
873 		iov.iov_base = buf;
874 		iov.iov_len = params.logical_block_size;
875 		g_reduce_errno = -1;
876 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
877 		CU_ASSERT(g_reduce_errno == 0);
878 
879 		switch (i) {
880 		case 2:
881 		case 3:
882 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
883 			break;
884 		default:
885 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
886 			break;
887 		}
888 	}
889 
890 	g_reduce_errno = -1;
891 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
892 	CU_ASSERT(g_reduce_errno == 0);
893 
894 	/* Overwrite what we just wrote with 0xCC */
895 	g_vol = NULL;
896 	g_reduce_errno = -1;
897 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
898 	CU_ASSERT(g_reduce_errno == 0);
899 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
900 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
901 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
902 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
903 
904 	memset(buf, 0xCC, 2 * params.logical_block_size);
905 	iov.iov_base = buf;
906 	iov.iov_len = 2 * params.logical_block_size;
907 	g_reduce_errno = -1;
908 	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
909 	CU_ASSERT(g_reduce_errno == 0);
910 
911 	memset(compare_buf, 0xCC, sizeof(compare_buf));
912 	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
913 		memset(buf, 0xFF, params.logical_block_size);
914 		iov.iov_base = buf;
915 		iov.iov_len = params.logical_block_size;
916 		g_reduce_errno = -1;
917 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
918 		CU_ASSERT(g_reduce_errno == 0);
919 
920 		switch (i) {
921 		case 2:
922 		case 3:
923 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
924 			break;
925 		default:
926 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
927 			break;
928 		}
929 	}
930 
931 	g_reduce_errno = -1;
932 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
933 	CU_ASSERT(g_reduce_errno == 0);
934 
935 	g_vol = NULL;
936 	g_reduce_errno = -1;
937 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
938 	CU_ASSERT(g_reduce_errno == 0);
939 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
940 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
941 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
942 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
943 
944 	g_reduce_errno = -1;
945 
946 	/* Write 0xBB to 2 512-byte logical blocks, starting at LBA 37.
947 	 * This is writing into the second chunk of the volume.  This also
948 	 * enables implicitly checking that we reloaded the bit arrays
949 	 * correctly - making sure we don't use the first chunk map again
950 	 * for this new write - the first chunk map was already used by the
951 	 * write from before we unloaded and reloaded.
952 	 */
953 	memset(buf, 0xBB, 2 * params.logical_block_size);
954 	iov.iov_base = buf;
955 	iov.iov_len = 2 * params.logical_block_size;
956 	g_reduce_errno = -1;
957 	spdk_reduce_vol_writev(g_vol, &iov, 1, 37, 2, write_cb, NULL);
958 	CU_ASSERT(g_reduce_errno == 0);
959 
960 	for (i = 0; i < 2 * params.chunk_size / params.logical_block_size; i++) {
961 		memset(buf, 0xFF, params.logical_block_size);
962 		iov.iov_base = buf;
963 		iov.iov_len = params.logical_block_size;
964 		g_reduce_errno = -1;
965 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
966 		CU_ASSERT(g_reduce_errno == 0);
967 
968 		switch (i) {
969 		case 2:
970 		case 3:
971 			memset(compare_buf, 0xCC, sizeof(compare_buf));
972 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
973 			break;
974 		case 37:
975 		case 38:
976 			memset(compare_buf, 0xBB, sizeof(compare_buf));
977 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
978 			break;
979 		default:
980 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
981 			break;
982 		}
983 	}
984 
985 	g_reduce_errno = -1;
986 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
987 	CU_ASSERT(g_reduce_errno == 0);
988 
989 	persistent_pm_buf_destroy();
990 	backing_dev_destroy(&backing_dev);
991 }
992 
993 static void
994 read_write(void)
995 {
996 	_read_write(512);
997 	_read_write(4096);
998 }
999 
1000 static void
1001 _readv_writev(uint32_t backing_blocklen)
1002 {
1003 	struct spdk_reduce_vol_params params = {};
1004 	struct spdk_reduce_backing_dev backing_dev = {};
1005 	struct iovec iov[REDUCE_MAX_IOVECS + 1];
1006 
1007 	params.chunk_size = 16 * 1024;
1008 	params.backing_io_unit_size = 4096;
1009 	params.logical_block_size = 512;
1010 	spdk_uuid_generate(&params.uuid);
1011 
1012 	backing_dev_init(&backing_dev, &params, backing_blocklen);
1013 
1014 	g_vol = NULL;
1015 	g_reduce_errno = -1;
1016 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1017 	CU_ASSERT(g_reduce_errno == 0);
1018 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1019 
1020 	g_reduce_errno = -1;
1021 	spdk_reduce_vol_writev(g_vol, iov, REDUCE_MAX_IOVECS + 1, 2, REDUCE_MAX_IOVECS + 1, write_cb, NULL);
1022 	CU_ASSERT(g_reduce_errno == -EINVAL);
1023 
1024 	g_reduce_errno = -1;
1025 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1026 	CU_ASSERT(g_reduce_errno == 0);
1027 
1028 	persistent_pm_buf_destroy();
1029 	backing_dev_destroy(&backing_dev);
1030 }
1031 
1032 static void
1033 readv_writev(void)
1034 {
1035 	_readv_writev(512);
1036 	_readv_writev(4096);
1037 }
1038 
1039 static void
1040 destroy_cb(void *ctx, int reduce_errno)
1041 {
1042 	g_reduce_errno = reduce_errno;
1043 }
1044 
1045 static void
1046 destroy(void)
1047 {
1048 	struct spdk_reduce_vol_params params = {};
1049 	struct spdk_reduce_backing_dev backing_dev = {};
1050 
1051 	params.chunk_size = 16 * 1024;
1052 	params.backing_io_unit_size = 512;
1053 	params.logical_block_size = 512;
1054 	spdk_uuid_generate(&params.uuid);
1055 
1056 	backing_dev_init(&backing_dev, &params, 512);
1057 
1058 	g_vol = NULL;
1059 	g_reduce_errno = -1;
1060 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1061 	CU_ASSERT(g_reduce_errno == 0);
1062 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1063 
1064 	g_reduce_errno = -1;
1065 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1066 	CU_ASSERT(g_reduce_errno == 0);
1067 
1068 	g_vol = NULL;
1069 	g_reduce_errno = -1;
1070 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
1071 	CU_ASSERT(g_reduce_errno == 0);
1072 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1073 
1074 	g_reduce_errno = -1;
1075 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1076 	CU_ASSERT(g_reduce_errno == 0);
1077 
1078 	g_reduce_errno = -1;
1079 	MOCK_CLEAR(spdk_malloc);
1080 	MOCK_CLEAR(spdk_zmalloc);
1081 	spdk_reduce_vol_destroy(&backing_dev, destroy_cb, NULL);
1082 	CU_ASSERT(g_reduce_errno == 0);
1083 
1084 	g_reduce_errno = 0;
1085 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
1086 	CU_ASSERT(g_reduce_errno == -EILSEQ);
1087 
1088 	backing_dev_destroy(&backing_dev);
1089 }
1090 
1091 /* This test primarily checks that the reduce unit test infrastructure for asynchronous
1092  * backing device I/O operations is working correctly.
1093  */
1094 static void
1095 defer_bdev_io(void)
1096 {
1097 	struct spdk_reduce_vol_params params = {};
1098 	struct spdk_reduce_backing_dev backing_dev = {};
1099 	const uint32_t logical_block_size = 512;
1100 	struct iovec iov;
1101 	char buf[logical_block_size];
1102 	char compare_buf[logical_block_size];
1103 
1104 	params.chunk_size = 16 * 1024;
1105 	params.backing_io_unit_size = 4096;
1106 	params.logical_block_size = logical_block_size;
1107 	spdk_uuid_generate(&params.uuid);
1108 
1109 	backing_dev_init(&backing_dev, &params, 512);
1110 
1111 	g_vol = NULL;
1112 	g_reduce_errno = -1;
1113 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1114 	CU_ASSERT(g_reduce_errno == 0);
1115 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1116 
1117 	/* Write 0xAA to 1 512-byte logical block. */
1118 	memset(buf, 0xAA, params.logical_block_size);
1119 	iov.iov_base = buf;
1120 	iov.iov_len = params.logical_block_size;
1121 	g_reduce_errno = -100;
1122 	g_defer_bdev_io = true;
1123 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
1124 	/* Callback should not have executed, so this should still equal -100. */
1125 	CU_ASSERT(g_reduce_errno == -100);
1126 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1127 	/* We wrote to just 512 bytes of one chunk which was previously unallocated.  This
1128 	 * should result in 1 pending I/O since the rest of this chunk will be zeroes and
1129 	 * very compressible.
1130 	 */
1131 	CU_ASSERT(g_pending_bdev_io_count == 1);
1132 
1133 	backing_dev_io_execute(0);
1134 	CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
1135 	CU_ASSERT(g_reduce_errno == 0);
1136 
1137 	g_defer_bdev_io = false;
1138 	memset(compare_buf, 0xAA, sizeof(compare_buf));
1139 	memset(buf, 0xFF, sizeof(buf));
1140 	iov.iov_base = buf;
1141 	iov.iov_len = params.logical_block_size;
1142 	g_reduce_errno = -100;
1143 	spdk_reduce_vol_readv(g_vol, &iov, 1, 0, 1, read_cb, NULL);
1144 	CU_ASSERT(g_reduce_errno == 0);
1145 	CU_ASSERT(memcmp(buf, compare_buf, sizeof(buf)) == 0);
1146 
1147 	g_reduce_errno = -1;
1148 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1149 	CU_ASSERT(g_reduce_errno == 0);
1150 
1151 	persistent_pm_buf_destroy();
1152 	backing_dev_destroy(&backing_dev);
1153 }
1154 
1155 static void
1156 overlapped(void)
1157 {
1158 	struct spdk_reduce_vol_params params = {};
1159 	struct spdk_reduce_backing_dev backing_dev = {};
1160 	const uint32_t logical_block_size = 512;
1161 	struct iovec iov;
1162 	char buf[2 * logical_block_size];
1163 	char compare_buf[2 * logical_block_size];
1164 
1165 	params.chunk_size = 16 * 1024;
1166 	params.backing_io_unit_size = 4096;
1167 	params.logical_block_size = logical_block_size;
1168 	spdk_uuid_generate(&params.uuid);
1169 
1170 	backing_dev_init(&backing_dev, &params, 512);
1171 
1172 	g_vol = NULL;
1173 	g_reduce_errno = -1;
1174 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1175 	CU_ASSERT(g_reduce_errno == 0);
1176 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1177 
1178 	/* Write 0xAA to 1 512-byte logical block. */
1179 	memset(buf, 0xAA, logical_block_size);
1180 	iov.iov_base = buf;
1181 	iov.iov_len = logical_block_size;
1182 	g_reduce_errno = -100;
1183 	g_defer_bdev_io = true;
1184 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
1185 	/* Callback should not have executed, so this should still equal -100. */
1186 	CU_ASSERT(g_reduce_errno == -100);
1187 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1188 	/* We wrote to just 512 bytes of one chunk which was previously unallocated.  This
1189 	 * should result in 1 pending I/O since the rest of this chunk will be zeroes and
1190 	 * very compressible.
1191 	 */
1192 	CU_ASSERT(g_pending_bdev_io_count == 1);
1193 
1194 	/* Now do an overlapped I/O to the same chunk. */
1195 	spdk_reduce_vol_writev(g_vol, &iov, 1, 1, 1, write_cb, NULL);
1196 	/* Callback should not have executed, so this should still equal -100. */
1197 	CU_ASSERT(g_reduce_errno == -100);
1198 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1199 	/* The second I/O overlaps with the first one.  So we should only see pending bdev_io
1200 	 * related to the first I/O here - the second one won't start until the first one is completed.
1201 	 */
1202 	CU_ASSERT(g_pending_bdev_io_count == 1);
1203 
1204 	backing_dev_io_execute(0);
1205 	CU_ASSERT(g_reduce_errno == 0);
1206 
1207 	g_defer_bdev_io = false;
1208 	memset(compare_buf, 0xAA, sizeof(compare_buf));
1209 	memset(buf, 0xFF, sizeof(buf));
1210 	iov.iov_base = buf;
1211 	iov.iov_len = 2 * logical_block_size;
1212 	g_reduce_errno = -100;
1213 	spdk_reduce_vol_readv(g_vol, &iov, 1, 0, 2, read_cb, NULL);
1214 	CU_ASSERT(g_reduce_errno == 0);
1215 	CU_ASSERT(memcmp(buf, compare_buf, 2 * logical_block_size) == 0);
1216 
1217 	g_reduce_errno = -1;
1218 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1219 	CU_ASSERT(g_reduce_errno == 0);
1220 
1221 	persistent_pm_buf_destroy();
1222 	backing_dev_destroy(&backing_dev);
1223 }
1224 
1225 #define BUFSIZE 4096
1226 
1227 static void
1228 compress_algorithm(void)
1229 {
1230 	uint8_t original_data[BUFSIZE];
1231 	uint8_t compressed_data[BUFSIZE];
1232 	uint8_t decompressed_data[BUFSIZE];
1233 	uint32_t compressed_len, decompressed_len;
1234 	int rc;
1235 
1236 	ut_build_data_buffer(original_data, BUFSIZE, 0xAA, BUFSIZE);
1237 	compressed_len = sizeof(compressed_data);
1238 	rc = ut_compress(compressed_data, &compressed_len, original_data, UINT8_MAX);
1239 	CU_ASSERT(rc == 0);
1240 	CU_ASSERT(compressed_len == 2);
1241 	CU_ASSERT(compressed_data[0] == UINT8_MAX);
1242 	CU_ASSERT(compressed_data[1] == 0xAA);
1243 
1244 	decompressed_len = sizeof(decompressed_data);
1245 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1246 	CU_ASSERT(rc == 0);
1247 	CU_ASSERT(decompressed_len == UINT8_MAX);
1248 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1249 
1250 	compressed_len = sizeof(compressed_data);
1251 	rc = ut_compress(compressed_data, &compressed_len, original_data, UINT8_MAX + 1);
1252 	CU_ASSERT(rc == 0);
1253 	CU_ASSERT(compressed_len == 4);
1254 	CU_ASSERT(compressed_data[0] == UINT8_MAX);
1255 	CU_ASSERT(compressed_data[1] == 0xAA);
1256 	CU_ASSERT(compressed_data[2] == 1);
1257 	CU_ASSERT(compressed_data[3] == 0xAA);
1258 
1259 	decompressed_len = sizeof(decompressed_data);
1260 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1261 	CU_ASSERT(rc == 0);
1262 	CU_ASSERT(decompressed_len == UINT8_MAX + 1);
1263 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1264 
1265 	ut_build_data_buffer(original_data, BUFSIZE, 0x00, 1);
1266 	compressed_len = sizeof(compressed_data);
1267 	rc = ut_compress(compressed_data, &compressed_len, original_data, 2048);
1268 	CU_ASSERT(rc == 0);
1269 	CU_ASSERT(compressed_len == 4096);
1270 	CU_ASSERT(compressed_data[0] == 1);
1271 	CU_ASSERT(compressed_data[1] == 0);
1272 	CU_ASSERT(compressed_data[4094] == 1);
1273 	CU_ASSERT(compressed_data[4095] == 0xFF);
1274 
1275 	decompressed_len = sizeof(decompressed_data);
1276 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1277 	CU_ASSERT(rc == 0);
1278 	CU_ASSERT(decompressed_len == 2048);
1279 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1280 
1281 	compressed_len = sizeof(compressed_data);
1282 	rc = ut_compress(compressed_data, &compressed_len, original_data, 2049);
1283 	CU_ASSERT(rc == -ENOSPC);
1284 }
1285 
1286 static void
1287 test_prepare_compress_chunk(void)
1288 {
1289 	struct spdk_reduce_vol vol = {};
1290 	struct spdk_reduce_backing_dev backing_dev = {};
1291 	struct spdk_reduce_vol_request req = {};
1292 	void *buf;
1293 	char *buffer_end, *aligned_user_buffer, *unaligned_user_buffer;
1294 	char decomp_buffer[16 * 1024] = {};
1295 	char comp_buffer[16 * 1024] = {};
1296 	struct iovec user_iov[2] = {};
1297 	size_t user_buffer_iov_len = 8192;
1298 	size_t remainder_bytes;
1299 	size_t offset_bytes;
1300 	size_t memcmp_offset;
1301 	uint32_t i;
1302 
1303 	vol.params.chunk_size = 16 * 1024;
1304 	vol.params.backing_io_unit_size = 4096;
1305 	vol.params.logical_block_size = 512;
1306 	backing_dev_init(&backing_dev, &vol.params, 512);
1307 	vol.backing_dev = &backing_dev;
1308 	vol.logical_blocks_per_chunk = vol.params.chunk_size / vol.params.logical_block_size;
1309 
1310 	/* Allocate 1 extra byte to test a case when buffer crosses huge page boundary */
1311 	SPDK_CU_ASSERT_FATAL(posix_memalign(&buf, VALUE_2MB, VALUE_2MB + 1) == 0);
1312 	buffer_end = (char *)buf + VALUE_2MB + 1;
1313 	aligned_user_buffer = (char *)buf;
1314 	memset(aligned_user_buffer, 0xc, vol.params.chunk_size);
1315 	unaligned_user_buffer = buffer_end - vol.params.chunk_size;
1316 	memset(unaligned_user_buffer, 0xc, vol.params.chunk_size);
1317 
1318 	req.vol = &vol;
1319 	req.decomp_buf = decomp_buffer;
1320 	req.comp_buf = comp_buffer;
1321 	req.iov = user_iov;
1322 	req.iovcnt = 2;
1323 	req.offset = 0;
1324 
1325 	/* Part 1 - backing dev supports sgl_in */
1326 	/* Test 1 - user's buffers length equals to chunk_size */
1327 	for (i = 0; i < 2; i++) {
1328 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1329 		req.iov[i].iov_len = user_buffer_iov_len;
1330 	}
1331 
1332 	_prepare_compress_chunk(&req, false);
1333 	CU_ASSERT(req.decomp_iovcnt == 2);
1334 	for (i = 0; i < 2; i++) {
1335 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1336 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1337 	}
1338 
1339 	_prepare_compress_chunk(&req, true);
1340 	CU_ASSERT(req.decomp_iovcnt == 2);
1341 	for (i = 0; i < 2; i++) {
1342 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1343 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1344 	}
1345 
1346 	/* Test 2 - user's buffer less than chunk_size, without offset */
1347 	user_buffer_iov_len = 4096;
1348 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1349 	for (i = 0; i < 2; i++) {
1350 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1351 		req.iov[i].iov_len = user_buffer_iov_len;
1352 	}
1353 
1354 	_prepare_compress_chunk(&req, false);
1355 	CU_ASSERT(req.decomp_iovcnt == 3);
1356 	for (i = 0; i < 2; i++) {
1357 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1358 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1359 	}
1360 	CU_ASSERT(req.decomp_iov[i].iov_base == req.decomp_buf + user_buffer_iov_len * 2);
1361 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1362 
1363 	_prepare_compress_chunk(&req, true);
1364 	CU_ASSERT(req.decomp_iovcnt == 3);
1365 	for (i = 0; i < 2; i++) {
1366 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1367 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1368 	}
1369 	CU_ASSERT(req.decomp_iov[i].iov_base == g_zero_buf + user_buffer_iov_len * 2);
1370 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1371 
1372 	/* Test 3 - user's buffer less than chunk_size, non zero offset */
1373 	user_buffer_iov_len = 4096;
1374 	req.offset = 3;
1375 	offset_bytes = req.offset * vol.params.logical_block_size;
1376 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1377 
1378 	_prepare_compress_chunk(&req, false);
1379 	CU_ASSERT(req.decomp_iovcnt == 4);
1380 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1381 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1382 	for (i = 0; i < 2; i++) {
1383 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1384 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1385 	}
1386 	CU_ASSERT(req.decomp_iov[3].iov_base == req.decomp_buf + offset_bytes + user_buffer_iov_len * 2);
1387 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1388 
1389 	_prepare_compress_chunk(&req, true);
1390 	CU_ASSERT(req.decomp_iovcnt == 4);
1391 	CU_ASSERT(req.decomp_iov[0].iov_base == g_zero_buf);
1392 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1393 	for (i = 0; i < 2; i++) {
1394 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1395 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1396 	}
1397 	CU_ASSERT(req.decomp_iov[3].iov_base == g_zero_buf + offset_bytes + user_buffer_iov_len * 2);
1398 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1399 
1400 	/* Part 2 - backing dev doesn't support sgl_in */
1401 	/* Test 1 - user's buffers length equals to chunk_size
1402 	 * user's buffers are copied */
1403 	vol.backing_dev->sgl_in = false;
1404 	req.offset = 0;
1405 	user_buffer_iov_len = 8192;
1406 	for (i = 0; i < 2; i++) {
1407 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1408 		req.iov[i].iov_len = user_buffer_iov_len;
1409 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1410 	}
1411 
1412 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1413 
1414 	_prepare_compress_chunk(&req, false);
1415 	CU_ASSERT(req.decomp_iovcnt == 1);
1416 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1417 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1418 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base, req.iov[0].iov_len) == 0);
1419 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + req.iov[0].iov_len, req.iov[1].iov_base,
1420 			 req.iov[1].iov_len) == 0);
1421 
1422 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1423 
1424 	_prepare_compress_chunk(&req, true);
1425 	CU_ASSERT(req.decomp_iovcnt == 1);
1426 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1427 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1428 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base, req.iov[0].iov_len) == 0);
1429 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + req.iov[0].iov_len, req.iov[1].iov_base,
1430 			 req.iov[1].iov_len) == 0);
1431 
1432 	/* Test 2 - single user's buffer length equals to chunk_size, buffer is not aligned
1433 	* User's buffer is copied */
1434 	req.iov[0].iov_base = unaligned_user_buffer;
1435 	req.iov[0].iov_len = vol.params.chunk_size;
1436 	req.iovcnt = 1;
1437 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1438 
1439 	_prepare_compress_chunk(&req, false);
1440 	CU_ASSERT(req.decomp_iovcnt == 1);
1441 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1442 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1443 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base,
1444 			 req.iov[0].iov_len) == 0);
1445 
1446 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1447 
1448 	_prepare_compress_chunk(&req, true);
1449 	CU_ASSERT(req.decomp_iovcnt == 1);
1450 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1451 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1452 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base,
1453 			 req.iov[0].iov_len) == 0);
1454 
1455 	/* Test 3 - single user's buffer length equals to chunk_size
1456 	 * User's buffer is not copied */
1457 	req.iov[0].iov_base = aligned_user_buffer;
1458 	req.iov[0].iov_len = vol.params.chunk_size;
1459 	req.iovcnt = 1;
1460 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1461 
1462 	_prepare_compress_chunk(&req, false);
1463 	CU_ASSERT(req.decomp_iovcnt == 1);
1464 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1465 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1466 
1467 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1468 
1469 	_prepare_compress_chunk(&req, true);
1470 	CU_ASSERT(req.decomp_iovcnt == 1);
1471 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1472 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1473 
1474 	/* Test 4 - user's buffer less than chunk_size, without offset
1475 	 * User's buffers are copied */
1476 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1477 	user_buffer_iov_len = 4096;
1478 	req.iovcnt = 2;
1479 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1480 	for (i = 0; i < 2; i++) {
1481 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1482 		req.iov[i].iov_len = user_buffer_iov_len;
1483 	}
1484 
1485 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1486 
1487 	_prepare_compress_chunk(&req, false);
1488 	CU_ASSERT(req.decomp_iovcnt == 1);
1489 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1490 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1491 	memcmp_offset = 0;
1492 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1493 			 req.iov[0].iov_len) == 0);
1494 	memcmp_offset += req.iov[0].iov_len;
1495 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1496 			 req.iov[1].iov_len) == 0);
1497 	memcmp_offset += req.iov[0].iov_len;
1498 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf + memcmp_offset,
1499 			 remainder_bytes) == 0);
1500 
1501 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1502 
1503 	_prepare_compress_chunk(&req, true);
1504 	CU_ASSERT(req.decomp_iovcnt == 1);
1505 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1506 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1507 	memcmp_offset = 0;
1508 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1509 			 req.iov[0].iov_len) == 0);
1510 	memcmp_offset += req.iov[0].iov_len;
1511 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1512 			 req.iov[1].iov_len) == 0);
1513 	memcmp_offset += req.iov[0].iov_len;
1514 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf + memcmp_offset,
1515 			 remainder_bytes) == 0);
1516 
1517 	/* Test 5 - user's buffer less than chunk_size, non zero offset
1518 	 * user's buffers are copied */
1519 	req.offset = 3;
1520 	offset_bytes = req.offset * vol.params.logical_block_size;
1521 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1522 
1523 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1524 
1525 	_prepare_compress_chunk(&req, false);
1526 	CU_ASSERT(req.decomp_iovcnt == 1);
1527 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1528 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1529 	memcmp_offset = 0;
1530 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf, offset_bytes) == 0);
1531 	memcmp_offset += offset_bytes;
1532 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1533 			 req.iov[0].iov_len) == 0);
1534 	memcmp_offset += req.iov[0].iov_len;
1535 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1536 			 req.iov[1].iov_len) == 0);
1537 	memcmp_offset += req.iov[1].iov_len;
1538 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf + memcmp_offset,
1539 			 remainder_bytes) == 0);
1540 
1541 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1542 
1543 	_prepare_compress_chunk(&req, true);
1544 	CU_ASSERT(req.decomp_iovcnt == 1);
1545 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1546 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1547 	memcmp_offset = 0;
1548 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf, offset_bytes) == 0);
1549 	memcmp_offset += offset_bytes;
1550 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1551 			 req.iov[0].iov_len) == 0);
1552 	memcmp_offset += req.iov[0].iov_len;
1553 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1554 			 req.iov[1].iov_len) == 0);
1555 	memcmp_offset += req.iov[1].iov_len;
1556 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf + memcmp_offset,
1557 			 remainder_bytes) == 0);
1558 	backing_dev_destroy(&backing_dev);
1559 	free(buf);
1560 }
1561 
1562 static void
1563 _reduce_vol_op_complete(void *ctx, int reduce_errno)
1564 {
1565 	g_reduce_errno = reduce_errno;
1566 }
1567 
1568 static void
1569 dummy_backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
1570 			     struct iovec *src_iov, int src_iovcnt,
1571 			     struct iovec *dst_iov, int dst_iovcnt,
1572 			     struct spdk_reduce_vol_cb_args *args)
1573 {
1574 	args->output_size = g_decompressed_len;
1575 	args->cb_fn(args->cb_arg, 0);
1576 }
1577 static void
1578 test_reduce_decompress_chunk(void)
1579 {
1580 	struct spdk_reduce_vol vol = {};
1581 	struct spdk_reduce_backing_dev backing_dev = {};
1582 	struct spdk_reduce_vol_request req = {};
1583 	void *buf;
1584 	char *buffer_end, *aligned_user_buffer, *unaligned_user_buffer;
1585 	char decomp_buffer[16 * 1024] = {};
1586 	char comp_buffer[16 * 1024] = {};
1587 	struct iovec user_iov[2] = {};
1588 	struct iovec comp_buf_iov = {};
1589 	struct spdk_reduce_chunk_map chunk = {};
1590 	size_t user_buffer_iov_len = 8192;
1591 	size_t remainder_bytes;
1592 	size_t offset_bytes;
1593 	uint32_t i;
1594 
1595 	vol.params.chunk_size = 16 * 1024;
1596 	vol.params.backing_io_unit_size = 4096;
1597 	vol.params.logical_block_size = 512;
1598 	backing_dev_init(&backing_dev, &vol.params, 512);
1599 	backing_dev.decompress = dummy_backing_dev_decompress;
1600 	vol.backing_dev = &backing_dev;
1601 	vol.logical_blocks_per_chunk = vol.params.chunk_size / vol.params.logical_block_size;
1602 	RB_INIT(&vol.executing_requests);
1603 	TAILQ_INIT(&vol.queued_requests);
1604 	TAILQ_INIT(&vol.free_requests);
1605 
1606 	/* Allocate 1 extra byte to test a case when buffer crosses huge page boundary */
1607 	SPDK_CU_ASSERT_FATAL(posix_memalign(&buf, VALUE_2MB, VALUE_2MB + 1) == 0);
1608 	buffer_end = (char *)buf + VALUE_2MB + 1;
1609 	aligned_user_buffer = (char *)buf;
1610 	unaligned_user_buffer = buffer_end - vol.params.chunk_size;
1611 
1612 	chunk.compressed_size = user_buffer_iov_len / 2;
1613 	req.chunk = &chunk;
1614 	req.vol = &vol;
1615 	req.decomp_buf = decomp_buffer;
1616 	req.comp_buf = comp_buffer;
1617 	req.comp_buf_iov = &comp_buf_iov;
1618 	req.iov = user_iov;
1619 	req.iovcnt = 2;
1620 	req.offset = 0;
1621 	req.cb_fn = _reduce_vol_op_complete;
1622 
1623 	/* Part 1 - backing dev supports sgl_out */
1624 	/* Test 1 - user's buffers length equals to chunk_size */
1625 	for (i = 0; i < 2; i++) {
1626 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1627 		req.iov[i].iov_len = user_buffer_iov_len;
1628 		memset(req.iov[i].iov_base, 0, req.iov[i].iov_len);
1629 	}
1630 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1631 	g_reduce_errno = -1;
1632 	g_decompressed_len = vol.params.chunk_size;
1633 
1634 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1635 	CU_ASSERT(g_reduce_errno == 0);
1636 	CU_ASSERT(req.copy_after_decompress == false);
1637 	CU_ASSERT(req.decomp_iovcnt == 2);
1638 	for (i = 0; i < 2; i++) {
1639 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1640 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1641 	}
1642 	CU_ASSERT(RB_EMPTY(&vol.executing_requests));
1643 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1644 
1645 	/* Test 2 - user's buffer less than chunk_size, without offset */
1646 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1647 	g_reduce_errno = -1;
1648 	user_buffer_iov_len = 4096;
1649 	for (i = 0; i < 2; i++) {
1650 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1651 		req.iov[i].iov_len = user_buffer_iov_len;
1652 		memset(req.iov[i].iov_base, 0, req.iov[i].iov_len);
1653 	}
1654 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1655 
1656 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1657 	CU_ASSERT(g_reduce_errno == 0);
1658 	CU_ASSERT(req.copy_after_decompress == false);
1659 	CU_ASSERT(req.decomp_iovcnt == 3);
1660 	for (i = 0; i < 2; i++) {
1661 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1662 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1663 	}
1664 	CU_ASSERT(req.decomp_iov[i].iov_base == req.decomp_buf + user_buffer_iov_len * 2);
1665 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1666 	CU_ASSERT(RB_EMPTY(&vol.executing_requests));
1667 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1668 
1669 	/* Test 3 - user's buffer less than chunk_size, non zero offset */
1670 	req.offset = 3;
1671 	offset_bytes = req.offset * vol.params.logical_block_size;
1672 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1673 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1674 	g_reduce_errno = -1;
1675 
1676 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1677 	CU_ASSERT(g_reduce_errno == 0);
1678 	CU_ASSERT(req.copy_after_decompress == false);
1679 	CU_ASSERT(req.decomp_iovcnt == 4);
1680 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1681 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1682 	for (i = 0; i < 2; i++) {
1683 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1684 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1685 	}
1686 	CU_ASSERT(req.decomp_iov[3].iov_base == req.decomp_buf + offset_bytes + user_buffer_iov_len * 2);
1687 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1688 	CU_ASSERT(RB_EMPTY(&vol.executing_requests));
1689 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1690 
1691 	/* Part 2 - backing dev doesn't support sgl_out */
1692 	/* Test 1 - user's buffers length equals to chunk_size
1693 	 * user's buffers are copied */
1694 	vol.backing_dev->sgl_out = false;
1695 	req.offset = 0;
1696 	user_buffer_iov_len = 8192;
1697 
1698 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1699 	for (i = 0; i < 2; i++) {
1700 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1701 		req.iov[i].iov_len = user_buffer_iov_len;
1702 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1703 	}
1704 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1705 	g_reduce_errno = -1;
1706 
1707 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1708 	CU_ASSERT(g_reduce_errno == 0);
1709 	CU_ASSERT(req.copy_after_decompress == true);
1710 	CU_ASSERT(req.decomp_iovcnt == 1);
1711 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1712 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1713 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base, req.iov[0].iov_len) == 0);
1714 	CU_ASSERT(memcmp(req.iov[1].iov_base, req.decomp_iov[0].iov_base + req.iov[0].iov_len,
1715 			 req.iov[1].iov_len) == 0);
1716 	CU_ASSERT(RB_EMPTY(&vol.executing_requests));
1717 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1718 
1719 	/* Test 2 - single user's buffer length equals to chunk_size, buffer is not aligned
1720 	* User's buffer is copied */
1721 	memset(unaligned_user_buffer, 0xc, vol.params.chunk_size);
1722 	req.iov[0].iov_base = unaligned_user_buffer;
1723 	req.iov[0].iov_len = vol.params.chunk_size;
1724 	req.iovcnt = 1;
1725 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1726 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1727 	g_reduce_errno = -1;
1728 
1729 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1730 	CU_ASSERT(g_reduce_errno == 0);
1731 	CU_ASSERT(req.copy_after_decompress == true);
1732 	CU_ASSERT(req.decomp_iovcnt == 1);
1733 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1734 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1735 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base,
1736 			 req.iov[0].iov_len) == 0);
1737 
1738 	/* Test 3 - single user's buffer length equals to chunk_size
1739 	* User's buffer is not copied */
1740 	req.iov[0].iov_base = aligned_user_buffer;
1741 	req.iov[0].iov_len = vol.params.chunk_size;
1742 	req.iovcnt = 1;
1743 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1744 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1745 	g_reduce_errno = -1;
1746 
1747 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1748 	CU_ASSERT(g_reduce_errno == 0);
1749 	CU_ASSERT(req.copy_after_decompress == false);
1750 	CU_ASSERT(req.decomp_iovcnt == 1);
1751 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1752 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1753 
1754 	/* Test 4 - user's buffer less than chunk_size, without offset
1755 	 * User's buffers are copied */
1756 	user_buffer_iov_len = 4096;
1757 	req.iovcnt = 2;
1758 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1759 	for (i = 0; i < 2; i++) {
1760 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1761 		req.iov[i].iov_len = user_buffer_iov_len;
1762 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1763 	}
1764 
1765 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1766 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1767 	g_reduce_errno = -1;
1768 
1769 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1770 	CU_ASSERT(g_reduce_errno == 0);
1771 	CU_ASSERT(req.copy_after_decompress == true);
1772 	CU_ASSERT(req.decomp_iovcnt == 1);
1773 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1774 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1775 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base,
1776 			 req.iov[0].iov_len) == 0);
1777 	CU_ASSERT(memcmp(req.iov[1].iov_base, req.decomp_iov[0].iov_base + req.iov[0].iov_len,
1778 			 req.iov[1].iov_len) == 0);
1779 	CU_ASSERT(RB_EMPTY(&vol.executing_requests));
1780 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1781 
1782 	/* Test 5 - user's buffer less than chunk_size, non zero offset
1783 	* user's buffers are copied */
1784 	req.offset = 3;
1785 	offset_bytes = req.offset * vol.params.logical_block_size;
1786 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1787 
1788 	for (i = 0; i < 2; i++) {
1789 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1790 		req.iov[i].iov_len = user_buffer_iov_len;
1791 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1792 	}
1793 
1794 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1795 	RB_INSERT(executing_req_tree, &vol.executing_requests, &req);
1796 	g_reduce_errno = -1;
1797 
1798 	_prepare_compress_chunk(&req, false);
1799 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1800 	CU_ASSERT(g_reduce_errno == 0);
1801 	CU_ASSERT(req.copy_after_decompress == true);
1802 	CU_ASSERT(req.decomp_iovcnt == 1);
1803 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1804 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1805 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + offset_bytes, req.iov[0].iov_base,
1806 			 req.iov[0].iov_len) == 0);
1807 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + offset_bytes + req.iov[0].iov_len,
1808 			 req.iov[1].iov_base,
1809 			 req.iov[1].iov_len) == 0);
1810 	CU_ASSERT(RB_EMPTY(&vol.executing_requests));
1811 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1812 
1813 	free(buf);
1814 }
1815 
1816 static void
1817 test_allocate_vol_requests(void)
1818 {
1819 	struct spdk_reduce_vol *vol;
1820 	struct spdk_reduce_backing_dev backing_dev = {};
1821 	/* include chunk_sizes which are not power of 2 */
1822 	uint32_t chunk_sizes[] = {8192, 8320, 16384, 16416, 32768};
1823 	uint32_t io_unit_sizes[] = {512, 520, 4096, 4104, 4096};
1824 	uint32_t i;
1825 
1826 	/* bdev compress module can specify how big the user_ctx_size needs to be */
1827 	backing_dev.user_ctx_size = 64;
1828 	for (i = 0; i < 4; i++) {
1829 		vol = calloc(1, sizeof(*vol));
1830 		SPDK_CU_ASSERT_FATAL(vol);
1831 
1832 		vol->params.chunk_size = chunk_sizes[i];
1833 		vol->params.logical_block_size = io_unit_sizes[i];
1834 		vol->params.backing_io_unit_size = io_unit_sizes[i];
1835 		vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
1836 		vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
1837 		vol->backing_dev = &backing_dev;
1838 
1839 		CU_ASSERT(_validate_vol_params(&vol->params) == 0);
1840 		CU_ASSERT(_allocate_vol_requests(vol) == 0);
1841 		_init_load_cleanup(vol, NULL);
1842 	}
1843 }
1844 
1845 int
1846 main(int argc, char **argv)
1847 {
1848 	CU_pSuite	suite = NULL;
1849 	unsigned int	num_failures;
1850 
1851 	CU_initialize_registry();
1852 
1853 	suite = CU_add_suite("reduce", NULL, NULL);
1854 
1855 	CU_ADD_TEST(suite, get_pm_file_size);
1856 	CU_ADD_TEST(suite, get_vol_size);
1857 	CU_ADD_TEST(suite, init_failure);
1858 	CU_ADD_TEST(suite, init_md);
1859 	CU_ADD_TEST(suite, init_backing_dev);
1860 	CU_ADD_TEST(suite, load);
1861 	CU_ADD_TEST(suite, write_maps);
1862 	CU_ADD_TEST(suite, read_write);
1863 	CU_ADD_TEST(suite, readv_writev);
1864 	CU_ADD_TEST(suite, destroy);
1865 	CU_ADD_TEST(suite, defer_bdev_io);
1866 	CU_ADD_TEST(suite, overlapped);
1867 	CU_ADD_TEST(suite, compress_algorithm);
1868 	CU_ADD_TEST(suite, test_prepare_compress_chunk);
1869 	CU_ADD_TEST(suite, test_reduce_decompress_chunk);
1870 	CU_ADD_TEST(suite, test_allocate_vol_requests);
1871 
1872 	g_unlink_path = g_path;
1873 	g_unlink_callback = unlink_cb;
1874 
1875 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
1876 	CU_cleanup_registry();
1877 	return num_failures;
1878 }
1879