xref: /spdk/module/bdev/compress/vbdev_compress.c (revision e0d7428b482257aa6999b8b4cc44159dcc292df9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 #define CHUNK_SIZE (1024 * 16)
25 #define COMP_BDEV_NAME "compress"
26 #define BACKING_IO_SZ (4 * 1024)
27 
28 /* This namespace UUID was generated using uuid_generate() method. */
29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
30 
31 struct vbdev_comp_delete_ctx {
32 	spdk_delete_compress_complete	cb_fn;
33 	void				*cb_arg;
34 	int				cb_rc;
35 	struct spdk_thread		*orig_thread;
36 };
37 
38 /* List of virtual bdevs and associated info for each. */
39 struct vbdev_compress {
40 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
41 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
42 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
43 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
44 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
45 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
46 	struct spdk_thread		*reduce_thread;
47 	pthread_mutex_t			reduce_lock;
48 	uint32_t			ch_count;
49 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
50 	struct spdk_poller		*poller;	/* completion poller */
51 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
52 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
53 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
54 	struct vbdev_comp_delete_ctx	*delete_ctx;
55 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
56 	int				reduce_errno;
57 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
58 	TAILQ_ENTRY(vbdev_compress)	link;
59 	struct spdk_thread		*thread;	/* thread where base device is opened */
60 	enum spdk_accel_comp_algo       comp_algo;      /* compression algorithm for compress bdev */
61 	uint32_t                        comp_level;     /* compression algorithm level */
62 	bool				init_failed;	/* compress bdev initialization failed */
63 };
64 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
65 
66 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
67  */
68 struct comp_io_channel {
69 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
70 };
71 
72 /* Per I/O context for the compression vbdev. */
73 struct comp_bdev_io {
74 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
75 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
76 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
77 	struct spdk_bdev_io		*orig_io;		/* the original IO */
78 	int				status;			/* save for completion on orig thread */
79 };
80 
81 static void vbdev_compress_examine(struct spdk_bdev *bdev);
82 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
83 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size,
84 		uint8_t comp_algo, uint32_t comp_level);
85 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
86 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
87 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
88 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
89 
90 /* for completing rw requests on the orig IO thread. */
91 static void
92 _reduce_rw_blocks_cb(void *arg)
93 {
94 	struct comp_bdev_io *io_ctx = arg;
95 
96 	if (spdk_likely(io_ctx->status == 0)) {
97 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
98 	} else if (io_ctx->status == -ENOMEM) {
99 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_NOMEM);
100 	} else {
101 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
102 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
103 	}
104 }
105 
106 /* Completion callback for r/w that were issued via reducelib. */
107 static void
108 reduce_rw_blocks_cb(void *arg, int reduce_errno)
109 {
110 	struct spdk_bdev_io *bdev_io = arg;
111 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
112 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
113 	struct spdk_thread *orig_thread;
114 
115 	/* TODO: need to decide which error codes are bdev_io success vs failure;
116 	 * example examine calls reading metadata */
117 
118 	io_ctx->status = reduce_errno;
119 
120 	/* Send this request to the orig IO thread. */
121 	orig_thread = spdk_io_channel_get_thread(ch);
122 
123 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
124 }
125 
126 static int
127 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
128 		    int src_iovcnt, struct iovec *dst_iovs,
129 		    int dst_iovcnt, bool compress, void *cb_arg)
130 {
131 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
132 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
133 					   backing_dev);
134 	int rc;
135 
136 	if (compress) {
137 		assert(dst_iovcnt == 1);
138 		rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base,
139 						    dst_iovs[0].iov_len, src_iovs, src_iovcnt,
140 						    comp_bdev->comp_algo, comp_bdev->comp_level,
141 						    &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
142 						    reduce_cb_arg->cb_arg);
143 	} else {
144 		rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
145 						      src_iovs, src_iovcnt, comp_bdev->comp_algo,
146 						      &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
147 						      reduce_cb_arg->cb_arg);
148 	}
149 
150 	return rc;
151 }
152 
153 /* Entry point for reduce lib to issue a compress operation. */
154 static void
155 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
156 		      struct iovec *src_iovs, int src_iovcnt,
157 		      struct iovec *dst_iovs, int dst_iovcnt,
158 		      struct spdk_reduce_vol_cb_args *cb_arg)
159 {
160 	int rc;
161 
162 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
163 	if (rc) {
164 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
165 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
166 	}
167 }
168 
169 /* Entry point for reduce lib to issue a decompress operation. */
170 static void
171 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
172 			struct iovec *src_iovs, int src_iovcnt,
173 			struct iovec *dst_iovs, int dst_iovcnt,
174 			struct spdk_reduce_vol_cb_args *cb_arg)
175 {
176 	int rc;
177 
178 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
179 	if (rc) {
180 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
181 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
182 	}
183 }
184 
185 static void
186 _comp_submit_write(void *ctx)
187 {
188 	struct spdk_bdev_io *bdev_io = ctx;
189 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
190 					   comp_bdev);
191 
192 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
193 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
194 			       reduce_rw_blocks_cb, bdev_io);
195 }
196 
197 static void
198 _comp_submit_read(void *ctx)
199 {
200 	struct spdk_bdev_io *bdev_io = ctx;
201 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
202 					   comp_bdev);
203 
204 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
205 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
206 			      reduce_rw_blocks_cb, bdev_io);
207 }
208 
209 
210 /* Callback for getting a buf from the bdev pool in the event that the caller passed
211  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
212  * beneath us before we're done with it.
213  */
214 static void
215 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
216 {
217 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
218 					   comp_bdev);
219 
220 	if (spdk_unlikely(!success)) {
221 		SPDK_ERRLOG("Failed to get data buffer\n");
222 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
223 		return;
224 	}
225 
226 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
227 }
228 
229 struct partial_chunk_info {
230 	uint64_t chunk_idx;
231 	uint64_t block_offset;
232 	uint64_t block_length;
233 };
234 
235 /*
236  * It's a structure used to hold information needed during the execution of an unmap operation.
237  */
238 struct compress_unmap_split_ctx {
239 	struct spdk_bdev_io *bdev_io;
240 	int32_t status;
241 	uint32_t logical_blocks_per_chunk;
242 	/* The first chunk that can be fully covered by the unmap bdevio interval */
243 	uint64_t full_chunk_idx_b;
244 	/* The last chunk that can be fully covered by the unmap bdevio interval */
245 	uint64_t full_chunk_idx_e;
246 	uint64_t num_full_chunks;
247 	uint64_t num_full_chunks_consumed;
248 	uint32_t num_partial_chunks;
249 	uint32_t num_partial_chunks_consumed;
250 	/* Used to hold the partial chunk information. There will only be less than or equal to two,
251 	because chunks that cannot be fully covered will only appear at the beginning or end or both two. */
252 	struct partial_chunk_info partial_chunk_info[2];
253 };
254 
255 static void _comp_unmap_subcmd_done_cb(void *ctx, int error);
256 
257 /*
258  * This function processes the unmap operation for both full and partial chunks in a
259  * compressed block device. It iteratively submits unmap requests until all the chunks
260  * have been unmapped or an error occurs.
261  */
262 static void
263 _comp_submit_unmap_split(void *ctx)
264 {
265 	struct compress_unmap_split_ctx *split_ctx = ctx;
266 	struct spdk_bdev_io *bdev_io = split_ctx->bdev_io;
267 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
268 					   comp_bdev);
269 	struct partial_chunk_info *partial_chunk = NULL;
270 	uint64_t chunk_idx = 0;
271 	uint64_t block_offset = 0;
272 	uint64_t block_length = 0;
273 
274 	if (split_ctx->status != 0 ||
275 	    (split_ctx->num_full_chunks_consumed == split_ctx->num_full_chunks &&
276 	     split_ctx->num_partial_chunks_consumed == split_ctx->num_partial_chunks)) {
277 		reduce_rw_blocks_cb(bdev_io, split_ctx->status);
278 		free(split_ctx);
279 		return;
280 	}
281 
282 	if (split_ctx->num_full_chunks_consumed < split_ctx->num_full_chunks) {
283 		chunk_idx = split_ctx->full_chunk_idx_b + split_ctx->num_full_chunks_consumed;
284 		block_offset = chunk_idx * split_ctx->logical_blocks_per_chunk;
285 		block_length = split_ctx->logical_blocks_per_chunk;
286 
287 		split_ctx->num_full_chunks_consumed++;
288 		spdk_reduce_vol_unmap(comp_bdev->vol,
289 				      block_offset, block_length,
290 				      _comp_unmap_subcmd_done_cb, split_ctx);
291 	} else if (split_ctx->num_partial_chunks_consumed < split_ctx->num_partial_chunks) {
292 		partial_chunk = &split_ctx->partial_chunk_info[split_ctx->num_partial_chunks_consumed];
293 		block_offset = partial_chunk->chunk_idx * split_ctx->logical_blocks_per_chunk +
294 			       partial_chunk->block_offset;
295 		block_length = partial_chunk->block_length;
296 
297 		split_ctx->num_partial_chunks_consumed++;
298 		spdk_reduce_vol_unmap(comp_bdev->vol,
299 				      block_offset, block_length,
300 				      _comp_unmap_subcmd_done_cb, split_ctx);
301 	} else {
302 		assert(false);
303 	}
304 }
305 
306 /*
307  * When mkfs or fstrim, large unmap requests may be generated.
308  * Large request will be split into multiple subcmds and processed recursively.
309  * Run too many subcmds recursively may cause stack overflow or monopolize the thread,
310  * delaying other tasks. To avoid this, next subcmd need to be processed asynchronously
311  * by 'spdk_thread_send_msg'.
312  */
313 static void
314 _comp_unmap_subcmd_done_cb(void *ctx, int error)
315 {
316 	struct compress_unmap_split_ctx *split_ctx = ctx;
317 
318 	split_ctx->status = error;
319 	spdk_thread_send_msg(spdk_get_thread(), _comp_submit_unmap_split, split_ctx);
320 }
321 
322 /*
323  * This function splits the unmap operation into full and partial chunks based on the
324  * block range specified in the 'spdk_bdev_io' structure. It calculates the start and end
325  * chunks, as well as any partial chunks at the beginning or end of the range, and prepares
326  * a context (compress_unmap_split_ctx) to handle these chunks. The unmap operation is
327  * then submitted for processing through '_comp_submit_unmap_split'.
328  * some cases to handle:
329  * 1. start and end chunks are different
330  * 1.1 start and end chunks are full
331  * 1.2 start and end chunks are partial
332  * 1.3 start or  end chunk  is full and the other is partial
333  * 2. start and end chunks are the same
334  * 2.1 full
335  * 2.2 partial
336  */
337 static void
338 _comp_submit_unmap(void *ctx)
339 {
340 	struct spdk_bdev_io *bdev_io = ctx;
341 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
342 					   comp_bdev);
343 	const struct spdk_reduce_vol_params *vol_params = spdk_reduce_vol_get_params(comp_bdev->vol);
344 	struct compress_unmap_split_ctx *split_ctx;
345 	struct partial_chunk_info *partial_chunk;
346 	uint32_t logical_blocks_per_chunk;
347 	uint64_t start_chunk, end_chunk, start_offset, end_tail;
348 
349 	logical_blocks_per_chunk = vol_params->chunk_size / vol_params->logical_block_size;
350 	start_chunk = bdev_io->u.bdev.offset_blocks / logical_blocks_per_chunk;
351 	end_chunk = (bdev_io->u.bdev.offset_blocks + bdev_io->u.bdev.num_blocks - 1) /
352 		    logical_blocks_per_chunk;
353 	start_offset = bdev_io->u.bdev.offset_blocks % logical_blocks_per_chunk;
354 	end_tail = (bdev_io->u.bdev.offset_blocks + bdev_io->u.bdev.num_blocks) %
355 		   logical_blocks_per_chunk;
356 
357 	split_ctx = calloc(1, sizeof(struct compress_unmap_split_ctx));
358 	if (split_ctx == NULL) {
359 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
360 		return;
361 	}
362 	partial_chunk = split_ctx->partial_chunk_info;
363 	split_ctx->bdev_io = bdev_io;
364 	split_ctx->logical_blocks_per_chunk = logical_blocks_per_chunk;
365 
366 	if (start_chunk < end_chunk) {
367 		if (start_offset != 0) {
368 			partial_chunk[split_ctx->num_partial_chunks].chunk_idx = start_chunk;
369 			partial_chunk[split_ctx->num_partial_chunks].block_offset = start_offset;
370 			partial_chunk[split_ctx->num_partial_chunks].block_length = logical_blocks_per_chunk
371 					- start_offset;
372 			split_ctx->num_partial_chunks++;
373 			split_ctx->full_chunk_idx_b = start_chunk + 1;
374 		} else {
375 			split_ctx->full_chunk_idx_b = start_chunk;
376 		}
377 
378 		if (end_tail != 0) {
379 			partial_chunk[split_ctx->num_partial_chunks].chunk_idx = end_chunk;
380 			partial_chunk[split_ctx->num_partial_chunks].block_offset = 0;
381 			partial_chunk[split_ctx->num_partial_chunks].block_length = end_tail;
382 			split_ctx->num_partial_chunks++;
383 			split_ctx->full_chunk_idx_e = end_chunk - 1;
384 		} else {
385 			split_ctx->full_chunk_idx_e = end_chunk;
386 		}
387 
388 		split_ctx->num_full_chunks = end_chunk - start_chunk + 1 - split_ctx->num_partial_chunks;
389 
390 		if (split_ctx->num_full_chunks) {
391 			assert(split_ctx->full_chunk_idx_b != UINT64_MAX && split_ctx->full_chunk_idx_e != UINT64_MAX);
392 			assert(split_ctx->full_chunk_idx_e - split_ctx->full_chunk_idx_b + 1 == split_ctx->num_full_chunks);
393 		} else {
394 			assert(split_ctx->full_chunk_idx_b - split_ctx->full_chunk_idx_e == 1);
395 		}
396 	} else if (start_offset != 0 || end_tail != 0) {
397 		partial_chunk[0].chunk_idx = start_chunk;
398 		partial_chunk[0].block_offset = start_offset;
399 		partial_chunk[0].block_length =
400 			bdev_io->u.bdev.num_blocks;
401 		split_ctx->num_partial_chunks = 1;
402 	} else {
403 		split_ctx->full_chunk_idx_b = start_chunk;
404 		split_ctx->full_chunk_idx_e = end_chunk;
405 		split_ctx->num_full_chunks = 1;
406 	}
407 	assert(split_ctx->num_partial_chunks <= SPDK_COUNTOF(split_ctx->partial_chunk_info));
408 
409 	_comp_submit_unmap_split(split_ctx);
410 }
411 
412 /* Called when someone above submits IO to this vbdev. */
413 static void
414 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
415 {
416 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
417 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
418 					   comp_bdev);
419 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
420 
421 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
422 	io_ctx->comp_bdev = comp_bdev;
423 	io_ctx->comp_ch = comp_ch;
424 	io_ctx->orig_io = bdev_io;
425 
426 	switch (bdev_io->type) {
427 	case SPDK_BDEV_IO_TYPE_READ:
428 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
429 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
430 		return;
431 	case SPDK_BDEV_IO_TYPE_WRITE:
432 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
433 		return;
434 	case SPDK_BDEV_IO_TYPE_UNMAP:
435 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_unmap, bdev_io);
436 		return;
437 	/* TODO support RESET in future patch in the series */
438 	case SPDK_BDEV_IO_TYPE_RESET:
439 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
440 	case SPDK_BDEV_IO_TYPE_FLUSH:
441 	default:
442 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
443 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
444 		break;
445 	}
446 }
447 
448 static bool
449 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
450 {
451 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
452 
453 	switch (io_type) {
454 	case SPDK_BDEV_IO_TYPE_READ:
455 	case SPDK_BDEV_IO_TYPE_WRITE:
456 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
457 	case SPDK_BDEV_IO_TYPE_UNMAP:
458 		return true;
459 	case SPDK_BDEV_IO_TYPE_RESET:
460 	case SPDK_BDEV_IO_TYPE_FLUSH:
461 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
462 	default:
463 		return false;
464 	}
465 }
466 
467 /* Callback for unregistering the IO device. */
468 static void
469 _device_unregister_cb(void *io_device)
470 {
471 	struct vbdev_compress *comp_bdev = io_device;
472 
473 	/* Done with this comp_bdev. */
474 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
475 	free(comp_bdev->comp_bdev.name);
476 	free(comp_bdev);
477 }
478 
479 static void
480 _vbdev_compress_destruct_cb(void *ctx)
481 {
482 	struct vbdev_compress *comp_bdev = ctx;
483 
484 	/* Close the underlying bdev on its same opened thread. */
485 	spdk_bdev_close(comp_bdev->base_desc);
486 	comp_bdev->vol = NULL;
487 	if (comp_bdev->init_failed) {
488 		free(comp_bdev);
489 		return;
490 	}
491 
492 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
493 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
494 
495 	if (comp_bdev->orphaned == false) {
496 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
497 	} else {
498 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
499 		_device_unregister_cb(comp_bdev);
500 	}
501 }
502 
503 static void
504 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
505 {
506 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
507 
508 	if (reduce_errno) {
509 		SPDK_ERRLOG("number %d\n", reduce_errno);
510 	} else {
511 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
512 			spdk_thread_send_msg(comp_bdev->thread,
513 					     _vbdev_compress_destruct_cb, comp_bdev);
514 		} else {
515 			_vbdev_compress_destruct_cb(comp_bdev);
516 		}
517 	}
518 }
519 
520 static void
521 _reduce_destroy_cb(void *ctx, int reduce_errno)
522 {
523 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
524 
525 	if (reduce_errno) {
526 		SPDK_ERRLOG("number %d\n", reduce_errno);
527 	}
528 
529 	comp_bdev->vol = NULL;
530 	spdk_put_io_channel(comp_bdev->base_ch);
531 	if (comp_bdev->init_failed || comp_bdev->orphaned) {
532 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
533 	} else {
534 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
535 				     comp_bdev->delete_ctx);
536 	}
537 
538 }
539 
540 static void
541 _delete_vol_unload_cb(void *ctx)
542 {
543 	struct vbdev_compress *comp_bdev = ctx;
544 
545 	/* FIXME: Assert if these conditions are not satisfied for now. */
546 	assert(!comp_bdev->reduce_thread ||
547 	       comp_bdev->reduce_thread == spdk_get_thread());
548 
549 	/* reducelib needs a channel to comm with the backing device */
550 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
551 
552 	/* Clean the device before we free our resources. */
553 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
554 }
555 
556 /* Called by reduceLib after performing unload vol actions */
557 static void
558 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
559 {
560 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
561 
562 	if (reduce_errno) {
563 		SPDK_ERRLOG("Failed to unload vol, error %s\n", spdk_strerror(-reduce_errno));
564 		vbdev_compress_delete_done(comp_bdev->delete_ctx, reduce_errno);
565 		return;
566 	}
567 
568 	pthread_mutex_lock(&comp_bdev->reduce_lock);
569 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
570 		spdk_thread_send_msg(comp_bdev->reduce_thread,
571 				     _delete_vol_unload_cb, comp_bdev);
572 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
573 	} else {
574 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
575 
576 		_delete_vol_unload_cb(comp_bdev);
577 	}
578 }
579 
580 const char *
581 compress_get_name(const struct vbdev_compress *comp_bdev)
582 {
583 	return comp_bdev->comp_bdev.name;
584 }
585 
586 struct vbdev_compress *
587 compress_bdev_first(void)
588 {
589 	struct vbdev_compress *comp_bdev;
590 
591 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
592 
593 	return comp_bdev;
594 }
595 
596 struct vbdev_compress *
597 compress_bdev_next(struct vbdev_compress *prev)
598 {
599 	struct vbdev_compress *comp_bdev;
600 
601 	comp_bdev = TAILQ_NEXT(prev, link);
602 
603 	return comp_bdev;
604 }
605 
606 bool
607 compress_has_orphan(const char *name)
608 {
609 	struct vbdev_compress *comp_bdev;
610 
611 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
612 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
613 			return true;
614 		}
615 	}
616 	return false;
617 }
618 
619 /* Called after we've unregistered following a hot remove callback.
620  * Our finish entry point will be called next.
621  */
622 static int
623 vbdev_compress_destruct(void *ctx)
624 {
625 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
626 
627 	if (comp_bdev->vol != NULL) {
628 		/* Tell reducelib that we're done with this volume. */
629 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
630 	} else {
631 		vbdev_compress_destruct_cb(comp_bdev, 0);
632 	}
633 
634 	return 0;
635 }
636 
637 /* We supplied this as an entry point for upper layers who want to communicate to this
638  * bdev.  This is how they get a channel.
639  */
640 static struct spdk_io_channel *
641 vbdev_compress_get_io_channel(void *ctx)
642 {
643 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
644 
645 	/* The IO channel code will allocate a channel for us which consists of
646 	 * the SPDK channel structure plus the size of our comp_io_channel struct
647 	 * that we passed in when we registered our IO device. It will then call
648 	 * our channel create callback to populate any elements that we need to
649 	 * update.
650 	 */
651 	return spdk_get_io_channel(comp_bdev);
652 }
653 
654 /* This is the output for bdev_get_bdevs() for this vbdev */
655 static int
656 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
657 {
658 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
659 	const struct spdk_reduce_vol_info *vol_info;
660 	char *comp_algo = NULL;
661 
662 	if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_LZ4) {
663 		comp_algo = "lz4";
664 	} else if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_DEFLATE) {
665 		comp_algo = "deflate";
666 	} else {
667 		assert(false);
668 	}
669 
670 	spdk_json_write_name(w, "compress");
671 	spdk_json_write_object_begin(w);
672 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
673 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
674 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
675 	spdk_json_write_named_string(w, "comp_algo", comp_algo);
676 	spdk_json_write_named_uint32(w, "comp_level", comp_bdev->params.comp_level);
677 	spdk_json_write_named_uint32(w, "chunk_size", comp_bdev->params.chunk_size);
678 	spdk_json_write_named_uint32(w, "backing_io_unit_size", comp_bdev->params.backing_io_unit_size);
679 	vol_info = spdk_reduce_vol_get_info(comp_bdev->vol);
680 	spdk_json_write_named_uint64(w, "allocated_io_units", vol_info->allocated_io_units);
681 	spdk_json_write_object_end(w);
682 
683 	return 0;
684 }
685 
686 static int
687 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
688 {
689 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
690 	return 0;
691 }
692 
693 struct vbdev_init_reduce_ctx {
694 	struct vbdev_compress   *comp_bdev;
695 	int                     status;
696 	bdev_compress_create_cb cb_fn;
697 	void                    *cb_ctx;
698 };
699 
700 static void
701 _cleanup_vol_unload_cb(void *ctx)
702 {
703 	struct vbdev_compress *comp_bdev = ctx;
704 
705 	assert(!comp_bdev->reduce_thread ||
706 	       comp_bdev->reduce_thread == spdk_get_thread());
707 
708 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
709 
710 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
711 }
712 
713 static void
714 init_vol_unload_cb(void *ctx, int reduce_errno)
715 {
716 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
717 
718 	if (reduce_errno) {
719 		SPDK_ERRLOG("Failed to unload vol, error %s\n", spdk_strerror(-reduce_errno));
720 	}
721 
722 	pthread_mutex_lock(&comp_bdev->reduce_lock);
723 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
724 		spdk_thread_send_msg(comp_bdev->reduce_thread,
725 				     _cleanup_vol_unload_cb, comp_bdev);
726 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
727 	} else {
728 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
729 
730 		_cleanup_vol_unload_cb(comp_bdev);
731 	}
732 }
733 
734 static void
735 _vbdev_reduce_init_cb(void *ctx)
736 {
737 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
738 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
739 	int rc = init_ctx->status;
740 
741 	assert(comp_bdev->base_desc != NULL);
742 
743 	/* We're done with metadata operations */
744 	spdk_put_io_channel(comp_bdev->base_ch);
745 
746 	if (rc != 0) {
747 		goto err;
748 	}
749 
750 	assert(comp_bdev->vol != NULL);
751 
752 	rc = vbdev_compress_claim(comp_bdev);
753 	if (rc != 0) {
754 		comp_bdev->init_failed = true;
755 		spdk_reduce_vol_unload(comp_bdev->vol, init_vol_unload_cb, comp_bdev);
756 	}
757 
758 	init_ctx->cb_fn(init_ctx->cb_ctx, rc);
759 	free(init_ctx);
760 	return;
761 
762 err:
763 	init_ctx->cb_fn(init_ctx->cb_ctx, rc);
764 	/* Close the underlying bdev on its same opened thread. */
765 	spdk_bdev_close(comp_bdev->base_desc);
766 	free(comp_bdev);
767 	free(init_ctx);
768 }
769 
770 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
771  * used for initial metadata operations to claim where it will be further filled out
772  * and added to the global list.
773  */
774 static void
775 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
776 {
777 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
778 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
779 
780 	if (reduce_errno == 0) {
781 		comp_bdev->vol = vol;
782 	} else {
783 		SPDK_ERRLOG("for vol %s, error %s\n",
784 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
785 	}
786 
787 	init_ctx->status = reduce_errno;
788 
789 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
790 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
791 	} else {
792 		_vbdev_reduce_init_cb(init_ctx);
793 	}
794 }
795 
796 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
797  * call the callback provided by reduceLib when it called the read/write/unmap function and
798  * free the bdev_io.
799  */
800 static void
801 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
802 {
803 	struct spdk_reduce_vol_cb_args *cb_args = arg;
804 	int reduce_errno;
805 
806 	if (success) {
807 		reduce_errno = 0;
808 	} else {
809 		reduce_errno = -EIO;
810 	}
811 	spdk_bdev_free_io(bdev_io);
812 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
813 }
814 
815 static void
816 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
817 				 struct spdk_reduce_backing_io *backing_io)
818 {
819 	struct spdk_bdev_io_wait_entry *waitq_entry;
820 	int rc;
821 
822 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
823 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
824 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
825 	waitq_entry->cb_arg = backing_io;
826 
827 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
828 	if (rc) {
829 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
830 		assert(false);
831 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
832 	}
833 }
834 
835 static void
836 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
837 {
838 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
839 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
840 					   backing_dev);
841 	int rc;
842 
843 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
844 				    backing_io->iov, backing_io->iovcnt,
845 				    backing_io->lba, backing_io->lba_count,
846 				    comp_reduce_io_cb,
847 				    backing_cb_args);
848 
849 	if (rc) {
850 		if (rc == -ENOMEM) {
851 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
852 			return;
853 		} else {
854 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
855 		}
856 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
857 	}
858 }
859 
860 static void
861 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
862 {
863 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
864 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
865 					   backing_dev);
866 	int rc;
867 
868 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
869 				     backing_io->iov, backing_io->iovcnt,
870 				     backing_io->lba, backing_io->lba_count,
871 				     comp_reduce_io_cb,
872 				     backing_cb_args);
873 
874 	if (rc) {
875 		if (rc == -ENOMEM) {
876 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
877 			return;
878 		} else {
879 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
880 		}
881 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
882 	}
883 }
884 
885 static void
886 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
887 {
888 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
889 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
890 					   backing_dev);
891 	int rc;
892 
893 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
894 				    backing_io->lba, backing_io->lba_count,
895 				    comp_reduce_io_cb,
896 				    backing_cb_args);
897 
898 	if (rc) {
899 		if (rc == -ENOMEM) {
900 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
901 			return;
902 		} else {
903 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
904 		}
905 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
906 	}
907 }
908 
909 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
910  * directly to the backing device.
911  */
912 static void
913 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
914 {
915 	switch (backing_io->backing_io_type) {
916 	case SPDK_REDUCE_BACKING_IO_WRITE:
917 		_comp_backing_bdev_write(backing_io);
918 		break;
919 	case SPDK_REDUCE_BACKING_IO_READ:
920 		_comp_backing_bdev_read(backing_io);
921 		break;
922 	case SPDK_REDUCE_BACKING_IO_UNMAP:
923 		_comp_backing_bdev_unmap(backing_io);
924 		break;
925 	default:
926 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
927 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
928 		break;
929 	}
930 }
931 
932 static void
933 _comp_reduce_resubmit_backing_io(void *_backing_io)
934 {
935 	struct spdk_reduce_backing_io *backing_io = _backing_io;
936 
937 	_comp_reduce_submit_backing_io(backing_io);
938 }
939 
940 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
941 static void
942 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
943 {
944 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
945 
946 	if (reduce_errno) {
947 		SPDK_ERRLOG("number %d\n", reduce_errno);
948 	}
949 
950 	comp_bdev->vol = NULL;
951 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
952 }
953 
954 static void
955 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
956 {
957 	struct vbdev_compress *comp_bdev, *tmp;
958 
959 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
960 		if (bdev_find == comp_bdev->base_bdev) {
961 			/* Tell reduceLib that we're done with this volume. */
962 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
963 		}
964 	}
965 }
966 
967 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
968 static void
969 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
970 				  void *event_ctx)
971 {
972 	switch (type) {
973 	case SPDK_BDEV_EVENT_REMOVE:
974 		vbdev_compress_base_bdev_hotremove_cb(bdev);
975 		break;
976 	default:
977 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
978 		break;
979 	}
980 }
981 
982 /* TODO: determine which parms we want user configurable, HC for now
983  * params.vol_size
984  * params.chunk_size
985  * compression PMD, algorithm, window size, comp level, etc.
986  * DEV_MD_PATH
987  */
988 
989 /* Common function for init and load to allocate and populate the minimal
990  * information for reducelib to init or load.
991  */
992 struct vbdev_compress *
993 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo,
994 		       uint32_t comp_level)
995 {
996 	struct vbdev_compress *comp_bdev;
997 	struct spdk_bdev *bdev;
998 
999 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
1000 	if (comp_bdev == NULL) {
1001 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
1002 		return NULL;
1003 	}
1004 
1005 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
1006 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
1007 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
1008 
1009 	comp_bdev->base_desc = bdev_desc;
1010 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
1011 	comp_bdev->base_bdev = bdev;
1012 
1013 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
1014 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
1015 
1016 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
1017 
1018 	comp_bdev->comp_algo = comp_algo;
1019 	comp_bdev->comp_level = comp_level;
1020 	comp_bdev->params.comp_algo = comp_algo;
1021 	comp_bdev->params.comp_level = comp_level;
1022 	comp_bdev->params.chunk_size = CHUNK_SIZE;
1023 	if (lb_size == 0) {
1024 		comp_bdev->params.logical_block_size = bdev->blocklen;
1025 	} else {
1026 		comp_bdev->params.logical_block_size = lb_size;
1027 	}
1028 
1029 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
1030 	return comp_bdev;
1031 }
1032 
1033 /* Call reducelib to initialize a new volume */
1034 static int
1035 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo,
1036 		  uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg)
1037 {
1038 	struct spdk_bdev_desc *bdev_desc = NULL;
1039 	struct vbdev_init_reduce_ctx *init_ctx;
1040 	struct vbdev_compress *comp_bdev;
1041 	int rc;
1042 
1043 	init_ctx = calloc(1, sizeof(*init_ctx));
1044 	if (init_ctx == NULL) {
1045 		SPDK_ERRLOG("failed to alloc init contexts\n");
1046 		return - ENOMEM;
1047 	}
1048 
1049 	init_ctx->cb_fn = cb_fn;
1050 	init_ctx->cb_ctx = cb_arg;
1051 
1052 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1053 				NULL, &bdev_desc);
1054 	if (rc) {
1055 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
1056 		free(init_ctx);
1057 		return rc;
1058 	}
1059 
1060 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level);
1061 	if (comp_bdev == NULL) {
1062 		free(init_ctx);
1063 		spdk_bdev_close(bdev_desc);
1064 		return -EINVAL;
1065 	}
1066 
1067 	init_ctx->comp_bdev = comp_bdev;
1068 
1069 	/* Save the thread where the base device is opened */
1070 	comp_bdev->thread = spdk_get_thread();
1071 
1072 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1073 
1074 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
1075 			     pm_path,
1076 			     vbdev_reduce_init_cb,
1077 			     init_ctx);
1078 	return 0;
1079 }
1080 
1081 /* We provide this callback for the SPDK channel code to create a channel using
1082  * the channel struct we provided in our module get_io_channel() entry point. Here
1083  * we get and save off an underlying base channel of the device below us so that
1084  * we can communicate with the base bdev on a per channel basis.  If we needed
1085  * our own poller for this vbdev, we'd register it here.
1086  */
1087 static int
1088 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1089 {
1090 	struct vbdev_compress *comp_bdev = io_device;
1091 
1092 	/* Now set the reduce channel if it's not already set. */
1093 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1094 	if (comp_bdev->ch_count == 0) {
1095 		/* We use this queue to track outstanding IO in our layer. */
1096 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1097 
1098 		/* We use this to queue up compression operations as needed. */
1099 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1100 
1101 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1102 		comp_bdev->reduce_thread = spdk_get_thread();
1103 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
1104 	}
1105 	comp_bdev->ch_count++;
1106 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1107 
1108 	return 0;
1109 }
1110 
1111 static void
1112 _channel_cleanup(struct vbdev_compress *comp_bdev)
1113 {
1114 	spdk_put_io_channel(comp_bdev->base_ch);
1115 	spdk_put_io_channel(comp_bdev->accel_channel);
1116 	comp_bdev->reduce_thread = NULL;
1117 }
1118 
1119 /* Used to reroute destroy_ch to the correct thread */
1120 static void
1121 _comp_bdev_ch_destroy_cb(void *arg)
1122 {
1123 	struct vbdev_compress *comp_bdev = arg;
1124 
1125 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1126 	_channel_cleanup(comp_bdev);
1127 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1128 }
1129 
1130 /* We provide this callback for the SPDK channel code to destroy a channel
1131  * created with our create callback. We just need to undo anything we did
1132  * when we created. If this bdev used its own poller, we'd unregister it here.
1133  */
1134 static void
1135 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1136 {
1137 	struct vbdev_compress *comp_bdev = io_device;
1138 
1139 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1140 	comp_bdev->ch_count--;
1141 	if (comp_bdev->ch_count == 0) {
1142 		/* Send this request to the thread where the channel was created. */
1143 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1144 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1145 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1146 		} else {
1147 			_channel_cleanup(comp_bdev);
1148 		}
1149 	}
1150 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1151 }
1152 
1153 static int
1154 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level)
1155 {
1156 	uint32_t min_level, max_level;
1157 	int rc;
1158 
1159 	rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level);
1160 	if (rc != 0) {
1161 		return rc;
1162 	}
1163 
1164 	/* If both min_level and max_level are 0, the compression level can be ignored.
1165 	 * The back-end implementation hardcodes the compression level.
1166 	 */
1167 	if (min_level == 0 && max_level == 0) {
1168 		return 0;
1169 	}
1170 
1171 	if (comp_level > max_level || comp_level < min_level) {
1172 		return -EINVAL;
1173 	}
1174 
1175 	return 0;
1176 }
1177 
1178 /* RPC entry point for compression vbdev creation. */
1179 int
1180 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
1181 		     uint8_t comp_algo, uint32_t comp_level,
1182 		     bdev_compress_create_cb cb_fn, void *cb_arg)
1183 {
1184 	struct vbdev_compress *comp_bdev = NULL;
1185 	struct stat info;
1186 	int rc;
1187 
1188 	if (stat(pm_path, &info) != 0) {
1189 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
1190 		return -EINVAL;
1191 	} else if (!S_ISDIR(info.st_mode)) {
1192 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
1193 		return -EINVAL;
1194 	}
1195 
1196 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1197 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1198 		return -EINVAL;
1199 	}
1200 
1201 	rc = _check_compress_bdev_comp_algo(comp_algo, comp_level);
1202 	if (rc != 0) {
1203 		SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n",
1204 			    comp_algo, comp_level);
1205 		return rc;
1206 	}
1207 
1208 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1209 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
1210 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
1211 			return -EBUSY;
1212 		}
1213 	}
1214 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg);
1215 }
1216 
1217 static int
1218 vbdev_compress_init(void)
1219 {
1220 	return 0;
1221 }
1222 
1223 /* Called when the entire module is being torn down. */
1224 static void
1225 vbdev_compress_finish(void)
1226 {
1227 	/* TODO: unload vol in a future patch */
1228 }
1229 
1230 /* During init we'll be asked how much memory we'd like passed to us
1231  * in bev_io structures as context. Here's where we specify how
1232  * much context we want per IO.
1233  */
1234 static int
1235 vbdev_compress_get_ctx_size(void)
1236 {
1237 	return sizeof(struct comp_bdev_io);
1238 }
1239 
1240 /* When we register our bdev this is how we specify our entry points. */
1241 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1242 	.destruct		= vbdev_compress_destruct,
1243 	.submit_request		= vbdev_compress_submit_request,
1244 	.io_type_supported	= vbdev_compress_io_type_supported,
1245 	.get_io_channel		= vbdev_compress_get_io_channel,
1246 	.dump_info_json		= vbdev_compress_dump_info_json,
1247 	.write_config_json	= NULL,
1248 };
1249 
1250 static struct spdk_bdev_module compress_if = {
1251 	.name = "compress",
1252 	.module_init = vbdev_compress_init,
1253 	.get_ctx_size = vbdev_compress_get_ctx_size,
1254 	.examine_disk = vbdev_compress_examine,
1255 	.module_fini = vbdev_compress_finish,
1256 	.config_json = vbdev_compress_config_json
1257 };
1258 
1259 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1260 
1261 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1262 {
1263 	struct spdk_bdev_alias *aliases;
1264 
1265 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1266 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1267 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1268 		if (!comp_bdev->comp_bdev.name) {
1269 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1270 			return -ENOMEM;
1271 		}
1272 	} else {
1273 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1274 		if (!comp_bdev->comp_bdev.name) {
1275 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1276 			return -ENOMEM;
1277 		}
1278 	}
1279 	return 0;
1280 }
1281 
1282 static int
1283 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1284 {
1285 	struct spdk_uuid ns_uuid;
1286 	int rc;
1287 
1288 	if (_set_compbdev_name(comp_bdev)) {
1289 		return -EINVAL;
1290 	}
1291 
1292 	/* Note: some of the fields below will change in the future - for example,
1293 	 * blockcnt specifically will not match (the compressed volume size will
1294 	 * be slightly less than the base bdev size)
1295 	 */
1296 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1297 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1298 
1299 	comp_bdev->comp_bdev.optimal_io_boundary =
1300 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1301 
1302 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1303 
1304 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1305 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1306 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1307 
1308 	/* This is the context that is passed to us when the bdev
1309 	 * layer calls in so we'll save our comp_bdev node here.
1310 	 */
1311 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1312 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1313 	comp_bdev->comp_bdev.module = &compress_if;
1314 
1315 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1316 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1317 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1318 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1319 	if (rc) {
1320 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1321 		return -EINVAL;
1322 	}
1323 
1324 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1325 
1326 	/* Save the thread where the base device is opened */
1327 	comp_bdev->thread = spdk_get_thread();
1328 
1329 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1330 				sizeof(struct comp_io_channel),
1331 				comp_bdev->comp_bdev.name);
1332 
1333 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1334 					 comp_bdev->comp_bdev.module);
1335 	if (rc) {
1336 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1337 			    spdk_strerror(-rc));
1338 		goto error_claim;
1339 	}
1340 
1341 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1342 	if (rc < 0) {
1343 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1344 		goto error_bdev_register;
1345 	}
1346 
1347 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1348 
1349 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1350 
1351 	return 0;
1352 
1353 	/* Error cleanup paths. */
1354 error_bdev_register:
1355 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1356 error_claim:
1357 	spdk_io_device_unregister(comp_bdev, NULL);
1358 	free(comp_bdev->comp_bdev.name);
1359 	return rc;
1360 }
1361 
1362 static void
1363 _vbdev_compress_delete_done(void *_ctx)
1364 {
1365 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1366 
1367 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1368 
1369 	free(ctx);
1370 }
1371 
1372 static void
1373 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1374 {
1375 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1376 
1377 	ctx->cb_rc = bdeverrno;
1378 
1379 	if (ctx->orig_thread != spdk_get_thread()) {
1380 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1381 	} else {
1382 		_vbdev_compress_delete_done(ctx);
1383 	}
1384 }
1385 
1386 void
1387 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1388 {
1389 	struct vbdev_compress *comp_bdev = NULL;
1390 	struct vbdev_comp_delete_ctx *ctx;
1391 
1392 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1393 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1394 			break;
1395 		}
1396 	}
1397 
1398 	if (comp_bdev == NULL) {
1399 		cb_fn(cb_arg, -ENODEV);
1400 		return;
1401 	}
1402 
1403 	ctx = calloc(1, sizeof(*ctx));
1404 	if (ctx == NULL) {
1405 		SPDK_ERRLOG("Failed to allocate delete context\n");
1406 		cb_fn(cb_arg, -ENOMEM);
1407 		return;
1408 	}
1409 
1410 	/* Save these for after the vol is destroyed. */
1411 	ctx->cb_fn = cb_fn;
1412 	ctx->cb_arg = cb_arg;
1413 	ctx->orig_thread = spdk_get_thread();
1414 
1415 	comp_bdev->delete_ctx = ctx;
1416 
1417 	/* Tell reducelib that we're done with this volume. */
1418 	if (comp_bdev->orphaned == false) {
1419 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1420 	} else {
1421 		delete_vol_unload_cb(comp_bdev, 0);
1422 	}
1423 }
1424 
1425 static void
1426 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1427 {
1428 }
1429 
1430 static void
1431 _vbdev_reduce_load_cb(void *ctx)
1432 {
1433 	struct vbdev_compress *comp_bdev = ctx;
1434 	int rc;
1435 
1436 	assert(comp_bdev->base_desc != NULL);
1437 
1438 	/* Done with metadata operations */
1439 	spdk_put_io_channel(comp_bdev->base_ch);
1440 
1441 	if (comp_bdev->reduce_errno == 0) {
1442 		rc = vbdev_compress_claim(comp_bdev);
1443 		if (rc != 0) {
1444 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1445 			goto err;
1446 		}
1447 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1448 		if (_set_compbdev_name(comp_bdev)) {
1449 			goto err;
1450 		}
1451 
1452 		/* Save the thread where the base device is opened */
1453 		comp_bdev->thread = spdk_get_thread();
1454 
1455 		comp_bdev->comp_bdev.module = &compress_if;
1456 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1457 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1458 						 comp_bdev->comp_bdev.module);
1459 		if (rc) {
1460 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1461 				    spdk_strerror(-rc));
1462 			free(comp_bdev->comp_bdev.name);
1463 			goto err;
1464 		}
1465 
1466 		comp_bdev->orphaned = true;
1467 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1468 	} else {
1469 		if (comp_bdev->reduce_errno != -EILSEQ) {
1470 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1471 				    spdk_strerror(-comp_bdev->reduce_errno));
1472 		}
1473 		goto err;
1474 	}
1475 
1476 	spdk_bdev_module_examine_done(&compress_if);
1477 	return;
1478 
1479 err:
1480 	/* Close the underlying bdev on its same opened thread. */
1481 	spdk_bdev_close(comp_bdev->base_desc);
1482 	free(comp_bdev);
1483 	spdk_bdev_module_examine_done(&compress_if);
1484 }
1485 
1486 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1487  * used for initial metadata operations to claim where it will be further filled out
1488  * and added to the global list.
1489  */
1490 static void
1491 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1492 {
1493 	struct vbdev_compress *comp_bdev = cb_arg;
1494 
1495 	if (reduce_errno == 0) {
1496 		/* Update information following volume load. */
1497 		comp_bdev->vol = vol;
1498 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1499 		       sizeof(struct spdk_reduce_vol_params));
1500 		comp_bdev->comp_algo = comp_bdev->params.comp_algo;
1501 		comp_bdev->comp_level = comp_bdev->params.comp_level;
1502 	}
1503 
1504 	comp_bdev->reduce_errno = reduce_errno;
1505 
1506 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1507 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1508 	} else {
1509 		_vbdev_reduce_load_cb(comp_bdev);
1510 	}
1511 
1512 }
1513 
1514 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1515  * and if so will go ahead and claim it.
1516  */
1517 static void
1518 vbdev_compress_examine(struct spdk_bdev *bdev)
1519 {
1520 	struct spdk_bdev_desc *bdev_desc = NULL;
1521 	struct vbdev_compress *comp_bdev;
1522 	int rc;
1523 
1524 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1525 		spdk_bdev_module_examine_done(&compress_if);
1526 		return;
1527 	}
1528 
1529 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1530 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1531 	if (rc) {
1532 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1533 			    spdk_strerror(-rc));
1534 		spdk_bdev_module_examine_done(&compress_if);
1535 		return;
1536 	}
1537 
1538 	comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1);
1539 	if (comp_bdev == NULL) {
1540 		spdk_bdev_close(bdev_desc);
1541 		spdk_bdev_module_examine_done(&compress_if);
1542 		return;
1543 	}
1544 
1545 	/* Save the thread where the base device is opened */
1546 	comp_bdev->thread = spdk_get_thread();
1547 
1548 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1549 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1550 }
1551 
1552 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1553