xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 95d6c9fac17572b107042103439aafd696d60b0e)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 #define CHUNK_SIZE (1024 * 16)
25 #define COMP_BDEV_NAME "compress"
26 #define BACKING_IO_SZ (4 * 1024)
27 
28 /* This namespace UUID was generated using uuid_generate() method. */
29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
30 
31 struct vbdev_comp_delete_ctx {
32 	spdk_delete_compress_complete	cb_fn;
33 	void				*cb_arg;
34 	int				cb_rc;
35 	struct spdk_thread		*orig_thread;
36 };
37 
38 /* List of virtual bdevs and associated info for each. */
39 struct vbdev_compress {
40 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
41 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
42 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
43 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
44 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
45 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
46 	struct spdk_thread		*reduce_thread;
47 	pthread_mutex_t			reduce_lock;
48 	uint32_t			ch_count;
49 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
50 	struct spdk_poller		*poller;	/* completion poller */
51 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
52 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
53 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
54 	struct vbdev_comp_delete_ctx	*delete_ctx;
55 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
56 	int				reduce_errno;
57 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
58 	TAILQ_ENTRY(vbdev_compress)	link;
59 	struct spdk_thread		*thread;	/* thread where base device is opened */
60 	enum spdk_accel_comp_algo       comp_algo;      /* compression algorithm for compress bdev */
61 	uint32_t                        comp_level;     /* compression algorithm level */
62 };
63 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
64 
65 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
66  */
67 struct comp_io_channel {
68 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
69 };
70 
71 /* Per I/O context for the compression vbdev. */
72 struct comp_bdev_io {
73 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
74 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
75 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
76 	struct spdk_bdev_io		*orig_io;		/* the original IO */
77 	int				status;			/* save for completion on orig thread */
78 };
79 
80 static void vbdev_compress_examine(struct spdk_bdev *bdev);
81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
82 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size,
83 		uint8_t comp_algo, uint32_t comp_level);
84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
87 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
88 
89 /* for completing rw requests on the orig IO thread. */
90 static void
91 _reduce_rw_blocks_cb(void *arg)
92 {
93 	struct comp_bdev_io *io_ctx = arg;
94 
95 	if (spdk_likely(io_ctx->status == 0)) {
96 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
97 	} else if (io_ctx->status == -ENOMEM) {
98 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_NOMEM);
99 	} else {
100 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
101 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
102 	}
103 }
104 
105 /* Completion callback for r/w that were issued via reducelib. */
106 static void
107 reduce_rw_blocks_cb(void *arg, int reduce_errno)
108 {
109 	struct spdk_bdev_io *bdev_io = arg;
110 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
111 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
112 	struct spdk_thread *orig_thread;
113 
114 	/* TODO: need to decide which error codes are bdev_io success vs failure;
115 	 * example examine calls reading metadata */
116 
117 	io_ctx->status = reduce_errno;
118 
119 	/* Send this request to the orig IO thread. */
120 	orig_thread = spdk_io_channel_get_thread(ch);
121 
122 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
123 }
124 
125 static int
126 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
127 		    int src_iovcnt, struct iovec *dst_iovs,
128 		    int dst_iovcnt, bool compress, void *cb_arg)
129 {
130 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
131 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
132 					   backing_dev);
133 	int rc;
134 
135 	if (compress) {
136 		assert(dst_iovcnt == 1);
137 		rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base,
138 						    dst_iovs[0].iov_len, src_iovs, src_iovcnt,
139 						    comp_bdev->comp_algo, comp_bdev->comp_level,
140 						    &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
141 						    reduce_cb_arg->cb_arg);
142 	} else {
143 		rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
144 						      src_iovs, src_iovcnt, comp_bdev->comp_algo,
145 						      &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
146 						      reduce_cb_arg->cb_arg);
147 	}
148 
149 	return rc;
150 }
151 
152 /* Entry point for reduce lib to issue a compress operation. */
153 static void
154 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
155 		      struct iovec *src_iovs, int src_iovcnt,
156 		      struct iovec *dst_iovs, int dst_iovcnt,
157 		      struct spdk_reduce_vol_cb_args *cb_arg)
158 {
159 	int rc;
160 
161 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
162 	if (rc) {
163 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
164 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
165 	}
166 }
167 
168 /* Entry point for reduce lib to issue a decompress operation. */
169 static void
170 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
171 			struct iovec *src_iovs, int src_iovcnt,
172 			struct iovec *dst_iovs, int dst_iovcnt,
173 			struct spdk_reduce_vol_cb_args *cb_arg)
174 {
175 	int rc;
176 
177 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
178 	if (rc) {
179 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
180 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
181 	}
182 }
183 
184 static void
185 _comp_submit_write(void *ctx)
186 {
187 	struct spdk_bdev_io *bdev_io = ctx;
188 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
189 					   comp_bdev);
190 
191 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
192 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
193 			       reduce_rw_blocks_cb, bdev_io);
194 }
195 
196 static void
197 _comp_submit_read(void *ctx)
198 {
199 	struct spdk_bdev_io *bdev_io = ctx;
200 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
201 					   comp_bdev);
202 
203 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
204 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
205 			      reduce_rw_blocks_cb, bdev_io);
206 }
207 
208 
209 /* Callback for getting a buf from the bdev pool in the event that the caller passed
210  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
211  * beneath us before we're done with it.
212  */
213 static void
214 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
215 {
216 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
217 					   comp_bdev);
218 
219 	if (spdk_unlikely(!success)) {
220 		SPDK_ERRLOG("Failed to get data buffer\n");
221 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
222 		return;
223 	}
224 
225 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
226 }
227 
228 struct partial_chunk_info {
229 	uint64_t chunk_idx;
230 	uint64_t block_offset;
231 	uint64_t block_length;
232 };
233 
234 /*
235  * It's a structure used to hold information needed during the execution of an unmap operation.
236  */
237 struct compress_unmap_split_ctx {
238 	struct spdk_bdev_io *bdev_io;
239 	int32_t status;
240 	uint32_t logical_blocks_per_chunk;
241 	/* The first chunk that can be fully covered by the unmap bdevio interval */
242 	uint64_t full_chunk_idx_b;
243 	/* The last chunk that can be fully covered by the unmap bdevio interval */
244 	uint64_t full_chunk_idx_e;
245 	uint64_t num_full_chunks;
246 	uint64_t num_full_chunks_consumed;
247 	uint32_t num_partial_chunks;
248 	uint32_t num_partial_chunks_consumed;
249 	/* Used to hold the partial chunk information. There will only be less than or equal to two,
250 	because chunks that cannot be fully covered will only appear at the beginning or end or both two. */
251 	struct partial_chunk_info partial_chunk_info[2];
252 };
253 
254 static void _comp_unmap_subcmd_done_cb(void *ctx, int error);
255 
256 /*
257  * This function processes the unmap operation for both full and partial chunks in a
258  * compressed block device. It iteratively submits unmap requests until all the chunks
259  * have been unmapped or an error occurs.
260  */
261 static void
262 _comp_submit_unmap_split(void *ctx)
263 {
264 	struct compress_unmap_split_ctx *split_ctx = ctx;
265 	struct spdk_bdev_io *bdev_io = split_ctx->bdev_io;
266 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
267 					   comp_bdev);
268 	struct partial_chunk_info *partial_chunk = NULL;
269 	uint64_t chunk_idx = 0;
270 	uint64_t block_offset = 0;
271 	uint64_t block_length = 0;
272 
273 	if (split_ctx->status != 0 ||
274 	    (split_ctx->num_full_chunks_consumed == split_ctx->num_full_chunks &&
275 	     split_ctx->num_partial_chunks_consumed == split_ctx->num_partial_chunks)) {
276 		reduce_rw_blocks_cb(bdev_io, split_ctx->status);
277 		free(split_ctx);
278 		return;
279 	}
280 
281 	if (split_ctx->num_full_chunks_consumed < split_ctx->num_full_chunks) {
282 		chunk_idx = split_ctx->full_chunk_idx_b + split_ctx->num_full_chunks_consumed;
283 		block_offset = chunk_idx * split_ctx->logical_blocks_per_chunk;
284 		block_length = split_ctx->logical_blocks_per_chunk;
285 
286 		split_ctx->num_full_chunks_consumed++;
287 		spdk_reduce_vol_unmap(comp_bdev->vol,
288 				      block_offset, block_length,
289 				      _comp_unmap_subcmd_done_cb, split_ctx);
290 	} else if (split_ctx->num_partial_chunks_consumed < split_ctx->num_partial_chunks) {
291 		partial_chunk = &split_ctx->partial_chunk_info[split_ctx->num_partial_chunks_consumed];
292 		block_offset = partial_chunk->chunk_idx * split_ctx->logical_blocks_per_chunk +
293 			       partial_chunk->block_offset;
294 		block_length = partial_chunk->block_length;
295 
296 		split_ctx->num_partial_chunks_consumed++;
297 		spdk_reduce_vol_unmap(comp_bdev->vol,
298 				      block_offset, block_length,
299 				      _comp_unmap_subcmd_done_cb, split_ctx);
300 	} else {
301 		assert(false);
302 	}
303 }
304 
305 /*
306  * When mkfs or fstrim, large unmap requests may be generated.
307  * Large request will be split into multiple subcmds and processed recursively.
308  * Run too many subcmds recursively may cause stack overflow or monopolize the thread,
309  * delaying other tasks. To avoid this, next subcmd need to be processed asynchronously
310  * by 'spdk_thread_send_msg'.
311  */
312 static void
313 _comp_unmap_subcmd_done_cb(void *ctx, int error)
314 {
315 	struct compress_unmap_split_ctx *split_ctx = ctx;
316 
317 	split_ctx->status = error;
318 	spdk_thread_send_msg(spdk_get_thread(), _comp_submit_unmap_split, split_ctx);
319 }
320 
321 /*
322  * This function splits the unmap operation into full and partial chunks based on the
323  * block range specified in the 'spdk_bdev_io' structure. It calculates the start and end
324  * chunks, as well as any partial chunks at the beginning or end of the range, and prepares
325  * a context (compress_unmap_split_ctx) to handle these chunks. The unmap operation is
326  * then submitted for processing through '_comp_submit_unmap_split'.
327  * some cases to handle:
328  * 1. start and end chunks are different
329  * 1.1 start and end chunks are full
330  * 1.2 start and end chunks are partial
331  * 1.3 start or  end chunk  is full and the other is partial
332  * 2. start and end chunks are the same
333  * 2.1 full
334  * 2.2 partial
335  */
336 static void
337 _comp_submit_unmap(void *ctx)
338 {
339 	struct spdk_bdev_io *bdev_io = ctx;
340 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
341 					   comp_bdev);
342 	const struct spdk_reduce_vol_params *vol_params = spdk_reduce_vol_get_params(comp_bdev->vol);
343 	struct compress_unmap_split_ctx *split_ctx;
344 	struct partial_chunk_info *partial_chunk;
345 	uint32_t logical_blocks_per_chunk;
346 	uint64_t start_chunk, end_chunk, start_offset, end_tail;
347 
348 	logical_blocks_per_chunk = vol_params->chunk_size / vol_params->logical_block_size;
349 	start_chunk = bdev_io->u.bdev.offset_blocks / logical_blocks_per_chunk;
350 	end_chunk = (bdev_io->u.bdev.offset_blocks + bdev_io->u.bdev.num_blocks - 1) /
351 		    logical_blocks_per_chunk;
352 	start_offset = bdev_io->u.bdev.offset_blocks % logical_blocks_per_chunk;
353 	end_tail = (bdev_io->u.bdev.offset_blocks + bdev_io->u.bdev.num_blocks) %
354 		   logical_blocks_per_chunk;
355 
356 	split_ctx = calloc(1, sizeof(struct compress_unmap_split_ctx));
357 	if (split_ctx == NULL) {
358 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
359 		return;
360 	}
361 	partial_chunk = split_ctx->partial_chunk_info;
362 	split_ctx->bdev_io = bdev_io;
363 	split_ctx->logical_blocks_per_chunk = logical_blocks_per_chunk;
364 
365 	if (start_chunk < end_chunk) {
366 		if (start_offset != 0) {
367 			partial_chunk[split_ctx->num_partial_chunks].chunk_idx = start_chunk;
368 			partial_chunk[split_ctx->num_partial_chunks].block_offset = start_offset;
369 			partial_chunk[split_ctx->num_partial_chunks].block_length = logical_blocks_per_chunk
370 					- start_offset;
371 			split_ctx->num_partial_chunks++;
372 			split_ctx->full_chunk_idx_b = start_chunk + 1;
373 		} else {
374 			split_ctx->full_chunk_idx_b = start_chunk;
375 		}
376 
377 		if (end_tail != 0) {
378 			partial_chunk[split_ctx->num_partial_chunks].chunk_idx = end_chunk;
379 			partial_chunk[split_ctx->num_partial_chunks].block_offset = 0;
380 			partial_chunk[split_ctx->num_partial_chunks].block_length = end_tail;
381 			split_ctx->num_partial_chunks++;
382 			split_ctx->full_chunk_idx_e = end_chunk - 1;
383 		} else {
384 			split_ctx->full_chunk_idx_e = end_chunk;
385 		}
386 
387 		split_ctx->num_full_chunks = end_chunk - start_chunk + 1 - split_ctx->num_partial_chunks;
388 
389 		if (split_ctx->num_full_chunks) {
390 			assert(split_ctx->full_chunk_idx_b != UINT64_MAX && split_ctx->full_chunk_idx_e != UINT64_MAX);
391 			assert(split_ctx->full_chunk_idx_e - split_ctx->full_chunk_idx_b + 1 == split_ctx->num_full_chunks);
392 		} else {
393 			assert(split_ctx->full_chunk_idx_b - split_ctx->full_chunk_idx_e == 1);
394 		}
395 	} else if (start_offset != 0 || end_tail != 0) {
396 		partial_chunk[0].chunk_idx = start_chunk;
397 		partial_chunk[0].block_offset = start_offset;
398 		partial_chunk[0].block_length =
399 			bdev_io->u.bdev.num_blocks;
400 		split_ctx->num_partial_chunks = 1;
401 	} else {
402 		split_ctx->full_chunk_idx_b = start_chunk;
403 		split_ctx->full_chunk_idx_e = end_chunk;
404 		split_ctx->num_full_chunks = 1;
405 	}
406 	assert(split_ctx->num_partial_chunks <= SPDK_COUNTOF(split_ctx->partial_chunk_info));
407 
408 	_comp_submit_unmap_split(split_ctx);
409 }
410 
411 /* Called when someone above submits IO to this vbdev. */
412 static void
413 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
414 {
415 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
416 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
417 					   comp_bdev);
418 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
419 
420 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
421 	io_ctx->comp_bdev = comp_bdev;
422 	io_ctx->comp_ch = comp_ch;
423 	io_ctx->orig_io = bdev_io;
424 
425 	switch (bdev_io->type) {
426 	case SPDK_BDEV_IO_TYPE_READ:
427 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
428 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
429 		return;
430 	case SPDK_BDEV_IO_TYPE_WRITE:
431 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
432 		return;
433 	case SPDK_BDEV_IO_TYPE_UNMAP:
434 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_unmap, bdev_io);
435 		return;
436 	/* TODO support RESET in future patch in the series */
437 	case SPDK_BDEV_IO_TYPE_RESET:
438 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
439 	case SPDK_BDEV_IO_TYPE_FLUSH:
440 	default:
441 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
442 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
443 		break;
444 	}
445 }
446 
447 static bool
448 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
449 {
450 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
451 
452 	switch (io_type) {
453 	case SPDK_BDEV_IO_TYPE_READ:
454 	case SPDK_BDEV_IO_TYPE_WRITE:
455 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
456 	case SPDK_BDEV_IO_TYPE_UNMAP:
457 		return true;
458 	case SPDK_BDEV_IO_TYPE_RESET:
459 	case SPDK_BDEV_IO_TYPE_FLUSH:
460 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
461 	default:
462 		return false;
463 	}
464 }
465 
466 /* Callback for unregistering the IO device. */
467 static void
468 _device_unregister_cb(void *io_device)
469 {
470 	struct vbdev_compress *comp_bdev = io_device;
471 
472 	/* Done with this comp_bdev. */
473 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
474 	free(comp_bdev->comp_bdev.name);
475 	free(comp_bdev);
476 }
477 
478 static void
479 _vbdev_compress_destruct_cb(void *ctx)
480 {
481 	struct vbdev_compress *comp_bdev = ctx;
482 
483 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
484 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
485 	/* Close the underlying bdev on its same opened thread. */
486 	spdk_bdev_close(comp_bdev->base_desc);
487 	comp_bdev->vol = NULL;
488 	if (comp_bdev->orphaned == false) {
489 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
490 	} else {
491 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
492 		_device_unregister_cb(comp_bdev);
493 	}
494 }
495 
496 static void
497 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
498 {
499 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
500 
501 	if (reduce_errno) {
502 		SPDK_ERRLOG("number %d\n", reduce_errno);
503 	} else {
504 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
505 			spdk_thread_send_msg(comp_bdev->thread,
506 					     _vbdev_compress_destruct_cb, comp_bdev);
507 		} else {
508 			_vbdev_compress_destruct_cb(comp_bdev);
509 		}
510 	}
511 }
512 
513 static void
514 _reduce_destroy_cb(void *ctx, int reduce_errno)
515 {
516 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
517 
518 	if (reduce_errno) {
519 		SPDK_ERRLOG("number %d\n", reduce_errno);
520 	}
521 
522 	comp_bdev->vol = NULL;
523 	spdk_put_io_channel(comp_bdev->base_ch);
524 	if (comp_bdev->orphaned == false) {
525 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
526 				     comp_bdev->delete_ctx);
527 	} else {
528 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
529 	}
530 
531 }
532 
533 static void
534 _delete_vol_unload_cb(void *ctx)
535 {
536 	struct vbdev_compress *comp_bdev = ctx;
537 
538 	/* FIXME: Assert if these conditions are not satisfied for now. */
539 	assert(!comp_bdev->reduce_thread ||
540 	       comp_bdev->reduce_thread == spdk_get_thread());
541 
542 	/* reducelib needs a channel to comm with the backing device */
543 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
544 
545 	/* Clean the device before we free our resources. */
546 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
547 }
548 
549 /* Called by reduceLib after performing unload vol actions */
550 static void
551 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
552 {
553 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
554 
555 	if (reduce_errno) {
556 		SPDK_ERRLOG("Failed to unload vol, error %s\n", spdk_strerror(-reduce_errno));
557 		vbdev_compress_delete_done(comp_bdev->delete_ctx, reduce_errno);
558 		return;
559 	}
560 
561 	pthread_mutex_lock(&comp_bdev->reduce_lock);
562 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
563 		spdk_thread_send_msg(comp_bdev->reduce_thread,
564 				     _delete_vol_unload_cb, comp_bdev);
565 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
566 	} else {
567 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
568 
569 		_delete_vol_unload_cb(comp_bdev);
570 	}
571 }
572 
573 const char *
574 compress_get_name(const struct vbdev_compress *comp_bdev)
575 {
576 	return comp_bdev->comp_bdev.name;
577 }
578 
579 struct vbdev_compress *
580 compress_bdev_first(void)
581 {
582 	struct vbdev_compress *comp_bdev;
583 
584 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
585 
586 	return comp_bdev;
587 }
588 
589 struct vbdev_compress *
590 compress_bdev_next(struct vbdev_compress *prev)
591 {
592 	struct vbdev_compress *comp_bdev;
593 
594 	comp_bdev = TAILQ_NEXT(prev, link);
595 
596 	return comp_bdev;
597 }
598 
599 bool
600 compress_has_orphan(const char *name)
601 {
602 	struct vbdev_compress *comp_bdev;
603 
604 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
605 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
606 			return true;
607 		}
608 	}
609 	return false;
610 }
611 
612 /* Called after we've unregistered following a hot remove callback.
613  * Our finish entry point will be called next.
614  */
615 static int
616 vbdev_compress_destruct(void *ctx)
617 {
618 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
619 
620 	if (comp_bdev->vol != NULL) {
621 		/* Tell reducelib that we're done with this volume. */
622 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
623 	} else {
624 		vbdev_compress_destruct_cb(comp_bdev, 0);
625 	}
626 
627 	return 0;
628 }
629 
630 /* We supplied this as an entry point for upper layers who want to communicate to this
631  * bdev.  This is how they get a channel.
632  */
633 static struct spdk_io_channel *
634 vbdev_compress_get_io_channel(void *ctx)
635 {
636 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
637 
638 	/* The IO channel code will allocate a channel for us which consists of
639 	 * the SPDK channel structure plus the size of our comp_io_channel struct
640 	 * that we passed in when we registered our IO device. It will then call
641 	 * our channel create callback to populate any elements that we need to
642 	 * update.
643 	 */
644 	return spdk_get_io_channel(comp_bdev);
645 }
646 
647 /* This is the output for bdev_get_bdevs() for this vbdev */
648 static int
649 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
650 {
651 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
652 	char *comp_algo = NULL;
653 
654 	if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_LZ4) {
655 		comp_algo = "lz4";
656 	} else if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_DEFLATE) {
657 		comp_algo = "deflate";
658 	} else {
659 		assert(false);
660 	}
661 
662 	spdk_json_write_name(w, "compress");
663 	spdk_json_write_object_begin(w);
664 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
665 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
666 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
667 	spdk_json_write_named_string(w, "comp_algo", comp_algo);
668 	spdk_json_write_named_uint32(w, "comp_level", comp_bdev->params.comp_level);
669 	spdk_json_write_named_uint32(w, "chunk_size", comp_bdev->params.chunk_size);
670 	spdk_json_write_named_uint32(w, "backing_io_unit_size", comp_bdev->params.backing_io_unit_size);
671 	spdk_json_write_object_end(w);
672 
673 	return 0;
674 }
675 
676 static int
677 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
678 {
679 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
680 	return 0;
681 }
682 
683 struct vbdev_init_reduce_ctx {
684 	struct vbdev_compress   *comp_bdev;
685 	int                     status;
686 	bdev_compress_create_cb cb_fn;
687 	void                    *cb_ctx;
688 };
689 
690 static void
691 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
692 {
693 }
694 
695 static void
696 _vbdev_reduce_init_cb(void *ctx)
697 {
698 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
699 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
700 	int rc;
701 
702 	assert(comp_bdev->base_desc != NULL);
703 
704 	/* We're done with metadata operations */
705 	spdk_put_io_channel(comp_bdev->base_ch);
706 
707 	if (comp_bdev->vol) {
708 		rc = vbdev_compress_claim(comp_bdev);
709 		if (rc == 0) {
710 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
711 			free(init_ctx);
712 			return;
713 		} else {
714 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
715 		}
716 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
717 	}
718 
719 	/* Close the underlying bdev on its same opened thread. */
720 	spdk_bdev_close(comp_bdev->base_desc);
721 	free(comp_bdev);
722 	free(init_ctx);
723 }
724 
725 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
726  * used for initial metadata operations to claim where it will be further filled out
727  * and added to the global list.
728  */
729 static void
730 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
731 {
732 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
733 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
734 
735 	if (reduce_errno == 0) {
736 		comp_bdev->vol = vol;
737 	} else {
738 		SPDK_ERRLOG("for vol %s, error %s\n",
739 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
740 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
741 	}
742 
743 	init_ctx->status = reduce_errno;
744 
745 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
746 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
747 	} else {
748 		_vbdev_reduce_init_cb(init_ctx);
749 	}
750 }
751 
752 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
753  * call the callback provided by reduceLib when it called the read/write/unmap function and
754  * free the bdev_io.
755  */
756 static void
757 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
758 {
759 	struct spdk_reduce_vol_cb_args *cb_args = arg;
760 	int reduce_errno;
761 
762 	if (success) {
763 		reduce_errno = 0;
764 	} else {
765 		reduce_errno = -EIO;
766 	}
767 	spdk_bdev_free_io(bdev_io);
768 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
769 }
770 
771 static void
772 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
773 				 struct spdk_reduce_backing_io *backing_io)
774 {
775 	struct spdk_bdev_io_wait_entry *waitq_entry;
776 	int rc;
777 
778 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
779 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
780 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
781 	waitq_entry->cb_arg = backing_io;
782 
783 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
784 	if (rc) {
785 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
786 		assert(false);
787 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
788 	}
789 }
790 
791 static void
792 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
793 {
794 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
795 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
796 					   backing_dev);
797 	int rc;
798 
799 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
800 				    backing_io->iov, backing_io->iovcnt,
801 				    backing_io->lba, backing_io->lba_count,
802 				    comp_reduce_io_cb,
803 				    backing_cb_args);
804 
805 	if (rc) {
806 		if (rc == -ENOMEM) {
807 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
808 			return;
809 		} else {
810 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
811 		}
812 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
813 	}
814 }
815 
816 static void
817 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
818 {
819 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
820 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
821 					   backing_dev);
822 	int rc;
823 
824 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
825 				     backing_io->iov, backing_io->iovcnt,
826 				     backing_io->lba, backing_io->lba_count,
827 				     comp_reduce_io_cb,
828 				     backing_cb_args);
829 
830 	if (rc) {
831 		if (rc == -ENOMEM) {
832 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
833 			return;
834 		} else {
835 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
836 		}
837 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
838 	}
839 }
840 
841 static void
842 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
843 {
844 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
845 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
846 					   backing_dev);
847 	int rc;
848 
849 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
850 				    backing_io->lba, backing_io->lba_count,
851 				    comp_reduce_io_cb,
852 				    backing_cb_args);
853 
854 	if (rc) {
855 		if (rc == -ENOMEM) {
856 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
857 			return;
858 		} else {
859 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
860 		}
861 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
862 	}
863 }
864 
865 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
866  * directly to the backing device.
867  */
868 static void
869 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
870 {
871 	switch (backing_io->backing_io_type) {
872 	case SPDK_REDUCE_BACKING_IO_WRITE:
873 		_comp_backing_bdev_write(backing_io);
874 		break;
875 	case SPDK_REDUCE_BACKING_IO_READ:
876 		_comp_backing_bdev_read(backing_io);
877 		break;
878 	case SPDK_REDUCE_BACKING_IO_UNMAP:
879 		_comp_backing_bdev_unmap(backing_io);
880 		break;
881 	default:
882 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
883 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
884 		break;
885 	}
886 }
887 
888 static void
889 _comp_reduce_resubmit_backing_io(void *_backing_io)
890 {
891 	struct spdk_reduce_backing_io *backing_io = _backing_io;
892 
893 	_comp_reduce_submit_backing_io(backing_io);
894 }
895 
896 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
897 static void
898 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
899 {
900 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
901 
902 	if (reduce_errno) {
903 		SPDK_ERRLOG("number %d\n", reduce_errno);
904 	}
905 
906 	comp_bdev->vol = NULL;
907 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
908 }
909 
910 static void
911 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
912 {
913 	struct vbdev_compress *comp_bdev, *tmp;
914 
915 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
916 		if (bdev_find == comp_bdev->base_bdev) {
917 			/* Tell reduceLib that we're done with this volume. */
918 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
919 		}
920 	}
921 }
922 
923 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
924 static void
925 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
926 				  void *event_ctx)
927 {
928 	switch (type) {
929 	case SPDK_BDEV_EVENT_REMOVE:
930 		vbdev_compress_base_bdev_hotremove_cb(bdev);
931 		break;
932 	default:
933 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
934 		break;
935 	}
936 }
937 
938 /* TODO: determine which parms we want user configurable, HC for now
939  * params.vol_size
940  * params.chunk_size
941  * compression PMD, algorithm, window size, comp level, etc.
942  * DEV_MD_PATH
943  */
944 
945 /* Common function for init and load to allocate and populate the minimal
946  * information for reducelib to init or load.
947  */
948 struct vbdev_compress *
949 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo,
950 		       uint32_t comp_level)
951 {
952 	struct vbdev_compress *comp_bdev;
953 	struct spdk_bdev *bdev;
954 
955 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
956 	if (comp_bdev == NULL) {
957 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
958 		return NULL;
959 	}
960 
961 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
962 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
963 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
964 
965 	comp_bdev->base_desc = bdev_desc;
966 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
967 	comp_bdev->base_bdev = bdev;
968 
969 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
970 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
971 
972 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
973 
974 	comp_bdev->comp_algo = comp_algo;
975 	comp_bdev->comp_level = comp_level;
976 	comp_bdev->params.comp_algo = comp_algo;
977 	comp_bdev->params.comp_level = comp_level;
978 	comp_bdev->params.chunk_size = CHUNK_SIZE;
979 	if (lb_size == 0) {
980 		comp_bdev->params.logical_block_size = bdev->blocklen;
981 	} else {
982 		comp_bdev->params.logical_block_size = lb_size;
983 	}
984 
985 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
986 	return comp_bdev;
987 }
988 
989 /* Call reducelib to initialize a new volume */
990 static int
991 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo,
992 		  uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg)
993 {
994 	struct spdk_bdev_desc *bdev_desc = NULL;
995 	struct vbdev_init_reduce_ctx *init_ctx;
996 	struct vbdev_compress *comp_bdev;
997 	int rc;
998 
999 	init_ctx = calloc(1, sizeof(*init_ctx));
1000 	if (init_ctx == NULL) {
1001 		SPDK_ERRLOG("failed to alloc init contexts\n");
1002 		return - ENOMEM;
1003 	}
1004 
1005 	init_ctx->cb_fn = cb_fn;
1006 	init_ctx->cb_ctx = cb_arg;
1007 
1008 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1009 				NULL, &bdev_desc);
1010 	if (rc) {
1011 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
1012 		free(init_ctx);
1013 		return rc;
1014 	}
1015 
1016 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level);
1017 	if (comp_bdev == NULL) {
1018 		free(init_ctx);
1019 		spdk_bdev_close(bdev_desc);
1020 		return -EINVAL;
1021 	}
1022 
1023 	init_ctx->comp_bdev = comp_bdev;
1024 
1025 	/* Save the thread where the base device is opened */
1026 	comp_bdev->thread = spdk_get_thread();
1027 
1028 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1029 
1030 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
1031 			     pm_path,
1032 			     vbdev_reduce_init_cb,
1033 			     init_ctx);
1034 	return 0;
1035 }
1036 
1037 /* We provide this callback for the SPDK channel code to create a channel using
1038  * the channel struct we provided in our module get_io_channel() entry point. Here
1039  * we get and save off an underlying base channel of the device below us so that
1040  * we can communicate with the base bdev on a per channel basis.  If we needed
1041  * our own poller for this vbdev, we'd register it here.
1042  */
1043 static int
1044 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1045 {
1046 	struct vbdev_compress *comp_bdev = io_device;
1047 
1048 	/* Now set the reduce channel if it's not already set. */
1049 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1050 	if (comp_bdev->ch_count == 0) {
1051 		/* We use this queue to track outstanding IO in our layer. */
1052 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1053 
1054 		/* We use this to queue up compression operations as needed. */
1055 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1056 
1057 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1058 		comp_bdev->reduce_thread = spdk_get_thread();
1059 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
1060 	}
1061 	comp_bdev->ch_count++;
1062 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1063 
1064 	return 0;
1065 }
1066 
1067 static void
1068 _channel_cleanup(struct vbdev_compress *comp_bdev)
1069 {
1070 	spdk_put_io_channel(comp_bdev->base_ch);
1071 	spdk_put_io_channel(comp_bdev->accel_channel);
1072 	comp_bdev->reduce_thread = NULL;
1073 }
1074 
1075 /* Used to reroute destroy_ch to the correct thread */
1076 static void
1077 _comp_bdev_ch_destroy_cb(void *arg)
1078 {
1079 	struct vbdev_compress *comp_bdev = arg;
1080 
1081 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1082 	_channel_cleanup(comp_bdev);
1083 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1084 }
1085 
1086 /* We provide this callback for the SPDK channel code to destroy a channel
1087  * created with our create callback. We just need to undo anything we did
1088  * when we created. If this bdev used its own poller, we'd unregister it here.
1089  */
1090 static void
1091 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1092 {
1093 	struct vbdev_compress *comp_bdev = io_device;
1094 
1095 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1096 	comp_bdev->ch_count--;
1097 	if (comp_bdev->ch_count == 0) {
1098 		/* Send this request to the thread where the channel was created. */
1099 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1100 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1101 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1102 		} else {
1103 			_channel_cleanup(comp_bdev);
1104 		}
1105 	}
1106 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1107 }
1108 
1109 static int
1110 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level)
1111 {
1112 	uint32_t min_level, max_level;
1113 	int rc;
1114 
1115 	rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level);
1116 	if (rc != 0) {
1117 		return rc;
1118 	}
1119 
1120 	/* If both min_level and max_level are 0, the compression level can be ignored.
1121 	 * The back-end implementation hardcodes the compression level.
1122 	 */
1123 	if (min_level == 0 && max_level == 0) {
1124 		return 0;
1125 	}
1126 
1127 	if (comp_level > max_level || comp_level < min_level) {
1128 		return -EINVAL;
1129 	}
1130 
1131 	return 0;
1132 }
1133 
1134 /* RPC entry point for compression vbdev creation. */
1135 int
1136 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
1137 		     uint8_t comp_algo, uint32_t comp_level,
1138 		     bdev_compress_create_cb cb_fn, void *cb_arg)
1139 {
1140 	struct vbdev_compress *comp_bdev = NULL;
1141 	struct stat info;
1142 	int rc;
1143 
1144 	if (stat(pm_path, &info) != 0) {
1145 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
1146 		return -EINVAL;
1147 	} else if (!S_ISDIR(info.st_mode)) {
1148 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
1149 		return -EINVAL;
1150 	}
1151 
1152 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1153 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1154 		return -EINVAL;
1155 	}
1156 
1157 	rc = _check_compress_bdev_comp_algo(comp_algo, comp_level);
1158 	if (rc != 0) {
1159 		SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n",
1160 			    comp_algo, comp_level);
1161 		return rc;
1162 	}
1163 
1164 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1165 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
1166 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
1167 			return -EBUSY;
1168 		}
1169 	}
1170 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg);
1171 }
1172 
1173 static int
1174 vbdev_compress_init(void)
1175 {
1176 	return 0;
1177 }
1178 
1179 /* Called when the entire module is being torn down. */
1180 static void
1181 vbdev_compress_finish(void)
1182 {
1183 	/* TODO: unload vol in a future patch */
1184 }
1185 
1186 /* During init we'll be asked how much memory we'd like passed to us
1187  * in bev_io structures as context. Here's where we specify how
1188  * much context we want per IO.
1189  */
1190 static int
1191 vbdev_compress_get_ctx_size(void)
1192 {
1193 	return sizeof(struct comp_bdev_io);
1194 }
1195 
1196 /* When we register our bdev this is how we specify our entry points. */
1197 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1198 	.destruct		= vbdev_compress_destruct,
1199 	.submit_request		= vbdev_compress_submit_request,
1200 	.io_type_supported	= vbdev_compress_io_type_supported,
1201 	.get_io_channel		= vbdev_compress_get_io_channel,
1202 	.dump_info_json		= vbdev_compress_dump_info_json,
1203 	.write_config_json	= NULL,
1204 };
1205 
1206 static struct spdk_bdev_module compress_if = {
1207 	.name = "compress",
1208 	.module_init = vbdev_compress_init,
1209 	.get_ctx_size = vbdev_compress_get_ctx_size,
1210 	.examine_disk = vbdev_compress_examine,
1211 	.module_fini = vbdev_compress_finish,
1212 	.config_json = vbdev_compress_config_json
1213 };
1214 
1215 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1216 
1217 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1218 {
1219 	struct spdk_bdev_alias *aliases;
1220 
1221 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1222 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1223 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1224 		if (!comp_bdev->comp_bdev.name) {
1225 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1226 			return -ENOMEM;
1227 		}
1228 	} else {
1229 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1230 		if (!comp_bdev->comp_bdev.name) {
1231 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1232 			return -ENOMEM;
1233 		}
1234 	}
1235 	return 0;
1236 }
1237 
1238 static int
1239 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1240 {
1241 	struct spdk_uuid ns_uuid;
1242 	int rc;
1243 
1244 	if (_set_compbdev_name(comp_bdev)) {
1245 		return -EINVAL;
1246 	}
1247 
1248 	/* Note: some of the fields below will change in the future - for example,
1249 	 * blockcnt specifically will not match (the compressed volume size will
1250 	 * be slightly less than the base bdev size)
1251 	 */
1252 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1253 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1254 
1255 	comp_bdev->comp_bdev.optimal_io_boundary =
1256 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1257 
1258 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1259 
1260 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1261 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1262 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1263 
1264 	/* This is the context that is passed to us when the bdev
1265 	 * layer calls in so we'll save our comp_bdev node here.
1266 	 */
1267 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1268 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1269 	comp_bdev->comp_bdev.module = &compress_if;
1270 
1271 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1272 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1273 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1274 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1275 	if (rc) {
1276 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1277 		return -EINVAL;
1278 	}
1279 
1280 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1281 
1282 	/* Save the thread where the base device is opened */
1283 	comp_bdev->thread = spdk_get_thread();
1284 
1285 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1286 				sizeof(struct comp_io_channel),
1287 				comp_bdev->comp_bdev.name);
1288 
1289 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1290 					 comp_bdev->comp_bdev.module);
1291 	if (rc) {
1292 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1293 			    spdk_strerror(-rc));
1294 		goto error_claim;
1295 	}
1296 
1297 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1298 	if (rc < 0) {
1299 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1300 		goto error_bdev_register;
1301 	}
1302 
1303 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1304 
1305 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1306 
1307 	return 0;
1308 
1309 	/* Error cleanup paths. */
1310 error_bdev_register:
1311 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1312 error_claim:
1313 	spdk_io_device_unregister(comp_bdev, NULL);
1314 	free(comp_bdev->comp_bdev.name);
1315 	return rc;
1316 }
1317 
1318 static void
1319 _vbdev_compress_delete_done(void *_ctx)
1320 {
1321 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1322 
1323 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1324 
1325 	free(ctx);
1326 }
1327 
1328 static void
1329 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1330 {
1331 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1332 
1333 	ctx->cb_rc = bdeverrno;
1334 
1335 	if (ctx->orig_thread != spdk_get_thread()) {
1336 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1337 	} else {
1338 		_vbdev_compress_delete_done(ctx);
1339 	}
1340 }
1341 
1342 void
1343 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1344 {
1345 	struct vbdev_compress *comp_bdev = NULL;
1346 	struct vbdev_comp_delete_ctx *ctx;
1347 
1348 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1349 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1350 			break;
1351 		}
1352 	}
1353 
1354 	if (comp_bdev == NULL) {
1355 		cb_fn(cb_arg, -ENODEV);
1356 		return;
1357 	}
1358 
1359 	ctx = calloc(1, sizeof(*ctx));
1360 	if (ctx == NULL) {
1361 		SPDK_ERRLOG("Failed to allocate delete context\n");
1362 		cb_fn(cb_arg, -ENOMEM);
1363 		return;
1364 	}
1365 
1366 	/* Save these for after the vol is destroyed. */
1367 	ctx->cb_fn = cb_fn;
1368 	ctx->cb_arg = cb_arg;
1369 	ctx->orig_thread = spdk_get_thread();
1370 
1371 	comp_bdev->delete_ctx = ctx;
1372 
1373 	/* Tell reducelib that we're done with this volume. */
1374 	if (comp_bdev->orphaned == false) {
1375 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1376 	} else {
1377 		delete_vol_unload_cb(comp_bdev, 0);
1378 	}
1379 }
1380 
1381 static void
1382 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1383 {
1384 }
1385 
1386 static void
1387 _vbdev_reduce_load_cb(void *ctx)
1388 {
1389 	struct vbdev_compress *comp_bdev = ctx;
1390 	int rc;
1391 
1392 	assert(comp_bdev->base_desc != NULL);
1393 
1394 	/* Done with metadata operations */
1395 	spdk_put_io_channel(comp_bdev->base_ch);
1396 
1397 	if (comp_bdev->reduce_errno == 0) {
1398 		rc = vbdev_compress_claim(comp_bdev);
1399 		if (rc != 0) {
1400 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1401 			goto err;
1402 		}
1403 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1404 		if (_set_compbdev_name(comp_bdev)) {
1405 			goto err;
1406 		}
1407 
1408 		/* Save the thread where the base device is opened */
1409 		comp_bdev->thread = spdk_get_thread();
1410 
1411 		comp_bdev->comp_bdev.module = &compress_if;
1412 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1413 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1414 						 comp_bdev->comp_bdev.module);
1415 		if (rc) {
1416 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1417 				    spdk_strerror(-rc));
1418 			free(comp_bdev->comp_bdev.name);
1419 			goto err;
1420 		}
1421 
1422 		comp_bdev->orphaned = true;
1423 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1424 	} else {
1425 		if (comp_bdev->reduce_errno != -EILSEQ) {
1426 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1427 				    spdk_strerror(-comp_bdev->reduce_errno));
1428 		}
1429 		goto err;
1430 	}
1431 
1432 	spdk_bdev_module_examine_done(&compress_if);
1433 	return;
1434 
1435 err:
1436 	/* Close the underlying bdev on its same opened thread. */
1437 	spdk_bdev_close(comp_bdev->base_desc);
1438 	free(comp_bdev);
1439 	spdk_bdev_module_examine_done(&compress_if);
1440 }
1441 
1442 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1443  * used for initial metadata operations to claim where it will be further filled out
1444  * and added to the global list.
1445  */
1446 static void
1447 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1448 {
1449 	struct vbdev_compress *comp_bdev = cb_arg;
1450 
1451 	if (reduce_errno == 0) {
1452 		/* Update information following volume load. */
1453 		comp_bdev->vol = vol;
1454 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1455 		       sizeof(struct spdk_reduce_vol_params));
1456 		comp_bdev->comp_algo = comp_bdev->params.comp_algo;
1457 		comp_bdev->comp_level = comp_bdev->params.comp_level;
1458 	}
1459 
1460 	comp_bdev->reduce_errno = reduce_errno;
1461 
1462 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1463 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1464 	} else {
1465 		_vbdev_reduce_load_cb(comp_bdev);
1466 	}
1467 
1468 }
1469 
1470 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1471  * and if so will go ahead and claim it.
1472  */
1473 static void
1474 vbdev_compress_examine(struct spdk_bdev *bdev)
1475 {
1476 	struct spdk_bdev_desc *bdev_desc = NULL;
1477 	struct vbdev_compress *comp_bdev;
1478 	int rc;
1479 
1480 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1481 		spdk_bdev_module_examine_done(&compress_if);
1482 		return;
1483 	}
1484 
1485 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1486 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1487 	if (rc) {
1488 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1489 			    spdk_strerror(-rc));
1490 		spdk_bdev_module_examine_done(&compress_if);
1491 		return;
1492 	}
1493 
1494 	comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1);
1495 	if (comp_bdev == NULL) {
1496 		spdk_bdev_close(bdev_desc);
1497 		spdk_bdev_module_examine_done(&compress_if);
1498 		return;
1499 	}
1500 
1501 	/* Save the thread where the base device is opened */
1502 	comp_bdev->thread = spdk_get_thread();
1503 
1504 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1505 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1506 }
1507 
1508 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1509