xref: /spdk/test/unit/lib/reduce/reduce.c/reduce_ut.c (revision 45a053c5777494f4e8ce4bc1191c9de3920377f7)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk_internal/cunit.h"
10 
11 #include "reduce/reduce.c"
12 #include "spdk_internal/mock.h"
13 #define UNIT_TEST_NO_VTOPHYS
14 #include "common/lib/test_env.c"
15 #undef UNIT_TEST_NO_VTOPHYS
16 
17 static struct spdk_reduce_vol *g_vol;
18 static int g_reduce_errno;
19 static char *g_volatile_pm_buf;
20 static size_t g_volatile_pm_buf_len;
21 static char *g_persistent_pm_buf;
22 static size_t g_persistent_pm_buf_len;
23 static char *g_backing_dev_buf;
24 static char g_path[REDUCE_PATH_MAX];
25 static char *g_decomp_buf;
26 static int g_decompressed_len;
27 
28 #define TEST_MD_PATH "/tmp"
29 
30 uint64_t
31 spdk_vtophys(const void *buf, uint64_t *size)
32 {
33 	/* add + 1 to buf addr for cases where buf is the start of the page, that will give us correct end of the page */
34 	const uint8_t *page_2mb_end = (const uint8_t *)SPDK_ALIGN_CEIL((uintptr_t)buf + 1, VALUE_2MB);
35 	uint64_t bytes_to_page_end = page_2mb_end - (const uint8_t *)buf;
36 	uint64_t _size;
37 
38 	if (*size) {
39 		_size = *size;
40 		_size = spdk_min(_size, bytes_to_page_end);
41 		*size = _size;
42 	}
43 
44 	return (uintptr_t)buf;
45 }
46 
47 enum ut_reduce_bdev_io_type {
48 	UT_REDUCE_IO_READV = 1,
49 	UT_REDUCE_IO_WRITEV = 2,
50 	UT_REDUCE_IO_UNMAP = 3,
51 };
52 
53 struct ut_reduce_bdev_io {
54 	enum ut_reduce_bdev_io_type type;
55 	struct spdk_reduce_backing_dev *backing_dev;
56 	struct iovec *iov;
57 	int iovcnt;
58 	uint64_t lba;
59 	uint32_t lba_count;
60 	struct spdk_reduce_vol_cb_args *args;
61 	TAILQ_ENTRY(ut_reduce_bdev_io)	link;
62 };
63 
64 static bool g_defer_bdev_io = false;
65 static TAILQ_HEAD(, ut_reduce_bdev_io) g_pending_bdev_io =
66 	TAILQ_HEAD_INITIALIZER(g_pending_bdev_io);
67 static uint32_t g_pending_bdev_io_count = 0;
68 
69 static void
70 sync_pm_buf(const void *addr, size_t length)
71 {
72 	uint64_t offset = (char *)addr - g_volatile_pm_buf;
73 
74 	memcpy(&g_persistent_pm_buf[offset], addr, length);
75 }
76 
77 int
78 pmem_msync(const void *addr, size_t length)
79 {
80 	sync_pm_buf(addr, length);
81 	return 0;
82 }
83 
84 void
85 pmem_persist(const void *addr, size_t len)
86 {
87 	sync_pm_buf(addr, len);
88 }
89 
90 static void
91 get_pm_file_size(void)
92 {
93 	struct spdk_reduce_vol_params params;
94 	uint64_t pm_size, expected_pm_size;
95 
96 	params.backing_io_unit_size = 4096;
97 	params.chunk_size = 4096 * 4;
98 	params.vol_size = 4096 * 4 * 100;
99 
100 	pm_size = _get_pm_file_size(&params);
101 	expected_pm_size = sizeof(struct spdk_reduce_vol_superblock);
102 	/* 100 chunks in logical map * 8 bytes per chunk */
103 	expected_pm_size += 100 * sizeof(uint64_t);
104 	/* 100 chunks * (chunk struct size + 4 backing io units per chunk * 8 bytes per backing io unit) */
105 	expected_pm_size += 100 * (sizeof(struct spdk_reduce_chunk_map) + 4 * sizeof(uint64_t));
106 	/* reduce allocates some extra chunks too for in-flight writes when logical map
107 	 * is full.  REDUCE_EXTRA_CHUNKS is a private #ifdef in reduce.c Here we need the num chunks
108 	 * times (chunk struct size + 4 backing io units per chunk * 8 bytes per backing io unit).
109 	 */
110 	expected_pm_size += REDUCE_NUM_EXTRA_CHUNKS *
111 			    (sizeof(struct spdk_reduce_chunk_map) + 4 * sizeof(uint64_t));
112 	/* reduce will add some padding so numbers may not match exactly.  Make sure
113 	 * they are close though.
114 	 */
115 	CU_ASSERT((pm_size - expected_pm_size) <= REDUCE_PM_SIZE_ALIGNMENT);
116 }
117 
118 static void
119 get_vol_size(void)
120 {
121 	uint64_t chunk_size, backing_dev_size;
122 
123 	chunk_size = 16 * 1024;
124 	backing_dev_size = 16 * 1024 * 1000;
125 	CU_ASSERT(_get_vol_size(chunk_size, backing_dev_size) < backing_dev_size);
126 }
127 
128 void *
129 pmem_map_file(const char *path, size_t len, int flags, mode_t mode,
130 	      size_t *mapped_lenp, int *is_pmemp)
131 {
132 	CU_ASSERT(g_volatile_pm_buf == NULL);
133 	snprintf(g_path, sizeof(g_path), "%s", path);
134 	*is_pmemp = 1;
135 
136 	if (g_persistent_pm_buf == NULL) {
137 		g_persistent_pm_buf = calloc(1, len);
138 		g_persistent_pm_buf_len = len;
139 		SPDK_CU_ASSERT_FATAL(g_persistent_pm_buf != NULL);
140 	}
141 
142 	*mapped_lenp = g_persistent_pm_buf_len;
143 	g_volatile_pm_buf = calloc(1, g_persistent_pm_buf_len);
144 	SPDK_CU_ASSERT_FATAL(g_volatile_pm_buf != NULL);
145 	memcpy(g_volatile_pm_buf, g_persistent_pm_buf, g_persistent_pm_buf_len);
146 	g_volatile_pm_buf_len = g_persistent_pm_buf_len;
147 
148 	return g_volatile_pm_buf;
149 }
150 
151 int
152 pmem_unmap(void *addr, size_t len)
153 {
154 	CU_ASSERT(addr == g_volatile_pm_buf);
155 	CU_ASSERT(len == g_volatile_pm_buf_len);
156 	free(g_volatile_pm_buf);
157 	g_volatile_pm_buf = NULL;
158 	g_volatile_pm_buf_len = 0;
159 
160 	return 0;
161 }
162 
163 static void
164 persistent_pm_buf_destroy(void)
165 {
166 	CU_ASSERT(g_persistent_pm_buf != NULL);
167 	free(g_persistent_pm_buf);
168 	g_persistent_pm_buf = NULL;
169 	g_persistent_pm_buf_len = 0;
170 }
171 
172 static void
173 unlink_cb(void)
174 {
175 	persistent_pm_buf_destroy();
176 }
177 
178 static void
179 init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
180 {
181 	g_vol = vol;
182 	g_reduce_errno = reduce_errno;
183 }
184 
185 static void
186 load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
187 {
188 	g_vol = vol;
189 	g_reduce_errno = reduce_errno;
190 }
191 
192 static void
193 unload_cb(void *cb_arg, int reduce_errno)
194 {
195 	g_reduce_errno = reduce_errno;
196 }
197 
198 static void
199 init_failure(void)
200 {
201 	struct spdk_reduce_vol_params params = {};
202 	struct spdk_reduce_backing_dev backing_dev = {};
203 
204 	backing_dev.blocklen = 512;
205 	/* This blockcnt is too small for a reduce vol - there needs to be
206 	 *  enough space for at least REDUCE_NUM_EXTRA_CHUNKS + 1 chunks.
207 	 */
208 	backing_dev.blockcnt = 20;
209 
210 	params.vol_size = 0;
211 	params.chunk_size = 16 * 1024;
212 	params.backing_io_unit_size = backing_dev.blocklen;
213 	params.logical_block_size = 512;
214 
215 	/* backing_dev has an invalid size.  This should fail. */
216 	g_vol = NULL;
217 	g_reduce_errno = 0;
218 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
219 	CU_ASSERT(g_reduce_errno == -EINVAL);
220 	SPDK_CU_ASSERT_FATAL(g_vol == NULL);
221 
222 	/* backing_dev now has valid size, but backing_dev still has null
223 	 *  function pointers.  This should fail.
224 	 */
225 	backing_dev.blockcnt = 20000;
226 
227 	g_vol = NULL;
228 	g_reduce_errno = 0;
229 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
230 	CU_ASSERT(g_reduce_errno == -EINVAL);
231 	SPDK_CU_ASSERT_FATAL(g_vol == NULL);
232 }
233 
234 static void
235 backing_dev_readv_execute(struct spdk_reduce_backing_dev *backing_dev,
236 			  struct iovec *iov, int iovcnt,
237 			  uint64_t lba, uint32_t lba_count,
238 			  struct spdk_reduce_vol_cb_args *args)
239 {
240 	char *offset;
241 	int i;
242 
243 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
244 	for (i = 0; i < iovcnt; i++) {
245 		memcpy(iov[i].iov_base, offset, iov[i].iov_len);
246 		offset += iov[i].iov_len;
247 	}
248 	args->cb_fn(args->cb_arg, 0);
249 }
250 
251 static void
252 backing_dev_insert_io(enum ut_reduce_bdev_io_type type, struct spdk_reduce_backing_dev *backing_dev,
253 		      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
254 		      struct spdk_reduce_vol_cb_args *args)
255 {
256 	struct ut_reduce_bdev_io *ut_bdev_io;
257 
258 	ut_bdev_io = calloc(1, sizeof(*ut_bdev_io));
259 	SPDK_CU_ASSERT_FATAL(ut_bdev_io != NULL);
260 
261 	ut_bdev_io->type = type;
262 	ut_bdev_io->backing_dev = backing_dev;
263 	ut_bdev_io->iov = iov;
264 	ut_bdev_io->iovcnt = iovcnt;
265 	ut_bdev_io->lba = lba;
266 	ut_bdev_io->lba_count = lba_count;
267 	ut_bdev_io->args = args;
268 	TAILQ_INSERT_TAIL(&g_pending_bdev_io, ut_bdev_io, link);
269 	g_pending_bdev_io_count++;
270 }
271 
272 static void
273 backing_dev_readv(struct spdk_reduce_backing_dev *backing_dev, struct iovec *iov, int iovcnt,
274 		  uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
275 {
276 	if (g_defer_bdev_io == false) {
277 		CU_ASSERT(g_pending_bdev_io_count == 0);
278 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
279 		backing_dev_readv_execute(backing_dev, iov, iovcnt, lba, lba_count, args);
280 		return;
281 	}
282 
283 	backing_dev_insert_io(UT_REDUCE_IO_READV, backing_dev, iov, iovcnt, lba, lba_count, args);
284 }
285 
286 static void
287 backing_dev_writev_execute(struct spdk_reduce_backing_dev *backing_dev,
288 			   struct iovec *iov, int iovcnt,
289 			   uint64_t lba, uint32_t lba_count,
290 			   struct spdk_reduce_vol_cb_args *args)
291 {
292 	char *offset;
293 	int i;
294 
295 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
296 	for (i = 0; i < iovcnt; i++) {
297 		memcpy(offset, iov[i].iov_base, iov[i].iov_len);
298 		offset += iov[i].iov_len;
299 	}
300 	args->cb_fn(args->cb_arg, 0);
301 }
302 
303 static void
304 backing_dev_writev(struct spdk_reduce_backing_dev *backing_dev, struct iovec *iov, int iovcnt,
305 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
306 {
307 	if (g_defer_bdev_io == false) {
308 		CU_ASSERT(g_pending_bdev_io_count == 0);
309 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
310 		backing_dev_writev_execute(backing_dev, iov, iovcnt, lba, lba_count, args);
311 		return;
312 	}
313 
314 	backing_dev_insert_io(UT_REDUCE_IO_WRITEV, backing_dev, iov, iovcnt, lba, lba_count, args);
315 }
316 
317 static void
318 backing_dev_unmap_execute(struct spdk_reduce_backing_dev *backing_dev,
319 			  uint64_t lba, uint32_t lba_count,
320 			  struct spdk_reduce_vol_cb_args *args)
321 {
322 	char *offset;
323 
324 	offset = g_backing_dev_buf + lba * backing_dev->blocklen;
325 	memset(offset, 0, lba_count * backing_dev->blocklen);
326 	args->cb_fn(args->cb_arg, 0);
327 }
328 
329 static void
330 backing_dev_unmap(struct spdk_reduce_backing_dev *backing_dev,
331 		  uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
332 {
333 	if (g_defer_bdev_io == false) {
334 		CU_ASSERT(g_pending_bdev_io_count == 0);
335 		CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
336 		backing_dev_unmap_execute(backing_dev, lba, lba_count, args);
337 		return;
338 	}
339 
340 	backing_dev_insert_io(UT_REDUCE_IO_UNMAP, backing_dev, NULL, 0, lba, lba_count, args);
341 }
342 
343 static void
344 backing_dev_io_execute(uint32_t count)
345 {
346 	struct ut_reduce_bdev_io *ut_bdev_io;
347 	uint32_t done = 0;
348 
349 	CU_ASSERT(g_defer_bdev_io == true);
350 	while (!TAILQ_EMPTY(&g_pending_bdev_io) && (count == 0 || done < count)) {
351 		ut_bdev_io = TAILQ_FIRST(&g_pending_bdev_io);
352 		TAILQ_REMOVE(&g_pending_bdev_io, ut_bdev_io, link);
353 		g_pending_bdev_io_count--;
354 		switch (ut_bdev_io->type) {
355 		case UT_REDUCE_IO_READV:
356 			backing_dev_readv_execute(ut_bdev_io->backing_dev,
357 						  ut_bdev_io->iov, ut_bdev_io->iovcnt,
358 						  ut_bdev_io->lba, ut_bdev_io->lba_count,
359 						  ut_bdev_io->args);
360 			break;
361 		case UT_REDUCE_IO_WRITEV:
362 			backing_dev_writev_execute(ut_bdev_io->backing_dev,
363 						   ut_bdev_io->iov, ut_bdev_io->iovcnt,
364 						   ut_bdev_io->lba, ut_bdev_io->lba_count,
365 						   ut_bdev_io->args);
366 			break;
367 		case UT_REDUCE_IO_UNMAP:
368 			backing_dev_unmap_execute(ut_bdev_io->backing_dev,
369 						  ut_bdev_io->lba, ut_bdev_io->lba_count,
370 						  ut_bdev_io->args);
371 			break;
372 		default:
373 			CU_ASSERT(false);
374 			break;
375 		}
376 		free(ut_bdev_io);
377 		done++;
378 	}
379 }
380 
381 static int
382 ut_compress(char *outbuf, uint32_t *compressed_len, char *inbuf, uint32_t inbuflen)
383 {
384 	uint32_t len = 0;
385 	uint8_t count;
386 	char last;
387 
388 	while (true) {
389 		if (inbuflen == 0) {
390 			*compressed_len = len;
391 			return 0;
392 		}
393 
394 		if (*compressed_len < (len + 2)) {
395 			return -ENOSPC;
396 		}
397 
398 		last = *inbuf;
399 		count = 1;
400 		inbuflen--;
401 		inbuf++;
402 
403 		while (inbuflen > 0 && *inbuf == last && count < UINT8_MAX) {
404 			count++;
405 			inbuflen--;
406 			inbuf++;
407 		}
408 
409 		outbuf[len] = count;
410 		outbuf[len + 1] = last;
411 		len += 2;
412 	}
413 }
414 
415 static int
416 ut_decompress(uint8_t *outbuf, uint32_t *compressed_len, uint8_t *inbuf, uint32_t inbuflen)
417 {
418 	uint32_t len = 0;
419 
420 	SPDK_CU_ASSERT_FATAL(inbuflen % 2 == 0);
421 
422 	while (true) {
423 		if (inbuflen == 0) {
424 			*compressed_len = len;
425 			return 0;
426 		}
427 
428 		if ((len + inbuf[0]) > *compressed_len) {
429 			return -ENOSPC;
430 		}
431 
432 		memset(outbuf, inbuf[1], inbuf[0]);
433 		outbuf += inbuf[0];
434 		len += inbuf[0];
435 		inbuflen -= 2;
436 		inbuf += 2;
437 	}
438 }
439 
440 static void
441 ut_build_data_buffer(uint8_t *data, uint32_t data_len, uint8_t init_val, uint32_t repeat)
442 {
443 	uint32_t _repeat = repeat;
444 
445 	SPDK_CU_ASSERT_FATAL(repeat > 0);
446 
447 	while (data_len > 0) {
448 		*data = init_val;
449 		data++;
450 		data_len--;
451 		_repeat--;
452 		if (_repeat == 0) {
453 			init_val++;
454 			_repeat = repeat;
455 		}
456 	}
457 }
458 
459 static void
460 backing_dev_compress(struct spdk_reduce_backing_dev *backing_dev,
461 		     struct iovec *src_iov, int src_iovcnt,
462 		     struct iovec *dst_iov, int dst_iovcnt,
463 		     struct spdk_reduce_vol_cb_args *args)
464 {
465 	uint32_t compressed_len;
466 	uint64_t total_length = 0;
467 	char *buf = g_decomp_buf;
468 	int rc, i;
469 
470 	CU_ASSERT(dst_iovcnt == 1);
471 
472 	for (i = 0; i < src_iovcnt; i++) {
473 		memcpy(buf, src_iov[i].iov_base, src_iov[i].iov_len);
474 		buf += src_iov[i].iov_len;
475 		total_length += src_iov[i].iov_len;
476 	}
477 
478 	compressed_len = dst_iov[0].iov_len;
479 	rc = ut_compress(dst_iov[0].iov_base, &compressed_len,
480 			 g_decomp_buf, total_length);
481 
482 	args->output_size = compressed_len;
483 
484 	args->cb_fn(args->cb_arg, rc);
485 }
486 
487 static void
488 backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
489 		       struct iovec *src_iov, int src_iovcnt,
490 		       struct iovec *dst_iov, int dst_iovcnt,
491 		       struct spdk_reduce_vol_cb_args *args)
492 {
493 	uint32_t decompressed_len = 0;
494 	char *buf = g_decomp_buf;
495 	int rc, i;
496 
497 	CU_ASSERT(src_iovcnt == 1);
498 
499 	for (i = 0; i < dst_iovcnt; i++) {
500 		decompressed_len += dst_iov[i].iov_len;
501 	}
502 
503 	rc = ut_decompress(g_decomp_buf, &decompressed_len,
504 			   src_iov[0].iov_base, src_iov[0].iov_len);
505 
506 	for (i = 0; i < dst_iovcnt; i++) {
507 		memcpy(dst_iov[i].iov_base, buf, dst_iov[i].iov_len);
508 		buf += dst_iov[i].iov_len;
509 	}
510 
511 	args->output_size = decompressed_len;
512 
513 	args->cb_fn(args->cb_arg, rc);
514 }
515 
516 static void
517 backing_dev_destroy(struct spdk_reduce_backing_dev *backing_dev)
518 {
519 	/* We don't free this during backing_dev_close so that we can test init/unload/load
520 	 *  scenarios.
521 	 */
522 	free(g_backing_dev_buf);
523 	free(g_decomp_buf);
524 	g_backing_dev_buf = NULL;
525 }
526 
527 static void
528 backing_dev_init(struct spdk_reduce_backing_dev *backing_dev, struct spdk_reduce_vol_params *params,
529 		 uint32_t backing_blocklen)
530 {
531 	int64_t size;
532 
533 	size = 4 * 1024 * 1024;
534 	backing_dev->blocklen = backing_blocklen;
535 	backing_dev->blockcnt = size / backing_dev->blocklen;
536 	backing_dev->readv = backing_dev_readv;
537 	backing_dev->writev = backing_dev_writev;
538 	backing_dev->unmap = backing_dev_unmap;
539 	backing_dev->compress = backing_dev_compress;
540 	backing_dev->decompress = backing_dev_decompress;
541 	backing_dev->sgl_in = true;
542 	backing_dev->sgl_out = true;
543 
544 	g_decomp_buf = calloc(1, params->chunk_size);
545 	SPDK_CU_ASSERT_FATAL(g_decomp_buf != NULL);
546 
547 	g_backing_dev_buf = calloc(1, size);
548 	SPDK_CU_ASSERT_FATAL(g_backing_dev_buf != NULL);
549 }
550 
551 static void
552 init_md(void)
553 {
554 	struct spdk_reduce_vol_params params = {};
555 	struct spdk_reduce_vol_params *persistent_params;
556 	struct spdk_reduce_backing_dev backing_dev = {};
557 	struct spdk_uuid uuid;
558 	uint64_t *entry;
559 
560 	params.chunk_size = 16 * 1024;
561 	params.backing_io_unit_size = 512;
562 	params.logical_block_size = 512;
563 
564 	backing_dev_init(&backing_dev, &params, 512);
565 
566 	g_vol = NULL;
567 	g_reduce_errno = -1;
568 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
569 	CU_ASSERT(g_reduce_errno == 0);
570 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
571 	/* Confirm that reduce persisted the params to metadata. */
572 	CU_ASSERT(memcmp(g_persistent_pm_buf, SPDK_REDUCE_SIGNATURE, 8) == 0);
573 	persistent_params = (struct spdk_reduce_vol_params *)(g_persistent_pm_buf + 8);
574 	CU_ASSERT(memcmp(persistent_params, &params, sizeof(params)) == 0);
575 	/* Now confirm that contents of pm_file after the superblock have been initialized
576 	 *  to REDUCE_EMPTY_MAP_ENTRY.
577 	 */
578 	entry = (uint64_t *)(g_persistent_pm_buf + sizeof(struct spdk_reduce_vol_superblock));
579 	while (entry != (uint64_t *)(g_persistent_pm_buf + g_vol->pm_file.size)) {
580 		CU_ASSERT(*entry == REDUCE_EMPTY_MAP_ENTRY);
581 		entry++;
582 	}
583 
584 	/* Check that the pm file path was constructed correctly.  It should be in
585 	 * the form:
586 	 * TEST_MD_PATH + "/" + <uuid string>
587 	 */
588 	CU_ASSERT(strncmp(&g_path[0], TEST_MD_PATH, strlen(TEST_MD_PATH)) == 0);
589 	CU_ASSERT(g_path[strlen(TEST_MD_PATH)] == '/');
590 	CU_ASSERT(spdk_uuid_parse(&uuid, &g_path[strlen(TEST_MD_PATH) + 1]) == 0);
591 	CU_ASSERT(spdk_uuid_compare(&uuid, spdk_reduce_vol_get_uuid(g_vol)) == 0);
592 
593 	g_reduce_errno = -1;
594 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
595 	CU_ASSERT(g_reduce_errno == 0);
596 	CU_ASSERT(g_volatile_pm_buf == NULL);
597 
598 	persistent_pm_buf_destroy();
599 	backing_dev_destroy(&backing_dev);
600 }
601 
602 static void
603 _init_backing_dev(uint32_t backing_blocklen)
604 {
605 	struct spdk_reduce_vol_params params = {};
606 	struct spdk_reduce_vol_params *persistent_params;
607 	struct spdk_reduce_backing_dev backing_dev = {};
608 
609 	params.chunk_size = 16 * 1024;
610 	params.backing_io_unit_size = 512;
611 	params.logical_block_size = 512;
612 	spdk_uuid_generate(&params.uuid);
613 
614 	backing_dev_init(&backing_dev, &params, backing_blocklen);
615 
616 	g_vol = NULL;
617 	memset(g_path, 0, sizeof(g_path));
618 	g_reduce_errno = -1;
619 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
620 	CU_ASSERT(g_reduce_errno == 0);
621 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
622 	CU_ASSERT(strncmp(TEST_MD_PATH, g_path, strlen(TEST_MD_PATH)) == 0);
623 	/* Confirm that libreduce persisted the params to the backing device. */
624 	CU_ASSERT(memcmp(g_backing_dev_buf, SPDK_REDUCE_SIGNATURE, 8) == 0);
625 	persistent_params = (struct spdk_reduce_vol_params *)(g_backing_dev_buf + 8);
626 	CU_ASSERT(memcmp(persistent_params, &params, sizeof(params)) == 0);
627 	/* Confirm that the path to the persistent memory metadata file was persisted to
628 	 *  the backing device.
629 	 */
630 	CU_ASSERT(strncmp(g_path,
631 			  g_backing_dev_buf + REDUCE_BACKING_DEV_PATH_OFFSET,
632 			  REDUCE_PATH_MAX) == 0);
633 
634 	g_reduce_errno = -1;
635 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
636 	CU_ASSERT(g_reduce_errno == 0);
637 
638 	persistent_pm_buf_destroy();
639 	backing_dev_destroy(&backing_dev);
640 }
641 
642 static void
643 init_backing_dev(void)
644 {
645 	_init_backing_dev(512);
646 	_init_backing_dev(4096);
647 }
648 
649 static void
650 _load(uint32_t backing_blocklen)
651 {
652 	struct spdk_reduce_vol_params params = {};
653 	struct spdk_reduce_backing_dev backing_dev = {};
654 	char pmem_file_path[REDUCE_PATH_MAX];
655 
656 	params.chunk_size = 16 * 1024;
657 	params.backing_io_unit_size = 512;
658 	params.logical_block_size = 512;
659 	spdk_uuid_generate(&params.uuid);
660 
661 	backing_dev_init(&backing_dev, &params, backing_blocklen);
662 
663 	g_vol = NULL;
664 	g_reduce_errno = -1;
665 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
666 	CU_ASSERT(g_reduce_errno == 0);
667 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
668 	CU_ASSERT(strncmp(TEST_MD_PATH, g_path, strlen(TEST_MD_PATH)) == 0);
669 	memcpy(pmem_file_path, g_path, sizeof(pmem_file_path));
670 
671 	g_reduce_errno = -1;
672 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
673 	CU_ASSERT(g_reduce_errno == 0);
674 
675 	g_vol = NULL;
676 	memset(g_path, 0, sizeof(g_path));
677 	g_reduce_errno = -1;
678 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
679 	CU_ASSERT(g_reduce_errno == 0);
680 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
681 	CU_ASSERT(strncmp(g_path, pmem_file_path, sizeof(pmem_file_path)) == 0);
682 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
683 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
684 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
685 
686 	g_reduce_errno = -1;
687 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
688 	CU_ASSERT(g_reduce_errno == 0);
689 
690 	persistent_pm_buf_destroy();
691 	backing_dev_destroy(&backing_dev);
692 }
693 
694 static void
695 load(void)
696 {
697 	_load(512);
698 	_load(4096);
699 }
700 
701 static uint64_t
702 _vol_get_chunk_map_index(struct spdk_reduce_vol *vol, uint64_t offset)
703 {
704 	uint64_t logical_map_index = offset / vol->logical_blocks_per_chunk;
705 
706 	return vol->pm_logical_map[logical_map_index];
707 }
708 
709 static void
710 write_cb(void *arg, int reduce_errno)
711 {
712 	g_reduce_errno = reduce_errno;
713 }
714 
715 static void
716 read_cb(void *arg, int reduce_errno)
717 {
718 	g_reduce_errno = reduce_errno;
719 }
720 
721 static void
722 _write_maps(uint32_t backing_blocklen)
723 {
724 	struct spdk_reduce_vol_params params = {};
725 	struct spdk_reduce_backing_dev backing_dev = {};
726 	struct iovec iov;
727 	const int bufsize = 16 * 1024; /* chunk size */
728 	char buf[bufsize];
729 	uint32_t num_lbas, i;
730 	uint64_t old_chunk0_map_index, new_chunk0_map_index;
731 	struct spdk_reduce_chunk_map *old_chunk0_map, *new_chunk0_map;
732 
733 	params.chunk_size = bufsize;
734 	params.backing_io_unit_size = 4096;
735 	params.logical_block_size = 512;
736 	num_lbas = bufsize / params.logical_block_size;
737 	spdk_uuid_generate(&params.uuid);
738 
739 	backing_dev_init(&backing_dev, &params, backing_blocklen);
740 
741 	g_vol = NULL;
742 	g_reduce_errno = -1;
743 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
744 	CU_ASSERT(g_reduce_errno == 0);
745 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
746 
747 	for (i = 0; i < g_vol->params.vol_size / g_vol->params.chunk_size; i++) {
748 		CU_ASSERT(_vol_get_chunk_map_index(g_vol, i) == REDUCE_EMPTY_MAP_ENTRY);
749 	}
750 
751 	ut_build_data_buffer(buf, bufsize, 0x00, 1);
752 	iov.iov_base = buf;
753 	iov.iov_len = bufsize;
754 	g_reduce_errno = -1;
755 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, num_lbas, write_cb, NULL);
756 	CU_ASSERT(g_reduce_errno == 0);
757 
758 	old_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
759 	CU_ASSERT(old_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
760 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == true);
761 
762 	old_chunk0_map = _reduce_vol_get_chunk_map(g_vol, old_chunk0_map_index);
763 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
764 		CU_ASSERT(old_chunk0_map->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
765 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
766 					     old_chunk0_map->io_unit_index[i]) == true);
767 	}
768 
769 	g_reduce_errno = -1;
770 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, num_lbas, write_cb, NULL);
771 	CU_ASSERT(g_reduce_errno == 0);
772 
773 	new_chunk0_map_index = _vol_get_chunk_map_index(g_vol, 0);
774 	CU_ASSERT(new_chunk0_map_index != REDUCE_EMPTY_MAP_ENTRY);
775 	CU_ASSERT(new_chunk0_map_index != old_chunk0_map_index);
776 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, new_chunk0_map_index) == true);
777 	CU_ASSERT(spdk_bit_array_get(g_vol->allocated_chunk_maps, old_chunk0_map_index) == false);
778 
779 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
780 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
781 					     old_chunk0_map->io_unit_index[i]) == false);
782 	}
783 
784 	new_chunk0_map = _reduce_vol_get_chunk_map(g_vol, new_chunk0_map_index);
785 	for (i = 0; i < g_vol->backing_io_units_per_chunk; i++) {
786 		CU_ASSERT(new_chunk0_map->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
787 		CU_ASSERT(spdk_bit_array_get(g_vol->allocated_backing_io_units,
788 					     new_chunk0_map->io_unit_index[i]) == true);
789 	}
790 
791 	g_reduce_errno = -1;
792 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
793 	CU_ASSERT(g_reduce_errno == 0);
794 
795 	g_vol = NULL;
796 	g_reduce_errno = -1;
797 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
798 	CU_ASSERT(g_reduce_errno == 0);
799 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
800 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
801 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
802 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
803 
804 	g_reduce_errno = -1;
805 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
806 	CU_ASSERT(g_reduce_errno == 0);
807 
808 	persistent_pm_buf_destroy();
809 	backing_dev_destroy(&backing_dev);
810 }
811 
812 static void
813 write_maps(void)
814 {
815 	_write_maps(512);
816 	_write_maps(4096);
817 }
818 
819 static void
820 _read_write(uint32_t backing_blocklen)
821 {
822 	struct spdk_reduce_vol_params params = {};
823 	struct spdk_reduce_backing_dev backing_dev = {};
824 	struct iovec iov;
825 	char buf[16 * 1024]; /* chunk size */
826 	char compare_buf[16 * 1024];
827 	uint32_t i;
828 
829 	params.chunk_size = 16 * 1024;
830 	params.backing_io_unit_size = 4096;
831 	params.logical_block_size = 512;
832 	spdk_uuid_generate(&params.uuid);
833 
834 	backing_dev_init(&backing_dev, &params, backing_blocklen);
835 
836 	g_vol = NULL;
837 	g_reduce_errno = -1;
838 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
839 	CU_ASSERT(g_reduce_errno == 0);
840 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
841 
842 	/* Write 0xAA to 2 512-byte logical blocks, starting at LBA 2. */
843 	memset(buf, 0xAA, 2 * params.logical_block_size);
844 	iov.iov_base = buf;
845 	iov.iov_len = 2 * params.logical_block_size;
846 	g_reduce_errno = -1;
847 	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
848 	CU_ASSERT(g_reduce_errno == 0);
849 
850 	memset(compare_buf, 0xAA, sizeof(compare_buf));
851 	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
852 		memset(buf, 0xFF, params.logical_block_size);
853 		iov.iov_base = buf;
854 		iov.iov_len = params.logical_block_size;
855 		g_reduce_errno = -1;
856 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
857 		CU_ASSERT(g_reduce_errno == 0);
858 
859 		switch (i) {
860 		case 2:
861 		case 3:
862 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
863 			break;
864 		default:
865 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
866 			break;
867 		}
868 	}
869 
870 	g_reduce_errno = -1;
871 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
872 	CU_ASSERT(g_reduce_errno == 0);
873 
874 	/* Overwrite what we just wrote with 0xCC */
875 	g_vol = NULL;
876 	g_reduce_errno = -1;
877 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
878 	CU_ASSERT(g_reduce_errno == 0);
879 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
880 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
881 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
882 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
883 
884 	memset(buf, 0xCC, 2 * params.logical_block_size);
885 	iov.iov_base = buf;
886 	iov.iov_len = 2 * params.logical_block_size;
887 	g_reduce_errno = -1;
888 	spdk_reduce_vol_writev(g_vol, &iov, 1, 2, 2, write_cb, NULL);
889 	CU_ASSERT(g_reduce_errno == 0);
890 
891 	memset(compare_buf, 0xCC, sizeof(compare_buf));
892 	for (i = 0; i < params.chunk_size / params.logical_block_size; i++) {
893 		memset(buf, 0xFF, params.logical_block_size);
894 		iov.iov_base = buf;
895 		iov.iov_len = params.logical_block_size;
896 		g_reduce_errno = -1;
897 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
898 		CU_ASSERT(g_reduce_errno == 0);
899 
900 		switch (i) {
901 		case 2:
902 		case 3:
903 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
904 			break;
905 		default:
906 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
907 			break;
908 		}
909 	}
910 
911 	g_reduce_errno = -1;
912 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
913 	CU_ASSERT(g_reduce_errno == 0);
914 
915 	g_vol = NULL;
916 	g_reduce_errno = -1;
917 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
918 	CU_ASSERT(g_reduce_errno == 0);
919 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
920 	CU_ASSERT(g_vol->params.vol_size == params.vol_size);
921 	CU_ASSERT(g_vol->params.chunk_size == params.chunk_size);
922 	CU_ASSERT(g_vol->params.backing_io_unit_size == params.backing_io_unit_size);
923 
924 	g_reduce_errno = -1;
925 
926 	/* Write 0xBB to 2 512-byte logical blocks, starting at LBA 37.
927 	 * This is writing into the second chunk of the volume.  This also
928 	 * enables implicitly checking that we reloaded the bit arrays
929 	 * correctly - making sure we don't use the first chunk map again
930 	 * for this new write - the first chunk map was already used by the
931 	 * write from before we unloaded and reloaded.
932 	 */
933 	memset(buf, 0xBB, 2 * params.logical_block_size);
934 	iov.iov_base = buf;
935 	iov.iov_len = 2 * params.logical_block_size;
936 	g_reduce_errno = -1;
937 	spdk_reduce_vol_writev(g_vol, &iov, 1, 37, 2, write_cb, NULL);
938 	CU_ASSERT(g_reduce_errno == 0);
939 
940 	for (i = 0; i < 2 * params.chunk_size / params.logical_block_size; i++) {
941 		memset(buf, 0xFF, params.logical_block_size);
942 		iov.iov_base = buf;
943 		iov.iov_len = params.logical_block_size;
944 		g_reduce_errno = -1;
945 		spdk_reduce_vol_readv(g_vol, &iov, 1, i, 1, read_cb, NULL);
946 		CU_ASSERT(g_reduce_errno == 0);
947 
948 		switch (i) {
949 		case 2:
950 		case 3:
951 			memset(compare_buf, 0xCC, sizeof(compare_buf));
952 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
953 			break;
954 		case 37:
955 		case 38:
956 			memset(compare_buf, 0xBB, sizeof(compare_buf));
957 			CU_ASSERT(memcmp(buf, compare_buf, params.logical_block_size) == 0);
958 			break;
959 		default:
960 			CU_ASSERT(spdk_mem_all_zero(buf, params.logical_block_size));
961 			break;
962 		}
963 	}
964 
965 	g_reduce_errno = -1;
966 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
967 	CU_ASSERT(g_reduce_errno == 0);
968 
969 	persistent_pm_buf_destroy();
970 	backing_dev_destroy(&backing_dev);
971 }
972 
973 static void
974 read_write(void)
975 {
976 	_read_write(512);
977 	_read_write(4096);
978 }
979 
980 static void
981 _readv_writev(uint32_t backing_blocklen)
982 {
983 	struct spdk_reduce_vol_params params = {};
984 	struct spdk_reduce_backing_dev backing_dev = {};
985 	struct iovec iov[REDUCE_MAX_IOVECS + 1];
986 
987 	params.chunk_size = 16 * 1024;
988 	params.backing_io_unit_size = 4096;
989 	params.logical_block_size = 512;
990 	spdk_uuid_generate(&params.uuid);
991 
992 	backing_dev_init(&backing_dev, &params, backing_blocklen);
993 
994 	g_vol = NULL;
995 	g_reduce_errno = -1;
996 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
997 	CU_ASSERT(g_reduce_errno == 0);
998 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
999 
1000 	g_reduce_errno = -1;
1001 	spdk_reduce_vol_writev(g_vol, iov, REDUCE_MAX_IOVECS + 1, 2, REDUCE_MAX_IOVECS + 1, write_cb, NULL);
1002 	CU_ASSERT(g_reduce_errno == -EINVAL);
1003 
1004 	g_reduce_errno = -1;
1005 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1006 	CU_ASSERT(g_reduce_errno == 0);
1007 
1008 	persistent_pm_buf_destroy();
1009 	backing_dev_destroy(&backing_dev);
1010 }
1011 
1012 static void
1013 readv_writev(void)
1014 {
1015 	_readv_writev(512);
1016 	_readv_writev(4096);
1017 }
1018 
1019 static void
1020 destroy_cb(void *ctx, int reduce_errno)
1021 {
1022 	g_reduce_errno = reduce_errno;
1023 }
1024 
1025 static void
1026 destroy(void)
1027 {
1028 	struct spdk_reduce_vol_params params = {};
1029 	struct spdk_reduce_backing_dev backing_dev = {};
1030 
1031 	params.chunk_size = 16 * 1024;
1032 	params.backing_io_unit_size = 512;
1033 	params.logical_block_size = 512;
1034 	spdk_uuid_generate(&params.uuid);
1035 
1036 	backing_dev_init(&backing_dev, &params, 512);
1037 
1038 	g_vol = NULL;
1039 	g_reduce_errno = -1;
1040 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1041 	CU_ASSERT(g_reduce_errno == 0);
1042 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1043 
1044 	g_reduce_errno = -1;
1045 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1046 	CU_ASSERT(g_reduce_errno == 0);
1047 
1048 	g_vol = NULL;
1049 	g_reduce_errno = -1;
1050 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
1051 	CU_ASSERT(g_reduce_errno == 0);
1052 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1053 
1054 	g_reduce_errno = -1;
1055 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1056 	CU_ASSERT(g_reduce_errno == 0);
1057 
1058 	g_reduce_errno = -1;
1059 	MOCK_CLEAR(spdk_malloc);
1060 	MOCK_CLEAR(spdk_zmalloc);
1061 	spdk_reduce_vol_destroy(&backing_dev, destroy_cb, NULL);
1062 	CU_ASSERT(g_reduce_errno == 0);
1063 
1064 	g_reduce_errno = 0;
1065 	spdk_reduce_vol_load(&backing_dev, load_cb, NULL);
1066 	CU_ASSERT(g_reduce_errno == -EILSEQ);
1067 
1068 	backing_dev_destroy(&backing_dev);
1069 }
1070 
1071 /* This test primarily checks that the reduce unit test infrastructure for asynchronous
1072  * backing device I/O operations is working correctly.
1073  */
1074 static void
1075 defer_bdev_io(void)
1076 {
1077 	struct spdk_reduce_vol_params params = {};
1078 	struct spdk_reduce_backing_dev backing_dev = {};
1079 	const uint32_t logical_block_size = 512;
1080 	struct iovec iov;
1081 	char buf[logical_block_size];
1082 	char compare_buf[logical_block_size];
1083 
1084 	params.chunk_size = 16 * 1024;
1085 	params.backing_io_unit_size = 4096;
1086 	params.logical_block_size = logical_block_size;
1087 	spdk_uuid_generate(&params.uuid);
1088 
1089 	backing_dev_init(&backing_dev, &params, 512);
1090 
1091 	g_vol = NULL;
1092 	g_reduce_errno = -1;
1093 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1094 	CU_ASSERT(g_reduce_errno == 0);
1095 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1096 
1097 	/* Write 0xAA to 1 512-byte logical block. */
1098 	memset(buf, 0xAA, params.logical_block_size);
1099 	iov.iov_base = buf;
1100 	iov.iov_len = params.logical_block_size;
1101 	g_reduce_errno = -100;
1102 	g_defer_bdev_io = true;
1103 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
1104 	/* Callback should not have executed, so this should still equal -100. */
1105 	CU_ASSERT(g_reduce_errno == -100);
1106 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1107 	/* We wrote to just 512 bytes of one chunk which was previously unallocated.  This
1108 	 * should result in 1 pending I/O since the rest of this chunk will be zeroes and
1109 	 * very compressible.
1110 	 */
1111 	CU_ASSERT(g_pending_bdev_io_count == 1);
1112 
1113 	backing_dev_io_execute(0);
1114 	CU_ASSERT(TAILQ_EMPTY(&g_pending_bdev_io));
1115 	CU_ASSERT(g_reduce_errno == 0);
1116 
1117 	g_defer_bdev_io = false;
1118 	memset(compare_buf, 0xAA, sizeof(compare_buf));
1119 	memset(buf, 0xFF, sizeof(buf));
1120 	iov.iov_base = buf;
1121 	iov.iov_len = params.logical_block_size;
1122 	g_reduce_errno = -100;
1123 	spdk_reduce_vol_readv(g_vol, &iov, 1, 0, 1, read_cb, NULL);
1124 	CU_ASSERT(g_reduce_errno == 0);
1125 	CU_ASSERT(memcmp(buf, compare_buf, sizeof(buf)) == 0);
1126 
1127 	g_reduce_errno = -1;
1128 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1129 	CU_ASSERT(g_reduce_errno == 0);
1130 
1131 	persistent_pm_buf_destroy();
1132 	backing_dev_destroy(&backing_dev);
1133 }
1134 
1135 static void
1136 overlapped(void)
1137 {
1138 	struct spdk_reduce_vol_params params = {};
1139 	struct spdk_reduce_backing_dev backing_dev = {};
1140 	const uint32_t logical_block_size = 512;
1141 	struct iovec iov;
1142 	char buf[2 * logical_block_size];
1143 	char compare_buf[2 * logical_block_size];
1144 
1145 	params.chunk_size = 16 * 1024;
1146 	params.backing_io_unit_size = 4096;
1147 	params.logical_block_size = logical_block_size;
1148 	spdk_uuid_generate(&params.uuid);
1149 
1150 	backing_dev_init(&backing_dev, &params, 512);
1151 
1152 	g_vol = NULL;
1153 	g_reduce_errno = -1;
1154 	spdk_reduce_vol_init(&params, &backing_dev, TEST_MD_PATH, init_cb, NULL);
1155 	CU_ASSERT(g_reduce_errno == 0);
1156 	SPDK_CU_ASSERT_FATAL(g_vol != NULL);
1157 
1158 	/* Write 0xAA to 1 512-byte logical block. */
1159 	memset(buf, 0xAA, logical_block_size);
1160 	iov.iov_base = buf;
1161 	iov.iov_len = logical_block_size;
1162 	g_reduce_errno = -100;
1163 	g_defer_bdev_io = true;
1164 	spdk_reduce_vol_writev(g_vol, &iov, 1, 0, 1, write_cb, NULL);
1165 	/* Callback should not have executed, so this should still equal -100. */
1166 	CU_ASSERT(g_reduce_errno == -100);
1167 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1168 	/* We wrote to just 512 bytes of one chunk which was previously unallocated.  This
1169 	 * should result in 1 pending I/O since the rest of this chunk will be zeroes and
1170 	 * very compressible.
1171 	 */
1172 	CU_ASSERT(g_pending_bdev_io_count == 1);
1173 
1174 	/* Now do an overlapped I/O to the same chunk. */
1175 	spdk_reduce_vol_writev(g_vol, &iov, 1, 1, 1, write_cb, NULL);
1176 	/* Callback should not have executed, so this should still equal -100. */
1177 	CU_ASSERT(g_reduce_errno == -100);
1178 	CU_ASSERT(!TAILQ_EMPTY(&g_pending_bdev_io));
1179 	/* The second I/O overlaps with the first one.  So we should only see pending bdev_io
1180 	 * related to the first I/O here - the second one won't start until the first one is completed.
1181 	 */
1182 	CU_ASSERT(g_pending_bdev_io_count == 1);
1183 
1184 	backing_dev_io_execute(0);
1185 	CU_ASSERT(g_reduce_errno == 0);
1186 
1187 	g_defer_bdev_io = false;
1188 	memset(compare_buf, 0xAA, sizeof(compare_buf));
1189 	memset(buf, 0xFF, sizeof(buf));
1190 	iov.iov_base = buf;
1191 	iov.iov_len = 2 * logical_block_size;
1192 	g_reduce_errno = -100;
1193 	spdk_reduce_vol_readv(g_vol, &iov, 1, 0, 2, read_cb, NULL);
1194 	CU_ASSERT(g_reduce_errno == 0);
1195 	CU_ASSERT(memcmp(buf, compare_buf, 2 * logical_block_size) == 0);
1196 
1197 	g_reduce_errno = -1;
1198 	spdk_reduce_vol_unload(g_vol, unload_cb, NULL);
1199 	CU_ASSERT(g_reduce_errno == 0);
1200 
1201 	persistent_pm_buf_destroy();
1202 	backing_dev_destroy(&backing_dev);
1203 }
1204 
1205 #define BUFSIZE 4096
1206 
1207 static void
1208 compress_algorithm(void)
1209 {
1210 	uint8_t original_data[BUFSIZE];
1211 	uint8_t compressed_data[BUFSIZE];
1212 	uint8_t decompressed_data[BUFSIZE];
1213 	uint32_t compressed_len, decompressed_len;
1214 	int rc;
1215 
1216 	ut_build_data_buffer(original_data, BUFSIZE, 0xAA, BUFSIZE);
1217 	compressed_len = sizeof(compressed_data);
1218 	rc = ut_compress(compressed_data, &compressed_len, original_data, UINT8_MAX);
1219 	CU_ASSERT(rc == 0);
1220 	CU_ASSERT(compressed_len == 2);
1221 	CU_ASSERT(compressed_data[0] == UINT8_MAX);
1222 	CU_ASSERT(compressed_data[1] == 0xAA);
1223 
1224 	decompressed_len = sizeof(decompressed_data);
1225 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1226 	CU_ASSERT(rc == 0);
1227 	CU_ASSERT(decompressed_len == UINT8_MAX);
1228 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1229 
1230 	compressed_len = sizeof(compressed_data);
1231 	rc = ut_compress(compressed_data, &compressed_len, original_data, UINT8_MAX + 1);
1232 	CU_ASSERT(rc == 0);
1233 	CU_ASSERT(compressed_len == 4);
1234 	CU_ASSERT(compressed_data[0] == UINT8_MAX);
1235 	CU_ASSERT(compressed_data[1] == 0xAA);
1236 	CU_ASSERT(compressed_data[2] == 1);
1237 	CU_ASSERT(compressed_data[3] == 0xAA);
1238 
1239 	decompressed_len = sizeof(decompressed_data);
1240 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1241 	CU_ASSERT(rc == 0);
1242 	CU_ASSERT(decompressed_len == UINT8_MAX + 1);
1243 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1244 
1245 	ut_build_data_buffer(original_data, BUFSIZE, 0x00, 1);
1246 	compressed_len = sizeof(compressed_data);
1247 	rc = ut_compress(compressed_data, &compressed_len, original_data, 2048);
1248 	CU_ASSERT(rc == 0);
1249 	CU_ASSERT(compressed_len == 4096);
1250 	CU_ASSERT(compressed_data[0] == 1);
1251 	CU_ASSERT(compressed_data[1] == 0);
1252 	CU_ASSERT(compressed_data[4094] == 1);
1253 	CU_ASSERT(compressed_data[4095] == 0xFF);
1254 
1255 	decompressed_len = sizeof(decompressed_data);
1256 	rc = ut_decompress(decompressed_data, &decompressed_len, compressed_data, compressed_len);
1257 	CU_ASSERT(rc == 0);
1258 	CU_ASSERT(decompressed_len == 2048);
1259 	CU_ASSERT(memcmp(original_data, decompressed_data, decompressed_len) == 0);
1260 
1261 	compressed_len = sizeof(compressed_data);
1262 	rc = ut_compress(compressed_data, &compressed_len, original_data, 2049);
1263 	CU_ASSERT(rc == -ENOSPC);
1264 }
1265 
1266 static void
1267 test_prepare_compress_chunk(void)
1268 {
1269 	struct spdk_reduce_vol vol = {};
1270 	struct spdk_reduce_backing_dev backing_dev = {};
1271 	struct spdk_reduce_vol_request req = {};
1272 	void *buf;
1273 	char *buffer_end, *aligned_user_buffer, *unaligned_user_buffer;
1274 	char decomp_buffer[16 * 1024] = {};
1275 	char comp_buffer[16 * 1024] = {};
1276 	struct iovec user_iov[2] = {};
1277 	size_t user_buffer_iov_len = 8192;
1278 	size_t remainder_bytes;
1279 	size_t offset_bytes;
1280 	size_t memcmp_offset;
1281 	uint32_t i;
1282 
1283 	vol.params.chunk_size = 16 * 1024;
1284 	vol.params.backing_io_unit_size = 4096;
1285 	vol.params.logical_block_size = 512;
1286 	backing_dev_init(&backing_dev, &vol.params, 512);
1287 	vol.backing_dev = &backing_dev;
1288 	vol.logical_blocks_per_chunk = vol.params.chunk_size / vol.params.logical_block_size;
1289 
1290 	/* Allocate 1 extra byte to test a case when buffer crosses huge page boundary */
1291 	SPDK_CU_ASSERT_FATAL(posix_memalign(&buf, VALUE_2MB, VALUE_2MB + 1) == 0);
1292 	buffer_end = (char *)buf + VALUE_2MB + 1;
1293 	aligned_user_buffer = (char *)buf;
1294 	memset(aligned_user_buffer, 0xc, vol.params.chunk_size);
1295 	unaligned_user_buffer = buffer_end - vol.params.chunk_size;
1296 	memset(unaligned_user_buffer, 0xc, vol.params.chunk_size);
1297 
1298 	req.vol = &vol;
1299 	req.decomp_buf = decomp_buffer;
1300 	req.comp_buf = comp_buffer;
1301 	req.iov = user_iov;
1302 	req.iovcnt = 2;
1303 	req.offset = 0;
1304 
1305 	/* Part 1 - backing dev supports sgl_in */
1306 	/* Test 1 - user's buffers length equals to chunk_size */
1307 	for (i = 0; i < 2; i++) {
1308 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1309 		req.iov[i].iov_len = user_buffer_iov_len;
1310 	}
1311 
1312 	_prepare_compress_chunk(&req, false);
1313 	CU_ASSERT(req.decomp_iovcnt == 2);
1314 	for (i = 0; i < 2; i++) {
1315 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1316 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1317 	}
1318 
1319 	_prepare_compress_chunk(&req, true);
1320 	CU_ASSERT(req.decomp_iovcnt == 2);
1321 	for (i = 0; i < 2; i++) {
1322 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1323 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1324 	}
1325 
1326 	/* Test 2 - user's buffer less than chunk_size, without offset */
1327 	user_buffer_iov_len = 4096;
1328 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1329 	for (i = 0; i < 2; i++) {
1330 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1331 		req.iov[i].iov_len = user_buffer_iov_len;
1332 	}
1333 
1334 	_prepare_compress_chunk(&req, false);
1335 	CU_ASSERT(req.decomp_iovcnt == 3);
1336 	for (i = 0; i < 2; i++) {
1337 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1338 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1339 	}
1340 	CU_ASSERT(req.decomp_iov[i].iov_base == req.decomp_buf + user_buffer_iov_len * 2);
1341 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1342 
1343 	_prepare_compress_chunk(&req, true);
1344 	CU_ASSERT(req.decomp_iovcnt == 3);
1345 	for (i = 0; i < 2; i++) {
1346 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1347 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1348 	}
1349 	CU_ASSERT(req.decomp_iov[i].iov_base == g_zero_buf + user_buffer_iov_len * 2);
1350 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1351 
1352 	/* Test 3 - user's buffer less than chunk_size, non zero offset */
1353 	user_buffer_iov_len = 4096;
1354 	req.offset = 3;
1355 	offset_bytes = req.offset * vol.params.logical_block_size;
1356 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1357 
1358 	_prepare_compress_chunk(&req, false);
1359 	CU_ASSERT(req.decomp_iovcnt == 4);
1360 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1361 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1362 	for (i = 0; i < 2; i++) {
1363 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1364 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1365 	}
1366 	CU_ASSERT(req.decomp_iov[3].iov_base == req.decomp_buf + offset_bytes + user_buffer_iov_len * 2);
1367 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1368 
1369 	_prepare_compress_chunk(&req, true);
1370 	CU_ASSERT(req.decomp_iovcnt == 4);
1371 	CU_ASSERT(req.decomp_iov[0].iov_base == g_zero_buf);
1372 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1373 	for (i = 0; i < 2; i++) {
1374 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1375 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1376 	}
1377 	CU_ASSERT(req.decomp_iov[3].iov_base == g_zero_buf + offset_bytes + user_buffer_iov_len * 2);
1378 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1379 
1380 	/* Part 2 - backing dev doesn't support sgl_in */
1381 	/* Test 1 - user's buffers length equals to chunk_size
1382 	 * user's buffers are copied */
1383 	vol.backing_dev->sgl_in = false;
1384 	req.offset = 0;
1385 	user_buffer_iov_len = 8192;
1386 	for (i = 0; i < 2; i++) {
1387 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1388 		req.iov[i].iov_len = user_buffer_iov_len;
1389 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1390 	}
1391 
1392 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1393 
1394 	_prepare_compress_chunk(&req, false);
1395 	CU_ASSERT(req.decomp_iovcnt == 1);
1396 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1397 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1398 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base, req.iov[0].iov_len) == 0);
1399 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + req.iov[0].iov_len, req.iov[1].iov_base,
1400 			 req.iov[1].iov_len) == 0);
1401 
1402 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1403 
1404 	_prepare_compress_chunk(&req, true);
1405 	CU_ASSERT(req.decomp_iovcnt == 1);
1406 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1407 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1408 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base, req.iov[0].iov_len) == 0);
1409 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + req.iov[0].iov_len, req.iov[1].iov_base,
1410 			 req.iov[1].iov_len) == 0);
1411 
1412 	/* Test 2 - single user's buffer length equals to chunk_size, buffer is not aligned
1413 	* User's buffer is copied */
1414 	req.iov[0].iov_base = unaligned_user_buffer;
1415 	req.iov[0].iov_len = vol.params.chunk_size;
1416 	req.iovcnt = 1;
1417 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1418 
1419 	_prepare_compress_chunk(&req, false);
1420 	CU_ASSERT(req.decomp_iovcnt == 1);
1421 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1422 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1423 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base,
1424 			 req.iov[0].iov_len) == 0);
1425 
1426 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1427 
1428 	_prepare_compress_chunk(&req, true);
1429 	CU_ASSERT(req.decomp_iovcnt == 1);
1430 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1431 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1432 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base, req.iov[0].iov_base,
1433 			 req.iov[0].iov_len) == 0);
1434 
1435 	/* Test 3 - single user's buffer length equals to chunk_size
1436 	 * User's buffer is not copied */
1437 	req.iov[0].iov_base = aligned_user_buffer;
1438 	req.iov[0].iov_len = vol.params.chunk_size;
1439 	req.iovcnt = 1;
1440 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1441 
1442 	_prepare_compress_chunk(&req, false);
1443 	CU_ASSERT(req.decomp_iovcnt == 1);
1444 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1445 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1446 
1447 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1448 
1449 	_prepare_compress_chunk(&req, true);
1450 	CU_ASSERT(req.decomp_iovcnt == 1);
1451 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1452 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1453 
1454 	/* Test 4 - user's buffer less than chunk_size, without offset
1455 	 * User's buffers are copied */
1456 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1457 	user_buffer_iov_len = 4096;
1458 	req.iovcnt = 2;
1459 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1460 	for (i = 0; i < 2; i++) {
1461 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1462 		req.iov[i].iov_len = user_buffer_iov_len;
1463 	}
1464 
1465 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1466 
1467 	_prepare_compress_chunk(&req, false);
1468 	CU_ASSERT(req.decomp_iovcnt == 1);
1469 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1470 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1471 	memcmp_offset = 0;
1472 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1473 			 req.iov[0].iov_len) == 0);
1474 	memcmp_offset += req.iov[0].iov_len;
1475 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1476 			 req.iov[1].iov_len) == 0);
1477 	memcmp_offset += req.iov[0].iov_len;
1478 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf + memcmp_offset,
1479 			 remainder_bytes) == 0);
1480 
1481 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1482 
1483 	_prepare_compress_chunk(&req, true);
1484 	CU_ASSERT(req.decomp_iovcnt == 1);
1485 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1486 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1487 	memcmp_offset = 0;
1488 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1489 			 req.iov[0].iov_len) == 0);
1490 	memcmp_offset += req.iov[0].iov_len;
1491 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1492 			 req.iov[1].iov_len) == 0);
1493 	memcmp_offset += req.iov[0].iov_len;
1494 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf + memcmp_offset,
1495 			 remainder_bytes) == 0);
1496 
1497 	/* Test 5 - user's buffer less than chunk_size, non zero offset
1498 	 * user's buffers are copied */
1499 	req.offset = 3;
1500 	offset_bytes = req.offset * vol.params.logical_block_size;
1501 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1502 
1503 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1504 
1505 	_prepare_compress_chunk(&req, false);
1506 	CU_ASSERT(req.decomp_iovcnt == 1);
1507 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1508 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1509 	memcmp_offset = 0;
1510 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf, offset_bytes) == 0);
1511 	memcmp_offset += offset_bytes;
1512 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1513 			 req.iov[0].iov_len) == 0);
1514 	memcmp_offset += req.iov[0].iov_len;
1515 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1516 			 req.iov[1].iov_len) == 0);
1517 	memcmp_offset += req.iov[1].iov_len;
1518 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.decomp_buf + memcmp_offset,
1519 			 remainder_bytes) == 0);
1520 
1521 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1522 
1523 	_prepare_compress_chunk(&req, true);
1524 	CU_ASSERT(req.decomp_iovcnt == 1);
1525 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1526 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1527 	memcmp_offset = 0;
1528 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf, offset_bytes) == 0);
1529 	memcmp_offset += offset_bytes;
1530 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[0].iov_base,
1531 			 req.iov[0].iov_len) == 0);
1532 	memcmp_offset += req.iov[0].iov_len;
1533 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, req.iov[1].iov_base,
1534 			 req.iov[1].iov_len) == 0);
1535 	memcmp_offset += req.iov[1].iov_len;
1536 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + memcmp_offset, g_zero_buf + memcmp_offset,
1537 			 remainder_bytes) == 0);
1538 	backing_dev_destroy(&backing_dev);
1539 	free(buf);
1540 }
1541 
1542 static void
1543 _reduce_vol_op_complete(void *ctx, int reduce_errno)
1544 {
1545 	g_reduce_errno = reduce_errno;
1546 }
1547 
1548 static void
1549 dummy_backing_dev_decompress(struct spdk_reduce_backing_dev *backing_dev,
1550 			     struct iovec *src_iov, int src_iovcnt,
1551 			     struct iovec *dst_iov, int dst_iovcnt,
1552 			     struct spdk_reduce_vol_cb_args *args)
1553 {
1554 	args->output_size = g_decompressed_len;
1555 	args->cb_fn(args->cb_arg, 0);
1556 }
1557 static void
1558 test_reduce_decompress_chunk(void)
1559 {
1560 	struct spdk_reduce_vol vol = {};
1561 	struct spdk_reduce_backing_dev backing_dev = {};
1562 	struct spdk_reduce_vol_request req = {};
1563 	void *buf;
1564 	char *buffer_end, *aligned_user_buffer, *unaligned_user_buffer;
1565 	char decomp_buffer[16 * 1024] = {};
1566 	char comp_buffer[16 * 1024] = {};
1567 	struct iovec user_iov[2] = {};
1568 	struct iovec comp_buf_iov = {};
1569 	struct spdk_reduce_chunk_map chunk = {};
1570 	size_t user_buffer_iov_len = 8192;
1571 	size_t remainder_bytes;
1572 	size_t offset_bytes;
1573 	uint32_t i;
1574 
1575 	vol.params.chunk_size = 16 * 1024;
1576 	vol.params.backing_io_unit_size = 4096;
1577 	vol.params.logical_block_size = 512;
1578 	backing_dev_init(&backing_dev, &vol.params, 512);
1579 	backing_dev.decompress = dummy_backing_dev_decompress;
1580 	vol.backing_dev = &backing_dev;
1581 	vol.logical_blocks_per_chunk = vol.params.chunk_size / vol.params.logical_block_size;
1582 	TAILQ_INIT(&vol.executing_requests);
1583 	TAILQ_INIT(&vol.queued_requests);
1584 	TAILQ_INIT(&vol.free_requests);
1585 
1586 	/* Allocate 1 extra byte to test a case when buffer crosses huge page boundary */
1587 	SPDK_CU_ASSERT_FATAL(posix_memalign(&buf, VALUE_2MB, VALUE_2MB + 1) == 0);
1588 	buffer_end = (char *)buf + VALUE_2MB + 1;
1589 	aligned_user_buffer = (char *)buf;
1590 	unaligned_user_buffer = buffer_end - vol.params.chunk_size;
1591 
1592 	chunk.compressed_size = user_buffer_iov_len / 2;
1593 	req.chunk = &chunk;
1594 	req.vol = &vol;
1595 	req.decomp_buf = decomp_buffer;
1596 	req.comp_buf = comp_buffer;
1597 	req.comp_buf_iov = &comp_buf_iov;
1598 	req.iov = user_iov;
1599 	req.iovcnt = 2;
1600 	req.offset = 0;
1601 	req.cb_fn = _reduce_vol_op_complete;
1602 
1603 	/* Part 1 - backing dev supports sgl_out */
1604 	/* Test 1 - user's buffers length equals to chunk_size */
1605 	for (i = 0; i < 2; i++) {
1606 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1607 		req.iov[i].iov_len = user_buffer_iov_len;
1608 		memset(req.iov[i].iov_base, 0, req.iov[i].iov_len);
1609 	}
1610 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1611 	g_reduce_errno = -1;
1612 	g_decompressed_len = vol.params.chunk_size;
1613 
1614 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1615 	CU_ASSERT(g_reduce_errno == 0);
1616 	CU_ASSERT(req.copy_after_decompress == false);
1617 	CU_ASSERT(req.decomp_iovcnt == 2);
1618 	for (i = 0; i < 2; i++) {
1619 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1620 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1621 	}
1622 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1623 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1624 
1625 	/* Test 2 - user's buffer less than chunk_size, without offset */
1626 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1627 	g_reduce_errno = -1;
1628 	user_buffer_iov_len = 4096;
1629 	for (i = 0; i < 2; i++) {
1630 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1631 		req.iov[i].iov_len = user_buffer_iov_len;
1632 		memset(req.iov[i].iov_base, 0, req.iov[i].iov_len);
1633 	}
1634 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1635 
1636 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1637 	CU_ASSERT(g_reduce_errno == 0);
1638 	CU_ASSERT(req.copy_after_decompress == false);
1639 	CU_ASSERT(req.decomp_iovcnt == 3);
1640 	for (i = 0; i < 2; i++) {
1641 		CU_ASSERT(req.decomp_iov[i].iov_base == req.iov[i].iov_base);
1642 		CU_ASSERT(req.decomp_iov[i].iov_len == req.iov[i].iov_len);
1643 	}
1644 	CU_ASSERT(req.decomp_iov[i].iov_base == req.decomp_buf + user_buffer_iov_len * 2);
1645 	CU_ASSERT(req.decomp_iov[i].iov_len == remainder_bytes);
1646 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1647 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1648 
1649 	/* Test 3 - user's buffer less than chunk_size, non zero offset */
1650 	req.offset = 3;
1651 	offset_bytes = req.offset * vol.params.logical_block_size;
1652 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1653 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1654 	g_reduce_errno = -1;
1655 
1656 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1657 	CU_ASSERT(g_reduce_errno == 0);
1658 	CU_ASSERT(req.copy_after_decompress == false);
1659 	CU_ASSERT(req.decomp_iovcnt == 4);
1660 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1661 	CU_ASSERT(req.decomp_iov[0].iov_len == offset_bytes);
1662 	for (i = 0; i < 2; i++) {
1663 		CU_ASSERT(req.decomp_iov[i + 1].iov_base == req.iov[i].iov_base);
1664 		CU_ASSERT(req.decomp_iov[i + 1].iov_len == req.iov[i].iov_len);
1665 	}
1666 	CU_ASSERT(req.decomp_iov[3].iov_base == req.decomp_buf + offset_bytes + user_buffer_iov_len * 2);
1667 	CU_ASSERT(req.decomp_iov[3].iov_len == remainder_bytes);
1668 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1669 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1670 
1671 	/* Part 2 - backing dev doesn't support sgl_out */
1672 	/* Test 1 - user's buffers length equals to chunk_size
1673 	 * user's buffers are copied */
1674 	vol.backing_dev->sgl_out = false;
1675 	req.offset = 0;
1676 	user_buffer_iov_len = 8192;
1677 
1678 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1679 	for (i = 0; i < 2; i++) {
1680 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1681 		req.iov[i].iov_len = user_buffer_iov_len;
1682 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1683 	}
1684 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1685 	g_reduce_errno = -1;
1686 
1687 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1688 	CU_ASSERT(g_reduce_errno == 0);
1689 	CU_ASSERT(req.copy_after_decompress == true);
1690 	CU_ASSERT(req.decomp_iovcnt == 1);
1691 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1692 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1693 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base, req.iov[0].iov_len) == 0);
1694 	CU_ASSERT(memcmp(req.iov[1].iov_base, req.decomp_iov[0].iov_base + req.iov[0].iov_len,
1695 			 req.iov[1].iov_len) == 0);
1696 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1697 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1698 
1699 	/* Test 2 - single user's buffer length equals to chunk_size, buffer is not aligned
1700 	* User's buffer is copied */
1701 	memset(unaligned_user_buffer, 0xc, vol.params.chunk_size);
1702 	req.iov[0].iov_base = unaligned_user_buffer;
1703 	req.iov[0].iov_len = vol.params.chunk_size;
1704 	req.iovcnt = 1;
1705 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1706 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1707 	g_reduce_errno = -1;
1708 
1709 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1710 	CU_ASSERT(g_reduce_errno == 0);
1711 	CU_ASSERT(req.copy_after_decompress == true);
1712 	CU_ASSERT(req.decomp_iovcnt == 1);
1713 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1714 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1715 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base,
1716 			 req.iov[0].iov_len) == 0);
1717 
1718 	/* Test 3 - single user's buffer length equals to chunk_size
1719 	* User's buffer is not copied */
1720 	req.iov[0].iov_base = aligned_user_buffer;
1721 	req.iov[0].iov_len = vol.params.chunk_size;
1722 	req.iovcnt = 1;
1723 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1724 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1725 	g_reduce_errno = -1;
1726 
1727 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1728 	CU_ASSERT(g_reduce_errno == 0);
1729 	CU_ASSERT(req.copy_after_decompress == false);
1730 	CU_ASSERT(req.decomp_iovcnt == 1);
1731 	CU_ASSERT(req.decomp_iov[0].iov_base == req.iov[0].iov_base);
1732 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1733 
1734 	/* Test 4 - user's buffer less than chunk_size, without offset
1735 	 * User's buffers are copied */
1736 	user_buffer_iov_len = 4096;
1737 	req.iovcnt = 2;
1738 	remainder_bytes = vol.params.chunk_size - user_buffer_iov_len * 2;
1739 	for (i = 0; i < 2; i++) {
1740 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1741 		req.iov[i].iov_len = user_buffer_iov_len;
1742 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1743 	}
1744 
1745 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1746 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1747 	g_reduce_errno = -1;
1748 
1749 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1750 	CU_ASSERT(g_reduce_errno == 0);
1751 	CU_ASSERT(req.copy_after_decompress == true);
1752 	CU_ASSERT(req.decomp_iovcnt == 1);
1753 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1754 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1755 	CU_ASSERT(memcmp(req.iov[0].iov_base, req.decomp_iov[0].iov_base,
1756 			 req.iov[0].iov_len) == 0);
1757 	CU_ASSERT(memcmp(req.iov[1].iov_base, req.decomp_iov[0].iov_base + req.iov[0].iov_len,
1758 			 req.iov[1].iov_len) == 0);
1759 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1760 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1761 
1762 	/* Test 5 - user's buffer less than chunk_size, non zero offset
1763 	* user's buffers are copied */
1764 	req.offset = 3;
1765 	offset_bytes = req.offset * vol.params.logical_block_size;
1766 	remainder_bytes = vol.params.chunk_size - offset_bytes - user_buffer_iov_len * 2;
1767 
1768 	for (i = 0; i < 2; i++) {
1769 		req.iov[i].iov_base = aligned_user_buffer + i * user_buffer_iov_len;
1770 		req.iov[i].iov_len = user_buffer_iov_len;
1771 		memset(req.iov[i].iov_base, 0xb + i, req.iov[i].iov_len);
1772 	}
1773 
1774 	memset(req.decomp_buf, 0xa, vol.params.chunk_size);
1775 	TAILQ_INSERT_HEAD(&vol.executing_requests, &req, tailq);
1776 	g_reduce_errno = -1;
1777 
1778 	_prepare_compress_chunk(&req, false);
1779 	_reduce_vol_decompress_chunk(&req, _read_decompress_done);
1780 	CU_ASSERT(g_reduce_errno == 0);
1781 	CU_ASSERT(req.copy_after_decompress == true);
1782 	CU_ASSERT(req.decomp_iovcnt == 1);
1783 	CU_ASSERT(req.decomp_iov[0].iov_base == req.decomp_buf);
1784 	CU_ASSERT(req.decomp_iov[0].iov_len == vol.params.chunk_size);
1785 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + offset_bytes, req.iov[0].iov_base,
1786 			 req.iov[0].iov_len) == 0);
1787 	CU_ASSERT(memcmp(req.decomp_iov[0].iov_base + offset_bytes + req.iov[0].iov_len,
1788 			 req.iov[1].iov_base,
1789 			 req.iov[1].iov_len) == 0);
1790 	CU_ASSERT(TAILQ_EMPTY(&vol.executing_requests));
1791 	CU_ASSERT(TAILQ_FIRST(&vol.free_requests) == &req);
1792 
1793 	free(buf);
1794 }
1795 
1796 static void
1797 test_allocate_vol_requests(void)
1798 {
1799 	struct spdk_reduce_vol *vol;
1800 	/* include chunk_sizes which are not power of 2 */
1801 	uint32_t chunk_sizes[] = {8192, 8320, 16384, 16416, 32768};
1802 	uint32_t io_unit_sizes[] = {512, 520, 4096, 4104, 4096};
1803 	uint32_t i;
1804 
1805 	for (i = 0; i < 4; i++) {
1806 		vol = calloc(1, sizeof(*vol));
1807 		SPDK_CU_ASSERT_FATAL(vol);
1808 
1809 		vol->params.chunk_size = chunk_sizes[i];
1810 		vol->params.logical_block_size = io_unit_sizes[i];
1811 		vol->params.backing_io_unit_size = io_unit_sizes[i];
1812 		vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
1813 		vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
1814 
1815 		CU_ASSERT(_validate_vol_params(&vol->params) == 0);
1816 		CU_ASSERT(_allocate_vol_requests(vol) == 0);
1817 		_init_load_cleanup(vol, NULL);
1818 	}
1819 }
1820 
1821 int
1822 main(int argc, char **argv)
1823 {
1824 	CU_pSuite	suite = NULL;
1825 	unsigned int	num_failures;
1826 
1827 	CU_initialize_registry();
1828 
1829 	suite = CU_add_suite("reduce", NULL, NULL);
1830 
1831 	CU_ADD_TEST(suite, get_pm_file_size);
1832 	CU_ADD_TEST(suite, get_vol_size);
1833 	CU_ADD_TEST(suite, init_failure);
1834 	CU_ADD_TEST(suite, init_md);
1835 	CU_ADD_TEST(suite, init_backing_dev);
1836 	CU_ADD_TEST(suite, load);
1837 	CU_ADD_TEST(suite, write_maps);
1838 	CU_ADD_TEST(suite, read_write);
1839 	CU_ADD_TEST(suite, readv_writev);
1840 	CU_ADD_TEST(suite, destroy);
1841 	CU_ADD_TEST(suite, defer_bdev_io);
1842 	CU_ADD_TEST(suite, overlapped);
1843 	CU_ADD_TEST(suite, compress_algorithm);
1844 	CU_ADD_TEST(suite, test_prepare_compress_chunk);
1845 	CU_ADD_TEST(suite, test_reduce_decompress_chunk);
1846 	CU_ADD_TEST(suite, test_allocate_vol_requests);
1847 
1848 	g_unlink_path = g_path;
1849 	g_unlink_callback = unlink_cb;
1850 
1851 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
1852 	CU_cleanup_registry();
1853 	return num_failures;
1854 }
1855