xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 63e0c25dad5f2793fdb9ff9b1e6ce516673dc6aa)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 #define CHUNK_SIZE (1024 * 16)
25 #define COMP_BDEV_NAME "compress"
26 #define BACKING_IO_SZ (4 * 1024)
27 
28 /* This namespace UUID was generated using uuid_generate() method. */
29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
30 
31 struct vbdev_comp_delete_ctx {
32 	spdk_delete_compress_complete	cb_fn;
33 	void				*cb_arg;
34 	int				cb_rc;
35 	struct spdk_thread		*orig_thread;
36 };
37 
38 /* List of virtual bdevs and associated info for each. */
39 struct vbdev_compress {
40 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
41 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
42 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
43 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
44 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
45 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
46 	struct spdk_thread		*reduce_thread;
47 	pthread_mutex_t			reduce_lock;
48 	uint32_t			ch_count;
49 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
50 	struct spdk_poller		*poller;	/* completion poller */
51 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
52 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
53 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
54 	struct vbdev_comp_delete_ctx	*delete_ctx;
55 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
56 	int				reduce_errno;
57 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
58 	TAILQ_ENTRY(vbdev_compress)	link;
59 	struct spdk_thread		*thread;	/* thread where base device is opened */
60 	enum spdk_accel_comp_algo       comp_algo;      /* compression algorithm for compress bdev */
61 	uint32_t                        comp_level;     /* compression algorithm level */
62 };
63 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
64 
65 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
66  */
67 struct comp_io_channel {
68 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
69 };
70 
71 /* Per I/O context for the compression vbdev. */
72 struct comp_bdev_io {
73 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
74 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
75 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
76 	struct spdk_bdev_io		*orig_io;		/* the original IO */
77 	int				status;			/* save for completion on orig thread */
78 };
79 
80 static void vbdev_compress_examine(struct spdk_bdev *bdev);
81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
82 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size,
83 		uint8_t comp_algo, uint32_t comp_level);
84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
87 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
88 
89 /* for completing rw requests on the orig IO thread. */
90 static void
91 _reduce_rw_blocks_cb(void *arg)
92 {
93 	struct comp_bdev_io *io_ctx = arg;
94 
95 	if (spdk_likely(io_ctx->status == 0)) {
96 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
97 	} else if (io_ctx->status == -ENOMEM) {
98 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_NOMEM);
99 	} else {
100 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
101 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
102 	}
103 }
104 
105 /* Completion callback for r/w that were issued via reducelib. */
106 static void
107 reduce_rw_blocks_cb(void *arg, int reduce_errno)
108 {
109 	struct spdk_bdev_io *bdev_io = arg;
110 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
111 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
112 	struct spdk_thread *orig_thread;
113 
114 	/* TODO: need to decide which error codes are bdev_io success vs failure;
115 	 * example examine calls reading metadata */
116 
117 	io_ctx->status = reduce_errno;
118 
119 	/* Send this request to the orig IO thread. */
120 	orig_thread = spdk_io_channel_get_thread(ch);
121 
122 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
123 }
124 
125 static int
126 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
127 		    int src_iovcnt, struct iovec *dst_iovs,
128 		    int dst_iovcnt, bool compress, void *cb_arg)
129 {
130 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
131 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
132 					   backing_dev);
133 	int rc;
134 
135 	if (compress) {
136 		assert(dst_iovcnt == 1);
137 		rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base,
138 						    dst_iovs[0].iov_len, src_iovs, src_iovcnt,
139 						    comp_bdev->comp_algo, comp_bdev->comp_level,
140 						    &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
141 						    reduce_cb_arg->cb_arg);
142 	} else {
143 		rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
144 						      src_iovs, src_iovcnt, comp_bdev->comp_algo,
145 						      &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
146 						      reduce_cb_arg->cb_arg);
147 	}
148 
149 	return rc;
150 }
151 
152 /* Entry point for reduce lib to issue a compress operation. */
153 static void
154 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
155 		      struct iovec *src_iovs, int src_iovcnt,
156 		      struct iovec *dst_iovs, int dst_iovcnt,
157 		      struct spdk_reduce_vol_cb_args *cb_arg)
158 {
159 	int rc;
160 
161 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
162 	if (rc) {
163 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
164 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
165 	}
166 }
167 
168 /* Entry point for reduce lib to issue a decompress operation. */
169 static void
170 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
171 			struct iovec *src_iovs, int src_iovcnt,
172 			struct iovec *dst_iovs, int dst_iovcnt,
173 			struct spdk_reduce_vol_cb_args *cb_arg)
174 {
175 	int rc;
176 
177 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
178 	if (rc) {
179 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
180 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
181 	}
182 }
183 
184 static void
185 _comp_submit_write(void *ctx)
186 {
187 	struct spdk_bdev_io *bdev_io = ctx;
188 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
189 					   comp_bdev);
190 
191 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
192 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
193 			       reduce_rw_blocks_cb, bdev_io);
194 }
195 
196 static void
197 _comp_submit_read(void *ctx)
198 {
199 	struct spdk_bdev_io *bdev_io = ctx;
200 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
201 					   comp_bdev);
202 
203 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
204 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
205 			      reduce_rw_blocks_cb, bdev_io);
206 }
207 
208 
209 /* Callback for getting a buf from the bdev pool in the event that the caller passed
210  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
211  * beneath us before we're done with it.
212  */
213 static void
214 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
215 {
216 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
217 					   comp_bdev);
218 
219 	if (spdk_unlikely(!success)) {
220 		SPDK_ERRLOG("Failed to get data buffer\n");
221 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
222 		return;
223 	}
224 
225 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
226 }
227 
228 /* Called when someone above submits IO to this vbdev. */
229 static void
230 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
231 {
232 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
233 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
234 					   comp_bdev);
235 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
236 
237 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
238 	io_ctx->comp_bdev = comp_bdev;
239 	io_ctx->comp_ch = comp_ch;
240 	io_ctx->orig_io = bdev_io;
241 
242 	switch (bdev_io->type) {
243 	case SPDK_BDEV_IO_TYPE_READ:
244 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
245 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
246 		return;
247 	case SPDK_BDEV_IO_TYPE_WRITE:
248 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
249 		return;
250 	/* TODO support RESET in future patch in the series */
251 	case SPDK_BDEV_IO_TYPE_RESET:
252 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
253 	case SPDK_BDEV_IO_TYPE_UNMAP:
254 	case SPDK_BDEV_IO_TYPE_FLUSH:
255 	default:
256 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
257 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
258 		break;
259 	}
260 }
261 
262 static bool
263 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
264 {
265 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
266 
267 	switch (io_type) {
268 	case SPDK_BDEV_IO_TYPE_READ:
269 	case SPDK_BDEV_IO_TYPE_WRITE:
270 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
271 	case SPDK_BDEV_IO_TYPE_UNMAP:
272 	case SPDK_BDEV_IO_TYPE_RESET:
273 	case SPDK_BDEV_IO_TYPE_FLUSH:
274 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
275 	default:
276 		return false;
277 	}
278 }
279 
280 /* Callback for unregistering the IO device. */
281 static void
282 _device_unregister_cb(void *io_device)
283 {
284 	struct vbdev_compress *comp_bdev = io_device;
285 
286 	/* Done with this comp_bdev. */
287 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
288 	free(comp_bdev->comp_bdev.name);
289 	free(comp_bdev);
290 }
291 
292 static void
293 _vbdev_compress_destruct_cb(void *ctx)
294 {
295 	struct vbdev_compress *comp_bdev = ctx;
296 
297 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
298 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
299 	/* Close the underlying bdev on its same opened thread. */
300 	spdk_bdev_close(comp_bdev->base_desc);
301 	comp_bdev->vol = NULL;
302 	if (comp_bdev->orphaned == false) {
303 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
304 	} else {
305 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
306 		_device_unregister_cb(comp_bdev);
307 	}
308 }
309 
310 static void
311 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
312 {
313 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
314 
315 	if (reduce_errno) {
316 		SPDK_ERRLOG("number %d\n", reduce_errno);
317 	} else {
318 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
319 			spdk_thread_send_msg(comp_bdev->thread,
320 					     _vbdev_compress_destruct_cb, comp_bdev);
321 		} else {
322 			_vbdev_compress_destruct_cb(comp_bdev);
323 		}
324 	}
325 }
326 
327 static void
328 _reduce_destroy_cb(void *ctx, int reduce_errno)
329 {
330 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
331 
332 	if (reduce_errno) {
333 		SPDK_ERRLOG("number %d\n", reduce_errno);
334 	}
335 
336 	comp_bdev->vol = NULL;
337 	spdk_put_io_channel(comp_bdev->base_ch);
338 	if (comp_bdev->orphaned == false) {
339 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
340 				     comp_bdev->delete_ctx);
341 	} else {
342 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
343 	}
344 
345 }
346 
347 static void
348 _delete_vol_unload_cb(void *ctx)
349 {
350 	struct vbdev_compress *comp_bdev = ctx;
351 
352 	/* FIXME: Assert if these conditions are not satisfied for now. */
353 	assert(!comp_bdev->reduce_thread ||
354 	       comp_bdev->reduce_thread == spdk_get_thread());
355 
356 	/* reducelib needs a channel to comm with the backing device */
357 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
358 
359 	/* Clean the device before we free our resources. */
360 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
361 }
362 
363 /* Called by reduceLib after performing unload vol actions */
364 static void
365 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
366 {
367 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
368 
369 	if (reduce_errno) {
370 		SPDK_ERRLOG("Failed to unload vol, error %s\n", spdk_strerror(-reduce_errno));
371 		vbdev_compress_delete_done(comp_bdev->delete_ctx, reduce_errno);
372 		return;
373 	}
374 
375 	pthread_mutex_lock(&comp_bdev->reduce_lock);
376 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
377 		spdk_thread_send_msg(comp_bdev->reduce_thread,
378 				     _delete_vol_unload_cb, comp_bdev);
379 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
380 	} else {
381 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
382 
383 		_delete_vol_unload_cb(comp_bdev);
384 	}
385 }
386 
387 const char *
388 compress_get_name(const struct vbdev_compress *comp_bdev)
389 {
390 	return comp_bdev->comp_bdev.name;
391 }
392 
393 struct vbdev_compress *
394 compress_bdev_first(void)
395 {
396 	struct vbdev_compress *comp_bdev;
397 
398 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
399 
400 	return comp_bdev;
401 }
402 
403 struct vbdev_compress *
404 compress_bdev_next(struct vbdev_compress *prev)
405 {
406 	struct vbdev_compress *comp_bdev;
407 
408 	comp_bdev = TAILQ_NEXT(prev, link);
409 
410 	return comp_bdev;
411 }
412 
413 bool
414 compress_has_orphan(const char *name)
415 {
416 	struct vbdev_compress *comp_bdev;
417 
418 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
419 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
420 			return true;
421 		}
422 	}
423 	return false;
424 }
425 
426 /* Called after we've unregistered following a hot remove callback.
427  * Our finish entry point will be called next.
428  */
429 static int
430 vbdev_compress_destruct(void *ctx)
431 {
432 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
433 
434 	if (comp_bdev->vol != NULL) {
435 		/* Tell reducelib that we're done with this volume. */
436 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
437 	} else {
438 		vbdev_compress_destruct_cb(comp_bdev, 0);
439 	}
440 
441 	return 0;
442 }
443 
444 /* We supplied this as an entry point for upper layers who want to communicate to this
445  * bdev.  This is how they get a channel.
446  */
447 static struct spdk_io_channel *
448 vbdev_compress_get_io_channel(void *ctx)
449 {
450 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
451 
452 	/* The IO channel code will allocate a channel for us which consists of
453 	 * the SPDK channel structure plus the size of our comp_io_channel struct
454 	 * that we passed in when we registered our IO device. It will then call
455 	 * our channel create callback to populate any elements that we need to
456 	 * update.
457 	 */
458 	return spdk_get_io_channel(comp_bdev);
459 }
460 
461 /* This is the output for bdev_get_bdevs() for this vbdev */
462 static int
463 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
464 {
465 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
466 	char *comp_algo = NULL;
467 
468 	if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_LZ4) {
469 		comp_algo = "lz4";
470 	} else if (comp_bdev->params.comp_algo == SPDK_ACCEL_COMP_ALGO_DEFLATE) {
471 		comp_algo = "deflate";
472 	} else {
473 		assert(false);
474 	}
475 
476 	spdk_json_write_name(w, "compress");
477 	spdk_json_write_object_begin(w);
478 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
479 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
480 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
481 	spdk_json_write_named_string(w, "comp_algo", comp_algo);
482 	spdk_json_write_named_uint32(w, "comp_level", comp_bdev->params.comp_level);
483 	spdk_json_write_named_uint32(w, "chunk_size", comp_bdev->params.chunk_size);
484 	spdk_json_write_named_uint32(w, "backing_io_unit_size", comp_bdev->params.backing_io_unit_size);
485 	spdk_json_write_object_end(w);
486 
487 	return 0;
488 }
489 
490 static int
491 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
492 {
493 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
494 	return 0;
495 }
496 
497 struct vbdev_init_reduce_ctx {
498 	struct vbdev_compress   *comp_bdev;
499 	int                     status;
500 	bdev_compress_create_cb cb_fn;
501 	void                    *cb_ctx;
502 };
503 
504 static void
505 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
506 {
507 }
508 
509 static void
510 _vbdev_reduce_init_cb(void *ctx)
511 {
512 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
513 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
514 	int rc;
515 
516 	assert(comp_bdev->base_desc != NULL);
517 
518 	/* We're done with metadata operations */
519 	spdk_put_io_channel(comp_bdev->base_ch);
520 
521 	if (comp_bdev->vol) {
522 		rc = vbdev_compress_claim(comp_bdev);
523 		if (rc == 0) {
524 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
525 			free(init_ctx);
526 			return;
527 		} else {
528 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
529 		}
530 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
531 	}
532 
533 	/* Close the underlying bdev on its same opened thread. */
534 	spdk_bdev_close(comp_bdev->base_desc);
535 	free(comp_bdev);
536 	free(init_ctx);
537 }
538 
539 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
540  * used for initial metadata operations to claim where it will be further filled out
541  * and added to the global list.
542  */
543 static void
544 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
545 {
546 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
547 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
548 
549 	if (reduce_errno == 0) {
550 		comp_bdev->vol = vol;
551 	} else {
552 		SPDK_ERRLOG("for vol %s, error %s\n",
553 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
554 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
555 	}
556 
557 	init_ctx->status = reduce_errno;
558 
559 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
560 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
561 	} else {
562 		_vbdev_reduce_init_cb(init_ctx);
563 	}
564 }
565 
566 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
567  * call the callback provided by reduceLib when it called the read/write/unmap function and
568  * free the bdev_io.
569  */
570 static void
571 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
572 {
573 	struct spdk_reduce_vol_cb_args *cb_args = arg;
574 	int reduce_errno;
575 
576 	if (success) {
577 		reduce_errno = 0;
578 	} else {
579 		reduce_errno = -EIO;
580 	}
581 	spdk_bdev_free_io(bdev_io);
582 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
583 }
584 
585 static void
586 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
587 				 struct spdk_reduce_backing_io *backing_io)
588 {
589 	struct spdk_bdev_io_wait_entry *waitq_entry;
590 	int rc;
591 
592 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
593 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
594 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
595 	waitq_entry->cb_arg = backing_io;
596 
597 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
598 	if (rc) {
599 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
600 		assert(false);
601 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
602 	}
603 }
604 
605 static void
606 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
607 {
608 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
609 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
610 					   backing_dev);
611 	int rc;
612 
613 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
614 				    backing_io->iov, backing_io->iovcnt,
615 				    backing_io->lba, backing_io->lba_count,
616 				    comp_reduce_io_cb,
617 				    backing_cb_args);
618 
619 	if (rc) {
620 		if (rc == -ENOMEM) {
621 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
622 			return;
623 		} else {
624 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
625 		}
626 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
627 	}
628 }
629 
630 static void
631 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
632 {
633 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
634 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
635 					   backing_dev);
636 	int rc;
637 
638 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
639 				     backing_io->iov, backing_io->iovcnt,
640 				     backing_io->lba, backing_io->lba_count,
641 				     comp_reduce_io_cb,
642 				     backing_cb_args);
643 
644 	if (rc) {
645 		if (rc == -ENOMEM) {
646 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
647 			return;
648 		} else {
649 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
650 		}
651 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
652 	}
653 }
654 
655 static void
656 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
657 {
658 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
659 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
660 					   backing_dev);
661 	int rc;
662 
663 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
664 				    backing_io->lba, backing_io->lba_count,
665 				    comp_reduce_io_cb,
666 				    backing_cb_args);
667 
668 	if (rc) {
669 		if (rc == -ENOMEM) {
670 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
671 			return;
672 		} else {
673 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
674 		}
675 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
676 	}
677 }
678 
679 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
680  * directly to the backing device.
681  */
682 static void
683 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
684 {
685 	switch (backing_io->backing_io_type) {
686 	case SPDK_REDUCE_BACKING_IO_WRITE:
687 		_comp_backing_bdev_write(backing_io);
688 		break;
689 	case SPDK_REDUCE_BACKING_IO_READ:
690 		_comp_backing_bdev_read(backing_io);
691 		break;
692 	case SPDK_REDUCE_BACKING_IO_UNMAP:
693 		_comp_backing_bdev_unmap(backing_io);
694 		break;
695 	default:
696 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
697 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
698 		break;
699 	}
700 }
701 
702 static void
703 _comp_reduce_resubmit_backing_io(void *_backing_io)
704 {
705 	struct spdk_reduce_backing_io *backing_io = _backing_io;
706 
707 	_comp_reduce_submit_backing_io(backing_io);
708 }
709 
710 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
711 static void
712 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
713 {
714 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
715 
716 	if (reduce_errno) {
717 		SPDK_ERRLOG("number %d\n", reduce_errno);
718 	}
719 
720 	comp_bdev->vol = NULL;
721 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
722 }
723 
724 static void
725 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
726 {
727 	struct vbdev_compress *comp_bdev, *tmp;
728 
729 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
730 		if (bdev_find == comp_bdev->base_bdev) {
731 			/* Tell reduceLib that we're done with this volume. */
732 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
733 		}
734 	}
735 }
736 
737 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
738 static void
739 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
740 				  void *event_ctx)
741 {
742 	switch (type) {
743 	case SPDK_BDEV_EVENT_REMOVE:
744 		vbdev_compress_base_bdev_hotremove_cb(bdev);
745 		break;
746 	default:
747 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
748 		break;
749 	}
750 }
751 
752 /* TODO: determine which parms we want user configurable, HC for now
753  * params.vol_size
754  * params.chunk_size
755  * compression PMD, algorithm, window size, comp level, etc.
756  * DEV_MD_PATH
757  */
758 
759 /* Common function for init and load to allocate and populate the minimal
760  * information for reducelib to init or load.
761  */
762 struct vbdev_compress *
763 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo,
764 		       uint32_t comp_level)
765 {
766 	struct vbdev_compress *comp_bdev;
767 	struct spdk_bdev *bdev;
768 
769 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
770 	if (comp_bdev == NULL) {
771 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
772 		return NULL;
773 	}
774 
775 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
776 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
777 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
778 
779 	comp_bdev->base_desc = bdev_desc;
780 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
781 	comp_bdev->base_bdev = bdev;
782 
783 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
784 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
785 
786 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
787 
788 	comp_bdev->comp_algo = comp_algo;
789 	comp_bdev->comp_level = comp_level;
790 	comp_bdev->params.comp_algo = comp_algo;
791 	comp_bdev->params.comp_level = comp_level;
792 	comp_bdev->params.chunk_size = CHUNK_SIZE;
793 	if (lb_size == 0) {
794 		comp_bdev->params.logical_block_size = bdev->blocklen;
795 	} else {
796 		comp_bdev->params.logical_block_size = lb_size;
797 	}
798 
799 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
800 	return comp_bdev;
801 }
802 
803 /* Call reducelib to initialize a new volume */
804 static int
805 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo,
806 		  uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg)
807 {
808 	struct spdk_bdev_desc *bdev_desc = NULL;
809 	struct vbdev_init_reduce_ctx *init_ctx;
810 	struct vbdev_compress *comp_bdev;
811 	int rc;
812 
813 	init_ctx = calloc(1, sizeof(*init_ctx));
814 	if (init_ctx == NULL) {
815 		SPDK_ERRLOG("failed to alloc init contexts\n");
816 		return - ENOMEM;
817 	}
818 
819 	init_ctx->cb_fn = cb_fn;
820 	init_ctx->cb_ctx = cb_arg;
821 
822 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
823 				NULL, &bdev_desc);
824 	if (rc) {
825 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
826 		free(init_ctx);
827 		return rc;
828 	}
829 
830 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level);
831 	if (comp_bdev == NULL) {
832 		free(init_ctx);
833 		spdk_bdev_close(bdev_desc);
834 		return -EINVAL;
835 	}
836 
837 	init_ctx->comp_bdev = comp_bdev;
838 
839 	/* Save the thread where the base device is opened */
840 	comp_bdev->thread = spdk_get_thread();
841 
842 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
843 
844 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
845 			     pm_path,
846 			     vbdev_reduce_init_cb,
847 			     init_ctx);
848 	return 0;
849 }
850 
851 /* We provide this callback for the SPDK channel code to create a channel using
852  * the channel struct we provided in our module get_io_channel() entry point. Here
853  * we get and save off an underlying base channel of the device below us so that
854  * we can communicate with the base bdev on a per channel basis.  If we needed
855  * our own poller for this vbdev, we'd register it here.
856  */
857 static int
858 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
859 {
860 	struct vbdev_compress *comp_bdev = io_device;
861 
862 	/* Now set the reduce channel if it's not already set. */
863 	pthread_mutex_lock(&comp_bdev->reduce_lock);
864 	if (comp_bdev->ch_count == 0) {
865 		/* We use this queue to track outstanding IO in our layer. */
866 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
867 
868 		/* We use this to queue up compression operations as needed. */
869 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
870 
871 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
872 		comp_bdev->reduce_thread = spdk_get_thread();
873 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
874 	}
875 	comp_bdev->ch_count++;
876 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
877 
878 	return 0;
879 }
880 
881 static void
882 _channel_cleanup(struct vbdev_compress *comp_bdev)
883 {
884 	spdk_put_io_channel(comp_bdev->base_ch);
885 	spdk_put_io_channel(comp_bdev->accel_channel);
886 	comp_bdev->reduce_thread = NULL;
887 }
888 
889 /* Used to reroute destroy_ch to the correct thread */
890 static void
891 _comp_bdev_ch_destroy_cb(void *arg)
892 {
893 	struct vbdev_compress *comp_bdev = arg;
894 
895 	pthread_mutex_lock(&comp_bdev->reduce_lock);
896 	_channel_cleanup(comp_bdev);
897 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
898 }
899 
900 /* We provide this callback for the SPDK channel code to destroy a channel
901  * created with our create callback. We just need to undo anything we did
902  * when we created. If this bdev used its own poller, we'd unregister it here.
903  */
904 static void
905 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
906 {
907 	struct vbdev_compress *comp_bdev = io_device;
908 
909 	pthread_mutex_lock(&comp_bdev->reduce_lock);
910 	comp_bdev->ch_count--;
911 	if (comp_bdev->ch_count == 0) {
912 		/* Send this request to the thread where the channel was created. */
913 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
914 			spdk_thread_send_msg(comp_bdev->reduce_thread,
915 					     _comp_bdev_ch_destroy_cb, comp_bdev);
916 		} else {
917 			_channel_cleanup(comp_bdev);
918 		}
919 	}
920 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
921 }
922 
923 static int
924 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level)
925 {
926 	uint32_t min_level, max_level;
927 	int rc;
928 
929 	rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level);
930 	if (rc != 0) {
931 		return rc;
932 	}
933 
934 	/* If both min_level and max_level are 0, the compression level can be ignored.
935 	 * The back-end implementation hardcodes the compression level.
936 	 */
937 	if (min_level == 0 && max_level == 0) {
938 		return 0;
939 	}
940 
941 	if (comp_level > max_level || comp_level < min_level) {
942 		return -EINVAL;
943 	}
944 
945 	return 0;
946 }
947 
948 /* RPC entry point for compression vbdev creation. */
949 int
950 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
951 		     uint8_t comp_algo, uint32_t comp_level,
952 		     bdev_compress_create_cb cb_fn, void *cb_arg)
953 {
954 	struct vbdev_compress *comp_bdev = NULL;
955 	struct stat info;
956 	int rc;
957 
958 	if (stat(pm_path, &info) != 0) {
959 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
960 		return -EINVAL;
961 	} else if (!S_ISDIR(info.st_mode)) {
962 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
963 		return -EINVAL;
964 	}
965 
966 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
967 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
968 		return -EINVAL;
969 	}
970 
971 	rc = _check_compress_bdev_comp_algo(comp_algo, comp_level);
972 	if (rc != 0) {
973 		SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n",
974 			    comp_algo, comp_level);
975 		return rc;
976 	}
977 
978 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
979 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
980 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
981 			return -EBUSY;
982 		}
983 	}
984 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg);
985 }
986 
987 static int
988 vbdev_compress_init(void)
989 {
990 	return 0;
991 }
992 
993 /* Called when the entire module is being torn down. */
994 static void
995 vbdev_compress_finish(void)
996 {
997 	/* TODO: unload vol in a future patch */
998 }
999 
1000 /* During init we'll be asked how much memory we'd like passed to us
1001  * in bev_io structures as context. Here's where we specify how
1002  * much context we want per IO.
1003  */
1004 static int
1005 vbdev_compress_get_ctx_size(void)
1006 {
1007 	return sizeof(struct comp_bdev_io);
1008 }
1009 
1010 /* When we register our bdev this is how we specify our entry points. */
1011 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1012 	.destruct		= vbdev_compress_destruct,
1013 	.submit_request		= vbdev_compress_submit_request,
1014 	.io_type_supported	= vbdev_compress_io_type_supported,
1015 	.get_io_channel		= vbdev_compress_get_io_channel,
1016 	.dump_info_json		= vbdev_compress_dump_info_json,
1017 	.write_config_json	= NULL,
1018 };
1019 
1020 static struct spdk_bdev_module compress_if = {
1021 	.name = "compress",
1022 	.module_init = vbdev_compress_init,
1023 	.get_ctx_size = vbdev_compress_get_ctx_size,
1024 	.examine_disk = vbdev_compress_examine,
1025 	.module_fini = vbdev_compress_finish,
1026 	.config_json = vbdev_compress_config_json
1027 };
1028 
1029 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1030 
1031 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1032 {
1033 	struct spdk_bdev_alias *aliases;
1034 
1035 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1036 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1037 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1038 		if (!comp_bdev->comp_bdev.name) {
1039 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1040 			return -ENOMEM;
1041 		}
1042 	} else {
1043 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1044 		if (!comp_bdev->comp_bdev.name) {
1045 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1046 			return -ENOMEM;
1047 		}
1048 	}
1049 	return 0;
1050 }
1051 
1052 static int
1053 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1054 {
1055 	struct spdk_uuid ns_uuid;
1056 	int rc;
1057 
1058 	if (_set_compbdev_name(comp_bdev)) {
1059 		return -EINVAL;
1060 	}
1061 
1062 	/* Note: some of the fields below will change in the future - for example,
1063 	 * blockcnt specifically will not match (the compressed volume size will
1064 	 * be slightly less than the base bdev size)
1065 	 */
1066 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1067 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1068 
1069 	comp_bdev->comp_bdev.optimal_io_boundary =
1070 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1071 
1072 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1073 
1074 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1075 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1076 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1077 
1078 	/* This is the context that is passed to us when the bdev
1079 	 * layer calls in so we'll save our comp_bdev node here.
1080 	 */
1081 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1082 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1083 	comp_bdev->comp_bdev.module = &compress_if;
1084 
1085 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1086 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1087 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1088 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1089 	if (rc) {
1090 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1091 		return -EINVAL;
1092 	}
1093 
1094 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1095 
1096 	/* Save the thread where the base device is opened */
1097 	comp_bdev->thread = spdk_get_thread();
1098 
1099 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1100 				sizeof(struct comp_io_channel),
1101 				comp_bdev->comp_bdev.name);
1102 
1103 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1104 					 comp_bdev->comp_bdev.module);
1105 	if (rc) {
1106 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1107 			    spdk_strerror(-rc));
1108 		goto error_claim;
1109 	}
1110 
1111 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1112 	if (rc < 0) {
1113 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1114 		goto error_bdev_register;
1115 	}
1116 
1117 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1118 
1119 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1120 
1121 	return 0;
1122 
1123 	/* Error cleanup paths. */
1124 error_bdev_register:
1125 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1126 error_claim:
1127 	spdk_io_device_unregister(comp_bdev, NULL);
1128 	free(comp_bdev->comp_bdev.name);
1129 	return rc;
1130 }
1131 
1132 static void
1133 _vbdev_compress_delete_done(void *_ctx)
1134 {
1135 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1136 
1137 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1138 
1139 	free(ctx);
1140 }
1141 
1142 static void
1143 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1144 {
1145 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1146 
1147 	ctx->cb_rc = bdeverrno;
1148 
1149 	if (ctx->orig_thread != spdk_get_thread()) {
1150 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1151 	} else {
1152 		_vbdev_compress_delete_done(ctx);
1153 	}
1154 }
1155 
1156 void
1157 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1158 {
1159 	struct vbdev_compress *comp_bdev = NULL;
1160 	struct vbdev_comp_delete_ctx *ctx;
1161 
1162 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1163 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1164 			break;
1165 		}
1166 	}
1167 
1168 	if (comp_bdev == NULL) {
1169 		cb_fn(cb_arg, -ENODEV);
1170 		return;
1171 	}
1172 
1173 	ctx = calloc(1, sizeof(*ctx));
1174 	if (ctx == NULL) {
1175 		SPDK_ERRLOG("Failed to allocate delete context\n");
1176 		cb_fn(cb_arg, -ENOMEM);
1177 		return;
1178 	}
1179 
1180 	/* Save these for after the vol is destroyed. */
1181 	ctx->cb_fn = cb_fn;
1182 	ctx->cb_arg = cb_arg;
1183 	ctx->orig_thread = spdk_get_thread();
1184 
1185 	comp_bdev->delete_ctx = ctx;
1186 
1187 	/* Tell reducelib that we're done with this volume. */
1188 	if (comp_bdev->orphaned == false) {
1189 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1190 	} else {
1191 		delete_vol_unload_cb(comp_bdev, 0);
1192 	}
1193 }
1194 
1195 static void
1196 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1197 {
1198 }
1199 
1200 static void
1201 _vbdev_reduce_load_cb(void *ctx)
1202 {
1203 	struct vbdev_compress *comp_bdev = ctx;
1204 	int rc;
1205 
1206 	assert(comp_bdev->base_desc != NULL);
1207 
1208 	/* Done with metadata operations */
1209 	spdk_put_io_channel(comp_bdev->base_ch);
1210 
1211 	if (comp_bdev->reduce_errno == 0) {
1212 		rc = vbdev_compress_claim(comp_bdev);
1213 		if (rc != 0) {
1214 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1215 			goto err;
1216 		}
1217 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1218 		if (_set_compbdev_name(comp_bdev)) {
1219 			goto err;
1220 		}
1221 
1222 		/* Save the thread where the base device is opened */
1223 		comp_bdev->thread = spdk_get_thread();
1224 
1225 		comp_bdev->comp_bdev.module = &compress_if;
1226 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1227 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1228 						 comp_bdev->comp_bdev.module);
1229 		if (rc) {
1230 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1231 				    spdk_strerror(-rc));
1232 			free(comp_bdev->comp_bdev.name);
1233 			goto err;
1234 		}
1235 
1236 		comp_bdev->orphaned = true;
1237 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1238 	} else {
1239 		if (comp_bdev->reduce_errno != -EILSEQ) {
1240 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1241 				    spdk_strerror(-comp_bdev->reduce_errno));
1242 		}
1243 		goto err;
1244 	}
1245 
1246 	spdk_bdev_module_examine_done(&compress_if);
1247 	return;
1248 
1249 err:
1250 	/* Close the underlying bdev on its same opened thread. */
1251 	spdk_bdev_close(comp_bdev->base_desc);
1252 	free(comp_bdev);
1253 	spdk_bdev_module_examine_done(&compress_if);
1254 }
1255 
1256 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1257  * used for initial metadata operations to claim where it will be further filled out
1258  * and added to the global list.
1259  */
1260 static void
1261 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1262 {
1263 	struct vbdev_compress *comp_bdev = cb_arg;
1264 
1265 	if (reduce_errno == 0) {
1266 		/* Update information following volume load. */
1267 		comp_bdev->vol = vol;
1268 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1269 		       sizeof(struct spdk_reduce_vol_params));
1270 		comp_bdev->comp_algo = comp_bdev->params.comp_algo;
1271 		comp_bdev->comp_level = comp_bdev->params.comp_level;
1272 	}
1273 
1274 	comp_bdev->reduce_errno = reduce_errno;
1275 
1276 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1277 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1278 	} else {
1279 		_vbdev_reduce_load_cb(comp_bdev);
1280 	}
1281 
1282 }
1283 
1284 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1285  * and if so will go ahead and claim it.
1286  */
1287 static void
1288 vbdev_compress_examine(struct spdk_bdev *bdev)
1289 {
1290 	struct spdk_bdev_desc *bdev_desc = NULL;
1291 	struct vbdev_compress *comp_bdev;
1292 	int rc;
1293 
1294 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1295 		spdk_bdev_module_examine_done(&compress_if);
1296 		return;
1297 	}
1298 
1299 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1300 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1301 	if (rc) {
1302 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1303 			    spdk_strerror(-rc));
1304 		spdk_bdev_module_examine_done(&compress_if);
1305 		return;
1306 	}
1307 
1308 	comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1);
1309 	if (comp_bdev == NULL) {
1310 		spdk_bdev_close(bdev_desc);
1311 		spdk_bdev_module_examine_done(&compress_if);
1312 		return;
1313 	}
1314 
1315 	/* Save the thread where the base device is opened */
1316 	comp_bdev->thread = spdk_get_thread();
1317 
1318 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1319 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1320 }
1321 
1322 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1323