xref: /spdk/module/bdev/compress/vbdev_compress.c (revision d4d015a572e1af7b2818e44218c1e661a61545ec)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 #define CHUNK_SIZE (1024 * 16)
25 #define COMP_BDEV_NAME "compress"
26 #define BACKING_IO_SZ (4 * 1024)
27 
28 /* This namespace UUID was generated using uuid_generate() method. */
29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
30 
31 struct vbdev_comp_delete_ctx {
32 	spdk_delete_compress_complete	cb_fn;
33 	void				*cb_arg;
34 	int				cb_rc;
35 	struct spdk_thread		*orig_thread;
36 };
37 
38 /* List of virtual bdevs and associated info for each. */
39 struct vbdev_compress {
40 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
41 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
42 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
43 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
44 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
45 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
46 	struct spdk_thread		*reduce_thread;
47 	pthread_mutex_t			reduce_lock;
48 	uint32_t			ch_count;
49 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
50 	struct spdk_poller		*poller;	/* completion poller */
51 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
52 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
53 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
54 	struct vbdev_comp_delete_ctx	*delete_ctx;
55 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
56 	int				reduce_errno;
57 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
58 	TAILQ_ENTRY(vbdev_compress)	link;
59 	struct spdk_thread		*thread;	/* thread where base device is opened */
60 	enum spdk_accel_comp_algo       comp_algo;      /* compression algorithm for compress bdev */
61 	uint32_t                        comp_level;     /* compression algorithm level */
62 };
63 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
64 
65 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
66  */
67 struct comp_io_channel {
68 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
69 };
70 
71 /* Per I/O context for the compression vbdev. */
72 struct comp_bdev_io {
73 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
74 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
75 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
76 	struct spdk_bdev_io		*orig_io;		/* the original IO */
77 	int				status;			/* save for completion on orig thread */
78 };
79 
80 static void vbdev_compress_examine(struct spdk_bdev *bdev);
81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
82 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size,
83 		uint8_t comp_algo, uint32_t comp_level);
84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
87 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
88 
89 /* for completing rw requests on the orig IO thread. */
90 static void
91 _reduce_rw_blocks_cb(void *arg)
92 {
93 	struct comp_bdev_io *io_ctx = arg;
94 
95 	if (spdk_likely(io_ctx->status == 0)) {
96 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
97 	} else if (io_ctx->status == -ENOMEM) {
98 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_NOMEM);
99 	} else {
100 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
101 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
102 	}
103 }
104 
105 /* Completion callback for r/w that were issued via reducelib. */
106 static void
107 reduce_rw_blocks_cb(void *arg, int reduce_errno)
108 {
109 	struct spdk_bdev_io *bdev_io = arg;
110 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
111 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
112 	struct spdk_thread *orig_thread;
113 
114 	/* TODO: need to decide which error codes are bdev_io success vs failure;
115 	 * example examine calls reading metadata */
116 
117 	io_ctx->status = reduce_errno;
118 
119 	/* Send this request to the orig IO thread. */
120 	orig_thread = spdk_io_channel_get_thread(ch);
121 
122 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
123 }
124 
125 static int
126 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
127 		    int src_iovcnt, struct iovec *dst_iovs,
128 		    int dst_iovcnt, bool compress, void *cb_arg)
129 {
130 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
131 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
132 					   backing_dev);
133 	int rc;
134 
135 	if (compress) {
136 		assert(dst_iovcnt == 1);
137 		rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base,
138 						    dst_iovs[0].iov_len, src_iovs, src_iovcnt,
139 						    comp_bdev->comp_algo, comp_bdev->comp_level,
140 						    &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
141 						    reduce_cb_arg->cb_arg);
142 	} else {
143 		rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
144 						      src_iovs, src_iovcnt, comp_bdev->comp_algo,
145 						      &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
146 						      reduce_cb_arg->cb_arg);
147 	}
148 
149 	return rc;
150 }
151 
152 /* Entry point for reduce lib to issue a compress operation. */
153 static void
154 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
155 		      struct iovec *src_iovs, int src_iovcnt,
156 		      struct iovec *dst_iovs, int dst_iovcnt,
157 		      struct spdk_reduce_vol_cb_args *cb_arg)
158 {
159 	int rc;
160 
161 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
162 	if (rc) {
163 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
164 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
165 	}
166 }
167 
168 /* Entry point for reduce lib to issue a decompress operation. */
169 static void
170 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
171 			struct iovec *src_iovs, int src_iovcnt,
172 			struct iovec *dst_iovs, int dst_iovcnt,
173 			struct spdk_reduce_vol_cb_args *cb_arg)
174 {
175 	int rc;
176 
177 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
178 	if (rc) {
179 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
180 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
181 	}
182 }
183 
184 static void
185 _comp_submit_write(void *ctx)
186 {
187 	struct spdk_bdev_io *bdev_io = ctx;
188 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
189 					   comp_bdev);
190 
191 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
192 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
193 			       reduce_rw_blocks_cb, bdev_io);
194 }
195 
196 static void
197 _comp_submit_read(void *ctx)
198 {
199 	struct spdk_bdev_io *bdev_io = ctx;
200 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
201 					   comp_bdev);
202 
203 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
204 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
205 			      reduce_rw_blocks_cb, bdev_io);
206 }
207 
208 
209 /* Callback for getting a buf from the bdev pool in the event that the caller passed
210  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
211  * beneath us before we're done with it.
212  */
213 static void
214 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
215 {
216 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
217 					   comp_bdev);
218 
219 	if (spdk_unlikely(!success)) {
220 		SPDK_ERRLOG("Failed to get data buffer\n");
221 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
222 		return;
223 	}
224 
225 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
226 }
227 
228 /* Called when someone above submits IO to this vbdev. */
229 static void
230 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
231 {
232 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
233 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
234 					   comp_bdev);
235 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
236 
237 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
238 	io_ctx->comp_bdev = comp_bdev;
239 	io_ctx->comp_ch = comp_ch;
240 	io_ctx->orig_io = bdev_io;
241 
242 	switch (bdev_io->type) {
243 	case SPDK_BDEV_IO_TYPE_READ:
244 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
245 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
246 		return;
247 	case SPDK_BDEV_IO_TYPE_WRITE:
248 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
249 		return;
250 	/* TODO support RESET in future patch in the series */
251 	case SPDK_BDEV_IO_TYPE_RESET:
252 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
253 	case SPDK_BDEV_IO_TYPE_UNMAP:
254 	case SPDK_BDEV_IO_TYPE_FLUSH:
255 	default:
256 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
257 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
258 		break;
259 	}
260 }
261 
262 static bool
263 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
264 {
265 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
266 
267 	switch (io_type) {
268 	case SPDK_BDEV_IO_TYPE_READ:
269 	case SPDK_BDEV_IO_TYPE_WRITE:
270 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
271 	case SPDK_BDEV_IO_TYPE_UNMAP:
272 	case SPDK_BDEV_IO_TYPE_RESET:
273 	case SPDK_BDEV_IO_TYPE_FLUSH:
274 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
275 	default:
276 		return false;
277 	}
278 }
279 
280 /* Callback for unregistering the IO device. */
281 static void
282 _device_unregister_cb(void *io_device)
283 {
284 	struct vbdev_compress *comp_bdev = io_device;
285 
286 	/* Done with this comp_bdev. */
287 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
288 	free(comp_bdev->comp_bdev.name);
289 	free(comp_bdev);
290 }
291 
292 static void
293 _vbdev_compress_destruct_cb(void *ctx)
294 {
295 	struct vbdev_compress *comp_bdev = ctx;
296 
297 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
298 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
299 	/* Close the underlying bdev on its same opened thread. */
300 	spdk_bdev_close(comp_bdev->base_desc);
301 	comp_bdev->vol = NULL;
302 	if (comp_bdev->orphaned == false) {
303 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
304 	} else {
305 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
306 		_device_unregister_cb(comp_bdev);
307 	}
308 }
309 
310 static void
311 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
312 {
313 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
314 
315 	if (reduce_errno) {
316 		SPDK_ERRLOG("number %d\n", reduce_errno);
317 	} else {
318 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
319 			spdk_thread_send_msg(comp_bdev->thread,
320 					     _vbdev_compress_destruct_cb, comp_bdev);
321 		} else {
322 			_vbdev_compress_destruct_cb(comp_bdev);
323 		}
324 	}
325 }
326 
327 static void
328 _reduce_destroy_cb(void *ctx, int reduce_errno)
329 {
330 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
331 
332 	if (reduce_errno) {
333 		SPDK_ERRLOG("number %d\n", reduce_errno);
334 	}
335 
336 	comp_bdev->vol = NULL;
337 	spdk_put_io_channel(comp_bdev->base_ch);
338 	if (comp_bdev->orphaned == false) {
339 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
340 				     comp_bdev->delete_ctx);
341 	} else {
342 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
343 	}
344 
345 }
346 
347 static void
348 _delete_vol_unload_cb(void *ctx)
349 {
350 	struct vbdev_compress *comp_bdev = ctx;
351 
352 	/* FIXME: Assert if these conditions are not satisfied for now. */
353 	assert(!comp_bdev->reduce_thread ||
354 	       comp_bdev->reduce_thread == spdk_get_thread());
355 
356 	/* reducelib needs a channel to comm with the backing device */
357 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
358 
359 	/* Clean the device before we free our resources. */
360 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
361 }
362 
363 /* Called by reduceLib after performing unload vol actions */
364 static void
365 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
366 {
367 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
368 
369 	if (reduce_errno) {
370 		SPDK_ERRLOG("number %d\n", reduce_errno);
371 		/* FIXME: callback should be executed. */
372 		return;
373 	}
374 
375 	pthread_mutex_lock(&comp_bdev->reduce_lock);
376 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
377 		spdk_thread_send_msg(comp_bdev->reduce_thread,
378 				     _delete_vol_unload_cb, comp_bdev);
379 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
380 	} else {
381 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
382 
383 		_delete_vol_unload_cb(comp_bdev);
384 	}
385 }
386 
387 const char *
388 compress_get_name(const struct vbdev_compress *comp_bdev)
389 {
390 	return comp_bdev->comp_bdev.name;
391 }
392 
393 struct vbdev_compress *
394 compress_bdev_first(void)
395 {
396 	struct vbdev_compress *comp_bdev;
397 
398 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
399 
400 	return comp_bdev;
401 }
402 
403 struct vbdev_compress *
404 compress_bdev_next(struct vbdev_compress *prev)
405 {
406 	struct vbdev_compress *comp_bdev;
407 
408 	comp_bdev = TAILQ_NEXT(prev, link);
409 
410 	return comp_bdev;
411 }
412 
413 bool
414 compress_has_orphan(const char *name)
415 {
416 	struct vbdev_compress *comp_bdev;
417 
418 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
419 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
420 			return true;
421 		}
422 	}
423 	return false;
424 }
425 
426 /* Called after we've unregistered following a hot remove callback.
427  * Our finish entry point will be called next.
428  */
429 static int
430 vbdev_compress_destruct(void *ctx)
431 {
432 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
433 
434 	if (comp_bdev->vol != NULL) {
435 		/* Tell reducelib that we're done with this volume. */
436 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
437 	} else {
438 		vbdev_compress_destruct_cb(comp_bdev, 0);
439 	}
440 
441 	return 0;
442 }
443 
444 /* We supplied this as an entry point for upper layers who want to communicate to this
445  * bdev.  This is how they get a channel.
446  */
447 static struct spdk_io_channel *
448 vbdev_compress_get_io_channel(void *ctx)
449 {
450 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
451 
452 	/* The IO channel code will allocate a channel for us which consists of
453 	 * the SPDK channel structure plus the size of our comp_io_channel struct
454 	 * that we passed in when we registered our IO device. It will then call
455 	 * our channel create callback to populate any elements that we need to
456 	 * update.
457 	 */
458 	return spdk_get_io_channel(comp_bdev);
459 }
460 
461 /* This is the output for bdev_get_bdevs() for this vbdev */
462 static int
463 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
464 {
465 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
466 
467 	spdk_json_write_name(w, "compress");
468 	spdk_json_write_object_begin(w);
469 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
470 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
471 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
472 	spdk_json_write_object_end(w);
473 
474 	return 0;
475 }
476 
477 static int
478 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
479 {
480 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
481 	return 0;
482 }
483 
484 struct vbdev_init_reduce_ctx {
485 	struct vbdev_compress   *comp_bdev;
486 	int                     status;
487 	bdev_compress_create_cb cb_fn;
488 	void                    *cb_ctx;
489 };
490 
491 static void
492 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
493 {
494 }
495 
496 static void
497 _vbdev_reduce_init_cb(void *ctx)
498 {
499 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
500 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
501 	int rc;
502 
503 	assert(comp_bdev->base_desc != NULL);
504 
505 	/* We're done with metadata operations */
506 	spdk_put_io_channel(comp_bdev->base_ch);
507 
508 	if (comp_bdev->vol) {
509 		rc = vbdev_compress_claim(comp_bdev);
510 		if (rc == 0) {
511 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
512 			free(init_ctx);
513 			return;
514 		} else {
515 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
516 		}
517 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
518 	}
519 
520 	/* Close the underlying bdev on its same opened thread. */
521 	spdk_bdev_close(comp_bdev->base_desc);
522 	free(comp_bdev);
523 	free(init_ctx);
524 }
525 
526 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
527  * used for initial metadata operations to claim where it will be further filled out
528  * and added to the global list.
529  */
530 static void
531 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
532 {
533 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
534 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
535 
536 	if (reduce_errno == 0) {
537 		comp_bdev->vol = vol;
538 	} else {
539 		SPDK_ERRLOG("for vol %s, error %s\n",
540 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
541 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
542 	}
543 
544 	init_ctx->status = reduce_errno;
545 
546 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
547 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
548 	} else {
549 		_vbdev_reduce_init_cb(init_ctx);
550 	}
551 }
552 
553 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
554  * call the callback provided by reduceLib when it called the read/write/unmap function and
555  * free the bdev_io.
556  */
557 static void
558 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
559 {
560 	struct spdk_reduce_vol_cb_args *cb_args = arg;
561 	int reduce_errno;
562 
563 	if (success) {
564 		reduce_errno = 0;
565 	} else {
566 		reduce_errno = -EIO;
567 	}
568 	spdk_bdev_free_io(bdev_io);
569 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
570 }
571 
572 static void
573 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
574 				 struct spdk_reduce_backing_io *backing_io)
575 {
576 	struct spdk_bdev_io_wait_entry *waitq_entry;
577 	int rc;
578 
579 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
580 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
581 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
582 	waitq_entry->cb_arg = backing_io;
583 
584 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
585 	if (rc) {
586 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
587 		assert(false);
588 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
589 	}
590 }
591 
592 static void
593 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
594 {
595 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
596 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
597 					   backing_dev);
598 	int rc;
599 
600 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
601 				    backing_io->iov, backing_io->iovcnt,
602 				    backing_io->lba, backing_io->lba_count,
603 				    comp_reduce_io_cb,
604 				    backing_cb_args);
605 
606 	if (rc) {
607 		if (rc == -ENOMEM) {
608 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
609 			return;
610 		} else {
611 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
612 		}
613 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
614 	}
615 }
616 
617 static void
618 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
619 {
620 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
621 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
622 					   backing_dev);
623 	int rc;
624 
625 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
626 				     backing_io->iov, backing_io->iovcnt,
627 				     backing_io->lba, backing_io->lba_count,
628 				     comp_reduce_io_cb,
629 				     backing_cb_args);
630 
631 	if (rc) {
632 		if (rc == -ENOMEM) {
633 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
634 			return;
635 		} else {
636 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
637 		}
638 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
639 	}
640 }
641 
642 static void
643 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
644 {
645 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
646 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
647 					   backing_dev);
648 	int rc;
649 
650 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
651 				    backing_io->lba, backing_io->lba_count,
652 				    comp_reduce_io_cb,
653 				    backing_cb_args);
654 
655 	if (rc) {
656 		if (rc == -ENOMEM) {
657 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
658 			return;
659 		} else {
660 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
661 		}
662 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
663 	}
664 }
665 
666 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
667  * directly to the backing device.
668  */
669 static void
670 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
671 {
672 	switch (backing_io->backing_io_type) {
673 	case SPDK_REDUCE_BACKING_IO_WRITE:
674 		_comp_backing_bdev_write(backing_io);
675 		break;
676 	case SPDK_REDUCE_BACKING_IO_READ:
677 		_comp_backing_bdev_read(backing_io);
678 		break;
679 	case SPDK_REDUCE_BACKING_IO_UNMAP:
680 		_comp_backing_bdev_unmap(backing_io);
681 		break;
682 	default:
683 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
684 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
685 		break;
686 	}
687 }
688 
689 static void
690 _comp_reduce_resubmit_backing_io(void *_backing_io)
691 {
692 	struct spdk_reduce_backing_io *backing_io = _backing_io;
693 
694 	_comp_reduce_submit_backing_io(backing_io);
695 }
696 
697 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
698 static void
699 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
700 {
701 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
702 
703 	if (reduce_errno) {
704 		SPDK_ERRLOG("number %d\n", reduce_errno);
705 	}
706 
707 	comp_bdev->vol = NULL;
708 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
709 }
710 
711 static void
712 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
713 {
714 	struct vbdev_compress *comp_bdev, *tmp;
715 
716 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
717 		if (bdev_find == comp_bdev->base_bdev) {
718 			/* Tell reduceLib that we're done with this volume. */
719 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
720 		}
721 	}
722 }
723 
724 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
725 static void
726 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
727 				  void *event_ctx)
728 {
729 	switch (type) {
730 	case SPDK_BDEV_EVENT_REMOVE:
731 		vbdev_compress_base_bdev_hotremove_cb(bdev);
732 		break;
733 	default:
734 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
735 		break;
736 	}
737 }
738 
739 /* TODO: determine which parms we want user configurable, HC for now
740  * params.vol_size
741  * params.chunk_size
742  * compression PMD, algorithm, window size, comp level, etc.
743  * DEV_MD_PATH
744  */
745 
746 /* Common function for init and load to allocate and populate the minimal
747  * information for reducelib to init or load.
748  */
749 struct vbdev_compress *
750 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo,
751 		       uint32_t comp_level)
752 {
753 	struct vbdev_compress *comp_bdev;
754 	struct spdk_bdev *bdev;
755 
756 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
757 	if (comp_bdev == NULL) {
758 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
759 		return NULL;
760 	}
761 
762 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
763 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
764 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
765 
766 	comp_bdev->base_desc = bdev_desc;
767 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
768 	comp_bdev->base_bdev = bdev;
769 
770 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
771 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
772 
773 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
774 
775 	comp_bdev->comp_algo = comp_algo;
776 	comp_bdev->comp_level = comp_level;
777 	comp_bdev->params.comp_algo = comp_algo;
778 	comp_bdev->params.comp_level = comp_level;
779 	comp_bdev->params.chunk_size = CHUNK_SIZE;
780 	if (lb_size == 0) {
781 		comp_bdev->params.logical_block_size = bdev->blocklen;
782 	} else {
783 		comp_bdev->params.logical_block_size = lb_size;
784 	}
785 
786 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
787 	return comp_bdev;
788 }
789 
790 /* Call reducelib to initialize a new volume */
791 static int
792 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo,
793 		  uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg)
794 {
795 	struct spdk_bdev_desc *bdev_desc = NULL;
796 	struct vbdev_init_reduce_ctx *init_ctx;
797 	struct vbdev_compress *comp_bdev;
798 	int rc;
799 
800 	init_ctx = calloc(1, sizeof(*init_ctx));
801 	if (init_ctx == NULL) {
802 		SPDK_ERRLOG("failed to alloc init contexts\n");
803 		return - ENOMEM;
804 	}
805 
806 	init_ctx->cb_fn = cb_fn;
807 	init_ctx->cb_ctx = cb_arg;
808 
809 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
810 				NULL, &bdev_desc);
811 	if (rc) {
812 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
813 		free(init_ctx);
814 		return rc;
815 	}
816 
817 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level);
818 	if (comp_bdev == NULL) {
819 		free(init_ctx);
820 		spdk_bdev_close(bdev_desc);
821 		return -EINVAL;
822 	}
823 
824 	init_ctx->comp_bdev = comp_bdev;
825 
826 	/* Save the thread where the base device is opened */
827 	comp_bdev->thread = spdk_get_thread();
828 
829 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
830 
831 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
832 			     pm_path,
833 			     vbdev_reduce_init_cb,
834 			     init_ctx);
835 	return 0;
836 }
837 
838 /* We provide this callback for the SPDK channel code to create a channel using
839  * the channel struct we provided in our module get_io_channel() entry point. Here
840  * we get and save off an underlying base channel of the device below us so that
841  * we can communicate with the base bdev on a per channel basis.  If we needed
842  * our own poller for this vbdev, we'd register it here.
843  */
844 static int
845 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
846 {
847 	struct vbdev_compress *comp_bdev = io_device;
848 
849 	/* Now set the reduce channel if it's not already set. */
850 	pthread_mutex_lock(&comp_bdev->reduce_lock);
851 	if (comp_bdev->ch_count == 0) {
852 		/* We use this queue to track outstanding IO in our layer. */
853 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
854 
855 		/* We use this to queue up compression operations as needed. */
856 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
857 
858 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
859 		comp_bdev->reduce_thread = spdk_get_thread();
860 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
861 	}
862 	comp_bdev->ch_count++;
863 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
864 
865 	return 0;
866 }
867 
868 static void
869 _channel_cleanup(struct vbdev_compress *comp_bdev)
870 {
871 	spdk_put_io_channel(comp_bdev->base_ch);
872 	spdk_put_io_channel(comp_bdev->accel_channel);
873 	comp_bdev->reduce_thread = NULL;
874 }
875 
876 /* Used to reroute destroy_ch to the correct thread */
877 static void
878 _comp_bdev_ch_destroy_cb(void *arg)
879 {
880 	struct vbdev_compress *comp_bdev = arg;
881 
882 	pthread_mutex_lock(&comp_bdev->reduce_lock);
883 	_channel_cleanup(comp_bdev);
884 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
885 }
886 
887 /* We provide this callback for the SPDK channel code to destroy a channel
888  * created with our create callback. We just need to undo anything we did
889  * when we created. If this bdev used its own poller, we'd unregister it here.
890  */
891 static void
892 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
893 {
894 	struct vbdev_compress *comp_bdev = io_device;
895 
896 	pthread_mutex_lock(&comp_bdev->reduce_lock);
897 	comp_bdev->ch_count--;
898 	if (comp_bdev->ch_count == 0) {
899 		/* Send this request to the thread where the channel was created. */
900 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
901 			spdk_thread_send_msg(comp_bdev->reduce_thread,
902 					     _comp_bdev_ch_destroy_cb, comp_bdev);
903 		} else {
904 			_channel_cleanup(comp_bdev);
905 		}
906 	}
907 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
908 }
909 
910 static int
911 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level)
912 {
913 	uint32_t min_level, max_level;
914 	int rc;
915 
916 	rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level);
917 	if (rc != 0) {
918 		return rc;
919 	}
920 
921 	/* If both min_level and max_level are 0, the compression level can be ignored.
922 	 * The back-end implementation hardcodes the compression level.
923 	 */
924 	if (min_level == 0 && max_level == 0) {
925 		return 0;
926 	}
927 
928 	if (comp_level > max_level || comp_level < min_level) {
929 		return -EINVAL;
930 	}
931 
932 	return 0;
933 }
934 
935 /* RPC entry point for compression vbdev creation. */
936 int
937 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
938 		     uint8_t comp_algo, uint32_t comp_level,
939 		     bdev_compress_create_cb cb_fn, void *cb_arg)
940 {
941 	struct vbdev_compress *comp_bdev = NULL;
942 	struct stat info;
943 	int rc;
944 
945 	if (stat(pm_path, &info) != 0) {
946 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
947 		return -EINVAL;
948 	} else if (!S_ISDIR(info.st_mode)) {
949 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
950 		return -EINVAL;
951 	}
952 
953 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
954 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
955 		return -EINVAL;
956 	}
957 
958 	rc = _check_compress_bdev_comp_algo(comp_algo, comp_level);
959 	if (rc != 0) {
960 		SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n",
961 			    comp_algo, comp_level);
962 		return rc;
963 	}
964 
965 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
966 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
967 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
968 			return -EBUSY;
969 		}
970 	}
971 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg);
972 }
973 
974 static int
975 vbdev_compress_init(void)
976 {
977 	return 0;
978 }
979 
980 /* Called when the entire module is being torn down. */
981 static void
982 vbdev_compress_finish(void)
983 {
984 	/* TODO: unload vol in a future patch */
985 }
986 
987 /* During init we'll be asked how much memory we'd like passed to us
988  * in bev_io structures as context. Here's where we specify how
989  * much context we want per IO.
990  */
991 static int
992 vbdev_compress_get_ctx_size(void)
993 {
994 	return sizeof(struct comp_bdev_io);
995 }
996 
997 /* When we register our bdev this is how we specify our entry points. */
998 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
999 	.destruct		= vbdev_compress_destruct,
1000 	.submit_request		= vbdev_compress_submit_request,
1001 	.io_type_supported	= vbdev_compress_io_type_supported,
1002 	.get_io_channel		= vbdev_compress_get_io_channel,
1003 	.dump_info_json		= vbdev_compress_dump_info_json,
1004 	.write_config_json	= NULL,
1005 };
1006 
1007 static struct spdk_bdev_module compress_if = {
1008 	.name = "compress",
1009 	.module_init = vbdev_compress_init,
1010 	.get_ctx_size = vbdev_compress_get_ctx_size,
1011 	.examine_disk = vbdev_compress_examine,
1012 	.module_fini = vbdev_compress_finish,
1013 	.config_json = vbdev_compress_config_json
1014 };
1015 
1016 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1017 
1018 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1019 {
1020 	struct spdk_bdev_alias *aliases;
1021 
1022 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1023 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1024 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1025 		if (!comp_bdev->comp_bdev.name) {
1026 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1027 			return -ENOMEM;
1028 		}
1029 	} else {
1030 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1031 		if (!comp_bdev->comp_bdev.name) {
1032 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1033 			return -ENOMEM;
1034 		}
1035 	}
1036 	return 0;
1037 }
1038 
1039 static int
1040 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1041 {
1042 	struct spdk_uuid ns_uuid;
1043 	int rc;
1044 
1045 	if (_set_compbdev_name(comp_bdev)) {
1046 		return -EINVAL;
1047 	}
1048 
1049 	/* Note: some of the fields below will change in the future - for example,
1050 	 * blockcnt specifically will not match (the compressed volume size will
1051 	 * be slightly less than the base bdev size)
1052 	 */
1053 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1054 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1055 
1056 	comp_bdev->comp_bdev.optimal_io_boundary =
1057 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1058 
1059 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1060 
1061 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1062 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1063 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1064 
1065 	/* This is the context that is passed to us when the bdev
1066 	 * layer calls in so we'll save our comp_bdev node here.
1067 	 */
1068 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1069 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1070 	comp_bdev->comp_bdev.module = &compress_if;
1071 
1072 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1073 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1074 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1075 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1076 	if (rc) {
1077 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1078 		return -EINVAL;
1079 	}
1080 
1081 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1082 
1083 	/* Save the thread where the base device is opened */
1084 	comp_bdev->thread = spdk_get_thread();
1085 
1086 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1087 				sizeof(struct comp_io_channel),
1088 				comp_bdev->comp_bdev.name);
1089 
1090 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1091 					 comp_bdev->comp_bdev.module);
1092 	if (rc) {
1093 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1094 			    spdk_strerror(-rc));
1095 		goto error_claim;
1096 	}
1097 
1098 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1099 	if (rc < 0) {
1100 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1101 		goto error_bdev_register;
1102 	}
1103 
1104 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1105 
1106 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1107 
1108 	return 0;
1109 
1110 	/* Error cleanup paths. */
1111 error_bdev_register:
1112 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1113 error_claim:
1114 	spdk_io_device_unregister(comp_bdev, NULL);
1115 	free(comp_bdev->comp_bdev.name);
1116 	return rc;
1117 }
1118 
1119 static void
1120 _vbdev_compress_delete_done(void *_ctx)
1121 {
1122 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1123 
1124 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1125 
1126 	free(ctx);
1127 }
1128 
1129 static void
1130 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1131 {
1132 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1133 
1134 	ctx->cb_rc = bdeverrno;
1135 
1136 	if (ctx->orig_thread != spdk_get_thread()) {
1137 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1138 	} else {
1139 		_vbdev_compress_delete_done(ctx);
1140 	}
1141 }
1142 
1143 void
1144 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1145 {
1146 	struct vbdev_compress *comp_bdev = NULL;
1147 	struct vbdev_comp_delete_ctx *ctx;
1148 
1149 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1150 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1151 			break;
1152 		}
1153 	}
1154 
1155 	if (comp_bdev == NULL) {
1156 		cb_fn(cb_arg, -ENODEV);
1157 		return;
1158 	}
1159 
1160 	ctx = calloc(1, sizeof(*ctx));
1161 	if (ctx == NULL) {
1162 		SPDK_ERRLOG("Failed to allocate delete context\n");
1163 		cb_fn(cb_arg, -ENOMEM);
1164 		return;
1165 	}
1166 
1167 	/* Save these for after the vol is destroyed. */
1168 	ctx->cb_fn = cb_fn;
1169 	ctx->cb_arg = cb_arg;
1170 	ctx->orig_thread = spdk_get_thread();
1171 
1172 	comp_bdev->delete_ctx = ctx;
1173 
1174 	/* Tell reducelib that we're done with this volume. */
1175 	if (comp_bdev->orphaned == false) {
1176 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1177 	} else {
1178 		delete_vol_unload_cb(comp_bdev, 0);
1179 	}
1180 }
1181 
1182 static void
1183 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1184 {
1185 }
1186 
1187 static void
1188 _vbdev_reduce_load_cb(void *ctx)
1189 {
1190 	struct vbdev_compress *comp_bdev = ctx;
1191 	int rc;
1192 
1193 	assert(comp_bdev->base_desc != NULL);
1194 
1195 	/* Done with metadata operations */
1196 	spdk_put_io_channel(comp_bdev->base_ch);
1197 
1198 	if (comp_bdev->reduce_errno == 0) {
1199 		rc = vbdev_compress_claim(comp_bdev);
1200 		if (rc != 0) {
1201 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1202 			goto err;
1203 		}
1204 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1205 		if (_set_compbdev_name(comp_bdev)) {
1206 			goto err;
1207 		}
1208 
1209 		/* Save the thread where the base device is opened */
1210 		comp_bdev->thread = spdk_get_thread();
1211 
1212 		comp_bdev->comp_bdev.module = &compress_if;
1213 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1214 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1215 						 comp_bdev->comp_bdev.module);
1216 		if (rc) {
1217 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1218 				    spdk_strerror(-rc));
1219 			free(comp_bdev->comp_bdev.name);
1220 			goto err;
1221 		}
1222 
1223 		comp_bdev->orphaned = true;
1224 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1225 	} else {
1226 		if (comp_bdev->reduce_errno != -EILSEQ) {
1227 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1228 				    spdk_strerror(-comp_bdev->reduce_errno));
1229 		}
1230 		goto err;
1231 	}
1232 
1233 	spdk_bdev_module_examine_done(&compress_if);
1234 	return;
1235 
1236 err:
1237 	/* Close the underlying bdev on its same opened thread. */
1238 	spdk_bdev_close(comp_bdev->base_desc);
1239 	free(comp_bdev);
1240 	spdk_bdev_module_examine_done(&compress_if);
1241 }
1242 
1243 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1244  * used for initial metadata operations to claim where it will be further filled out
1245  * and added to the global list.
1246  */
1247 static void
1248 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1249 {
1250 	struct vbdev_compress *comp_bdev = cb_arg;
1251 
1252 	if (reduce_errno == 0) {
1253 		/* Update information following volume load. */
1254 		comp_bdev->vol = vol;
1255 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1256 		       sizeof(struct spdk_reduce_vol_params));
1257 		comp_bdev->comp_algo = comp_bdev->params.comp_algo;
1258 		comp_bdev->comp_level = comp_bdev->params.comp_level;
1259 	}
1260 
1261 	comp_bdev->reduce_errno = reduce_errno;
1262 
1263 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1264 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1265 	} else {
1266 		_vbdev_reduce_load_cb(comp_bdev);
1267 	}
1268 
1269 }
1270 
1271 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1272  * and if so will go ahead and claim it.
1273  */
1274 static void
1275 vbdev_compress_examine(struct spdk_bdev *bdev)
1276 {
1277 	struct spdk_bdev_desc *bdev_desc = NULL;
1278 	struct vbdev_compress *comp_bdev;
1279 	int rc;
1280 
1281 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1282 		spdk_bdev_module_examine_done(&compress_if);
1283 		return;
1284 	}
1285 
1286 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1287 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1288 	if (rc) {
1289 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1290 			    spdk_strerror(-rc));
1291 		spdk_bdev_module_examine_done(&compress_if);
1292 		return;
1293 	}
1294 
1295 	comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1);
1296 	if (comp_bdev == NULL) {
1297 		spdk_bdev_close(bdev_desc);
1298 		spdk_bdev_module_examine_done(&compress_if);
1299 		return;
1300 	}
1301 
1302 	/* Save the thread where the base device is opened */
1303 	comp_bdev->thread = spdk_get_thread();
1304 
1305 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1306 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1307 }
1308 
1309 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1310