xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 42d1bd28396630ca9cfb81bf7934fb8872df47f0)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 #define CHUNK_SIZE (1024 * 16)
25 #define COMP_BDEV_NAME "compress"
26 #define BACKING_IO_SZ (4 * 1024)
27 
28 /* This namespace UUID was generated using uuid_generate() method. */
29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
30 
31 struct vbdev_comp_delete_ctx {
32 	spdk_delete_compress_complete	cb_fn;
33 	void				*cb_arg;
34 	int				cb_rc;
35 	struct spdk_thread		*orig_thread;
36 };
37 
38 /* List of virtual bdevs and associated info for each. */
39 struct vbdev_compress {
40 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
41 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
42 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
43 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
44 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
45 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
46 	struct spdk_thread		*reduce_thread;
47 	pthread_mutex_t			reduce_lock;
48 	uint32_t			ch_count;
49 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
50 	struct spdk_poller		*poller;	/* completion poller */
51 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
52 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
53 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
54 	struct vbdev_comp_delete_ctx	*delete_ctx;
55 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
56 	int				reduce_errno;
57 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
58 	TAILQ_ENTRY(vbdev_compress)	link;
59 	struct spdk_thread		*thread;	/* thread where base device is opened */
60 	enum spdk_accel_comp_algo       comp_algo;      /* compression algorithm for compress bdev */
61 	uint32_t                        comp_level;     /* compression algorithm level */
62 };
63 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
64 
65 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
66  */
67 struct comp_io_channel {
68 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
69 };
70 
71 /* Per I/O context for the compression vbdev. */
72 struct comp_bdev_io {
73 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
74 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
75 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
76 	struct spdk_bdev_io		*orig_io;		/* the original IO */
77 	int				status;			/* save for completion on orig thread */
78 };
79 
80 static void vbdev_compress_examine(struct spdk_bdev *bdev);
81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
82 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size,
83 		uint8_t comp_algo, uint32_t comp_level);
84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
87 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
88 
89 /* for completing rw requests on the orig IO thread. */
90 static void
91 _reduce_rw_blocks_cb(void *arg)
92 {
93 	struct comp_bdev_io *io_ctx = arg;
94 
95 	if (spdk_likely(io_ctx->status == 0)) {
96 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
97 	} else if (io_ctx->status == -ENOMEM) {
98 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_NOMEM);
99 	} else {
100 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
101 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
102 	}
103 }
104 
105 /* Completion callback for r/w that were issued via reducelib. */
106 static void
107 reduce_rw_blocks_cb(void *arg, int reduce_errno)
108 {
109 	struct spdk_bdev_io *bdev_io = arg;
110 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
111 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
112 	struct spdk_thread *orig_thread;
113 
114 	/* TODO: need to decide which error codes are bdev_io success vs failure;
115 	 * example examine calls reading metadata */
116 
117 	io_ctx->status = reduce_errno;
118 
119 	/* Send this request to the orig IO thread. */
120 	orig_thread = spdk_io_channel_get_thread(ch);
121 
122 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
123 }
124 
125 static int
126 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
127 		    int src_iovcnt, struct iovec *dst_iovs,
128 		    int dst_iovcnt, bool compress, void *cb_arg)
129 {
130 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
131 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
132 					   backing_dev);
133 	int rc;
134 
135 	if (compress) {
136 		assert(dst_iovcnt == 1);
137 		rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base,
138 						    dst_iovs[0].iov_len, src_iovs, src_iovcnt,
139 						    comp_bdev->comp_algo, comp_bdev->comp_level,
140 						    &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
141 						    reduce_cb_arg->cb_arg);
142 	} else {
143 		rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
144 						      src_iovs, src_iovcnt, comp_bdev->comp_algo,
145 						      &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
146 						      reduce_cb_arg->cb_arg);
147 	}
148 
149 	return rc;
150 }
151 
152 /* Entry point for reduce lib to issue a compress operation. */
153 static void
154 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
155 		      struct iovec *src_iovs, int src_iovcnt,
156 		      struct iovec *dst_iovs, int dst_iovcnt,
157 		      struct spdk_reduce_vol_cb_args *cb_arg)
158 {
159 	int rc;
160 
161 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
162 	if (rc) {
163 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
164 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
165 	}
166 }
167 
168 /* Entry point for reduce lib to issue a decompress operation. */
169 static void
170 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
171 			struct iovec *src_iovs, int src_iovcnt,
172 			struct iovec *dst_iovs, int dst_iovcnt,
173 			struct spdk_reduce_vol_cb_args *cb_arg)
174 {
175 	int rc;
176 
177 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
178 	if (rc) {
179 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
180 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
181 	}
182 }
183 
184 static void
185 _comp_submit_write(void *ctx)
186 {
187 	struct spdk_bdev_io *bdev_io = ctx;
188 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
189 					   comp_bdev);
190 
191 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
192 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
193 			       reduce_rw_blocks_cb, bdev_io);
194 }
195 
196 static void
197 _comp_submit_read(void *ctx)
198 {
199 	struct spdk_bdev_io *bdev_io = ctx;
200 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
201 					   comp_bdev);
202 
203 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
204 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
205 			      reduce_rw_blocks_cb, bdev_io);
206 }
207 
208 
209 /* Callback for getting a buf from the bdev pool in the event that the caller passed
210  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
211  * beneath us before we're done with it.
212  */
213 static void
214 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
215 {
216 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
217 					   comp_bdev);
218 
219 	if (spdk_unlikely(!success)) {
220 		SPDK_ERRLOG("Failed to get data buffer\n");
221 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
222 		return;
223 	}
224 
225 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
226 }
227 
228 struct partial_chunk_info {
229 	uint64_t chunk_idx;
230 	uint64_t block_offset;
231 	uint64_t block_length;
232 };
233 
234 /*
235  * It's a structure used to hold information needed during the execution of an unmap operation.
236  */
237 struct compress_unmap_split_ctx {
238 	struct spdk_bdev_io *bdev_io;
239 	int32_t status;
240 	uint32_t logical_blocks_per_chunk;
241 	/* The first chunk that can be fully covered by the unmap bdevio interval */
242 	uint64_t full_chunk_idx_b;
243 	/* The last chunk that can be fully covered by the unmap bdevio interval */
244 	uint64_t full_chunk_idx_e;
245 	uint64_t num_full_chunks;
246 	uint64_t num_full_chunks_consumed;
247 	uint32_t num_partial_chunks;
248 	uint32_t num_partial_chunks_consumed;
249 	/* Used to hold the partial chunk information. There will only be less than or equal to two,
250 	because chunks that cannot be fully covered will only appear at the beginning or end or both two. */
251 	struct partial_chunk_info partial_chunk_info[2];
252 };
253 
254 static void _comp_unmap_subcmd_done_cb(void *ctx, int error);
255 
256 /*
257  * This function processes the unmap operation for both full and partial chunks in a
258  * compressed block device. It iteratively submits unmap requests until all the chunks
259  * have been unmapped or an error occurs.
260  */
261 static void
262 _comp_submit_unmap_split(void *ctx)
263 {
264 	struct compress_unmap_split_ctx *split_ctx = ctx;
265 	struct spdk_bdev_io *bdev_io = split_ctx->bdev_io;
266 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
267 					   comp_bdev);
268 	struct partial_chunk_info *partial_chunk = NULL;
269 	uint64_t chunk_idx = 0;
270 	uint64_t block_offset = 0;
271 	uint64_t block_length = 0;
272 
273 	if (split_ctx->status != 0 ||
274 	    (split_ctx->num_full_chunks_consumed == split_ctx->num_full_chunks &&
275 	     split_ctx->num_partial_chunks_consumed == split_ctx->num_partial_chunks)) {
276 		reduce_rw_blocks_cb(bdev_io, split_ctx->status);
277 		free(split_ctx);
278 		return;
279 	}
280 
281 	if (split_ctx->num_full_chunks_consumed < split_ctx->num_full_chunks) {
282 		chunk_idx = split_ctx->full_chunk_idx_b + split_ctx->num_full_chunks_consumed;
283 		block_offset = chunk_idx * split_ctx->logical_blocks_per_chunk;
284 		block_length = split_ctx->logical_blocks_per_chunk;
285 
286 		split_ctx->num_full_chunks_consumed++;
287 		spdk_reduce_vol_unmap(comp_bdev->vol,
288 				      block_offset, block_length,
289 				      _comp_unmap_subcmd_done_cb, split_ctx);
290 	} else if (split_ctx->num_partial_chunks_consumed < split_ctx->num_partial_chunks) {
291 		partial_chunk = &split_ctx->partial_chunk_info[split_ctx->num_partial_chunks_consumed];
292 		block_offset = partial_chunk->chunk_idx * split_ctx->logical_blocks_per_chunk +
293 			       partial_chunk->block_offset;
294 		block_length = partial_chunk->block_length;
295 
296 		split_ctx->num_partial_chunks_consumed++;
297 		spdk_reduce_vol_unmap(comp_bdev->vol,
298 				      block_offset, block_length,
299 				      _comp_unmap_subcmd_done_cb, split_ctx);
300 	} else {
301 		assert(false);
302 	}
303 }
304 
305 /*
306  * When mkfs or fstrim, large unmap requests may be generated.
307  * Large request will be split into multiple subcmds and processed recursively.
308  * Run too many subcmds recursively may cause stack overflow or monopolize the thread,
309  * delaying other tasks. To avoid this, next subcmd need to be processed asynchronously
310  * by 'spdk_thread_send_msg'.
311  */
312 static void
313 _comp_unmap_subcmd_done_cb(void *ctx, int error)
314 {
315 	struct compress_unmap_split_ctx *split_ctx = ctx;
316 
317 	split_ctx->status = error;
318 	spdk_thread_send_msg(spdk_get_thread(), _comp_submit_unmap_split, split_ctx);
319 }
320 
321 /*
322  * This function splits the unmap operation into full and partial chunks based on the
323  * block range specified in the 'spdk_bdev_io' structure. It calculates the start and end
324  * chunks, as well as any partial chunks at the beginning or end of the range, and prepares
325  * a context (compress_unmap_split_ctx) to handle these chunks. The unmap operation is
326  * then submitted for processing through '_comp_submit_unmap_split'.
327  * some cases to handle:
328  * 1. start and end chunks are different
329  * 1.1 start and end chunks are full
330  * 1.2 start and end chunks are partial
331  * 1.3 start or  end chunk  is full and the other is partial
332  * 2. start and end chunks are the same
333  * 2.1 full
334  * 2.2 partial
335  */
336 static void
337 _comp_submit_unmap(void *ctx)
338 {
339 	struct spdk_bdev_io *bdev_io = ctx;
340 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
341 					   comp_bdev);
342 	const struct spdk_reduce_vol_params *vol_params = spdk_reduce_vol_get_params(comp_bdev->vol);
343 	struct compress_unmap_split_ctx *split_ctx;
344 	struct partial_chunk_info *partial_chunk;
345 	uint32_t logical_blocks_per_chunk;
346 	uint64_t start_chunk, end_chunk, start_offset, end_tail;
347 
348 	logical_blocks_per_chunk = vol_params->chunk_size / vol_params->logical_block_size;
349 	start_chunk = bdev_io->u.bdev.offset_blocks / logical_blocks_per_chunk;
350 	end_chunk = (bdev_io->u.bdev.offset_blocks + bdev_io->u.bdev.num_blocks - 1) /
351 		    logical_blocks_per_chunk;
352 	start_offset = bdev_io->u.bdev.offset_blocks % logical_blocks_per_chunk;
353 	end_tail = (bdev_io->u.bdev.offset_blocks + bdev_io->u.bdev.num_blocks) %
354 		   logical_blocks_per_chunk;
355 
356 	split_ctx = calloc(1, sizeof(struct compress_unmap_split_ctx));
357 	if (split_ctx == NULL) {
358 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
359 		return;
360 	}
361 	partial_chunk = split_ctx->partial_chunk_info;
362 	split_ctx->bdev_io = bdev_io;
363 	split_ctx->logical_blocks_per_chunk = logical_blocks_per_chunk;
364 
365 	if (start_chunk < end_chunk) {
366 		if (start_offset != 0) {
367 			partial_chunk[split_ctx->num_partial_chunks].chunk_idx = start_chunk;
368 			partial_chunk[split_ctx->num_partial_chunks].block_offset = start_offset;
369 			partial_chunk[split_ctx->num_partial_chunks].block_length = logical_blocks_per_chunk
370 					- start_offset;
371 			split_ctx->num_partial_chunks++;
372 			split_ctx->full_chunk_idx_b = start_chunk + 1;
373 		} else {
374 			split_ctx->full_chunk_idx_b = start_chunk;
375 		}
376 
377 		if (end_tail != 0) {
378 			partial_chunk[split_ctx->num_partial_chunks].chunk_idx = end_chunk;
379 			partial_chunk[split_ctx->num_partial_chunks].block_offset = 0;
380 			partial_chunk[split_ctx->num_partial_chunks].block_length = end_tail;
381 			split_ctx->num_partial_chunks++;
382 			split_ctx->full_chunk_idx_e = end_chunk - 1;
383 		} else {
384 			split_ctx->full_chunk_idx_e = end_chunk;
385 		}
386 
387 		split_ctx->num_full_chunks = end_chunk - start_chunk + 1 - split_ctx->num_partial_chunks;
388 
389 		if (split_ctx->num_full_chunks) {
390 			assert(split_ctx->full_chunk_idx_b != UINT64_MAX && split_ctx->full_chunk_idx_e != UINT64_MAX);
391 			assert(split_ctx->full_chunk_idx_e - split_ctx->full_chunk_idx_b + 1 == split_ctx->num_full_chunks);
392 		} else {
393 			assert(split_ctx->full_chunk_idx_b - split_ctx->full_chunk_idx_e == 1);
394 		}
395 	} else if (start_offset != 0 || end_tail != 0) {
396 		partial_chunk[0].chunk_idx = start_chunk;
397 		partial_chunk[0].block_offset = start_offset;
398 		partial_chunk[0].block_length =
399 			bdev_io->u.bdev.num_blocks;
400 		split_ctx->num_partial_chunks = 1;
401 	} else {
402 		split_ctx->full_chunk_idx_b = start_chunk;
403 		split_ctx->full_chunk_idx_e = end_chunk;
404 		split_ctx->num_full_chunks = 1;
405 	}
406 	assert(split_ctx->num_partial_chunks <= SPDK_COUNTOF(split_ctx->partial_chunk_info));
407 
408 	_comp_submit_unmap_split(split_ctx);
409 }
410 
411 /* Called when someone above submits IO to this vbdev. */
412 static void
413 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
414 {
415 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
416 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
417 					   comp_bdev);
418 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
419 
420 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
421 	io_ctx->comp_bdev = comp_bdev;
422 	io_ctx->comp_ch = comp_ch;
423 	io_ctx->orig_io = bdev_io;
424 
425 	switch (bdev_io->type) {
426 	case SPDK_BDEV_IO_TYPE_READ:
427 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
428 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
429 		return;
430 	case SPDK_BDEV_IO_TYPE_WRITE:
431 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
432 		return;
433 	case SPDK_BDEV_IO_TYPE_UNMAP:
434 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_unmap, bdev_io);
435 		return;
436 	/* TODO support RESET in future patch in the series */
437 	case SPDK_BDEV_IO_TYPE_RESET:
438 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
439 	case SPDK_BDEV_IO_TYPE_FLUSH:
440 	default:
441 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
442 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
443 		break;
444 	}
445 }
446 
447 static bool
448 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
449 {
450 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
451 
452 	switch (io_type) {
453 	case SPDK_BDEV_IO_TYPE_READ:
454 	case SPDK_BDEV_IO_TYPE_WRITE:
455 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
456 	case SPDK_BDEV_IO_TYPE_UNMAP:
457 		return true;
458 	case SPDK_BDEV_IO_TYPE_RESET:
459 	case SPDK_BDEV_IO_TYPE_FLUSH:
460 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
461 	default:
462 		return false;
463 	}
464 }
465 
466 /* Callback for unregistering the IO device. */
467 static void
468 _device_unregister_cb(void *io_device)
469 {
470 	struct vbdev_compress *comp_bdev = io_device;
471 
472 	/* Done with this comp_bdev. */
473 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
474 	free(comp_bdev->comp_bdev.name);
475 	free(comp_bdev);
476 }
477 
478 static void
479 _vbdev_compress_destruct_cb(void *ctx)
480 {
481 	struct vbdev_compress *comp_bdev = ctx;
482 
483 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
484 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
485 	/* Close the underlying bdev on its same opened thread. */
486 	spdk_bdev_close(comp_bdev->base_desc);
487 	comp_bdev->vol = NULL;
488 	if (comp_bdev->orphaned == false) {
489 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
490 	} else {
491 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
492 		_device_unregister_cb(comp_bdev);
493 	}
494 }
495 
496 static void
497 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
498 {
499 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
500 
501 	if (reduce_errno) {
502 		SPDK_ERRLOG("number %d\n", reduce_errno);
503 	} else {
504 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
505 			spdk_thread_send_msg(comp_bdev->thread,
506 					     _vbdev_compress_destruct_cb, comp_bdev);
507 		} else {
508 			_vbdev_compress_destruct_cb(comp_bdev);
509 		}
510 	}
511 }
512 
513 static void
514 _reduce_destroy_cb(void *ctx, int reduce_errno)
515 {
516 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
517 
518 	if (reduce_errno) {
519 		SPDK_ERRLOG("number %d\n", reduce_errno);
520 	}
521 
522 	comp_bdev->vol = NULL;
523 	spdk_put_io_channel(comp_bdev->base_ch);
524 	if (comp_bdev->orphaned == false) {
525 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
526 				     comp_bdev->delete_ctx);
527 	} else {
528 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
529 	}
530 
531 }
532 
533 static void
534 _delete_vol_unload_cb(void *ctx)
535 {
536 	struct vbdev_compress *comp_bdev = ctx;
537 
538 	/* FIXME: Assert if these conditions are not satisfied for now. */
539 	assert(!comp_bdev->reduce_thread ||
540 	       comp_bdev->reduce_thread == spdk_get_thread());
541 
542 	/* reducelib needs a channel to comm with the backing device */
543 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
544 
545 	/* Clean the device before we free our resources. */
546 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
547 }
548 
549 /* Called by reduceLib after performing unload vol actions */
550 static void
551 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
552 {
553 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
554 
555 	if (reduce_errno) {
556 		SPDK_ERRLOG("Failed to unload vol, error %s\n", spdk_strerror(-reduce_errno));
557 		vbdev_compress_delete_done(comp_bdev->delete_ctx, reduce_errno);
558 		return;
559 	}
560 
561 	pthread_mutex_lock(&comp_bdev->reduce_lock);
562 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
563 		spdk_thread_send_msg(comp_bdev->reduce_thread,
564 				     _delete_vol_unload_cb, comp_bdev);
565 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
566 	} else {
567 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
568 
569 		_delete_vol_unload_cb(comp_bdev);
570 	}
571 }
572 
573 const char *
574 compress_get_name(const struct vbdev_compress *comp_bdev)
575 {
576 	return comp_bdev->comp_bdev.name;
577 }
578 
579 struct vbdev_compress *
580 compress_bdev_first(void)
581 {
582 	struct vbdev_compress *comp_bdev;
583 
584 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
585 
586 	return comp_bdev;
587 }
588 
589 struct vbdev_compress *
590 compress_bdev_next(struct vbdev_compress *prev)
591 {
592 	struct vbdev_compress *comp_bdev;
593 
594 	comp_bdev = TAILQ_NEXT(prev, link);
595 
596 	return comp_bdev;
597 }
598 
599 bool
600 compress_has_orphan(const char *name)
601 {
602 	struct vbdev_compress *comp_bdev;
603 
604 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
605 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
606 			return true;
607 		}
608 	}
609 	return false;
610 }
611 
612 /* Called after we've unregistered following a hot remove callback.
613  * Our finish entry point will be called next.
614  */
615 static int
616 vbdev_compress_destruct(void *ctx)
617 {
618 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
619 
620 	if (comp_bdev->vol != NULL) {
621 		/* Tell reducelib that we're done with this volume. */
622 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
623 	} else {
624 		vbdev_compress_destruct_cb(comp_bdev, 0);
625 	}
626 
627 	return 0;
628 }
629 
630 /* We supplied this as an entry point for upper layers who want to communicate to this
631  * bdev.  This is how they get a channel.
632  */
633 static struct spdk_io_channel *
634 vbdev_compress_get_io_channel(void *ctx)
635 {
636 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
637 
638 	/* The IO channel code will allocate a channel for us which consists of
639 	 * the SPDK channel structure plus the size of our comp_io_channel struct
640 	 * that we passed in when we registered our IO device. It will then call
641 	 * our channel create callback to populate any elements that we need to
642 	 * update.
643 	 */
644 	return spdk_get_io_channel(comp_bdev);
645 }
646 
647 /* This is the output for bdev_get_bdevs() for this vbdev */
648 static int
649 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
650 {
651 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
652 	const struct spdk_reduce_vol_info *vol_info;
653 	char *comp_algo = NULL;
654 
655 	if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_LZ4) {
656 		comp_algo = "lz4";
657 	} else if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_DEFLATE) {
658 		comp_algo = "deflate";
659 	} else {
660 		assert(false);
661 	}
662 
663 	spdk_json_write_name(w, "compress");
664 	spdk_json_write_object_begin(w);
665 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
666 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
667 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
668 	spdk_json_write_named_string(w, "comp_algo", comp_algo);
669 	spdk_json_write_named_uint32(w, "comp_level", comp_bdev->params.comp_level);
670 	spdk_json_write_named_uint32(w, "chunk_size", comp_bdev->params.chunk_size);
671 	spdk_json_write_named_uint32(w, "backing_io_unit_size", comp_bdev->params.backing_io_unit_size);
672 	vol_info = spdk_reduce_vol_get_info(comp_bdev->vol);
673 	spdk_json_write_named_uint64(w, "allocated_io_units", vol_info->allocated_io_units);
674 	spdk_json_write_object_end(w);
675 
676 	return 0;
677 }
678 
679 static int
680 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
681 {
682 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
683 	return 0;
684 }
685 
686 struct vbdev_init_reduce_ctx {
687 	struct vbdev_compress   *comp_bdev;
688 	int                     status;
689 	bdev_compress_create_cb cb_fn;
690 	void                    *cb_ctx;
691 };
692 
693 static void
694 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
695 {
696 }
697 
698 static void
699 _vbdev_reduce_init_cb(void *ctx)
700 {
701 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
702 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
703 	int rc;
704 
705 	assert(comp_bdev->base_desc != NULL);
706 
707 	/* We're done with metadata operations */
708 	spdk_put_io_channel(comp_bdev->base_ch);
709 
710 	if (comp_bdev->vol) {
711 		rc = vbdev_compress_claim(comp_bdev);
712 		if (rc == 0) {
713 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
714 			free(init_ctx);
715 			return;
716 		} else {
717 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
718 		}
719 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
720 	}
721 
722 	/* Close the underlying bdev on its same opened thread. */
723 	spdk_bdev_close(comp_bdev->base_desc);
724 	free(comp_bdev);
725 	free(init_ctx);
726 }
727 
728 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
729  * used for initial metadata operations to claim where it will be further filled out
730  * and added to the global list.
731  */
732 static void
733 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
734 {
735 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
736 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
737 
738 	if (reduce_errno == 0) {
739 		comp_bdev->vol = vol;
740 	} else {
741 		SPDK_ERRLOG("for vol %s, error %s\n",
742 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
743 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
744 	}
745 
746 	init_ctx->status = reduce_errno;
747 
748 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
749 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
750 	} else {
751 		_vbdev_reduce_init_cb(init_ctx);
752 	}
753 }
754 
755 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
756  * call the callback provided by reduceLib when it called the read/write/unmap function and
757  * free the bdev_io.
758  */
759 static void
760 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
761 {
762 	struct spdk_reduce_vol_cb_args *cb_args = arg;
763 	int reduce_errno;
764 
765 	if (success) {
766 		reduce_errno = 0;
767 	} else {
768 		reduce_errno = -EIO;
769 	}
770 	spdk_bdev_free_io(bdev_io);
771 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
772 }
773 
774 static void
775 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
776 				 struct spdk_reduce_backing_io *backing_io)
777 {
778 	struct spdk_bdev_io_wait_entry *waitq_entry;
779 	int rc;
780 
781 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
782 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
783 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
784 	waitq_entry->cb_arg = backing_io;
785 
786 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
787 	if (rc) {
788 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
789 		assert(false);
790 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
791 	}
792 }
793 
794 static void
795 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
796 {
797 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
798 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
799 					   backing_dev);
800 	int rc;
801 
802 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
803 				    backing_io->iov, backing_io->iovcnt,
804 				    backing_io->lba, backing_io->lba_count,
805 				    comp_reduce_io_cb,
806 				    backing_cb_args);
807 
808 	if (rc) {
809 		if (rc == -ENOMEM) {
810 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
811 			return;
812 		} else {
813 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
814 		}
815 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
816 	}
817 }
818 
819 static void
820 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
821 {
822 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
823 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
824 					   backing_dev);
825 	int rc;
826 
827 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
828 				     backing_io->iov, backing_io->iovcnt,
829 				     backing_io->lba, backing_io->lba_count,
830 				     comp_reduce_io_cb,
831 				     backing_cb_args);
832 
833 	if (rc) {
834 		if (rc == -ENOMEM) {
835 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
836 			return;
837 		} else {
838 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
839 		}
840 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
841 	}
842 }
843 
844 static void
845 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
846 {
847 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
848 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
849 					   backing_dev);
850 	int rc;
851 
852 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
853 				    backing_io->lba, backing_io->lba_count,
854 				    comp_reduce_io_cb,
855 				    backing_cb_args);
856 
857 	if (rc) {
858 		if (rc == -ENOMEM) {
859 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
860 			return;
861 		} else {
862 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
863 		}
864 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
865 	}
866 }
867 
868 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
869  * directly to the backing device.
870  */
871 static void
872 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
873 {
874 	switch (backing_io->backing_io_type) {
875 	case SPDK_REDUCE_BACKING_IO_WRITE:
876 		_comp_backing_bdev_write(backing_io);
877 		break;
878 	case SPDK_REDUCE_BACKING_IO_READ:
879 		_comp_backing_bdev_read(backing_io);
880 		break;
881 	case SPDK_REDUCE_BACKING_IO_UNMAP:
882 		_comp_backing_bdev_unmap(backing_io);
883 		break;
884 	default:
885 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
886 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
887 		break;
888 	}
889 }
890 
891 static void
892 _comp_reduce_resubmit_backing_io(void *_backing_io)
893 {
894 	struct spdk_reduce_backing_io *backing_io = _backing_io;
895 
896 	_comp_reduce_submit_backing_io(backing_io);
897 }
898 
899 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
900 static void
901 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
902 {
903 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
904 
905 	if (reduce_errno) {
906 		SPDK_ERRLOG("number %d\n", reduce_errno);
907 	}
908 
909 	comp_bdev->vol = NULL;
910 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
911 }
912 
913 static void
914 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
915 {
916 	struct vbdev_compress *comp_bdev, *tmp;
917 
918 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
919 		if (bdev_find == comp_bdev->base_bdev) {
920 			/* Tell reduceLib that we're done with this volume. */
921 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
922 		}
923 	}
924 }
925 
926 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
927 static void
928 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
929 				  void *event_ctx)
930 {
931 	switch (type) {
932 	case SPDK_BDEV_EVENT_REMOVE:
933 		vbdev_compress_base_bdev_hotremove_cb(bdev);
934 		break;
935 	default:
936 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
937 		break;
938 	}
939 }
940 
941 /* TODO: determine which parms we want user configurable, HC for now
942  * params.vol_size
943  * params.chunk_size
944  * compression PMD, algorithm, window size, comp level, etc.
945  * DEV_MD_PATH
946  */
947 
948 /* Common function for init and load to allocate and populate the minimal
949  * information for reducelib to init or load.
950  */
951 struct vbdev_compress *
952 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo,
953 		       uint32_t comp_level)
954 {
955 	struct vbdev_compress *comp_bdev;
956 	struct spdk_bdev *bdev;
957 
958 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
959 	if (comp_bdev == NULL) {
960 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
961 		return NULL;
962 	}
963 
964 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
965 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
966 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
967 
968 	comp_bdev->base_desc = bdev_desc;
969 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
970 	comp_bdev->base_bdev = bdev;
971 
972 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
973 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
974 
975 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
976 
977 	comp_bdev->comp_algo = comp_algo;
978 	comp_bdev->comp_level = comp_level;
979 	comp_bdev->params.comp_algo = comp_algo;
980 	comp_bdev->params.comp_level = comp_level;
981 	comp_bdev->params.chunk_size = CHUNK_SIZE;
982 	if (lb_size == 0) {
983 		comp_bdev->params.logical_block_size = bdev->blocklen;
984 	} else {
985 		comp_bdev->params.logical_block_size = lb_size;
986 	}
987 
988 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
989 	return comp_bdev;
990 }
991 
992 /* Call reducelib to initialize a new volume */
993 static int
994 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo,
995 		  uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg)
996 {
997 	struct spdk_bdev_desc *bdev_desc = NULL;
998 	struct vbdev_init_reduce_ctx *init_ctx;
999 	struct vbdev_compress *comp_bdev;
1000 	int rc;
1001 
1002 	init_ctx = calloc(1, sizeof(*init_ctx));
1003 	if (init_ctx == NULL) {
1004 		SPDK_ERRLOG("failed to alloc init contexts\n");
1005 		return - ENOMEM;
1006 	}
1007 
1008 	init_ctx->cb_fn = cb_fn;
1009 	init_ctx->cb_ctx = cb_arg;
1010 
1011 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1012 				NULL, &bdev_desc);
1013 	if (rc) {
1014 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
1015 		free(init_ctx);
1016 		return rc;
1017 	}
1018 
1019 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level);
1020 	if (comp_bdev == NULL) {
1021 		free(init_ctx);
1022 		spdk_bdev_close(bdev_desc);
1023 		return -EINVAL;
1024 	}
1025 
1026 	init_ctx->comp_bdev = comp_bdev;
1027 
1028 	/* Save the thread where the base device is opened */
1029 	comp_bdev->thread = spdk_get_thread();
1030 
1031 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1032 
1033 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
1034 			     pm_path,
1035 			     vbdev_reduce_init_cb,
1036 			     init_ctx);
1037 	return 0;
1038 }
1039 
1040 /* We provide this callback for the SPDK channel code to create a channel using
1041  * the channel struct we provided in our module get_io_channel() entry point. Here
1042  * we get and save off an underlying base channel of the device below us so that
1043  * we can communicate with the base bdev on a per channel basis.  If we needed
1044  * our own poller for this vbdev, we'd register it here.
1045  */
1046 static int
1047 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1048 {
1049 	struct vbdev_compress *comp_bdev = io_device;
1050 
1051 	/* Now set the reduce channel if it's not already set. */
1052 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1053 	if (comp_bdev->ch_count == 0) {
1054 		/* We use this queue to track outstanding IO in our layer. */
1055 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1056 
1057 		/* We use this to queue up compression operations as needed. */
1058 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1059 
1060 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1061 		comp_bdev->reduce_thread = spdk_get_thread();
1062 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
1063 	}
1064 	comp_bdev->ch_count++;
1065 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1066 
1067 	return 0;
1068 }
1069 
1070 static void
1071 _channel_cleanup(struct vbdev_compress *comp_bdev)
1072 {
1073 	spdk_put_io_channel(comp_bdev->base_ch);
1074 	spdk_put_io_channel(comp_bdev->accel_channel);
1075 	comp_bdev->reduce_thread = NULL;
1076 }
1077 
1078 /* Used to reroute destroy_ch to the correct thread */
1079 static void
1080 _comp_bdev_ch_destroy_cb(void *arg)
1081 {
1082 	struct vbdev_compress *comp_bdev = arg;
1083 
1084 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1085 	_channel_cleanup(comp_bdev);
1086 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1087 }
1088 
1089 /* We provide this callback for the SPDK channel code to destroy a channel
1090  * created with our create callback. We just need to undo anything we did
1091  * when we created. If this bdev used its own poller, we'd unregister it here.
1092  */
1093 static void
1094 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1095 {
1096 	struct vbdev_compress *comp_bdev = io_device;
1097 
1098 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1099 	comp_bdev->ch_count--;
1100 	if (comp_bdev->ch_count == 0) {
1101 		/* Send this request to the thread where the channel was created. */
1102 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1103 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1104 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1105 		} else {
1106 			_channel_cleanup(comp_bdev);
1107 		}
1108 	}
1109 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1110 }
1111 
1112 static int
1113 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level)
1114 {
1115 	uint32_t min_level, max_level;
1116 	int rc;
1117 
1118 	rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level);
1119 	if (rc != 0) {
1120 		return rc;
1121 	}
1122 
1123 	/* If both min_level and max_level are 0, the compression level can be ignored.
1124 	 * The back-end implementation hardcodes the compression level.
1125 	 */
1126 	if (min_level == 0 && max_level == 0) {
1127 		return 0;
1128 	}
1129 
1130 	if (comp_level > max_level || comp_level < min_level) {
1131 		return -EINVAL;
1132 	}
1133 
1134 	return 0;
1135 }
1136 
1137 /* RPC entry point for compression vbdev creation. */
1138 int
1139 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
1140 		     uint8_t comp_algo, uint32_t comp_level,
1141 		     bdev_compress_create_cb cb_fn, void *cb_arg)
1142 {
1143 	struct vbdev_compress *comp_bdev = NULL;
1144 	struct stat info;
1145 	int rc;
1146 
1147 	if (stat(pm_path, &info) != 0) {
1148 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
1149 		return -EINVAL;
1150 	} else if (!S_ISDIR(info.st_mode)) {
1151 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
1152 		return -EINVAL;
1153 	}
1154 
1155 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1156 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1157 		return -EINVAL;
1158 	}
1159 
1160 	rc = _check_compress_bdev_comp_algo(comp_algo, comp_level);
1161 	if (rc != 0) {
1162 		SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n",
1163 			    comp_algo, comp_level);
1164 		return rc;
1165 	}
1166 
1167 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1168 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
1169 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
1170 			return -EBUSY;
1171 		}
1172 	}
1173 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg);
1174 }
1175 
1176 static int
1177 vbdev_compress_init(void)
1178 {
1179 	return 0;
1180 }
1181 
1182 /* Called when the entire module is being torn down. */
1183 static void
1184 vbdev_compress_finish(void)
1185 {
1186 	/* TODO: unload vol in a future patch */
1187 }
1188 
1189 /* During init we'll be asked how much memory we'd like passed to us
1190  * in bev_io structures as context. Here's where we specify how
1191  * much context we want per IO.
1192  */
1193 static int
1194 vbdev_compress_get_ctx_size(void)
1195 {
1196 	return sizeof(struct comp_bdev_io);
1197 }
1198 
1199 /* When we register our bdev this is how we specify our entry points. */
1200 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1201 	.destruct		= vbdev_compress_destruct,
1202 	.submit_request		= vbdev_compress_submit_request,
1203 	.io_type_supported	= vbdev_compress_io_type_supported,
1204 	.get_io_channel		= vbdev_compress_get_io_channel,
1205 	.dump_info_json		= vbdev_compress_dump_info_json,
1206 	.write_config_json	= NULL,
1207 };
1208 
1209 static struct spdk_bdev_module compress_if = {
1210 	.name = "compress",
1211 	.module_init = vbdev_compress_init,
1212 	.get_ctx_size = vbdev_compress_get_ctx_size,
1213 	.examine_disk = vbdev_compress_examine,
1214 	.module_fini = vbdev_compress_finish,
1215 	.config_json = vbdev_compress_config_json
1216 };
1217 
1218 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1219 
1220 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1221 {
1222 	struct spdk_bdev_alias *aliases;
1223 
1224 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1225 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1226 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1227 		if (!comp_bdev->comp_bdev.name) {
1228 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1229 			return -ENOMEM;
1230 		}
1231 	} else {
1232 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1233 		if (!comp_bdev->comp_bdev.name) {
1234 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1235 			return -ENOMEM;
1236 		}
1237 	}
1238 	return 0;
1239 }
1240 
1241 static int
1242 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1243 {
1244 	struct spdk_uuid ns_uuid;
1245 	int rc;
1246 
1247 	if (_set_compbdev_name(comp_bdev)) {
1248 		return -EINVAL;
1249 	}
1250 
1251 	/* Note: some of the fields below will change in the future - for example,
1252 	 * blockcnt specifically will not match (the compressed volume size will
1253 	 * be slightly less than the base bdev size)
1254 	 */
1255 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1256 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1257 
1258 	comp_bdev->comp_bdev.optimal_io_boundary =
1259 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1260 
1261 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1262 
1263 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1264 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1265 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1266 
1267 	/* This is the context that is passed to us when the bdev
1268 	 * layer calls in so we'll save our comp_bdev node here.
1269 	 */
1270 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1271 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1272 	comp_bdev->comp_bdev.module = &compress_if;
1273 
1274 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1275 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1276 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1277 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1278 	if (rc) {
1279 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1280 		return -EINVAL;
1281 	}
1282 
1283 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1284 
1285 	/* Save the thread where the base device is opened */
1286 	comp_bdev->thread = spdk_get_thread();
1287 
1288 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1289 				sizeof(struct comp_io_channel),
1290 				comp_bdev->comp_bdev.name);
1291 
1292 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1293 					 comp_bdev->comp_bdev.module);
1294 	if (rc) {
1295 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1296 			    spdk_strerror(-rc));
1297 		goto error_claim;
1298 	}
1299 
1300 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1301 	if (rc < 0) {
1302 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1303 		goto error_bdev_register;
1304 	}
1305 
1306 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1307 
1308 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1309 
1310 	return 0;
1311 
1312 	/* Error cleanup paths. */
1313 error_bdev_register:
1314 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1315 error_claim:
1316 	spdk_io_device_unregister(comp_bdev, NULL);
1317 	free(comp_bdev->comp_bdev.name);
1318 	return rc;
1319 }
1320 
1321 static void
1322 _vbdev_compress_delete_done(void *_ctx)
1323 {
1324 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1325 
1326 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1327 
1328 	free(ctx);
1329 }
1330 
1331 static void
1332 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1333 {
1334 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1335 
1336 	ctx->cb_rc = bdeverrno;
1337 
1338 	if (ctx->orig_thread != spdk_get_thread()) {
1339 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1340 	} else {
1341 		_vbdev_compress_delete_done(ctx);
1342 	}
1343 }
1344 
1345 void
1346 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1347 {
1348 	struct vbdev_compress *comp_bdev = NULL;
1349 	struct vbdev_comp_delete_ctx *ctx;
1350 
1351 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1352 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1353 			break;
1354 		}
1355 	}
1356 
1357 	if (comp_bdev == NULL) {
1358 		cb_fn(cb_arg, -ENODEV);
1359 		return;
1360 	}
1361 
1362 	ctx = calloc(1, sizeof(*ctx));
1363 	if (ctx == NULL) {
1364 		SPDK_ERRLOG("Failed to allocate delete context\n");
1365 		cb_fn(cb_arg, -ENOMEM);
1366 		return;
1367 	}
1368 
1369 	/* Save these for after the vol is destroyed. */
1370 	ctx->cb_fn = cb_fn;
1371 	ctx->cb_arg = cb_arg;
1372 	ctx->orig_thread = spdk_get_thread();
1373 
1374 	comp_bdev->delete_ctx = ctx;
1375 
1376 	/* Tell reducelib that we're done with this volume. */
1377 	if (comp_bdev->orphaned == false) {
1378 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1379 	} else {
1380 		delete_vol_unload_cb(comp_bdev, 0);
1381 	}
1382 }
1383 
1384 static void
1385 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1386 {
1387 }
1388 
1389 static void
1390 _vbdev_reduce_load_cb(void *ctx)
1391 {
1392 	struct vbdev_compress *comp_bdev = ctx;
1393 	int rc;
1394 
1395 	assert(comp_bdev->base_desc != NULL);
1396 
1397 	/* Done with metadata operations */
1398 	spdk_put_io_channel(comp_bdev->base_ch);
1399 
1400 	if (comp_bdev->reduce_errno == 0) {
1401 		rc = vbdev_compress_claim(comp_bdev);
1402 		if (rc != 0) {
1403 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1404 			goto err;
1405 		}
1406 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1407 		if (_set_compbdev_name(comp_bdev)) {
1408 			goto err;
1409 		}
1410 
1411 		/* Save the thread where the base device is opened */
1412 		comp_bdev->thread = spdk_get_thread();
1413 
1414 		comp_bdev->comp_bdev.module = &compress_if;
1415 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1416 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1417 						 comp_bdev->comp_bdev.module);
1418 		if (rc) {
1419 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1420 				    spdk_strerror(-rc));
1421 			free(comp_bdev->comp_bdev.name);
1422 			goto err;
1423 		}
1424 
1425 		comp_bdev->orphaned = true;
1426 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1427 	} else {
1428 		if (comp_bdev->reduce_errno != -EILSEQ) {
1429 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1430 				    spdk_strerror(-comp_bdev->reduce_errno));
1431 		}
1432 		goto err;
1433 	}
1434 
1435 	spdk_bdev_module_examine_done(&compress_if);
1436 	return;
1437 
1438 err:
1439 	/* Close the underlying bdev on its same opened thread. */
1440 	spdk_bdev_close(comp_bdev->base_desc);
1441 	free(comp_bdev);
1442 	spdk_bdev_module_examine_done(&compress_if);
1443 }
1444 
1445 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1446  * used for initial metadata operations to claim where it will be further filled out
1447  * and added to the global list.
1448  */
1449 static void
1450 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1451 {
1452 	struct vbdev_compress *comp_bdev = cb_arg;
1453 
1454 	if (reduce_errno == 0) {
1455 		/* Update information following volume load. */
1456 		comp_bdev->vol = vol;
1457 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1458 		       sizeof(struct spdk_reduce_vol_params));
1459 		comp_bdev->comp_algo = comp_bdev->params.comp_algo;
1460 		comp_bdev->comp_level = comp_bdev->params.comp_level;
1461 	}
1462 
1463 	comp_bdev->reduce_errno = reduce_errno;
1464 
1465 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1466 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1467 	} else {
1468 		_vbdev_reduce_load_cb(comp_bdev);
1469 	}
1470 
1471 }
1472 
1473 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1474  * and if so will go ahead and claim it.
1475  */
1476 static void
1477 vbdev_compress_examine(struct spdk_bdev *bdev)
1478 {
1479 	struct spdk_bdev_desc *bdev_desc = NULL;
1480 	struct vbdev_compress *comp_bdev;
1481 	int rc;
1482 
1483 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1484 		spdk_bdev_module_examine_done(&compress_if);
1485 		return;
1486 	}
1487 
1488 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1489 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1490 	if (rc) {
1491 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1492 			    spdk_strerror(-rc));
1493 		spdk_bdev_module_examine_done(&compress_if);
1494 		return;
1495 	}
1496 
1497 	comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1);
1498 	if (comp_bdev == NULL) {
1499 		spdk_bdev_close(bdev_desc);
1500 		spdk_bdev_module_examine_done(&compress_if);
1501 		return;
1502 	}
1503 
1504 	/* Save the thread where the base device is opened */
1505 	comp_bdev->thread = spdk_get_thread();
1506 
1507 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1508 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1509 }
1510 
1511 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1512