xref: /spdk/lib/reduce/reduce.c (revision 13a58c41ad5c01eefaf68f049b6c7da2b9d19426)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/env.h"
38 #include "spdk/string.h"
39 #include "spdk/bit_array.h"
40 #include "spdk/util.h"
41 #include "spdk_internal/log.h"
42 
43 #include "libpmem.h"
44 
45 /* Always round up the size of the PM region to the nearest cacheline. */
46 #define REDUCE_PM_SIZE_ALIGNMENT	64
47 
48 /* Offset into the backing device where the persistent memory file's path is stored. */
49 #define REDUCE_BACKING_DEV_PATH_OFFSET	4096
50 
51 #define REDUCE_EMPTY_MAP_ENTRY	-1ULL
52 
53 #define REDUCE_NUM_VOL_REQUESTS	256
54 
55 /* Structure written to offset 0 of both the pm file and the backing device. */
56 struct spdk_reduce_vol_superblock {
57 	uint8_t				signature[8];
58 	struct spdk_reduce_vol_params	params;
59 	uint8_t				reserved[4048];
60 };
61 SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
62 
63 #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
64 /* null terminator counts one */
65 SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
66 		   sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect");
67 
68 #define REDUCE_PATH_MAX 4096
69 
70 /**
71  * Describes a persistent memory file used to hold metadata associated with a
72  *  compressed volume.
73  */
74 struct spdk_reduce_pm_file {
75 	char			path[REDUCE_PATH_MAX];
76 	void			*pm_buf;
77 	int			pm_is_pmem;
78 	uint64_t		size;
79 };
80 
81 struct spdk_reduce_vol_request {
82 	/**
83 	 *  Scratch buffer used for read/modify/write operations on
84 	 *  I/Os less than a full chunk size, and as the intermediate
85 	 *  buffer for compress/decompress operations.
86 	 */
87 	uint8_t					*buf;
88 	struct iovec				*buf_iov;
89 	struct iovec				*iov;
90 	struct spdk_reduce_vol			*vol;
91 	int					reduce_errno;
92 	int					iovcnt;
93 	int					num_backing_ops;
94 	uint64_t				offset;
95 	uint64_t				length;
96 	uint64_t				chunk_map_index;
97 	uint64_t				*chunk;
98 	spdk_reduce_vol_op_complete		cb_fn;
99 	void					*cb_arg;
100 	TAILQ_ENTRY(spdk_reduce_vol_request)	tailq;
101 	struct spdk_reduce_vol_cb_args		backing_cb_args;
102 };
103 
104 struct spdk_reduce_vol {
105 	struct spdk_reduce_vol_params		params;
106 	uint32_t				backing_io_units_per_chunk;
107 	uint32_t				backing_lba_per_io_unit;
108 	uint32_t				logical_blocks_per_chunk;
109 	struct spdk_reduce_pm_file		pm_file;
110 	struct spdk_reduce_backing_dev		*backing_dev;
111 	struct spdk_reduce_vol_superblock	*backing_super;
112 	struct spdk_reduce_vol_superblock	*pm_super;
113 	uint64_t				*pm_logical_map;
114 	uint64_t				*pm_chunk_maps;
115 
116 	struct spdk_bit_array			*allocated_chunk_maps;
117 	struct spdk_bit_array			*allocated_backing_io_units;
118 
119 	struct spdk_reduce_vol_request		*request_mem;
120 	TAILQ_HEAD(, spdk_reduce_vol_request)	free_requests;
121 
122 	/* Single contiguous buffer used for all request buffers for this volume. */
123 	uint8_t					*reqbufspace;
124 	struct iovec				*buf_iov_mem;
125 };
126 
127 /*
128  * Allocate extra metadata chunks and corresponding backing io units to account for
129  *  outstanding IO in worst case scenario where logical map is completely allocated
130  *  and no data can be compressed.  We need extra chunks in this case to handle
131  *  in-flight writes since reduce never writes data in place.
132  */
133 #define REDUCE_NUM_EXTRA_CHUNKS 128
134 
135 static void
136 _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
137 {
138 	if (vol->pm_file.pm_is_pmem) {
139 		pmem_persist(addr, len);
140 	} else {
141 		pmem_msync(addr, len);
142 	}
143 }
144 
145 static uint64_t
146 _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
147 {
148 	uint64_t chunks_in_logical_map, logical_map_size;
149 
150 	chunks_in_logical_map = vol_size / chunk_size;
151 	logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
152 
153 	/* Round up to next cacheline. */
154 	return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
155 	       REDUCE_PM_SIZE_ALIGNMENT;
156 }
157 
158 static uint64_t
159 _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
160 {
161 	uint64_t num_chunks;
162 
163 	num_chunks = vol_size / chunk_size;
164 	num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
165 
166 	return num_chunks;
167 }
168 
169 static uint64_t
170 _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
171 {
172 	uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
173 
174 	num_chunks = _get_total_chunks(vol_size, chunk_size);
175 	io_units_per_chunk = chunk_size / backing_io_unit_size;
176 	total_chunks_size = num_chunks * io_units_per_chunk * sizeof(uint64_t);
177 
178 	return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
179 	       REDUCE_PM_SIZE_ALIGNMENT;
180 }
181 
182 static uint64_t *
183 _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
184 {
185 	assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
186 
187 	return vol->pm_chunk_maps + (chunk_map_index * vol->backing_io_units_per_chunk);
188 }
189 
190 static int
191 _validate_vol_params(struct spdk_reduce_vol_params *params)
192 {
193 	if (params->vol_size > 0) {
194 		/**
195 		 * User does not pass in the vol size - it gets calculated by libreduce from
196 		 *  values in this structure plus the size of the backing device.
197 		 */
198 		return -EINVAL;
199 	}
200 
201 	if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
202 	    params->logical_block_size == 0) {
203 		return -EINVAL;
204 	}
205 
206 	/* Chunk size must be an even multiple of the backing io unit size. */
207 	if ((params->chunk_size % params->backing_io_unit_size) != 0) {
208 		return -EINVAL;
209 	}
210 
211 	/* Chunk size must be an even multiple of the logical block size. */
212 	if ((params->chunk_size % params->logical_block_size) != 0) {
213 		return -1;
214 	}
215 
216 	return 0;
217 }
218 
219 static uint64_t
220 _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
221 {
222 	uint64_t num_chunks;
223 
224 	num_chunks = backing_dev_size / chunk_size;
225 	if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
226 		return 0;
227 	}
228 
229 	num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
230 	return num_chunks * chunk_size;
231 }
232 
233 static uint64_t
234 _get_pm_file_size(struct spdk_reduce_vol_params *params)
235 {
236 	uint64_t total_pm_size;
237 
238 	total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
239 	total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
240 	total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
241 			 params->backing_io_unit_size);
242 	return total_pm_size;
243 }
244 
245 const struct spdk_uuid *
246 spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
247 {
248 	return &vol->params.uuid;
249 }
250 
251 static void
252 _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
253 {
254 	/* Superblock is at the beginning of the pm file. */
255 	vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
256 
257 	/* Logical map immediately follows the super block. */
258 	vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
259 
260 	/* Chunks maps follow the logical map. */
261 	vol->pm_chunk_maps = vol->pm_logical_map + (vol->params.vol_size / vol->params.chunk_size);
262 }
263 
264 /* We need 2 iovs during load - one for the superblock, another for the path */
265 #define LOAD_IOV_COUNT	2
266 
267 struct reduce_init_load_ctx {
268 	struct spdk_reduce_vol			*vol;
269 	struct spdk_reduce_vol_cb_args		backing_cb_args;
270 	spdk_reduce_vol_op_with_handle_complete	cb_fn;
271 	void					*cb_arg;
272 	struct iovec				iov[LOAD_IOV_COUNT];
273 	void					*path;
274 };
275 
276 static int
277 _allocate_vol_requests(struct spdk_reduce_vol *vol)
278 {
279 	struct spdk_reduce_vol_request *req;
280 	int i;
281 
282 	vol->reqbufspace = spdk_dma_malloc(REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, 64, NULL);
283 	if (vol->reqbufspace == NULL) {
284 		return -ENOMEM;
285 	}
286 
287 	vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
288 	if (vol->request_mem == NULL) {
289 		spdk_dma_free(vol->reqbufspace);
290 		vol->reqbufspace = NULL;
291 		return -ENOMEM;
292 	}
293 
294 	vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
295 				  sizeof(struct iovec) * vol->backing_io_units_per_chunk);
296 	if (vol->buf_iov_mem == NULL) {
297 		free(vol->request_mem);
298 		spdk_dma_free(vol->reqbufspace);
299 		vol->request_mem = NULL;
300 		vol->reqbufspace = NULL;
301 		return -ENOMEM;
302 	}
303 
304 	for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
305 		req = &vol->request_mem[i];
306 		TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
307 		req->buf_iov = &vol->buf_iov_mem[i * vol->backing_io_units_per_chunk];
308 		req->buf = vol->reqbufspace + i * vol->params.chunk_size;
309 	}
310 
311 	return 0;
312 }
313 
314 static void
315 _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
316 {
317 	if (ctx != NULL) {
318 		spdk_dma_free(ctx->path);
319 		free(ctx);
320 	}
321 
322 	if (vol != NULL) {
323 		pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
324 		spdk_dma_free(vol->backing_super);
325 		spdk_bit_array_free(&vol->allocated_chunk_maps);
326 		spdk_bit_array_free(&vol->allocated_backing_io_units);
327 		free(vol->request_mem);
328 		free(vol->buf_iov_mem);
329 		spdk_dma_free(vol->reqbufspace);
330 		free(vol);
331 	}
332 }
333 
334 static void
335 _init_write_super_cpl(void *cb_arg, int reduce_errno)
336 {
337 	struct reduce_init_load_ctx *init_ctx = cb_arg;
338 	int rc;
339 
340 	rc = _allocate_vol_requests(init_ctx->vol);
341 	if (rc != 0) {
342 		init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
343 		_init_load_cleanup(init_ctx->vol, init_ctx);
344 		return;
345 	}
346 
347 	init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno);
348 	/* Only clean up the ctx - the vol has been passed to the application
349 	 *  for use now that initialization was successful.
350 	 */
351 	_init_load_cleanup(NULL, init_ctx);
352 }
353 
354 static void
355 _init_write_path_cpl(void *cb_arg, int reduce_errno)
356 {
357 	struct reduce_init_load_ctx *init_ctx = cb_arg;
358 	struct spdk_reduce_vol *vol = init_ctx->vol;
359 
360 	init_ctx->iov[0].iov_base = vol->backing_super;
361 	init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
362 	init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
363 	init_ctx->backing_cb_args.cb_arg = init_ctx;
364 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
365 				 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen,
366 				 &init_ctx->backing_cb_args);
367 }
368 
369 static int
370 _allocate_bit_arrays(struct spdk_reduce_vol *vol)
371 {
372 	uint64_t total_chunks, total_backing_io_units;
373 
374 	total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
375 	vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
376 	total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
377 	vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
378 
379 	if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
380 		return -ENOMEM;
381 	}
382 
383 	/* Set backing io unit bits associated with metadata. */
384 	spdk_bit_array_set(vol->allocated_backing_io_units, 0);
385 	spdk_bit_array_set(vol->allocated_backing_io_units, 1);
386 
387 	return 0;
388 }
389 
390 void
391 spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
392 		     struct spdk_reduce_backing_dev *backing_dev,
393 		     const char *pm_file_dir,
394 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
395 {
396 	struct spdk_reduce_vol *vol;
397 	struct reduce_init_load_ctx *init_ctx;
398 	uint64_t backing_dev_size;
399 	size_t mapped_len;
400 	int dir_len, max_dir_len, rc;
401 
402 	/* We need to append a path separator and the UUID to the supplied
403 	 * path.
404 	 */
405 	max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
406 	dir_len = strnlen(pm_file_dir, max_dir_len);
407 	/* Strip trailing slash if the user provided one - we will add it back
408 	 * later when appending the filename.
409 	 */
410 	if (pm_file_dir[dir_len - 1] == '/') {
411 		dir_len--;
412 	}
413 	if (dir_len == max_dir_len) {
414 		SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
415 		cb_fn(cb_arg, NULL, -EINVAL);
416 		return;
417 	}
418 
419 	rc = _validate_vol_params(params);
420 	if (rc != 0) {
421 		SPDK_ERRLOG("invalid vol params\n");
422 		cb_fn(cb_arg, NULL, rc);
423 		return;
424 	}
425 
426 	backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
427 	params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
428 	if (params->vol_size == 0) {
429 		SPDK_ERRLOG("backing device is too small\n");
430 		cb_fn(cb_arg, NULL, -EINVAL);
431 		return;
432 	}
433 
434 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
435 	    backing_dev->unmap == NULL) {
436 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
437 		cb_fn(cb_arg, NULL, -EINVAL);
438 		return;
439 	}
440 
441 	vol = calloc(1, sizeof(*vol));
442 	if (vol == NULL) {
443 		cb_fn(cb_arg, NULL, -ENOMEM);
444 		return;
445 	}
446 
447 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 0, NULL);
448 	if (vol->backing_super == NULL) {
449 		cb_fn(cb_arg, NULL, -ENOMEM);
450 		_init_load_cleanup(vol, NULL);
451 		return;
452 	}
453 
454 	init_ctx = calloc(1, sizeof(*init_ctx));
455 	if (init_ctx == NULL) {
456 		cb_fn(cb_arg, NULL, -ENOMEM);
457 		_init_load_cleanup(vol, NULL);
458 		return;
459 	}
460 
461 	init_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 0, NULL);
462 	if (init_ctx->path == NULL) {
463 		cb_fn(cb_arg, NULL, -ENOMEM);
464 		_init_load_cleanup(vol, init_ctx);
465 		return;
466 	}
467 
468 	if (spdk_mem_all_zero(&params->uuid, sizeof(params->uuid))) {
469 		spdk_uuid_generate(&params->uuid);
470 	}
471 
472 	memcpy(vol->pm_file.path, pm_file_dir, dir_len);
473 	vol->pm_file.path[dir_len] = '/';
474 	spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
475 			    &params->uuid);
476 	vol->pm_file.size = _get_pm_file_size(params);
477 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
478 					    PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
479 					    &mapped_len, &vol->pm_file.pm_is_pmem);
480 	if (vol->pm_file.pm_buf == NULL) {
481 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
482 			    vol->pm_file.path, strerror(errno));
483 		cb_fn(cb_arg, NULL, -errno);
484 		_init_load_cleanup(vol, init_ctx);
485 		return;
486 	}
487 
488 	if (vol->pm_file.size != mapped_len) {
489 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
490 			    vol->pm_file.size, mapped_len);
491 		cb_fn(cb_arg, NULL, -ENOMEM);
492 		_init_load_cleanup(vol, init_ctx);
493 		return;
494 	}
495 
496 	vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
497 	vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
498 	vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
499 	memcpy(&vol->params, params, sizeof(*params));
500 
501 	rc = _allocate_bit_arrays(vol);
502 	if (rc != 0) {
503 		cb_fn(cb_arg, NULL, rc);
504 		_init_load_cleanup(vol, init_ctx);
505 		return;
506 	}
507 
508 	vol->backing_dev = backing_dev;
509 
510 	memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
511 	       sizeof(vol->backing_super->signature));
512 	memcpy(&vol->backing_super->params, params, sizeof(*params));
513 
514 	_initialize_vol_pm_pointers(vol);
515 
516 	memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
517 	/* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
518 	 * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
519 	 */
520 	memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
521 	_reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
522 
523 	init_ctx->vol = vol;
524 	init_ctx->cb_fn = cb_fn;
525 	init_ctx->cb_arg = cb_arg;
526 
527 	memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
528 	init_ctx->iov[0].iov_base = init_ctx->path;
529 	init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
530 	init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
531 	init_ctx->backing_cb_args.cb_arg = init_ctx;
532 	/* Write path to offset 4K on backing device - just after where the super
533 	 *  block will be written.  We wait until this is committed before writing the
534 	 *  super block to guarantee we don't get the super block written without the
535 	 *  the path if the system crashed in the middle of a write operation.
536 	 */
537 	vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1,
538 				 REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen,
539 				 REDUCE_PATH_MAX / vol->backing_dev->blocklen,
540 				 &init_ctx->backing_cb_args);
541 }
542 
543 static void
544 _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
545 {
546 	struct reduce_init_load_ctx *load_ctx = cb_arg;
547 	struct spdk_reduce_vol *vol = load_ctx->vol;
548 	uint64_t backing_dev_size;
549 	uint64_t i, num_chunks;
550 	uint64_t *chunk;
551 	size_t mapped_len;
552 	uint32_t j;
553 	int rc;
554 
555 	if (memcmp(vol->backing_super->signature,
556 		   SPDK_REDUCE_SIGNATURE,
557 		   sizeof(vol->backing_super->signature)) != 0) {
558 		/* This backing device isn't a libreduce backing device. */
559 		rc = -EILSEQ;
560 		goto error;
561 	}
562 
563 	memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
564 	vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
565 	vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
566 	vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
567 
568 	rc = _allocate_bit_arrays(vol);
569 	if (rc != 0) {
570 		goto error;
571 	}
572 
573 	backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
574 	if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
575 		SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
576 			    backing_dev_size);
577 		rc = -EILSEQ;
578 		goto error;
579 	}
580 
581 	memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
582 	vol->pm_file.size = _get_pm_file_size(&vol->params);
583 	vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
584 					    &vol->pm_file.pm_is_pmem);
585 	if (vol->pm_file.pm_buf == NULL) {
586 		SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
587 		rc = -errno;
588 		goto error;
589 	}
590 
591 	if (vol->pm_file.size != mapped_len) {
592 		SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
593 			    vol->pm_file.size, mapped_len);
594 		rc = -ENOMEM;
595 		goto error;
596 	}
597 
598 	rc = _allocate_vol_requests(vol);
599 	if (rc != 0) {
600 		goto error;
601 	}
602 
603 	_initialize_vol_pm_pointers(vol);
604 
605 	num_chunks = vol->params.vol_size / vol->params.chunk_size;
606 	for (i = 0; i < num_chunks; i++) {
607 		if (vol->pm_logical_map[i] == REDUCE_EMPTY_MAP_ENTRY) {
608 			continue;
609 		}
610 		spdk_bit_array_set(vol->allocated_chunk_maps, i);
611 		chunk = _reduce_vol_get_chunk_map(vol, i);
612 		for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
613 			if (chunk[j] != REDUCE_EMPTY_MAP_ENTRY) {
614 				spdk_bit_array_set(vol->allocated_backing_io_units, chunk[j]);
615 			}
616 		}
617 	}
618 
619 	load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
620 	/* Only clean up the ctx - the vol has been passed to the application
621 	 *  for use now that volume load was successful.
622 	 */
623 	_init_load_cleanup(NULL, load_ctx);
624 	return;
625 
626 error:
627 	load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
628 	_init_load_cleanup(vol, load_ctx);
629 }
630 
631 void
632 spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
633 		     spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
634 {
635 	struct spdk_reduce_vol *vol;
636 	struct reduce_init_load_ctx *load_ctx;
637 
638 	if (backing_dev->readv == NULL || backing_dev->writev == NULL ||
639 	    backing_dev->unmap == NULL) {
640 		SPDK_ERRLOG("backing_dev function pointer not specified\n");
641 		cb_fn(cb_arg, NULL, -EINVAL);
642 		return;
643 	}
644 
645 	vol = calloc(1, sizeof(*vol));
646 	if (vol == NULL) {
647 		cb_fn(cb_arg, NULL, -ENOMEM);
648 		return;
649 	}
650 
651 	vol->backing_super = spdk_dma_zmalloc(sizeof(*vol->backing_super), 64, NULL);
652 	if (vol->backing_super == NULL) {
653 		_init_load_cleanup(vol, NULL);
654 		cb_fn(cb_arg, NULL, -ENOMEM);
655 		return;
656 	}
657 
658 	vol->backing_dev = backing_dev;
659 
660 	load_ctx = calloc(1, sizeof(*load_ctx));
661 	if (load_ctx == NULL) {
662 		_init_load_cleanup(vol, NULL);
663 		cb_fn(cb_arg, NULL, -ENOMEM);
664 		return;
665 	}
666 
667 	load_ctx->path = spdk_dma_zmalloc(REDUCE_PATH_MAX, 64, NULL);
668 	if (load_ctx->path == NULL) {
669 		_init_load_cleanup(vol, load_ctx);
670 		cb_fn(cb_arg, NULL, -ENOMEM);
671 		return;
672 	}
673 
674 	load_ctx->vol = vol;
675 	load_ctx->cb_fn = cb_fn;
676 	load_ctx->cb_arg = cb_arg;
677 
678 	load_ctx->iov[0].iov_base = vol->backing_super;
679 	load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
680 	load_ctx->iov[1].iov_base = load_ctx->path;
681 	load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
682 	load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
683 	load_ctx->backing_cb_args.cb_arg = load_ctx;
684 	vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0,
685 				(sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
686 				vol->backing_dev->blocklen,
687 				&load_ctx->backing_cb_args);
688 }
689 
690 void
691 spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
692 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
693 {
694 	if (vol == NULL) {
695 		/* This indicates a programming error. */
696 		assert(false);
697 		cb_fn(cb_arg, -EINVAL);
698 		return;
699 	}
700 
701 	_init_load_cleanup(vol, NULL);
702 	cb_fn(cb_arg, 0);
703 }
704 
705 struct reduce_destroy_ctx {
706 	spdk_reduce_vol_op_complete		cb_fn;
707 	void					*cb_arg;
708 	struct spdk_reduce_vol			*vol;
709 	struct spdk_reduce_vol_superblock	*super;
710 	struct iovec				iov;
711 	struct spdk_reduce_vol_cb_args		backing_cb_args;
712 	int					reduce_errno;
713 	char					pm_path[REDUCE_PATH_MAX];
714 };
715 
716 static void
717 destroy_unload_cpl(void *cb_arg, int reduce_errno)
718 {
719 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
720 
721 	if (destroy_ctx->reduce_errno == 0) {
722 		if (unlink(destroy_ctx->pm_path)) {
723 			SPDK_ERRLOG("%s could not be unlinked: %s\n",
724 				    destroy_ctx->pm_path, strerror(errno));
725 		}
726 	}
727 
728 	/* Even if the unload somehow failed, we still pass the destroy_ctx
729 	 * reduce_errno since that indicates whether or not the volume was
730 	 * actually destroyed.
731 	 */
732 	destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
733 	spdk_dma_free(destroy_ctx->super);
734 	free(destroy_ctx);
735 }
736 
737 static void
738 _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
739 {
740 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
741 	struct spdk_reduce_vol *vol = destroy_ctx->vol;
742 
743 	destroy_ctx->reduce_errno = reduce_errno;
744 	spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
745 }
746 
747 static void
748 destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
749 {
750 	struct reduce_destroy_ctx *destroy_ctx = cb_arg;
751 
752 	if (reduce_errno != 0) {
753 		destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
754 		spdk_dma_free(destroy_ctx->super);
755 		free(destroy_ctx);
756 		return;
757 	}
758 
759 	destroy_ctx->vol = vol;
760 	memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
761 	destroy_ctx->iov.iov_base = destroy_ctx->super;
762 	destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
763 	destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
764 	destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
765 	vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0,
766 				 sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen,
767 				 &destroy_ctx->backing_cb_args);
768 }
769 
770 void
771 spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
772 			spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
773 {
774 	struct reduce_destroy_ctx *destroy_ctx;
775 
776 	destroy_ctx = calloc(1, sizeof(*destroy_ctx));
777 	if (destroy_ctx == NULL) {
778 		cb_fn(cb_arg, -ENOMEM);
779 		return;
780 	}
781 
782 	destroy_ctx->super = spdk_dma_zmalloc(sizeof(*destroy_ctx->super), 64, NULL);
783 	if (destroy_ctx->super == NULL) {
784 		free(destroy_ctx);
785 		cb_fn(cb_arg, -ENOMEM);
786 		return;
787 	}
788 	destroy_ctx->cb_fn = cb_fn;
789 	destroy_ctx->cb_arg = cb_arg;
790 	spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
791 }
792 
793 static bool
794 _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
795 {
796 	uint64_t start_chunk, end_chunk;
797 
798 	start_chunk = offset / vol->logical_blocks_per_chunk;
799 	end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
800 
801 	return (start_chunk != end_chunk);
802 }
803 
804 typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
805 
806 static void
807 _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
808 {
809 	req->cb_fn(req->cb_arg, reduce_errno);
810 	TAILQ_INSERT_HEAD(&req->vol->free_requests, req, tailq);
811 }
812 
813 static void
814 _write_complete_req(void *_req, int reduce_errno)
815 {
816 	struct spdk_reduce_vol_request *req = _req;
817 	struct spdk_reduce_vol *vol = req->vol;
818 	uint64_t logical_map_index, old_chunk_map_index;
819 	uint64_t *old_chunk;
820 	uint32_t i;
821 
822 	if (reduce_errno != 0) {
823 		req->reduce_errno = reduce_errno;
824 	}
825 
826 	assert(req->num_backing_ops > 0);
827 	if (--req->num_backing_ops > 0) {
828 		return;
829 	}
830 
831 	if (req->reduce_errno != 0) {
832 		_reduce_vol_complete_req(req, req->reduce_errno);
833 		return;
834 	}
835 
836 	logical_map_index = req->offset / vol->logical_blocks_per_chunk;
837 
838 	old_chunk_map_index = vol->pm_logical_map[logical_map_index];
839 	if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
840 		old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index);
841 		for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
842 			if (old_chunk[i] == REDUCE_EMPTY_MAP_ENTRY) {
843 				break;
844 			}
845 			assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk[i]) == true);
846 			spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk[i]);
847 			old_chunk[i] = REDUCE_EMPTY_MAP_ENTRY;
848 		}
849 		spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index);
850 	}
851 
852 	/*
853 	 * We don't need to persist the clearing of the old chunk map here.  The old chunk map
854 	 * becomes invalid after we update the logical map, since the old chunk map will no
855 	 * longer have a reference to it in the logical map.
856 	 */
857 
858 	/* Persist the new chunk map.  This must be persisted before we update the logical map. */
859 	_reduce_persist(vol, req->chunk, sizeof(uint64_t) * vol->backing_io_units_per_chunk);
860 
861 	vol->pm_logical_map[logical_map_index] = req->chunk_map_index;
862 
863 	_reduce_persist(vol, &vol->pm_logical_map[logical_map_index], sizeof(uint64_t));
864 
865 	_reduce_vol_complete_req(req, 0);
866 }
867 
868 static void
869 _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
870 		   reduce_request_fn next_fn, bool is_write)
871 {
872 	uint32_t i;
873 
874 	req->num_backing_ops = vol->backing_io_units_per_chunk;
875 	req->backing_cb_args.cb_fn = next_fn;
876 	req->backing_cb_args.cb_arg = req;
877 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
878 		req->buf_iov[i].iov_base = req->buf + i * vol->params.backing_io_unit_size;
879 		req->buf_iov[i].iov_len = vol->params.backing_io_unit_size;
880 		if (is_write) {
881 			vol->backing_dev->writev(vol->backing_dev, &req->buf_iov[i], 1,
882 						 req->chunk[i] * vol->backing_lba_per_io_unit,
883 						 vol->backing_lba_per_io_unit, &req->backing_cb_args);
884 		} else {
885 			vol->backing_dev->readv(vol->backing_dev, &req->buf_iov[i], 1,
886 						req->chunk[i] * vol->backing_lba_per_io_unit,
887 						vol->backing_lba_per_io_unit, &req->backing_cb_args);
888 		}
889 	}
890 }
891 
892 static void
893 _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
894 {
895 	struct spdk_reduce_vol *vol = req->vol;
896 	uint32_t i;
897 
898 	req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0);
899 
900 	/* TODO: fail if no chunk map found - but really this should not happen if we
901 	 * size the number of requests similarly to number of extra chunk maps
902 	 */
903 	assert(req->chunk_map_index != UINT32_MAX);
904 	spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
905 
906 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
907 
908 	for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
909 		req->chunk[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0);
910 		/* TODO: fail if no backing block found - but really this should also not
911 		 * happen (see comment above).
912 		 */
913 		assert(req->chunk[i] != UINT32_MAX);
914 		spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk[i]);
915 	}
916 
917 	_issue_backing_ops(req, vol, next_fn, true /* write */);
918 }
919 
920 static void
921 _write_read_done(void *_req, int reduce_errno)
922 {
923 	struct spdk_reduce_vol_request *req = _req;
924 	uint64_t chunk_offset;
925 	uint8_t *buf;
926 	int i;
927 
928 	if (reduce_errno != 0) {
929 		req->reduce_errno = reduce_errno;
930 	}
931 
932 	assert(req->num_backing_ops > 0);
933 	if (--req->num_backing_ops > 0) {
934 		return;
935 	}
936 
937 	if (req->reduce_errno != 0) {
938 		_reduce_vol_complete_req(req, req->reduce_errno);
939 		return;
940 	}
941 
942 	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
943 	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
944 	for (i = 0; i < req->iovcnt; i++) {
945 		memcpy(buf, req->iov[i].iov_base, req->iov[i].iov_len);
946 		buf += req->iov[i].iov_len;
947 	}
948 
949 	_reduce_vol_write_chunk(req, _write_complete_req);
950 }
951 
952 static void
953 _read_read_done(void *_req, int reduce_errno)
954 {
955 	struct spdk_reduce_vol_request *req = _req;
956 	uint64_t chunk_offset;
957 	uint8_t *buf;
958 	int i;
959 
960 	if (reduce_errno != 0) {
961 		req->reduce_errno = reduce_errno;
962 	}
963 
964 	assert(req->num_backing_ops > 0);
965 	if (--req->num_backing_ops > 0) {
966 		return;
967 	}
968 
969 	if (req->reduce_errno != 0) {
970 		_reduce_vol_complete_req(req, req->reduce_errno);
971 		return;
972 	}
973 
974 	chunk_offset = req->offset % req->vol->logical_blocks_per_chunk;
975 	buf = req->buf + chunk_offset * req->vol->params.logical_block_size;
976 	for (i = 0; i < req->iovcnt; i++) {
977 		memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len);
978 		buf += req->iov[i].iov_len;
979 	}
980 	_reduce_vol_complete_req(req, 0);
981 }
982 
983 static void
984 _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
985 {
986 	struct spdk_reduce_vol *vol = req->vol;
987 	uint64_t chunk;
988 
989 	chunk = req->offset / vol->logical_blocks_per_chunk;
990 	req->chunk_map_index = vol->pm_logical_map[chunk];
991 	assert(req->chunk_map_index != UINT32_MAX);
992 
993 	req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
994 	_issue_backing_ops(req, vol, next_fn, false /* read */);
995 }
996 
997 static bool
998 _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
999 		    uint64_t length)
1000 {
1001 	uint64_t size = 0;
1002 	int i;
1003 
1004 	for (i = 0; i < iovcnt; i++) {
1005 		size += iov[i].iov_len;
1006 	}
1007 
1008 	return size == (length * vol->params.logical_block_size);
1009 }
1010 
1011 void
1012 spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
1013 		      struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1014 		      spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1015 {
1016 	struct spdk_reduce_vol_request *req;
1017 	uint64_t chunk;
1018 	int i;
1019 
1020 	if (length == 0) {
1021 		cb_fn(cb_arg, 0);
1022 		return;
1023 	}
1024 
1025 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1026 		cb_fn(cb_arg, -EINVAL);
1027 		return;
1028 	}
1029 
1030 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1031 		cb_fn(cb_arg, -EINVAL);
1032 		return;
1033 	}
1034 
1035 	chunk = offset / vol->logical_blocks_per_chunk;
1036 	if (vol->pm_logical_map[chunk] == REDUCE_EMPTY_MAP_ENTRY) {
1037 		/*
1038 		 * This chunk hasn't been allocated.  So treat the data as all
1039 		 * zeroes for this chunk - do the memset and immediately complete
1040 		 * the operation.
1041 		 */
1042 		for (i = 0; i < iovcnt; i++) {
1043 			memset(iov[i].iov_base, 0, iov[i].iov_len);
1044 		}
1045 		cb_fn(cb_arg, 0);
1046 		return;
1047 	}
1048 
1049 	req = TAILQ_FIRST(&vol->free_requests);
1050 	if (req == NULL) {
1051 		cb_fn(cb_arg, -ENOMEM);
1052 		return;
1053 	}
1054 
1055 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1056 	req->vol = vol;
1057 	req->iov = iov;
1058 	req->iovcnt = iovcnt;
1059 	req->offset = offset;
1060 	req->length = length;
1061 	req->cb_fn = cb_fn;
1062 	req->cb_arg = cb_arg;
1063 
1064 	_reduce_vol_read_chunk(req, _read_read_done);
1065 }
1066 
1067 void
1068 spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
1069 		       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
1070 		       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
1071 {
1072 	struct spdk_reduce_vol_request *req;
1073 	uint64_t chunk, chunk_offset;
1074 	uint32_t lbsize, lb_per_chunk;
1075 	int i;
1076 	uint8_t *buf;
1077 
1078 	if (length == 0) {
1079 		cb_fn(cb_arg, 0);
1080 		return;
1081 	}
1082 
1083 	if (_request_spans_chunk_boundary(vol, offset, length)) {
1084 		cb_fn(cb_arg, -EINVAL);
1085 		return;
1086 	}
1087 
1088 	if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
1089 		cb_fn(cb_arg, -EINVAL);
1090 		return;
1091 	}
1092 
1093 	req = TAILQ_FIRST(&vol->free_requests);
1094 	if (req == NULL) {
1095 		cb_fn(cb_arg, -ENOMEM);
1096 		return;
1097 	}
1098 
1099 	TAILQ_REMOVE(&vol->free_requests, req, tailq);
1100 	req->vol = vol;
1101 	req->iov = iov;
1102 	req->iovcnt = iovcnt;
1103 	req->offset = offset;
1104 	req->length = length;
1105 	req->cb_fn = cb_fn;
1106 	req->cb_arg = cb_arg;
1107 
1108 	chunk = offset / vol->logical_blocks_per_chunk;
1109 	if (vol->pm_logical_map[chunk] != REDUCE_EMPTY_MAP_ENTRY) {
1110 		/* Read old chunk, then overwrite with data from this write operation.
1111 		 * TODO: bypass reading old chunk if this write operation overwrites
1112 		 * the entire chunk.
1113 		 */
1114 		_reduce_vol_read_chunk(req, _write_read_done);
1115 		return;
1116 	}
1117 
1118 	buf = req->buf;
1119 	lbsize = vol->params.logical_block_size;
1120 	lb_per_chunk = vol->logical_blocks_per_chunk;
1121 	/* Note: we must zero out parts of req->buf not specified by this write operation. */
1122 	chunk_offset = offset % lb_per_chunk;
1123 	if (chunk_offset != 0) {
1124 		memset(buf, 0, chunk_offset * lbsize);
1125 		buf += chunk_offset * lbsize;
1126 	}
1127 	for (i = 0; i < iovcnt; i++) {
1128 		memcpy(buf, iov[i].iov_base, iov[i].iov_len);
1129 		buf += iov[i].iov_len;
1130 	}
1131 	chunk_offset += length;
1132 	if (chunk_offset != lb_per_chunk) {
1133 		memset(buf, 0, (lb_per_chunk - chunk_offset) * lbsize);
1134 	}
1135 	_reduce_vol_write_chunk(req, _write_complete_req);
1136 }
1137 
1138 SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE)
1139