xref: /spdk/module/bdev/compress/vbdev_compress.c (revision b0ca75fce1a225a920f48a0dcedf6146a2cf5cc9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 
25 #define CHUNK_SIZE (1024 * 16)
26 #define COMP_BDEV_NAME "compress"
27 #define BACKING_IO_SZ (4 * 1024)
28 
29 /* This namespace UUID was generated using uuid_generate() method. */
30 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
31 
32 struct vbdev_comp_delete_ctx {
33 	spdk_delete_compress_complete	cb_fn;
34 	void				*cb_arg;
35 	int				cb_rc;
36 	struct spdk_thread		*orig_thread;
37 };
38 
39 /* List of virtual bdevs and associated info for each. */
40 struct vbdev_compress {
41 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
42 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
43 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
44 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
45 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
46 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
47 	struct spdk_thread		*reduce_thread;
48 	pthread_mutex_t			reduce_lock;
49 	uint32_t			ch_count;
50 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
51 	struct spdk_poller		*poller;	/* completion poller */
52 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
53 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
54 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
55 	struct vbdev_comp_delete_ctx	*delete_ctx;
56 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
57 	int				reduce_errno;
58 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
59 	TAILQ_ENTRY(vbdev_compress)	link;
60 	struct spdk_thread		*thread;	/* thread where base device is opened */
61 };
62 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
63 
64 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
65  */
66 struct comp_io_channel {
67 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
68 };
69 
70 /* Per I/O context for the compression vbdev. */
71 struct comp_bdev_io {
72 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
73 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
74 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
75 	struct spdk_bdev_io		*orig_io;		/* the original IO */
76 	struct spdk_io_channel		*ch;			/* for resubmission */
77 	int				status;			/* save for completion on orig thread */
78 };
79 
80 static void vbdev_compress_examine(struct spdk_bdev *bdev);
81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
82 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
83 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
87 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
88 
89 /* for completing rw requests on the orig IO thread. */
90 static void
91 _reduce_rw_blocks_cb(void *arg)
92 {
93 	struct comp_bdev_io *io_ctx = arg;
94 
95 	if (spdk_likely(io_ctx->status == 0)) {
96 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
97 	} else if (io_ctx->status == -ENOMEM) {
98 		vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx));
99 	} else {
100 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
101 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
102 	}
103 }
104 
105 /* Completion callback for r/w that were issued via reducelib. */
106 static void
107 reduce_rw_blocks_cb(void *arg, int reduce_errno)
108 {
109 	struct spdk_bdev_io *bdev_io = arg;
110 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
111 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
112 	struct spdk_thread *orig_thread;
113 
114 	/* TODO: need to decide which error codes are bdev_io success vs failure;
115 	 * example examine calls reading metadata */
116 
117 	io_ctx->status = reduce_errno;
118 
119 	/* Send this request to the orig IO thread. */
120 	orig_thread = spdk_io_channel_get_thread(ch);
121 
122 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
123 }
124 
125 static int
126 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
127 		    int src_iovcnt, struct iovec *dst_iovs,
128 		    int dst_iovcnt, bool compress, void *cb_arg)
129 {
130 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
131 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
132 					   backing_dev);
133 	int rc;
134 
135 	if (compress) {
136 		assert(dst_iovcnt == 1);
137 		rc = spdk_accel_submit_compress(comp_bdev->accel_channel, dst_iovs[0].iov_base, dst_iovs[0].iov_len,
138 						src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
139 						reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
140 	} else {
141 		rc = spdk_accel_submit_decompress(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
142 						  src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
143 						  reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
144 	}
145 
146 	return rc;
147 }
148 
149 /* Entry point for reduce lib to issue a compress operation. */
150 static void
151 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
152 		      struct iovec *src_iovs, int src_iovcnt,
153 		      struct iovec *dst_iovs, int dst_iovcnt,
154 		      struct spdk_reduce_vol_cb_args *cb_arg)
155 {
156 	int rc;
157 
158 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
159 	if (rc) {
160 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
161 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
162 	}
163 }
164 
165 /* Entry point for reduce lib to issue a decompress operation. */
166 static void
167 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
168 			struct iovec *src_iovs, int src_iovcnt,
169 			struct iovec *dst_iovs, int dst_iovcnt,
170 			struct spdk_reduce_vol_cb_args *cb_arg)
171 {
172 	int rc;
173 
174 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
175 	if (rc) {
176 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
177 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
178 	}
179 }
180 
181 static void
182 _comp_submit_write(void *ctx)
183 {
184 	struct spdk_bdev_io *bdev_io = ctx;
185 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
186 					   comp_bdev);
187 
188 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
189 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
190 			       reduce_rw_blocks_cb, bdev_io);
191 }
192 
193 static void
194 _comp_submit_read(void *ctx)
195 {
196 	struct spdk_bdev_io *bdev_io = ctx;
197 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
198 					   comp_bdev);
199 
200 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
201 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
202 			      reduce_rw_blocks_cb, bdev_io);
203 }
204 
205 
206 /* Callback for getting a buf from the bdev pool in the event that the caller passed
207  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
208  * beneath us before we're done with it.
209  */
210 static void
211 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
212 {
213 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
214 					   comp_bdev);
215 
216 	if (spdk_unlikely(!success)) {
217 		SPDK_ERRLOG("Failed to get data buffer\n");
218 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
219 		return;
220 	}
221 
222 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
223 }
224 
225 /* Called when someone above submits IO to this vbdev. */
226 static void
227 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
228 {
229 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
230 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
231 					   comp_bdev);
232 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
233 
234 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
235 	io_ctx->comp_bdev = comp_bdev;
236 	io_ctx->comp_ch = comp_ch;
237 	io_ctx->orig_io = bdev_io;
238 
239 	switch (bdev_io->type) {
240 	case SPDK_BDEV_IO_TYPE_READ:
241 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
242 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
243 		return;
244 	case SPDK_BDEV_IO_TYPE_WRITE:
245 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
246 		return;
247 	/* TODO support RESET in future patch in the series */
248 	case SPDK_BDEV_IO_TYPE_RESET:
249 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
250 	case SPDK_BDEV_IO_TYPE_UNMAP:
251 	case SPDK_BDEV_IO_TYPE_FLUSH:
252 	default:
253 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
254 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
255 		break;
256 	}
257 }
258 
259 static bool
260 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
261 {
262 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
263 
264 	switch (io_type) {
265 	case SPDK_BDEV_IO_TYPE_READ:
266 	case SPDK_BDEV_IO_TYPE_WRITE:
267 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
268 	case SPDK_BDEV_IO_TYPE_UNMAP:
269 	case SPDK_BDEV_IO_TYPE_RESET:
270 	case SPDK_BDEV_IO_TYPE_FLUSH:
271 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
272 	default:
273 		return false;
274 	}
275 }
276 
277 /* Resubmission function used by the bdev layer when a queued IO is ready to be
278  * submitted.
279  */
280 static void
281 vbdev_compress_resubmit_io(void *arg)
282 {
283 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
284 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
285 
286 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
287 }
288 
289 /* Used to queue an IO in the event of resource issues. */
290 static void
291 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
292 {
293 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
294 	int rc;
295 
296 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
297 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
298 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
299 
300 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
301 	if (rc) {
302 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
303 		assert(false);
304 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
305 	}
306 }
307 
308 /* Callback for unregistering the IO device. */
309 static void
310 _device_unregister_cb(void *io_device)
311 {
312 	struct vbdev_compress *comp_bdev = io_device;
313 
314 	/* Done with this comp_bdev. */
315 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
316 	free(comp_bdev->comp_bdev.name);
317 	free(comp_bdev);
318 }
319 
320 static void
321 _vbdev_compress_destruct_cb(void *ctx)
322 {
323 	struct vbdev_compress *comp_bdev = ctx;
324 
325 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
326 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
327 	/* Close the underlying bdev on its same opened thread. */
328 	spdk_bdev_close(comp_bdev->base_desc);
329 	comp_bdev->vol = NULL;
330 	if (comp_bdev->orphaned == false) {
331 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
332 	} else {
333 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
334 		_device_unregister_cb(comp_bdev);
335 	}
336 }
337 
338 static void
339 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
340 {
341 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
342 
343 	if (reduce_errno) {
344 		SPDK_ERRLOG("number %d\n", reduce_errno);
345 	} else {
346 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
347 			spdk_thread_send_msg(comp_bdev->thread,
348 					     _vbdev_compress_destruct_cb, comp_bdev);
349 		} else {
350 			_vbdev_compress_destruct_cb(comp_bdev);
351 		}
352 	}
353 }
354 
355 static void
356 _reduce_destroy_cb(void *ctx, int reduce_errno)
357 {
358 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
359 
360 	if (reduce_errno) {
361 		SPDK_ERRLOG("number %d\n", reduce_errno);
362 	}
363 
364 	comp_bdev->vol = NULL;
365 	spdk_put_io_channel(comp_bdev->base_ch);
366 	if (comp_bdev->orphaned == false) {
367 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
368 				     comp_bdev->delete_ctx);
369 	} else {
370 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
371 	}
372 
373 }
374 
375 static void
376 _delete_vol_unload_cb(void *ctx)
377 {
378 	struct vbdev_compress *comp_bdev = ctx;
379 
380 	/* FIXME: Assert if these conditions are not satisfied for now. */
381 	assert(!comp_bdev->reduce_thread ||
382 	       comp_bdev->reduce_thread == spdk_get_thread());
383 
384 	/* reducelib needs a channel to comm with the backing device */
385 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
386 
387 	/* Clean the device before we free our resources. */
388 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
389 }
390 
391 /* Called by reduceLib after performing unload vol actions */
392 static void
393 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
394 {
395 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
396 
397 	if (reduce_errno) {
398 		SPDK_ERRLOG("number %d\n", reduce_errno);
399 		/* FIXME: callback should be executed. */
400 		return;
401 	}
402 
403 	pthread_mutex_lock(&comp_bdev->reduce_lock);
404 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
405 		spdk_thread_send_msg(comp_bdev->reduce_thread,
406 				     _delete_vol_unload_cb, comp_bdev);
407 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
408 	} else {
409 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
410 
411 		_delete_vol_unload_cb(comp_bdev);
412 	}
413 }
414 
415 const char *
416 compress_get_name(const struct vbdev_compress *comp_bdev)
417 {
418 	return comp_bdev->comp_bdev.name;
419 }
420 
421 struct vbdev_compress *
422 compress_bdev_first(void)
423 {
424 	struct vbdev_compress *comp_bdev;
425 
426 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
427 
428 	return comp_bdev;
429 }
430 
431 struct vbdev_compress *
432 compress_bdev_next(struct vbdev_compress *prev)
433 {
434 	struct vbdev_compress *comp_bdev;
435 
436 	comp_bdev = TAILQ_NEXT(prev, link);
437 
438 	return comp_bdev;
439 }
440 
441 bool
442 compress_has_orphan(const char *name)
443 {
444 	struct vbdev_compress *comp_bdev;
445 
446 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
447 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
448 			return true;
449 		}
450 	}
451 	return false;
452 }
453 
454 /* Called after we've unregistered following a hot remove callback.
455  * Our finish entry point will be called next.
456  */
457 static int
458 vbdev_compress_destruct(void *ctx)
459 {
460 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
461 
462 	if (comp_bdev->vol != NULL) {
463 		/* Tell reducelib that we're done with this volume. */
464 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
465 	} else {
466 		vbdev_compress_destruct_cb(comp_bdev, 0);
467 	}
468 
469 	return 0;
470 }
471 
472 /* We supplied this as an entry point for upper layers who want to communicate to this
473  * bdev.  This is how they get a channel.
474  */
475 static struct spdk_io_channel *
476 vbdev_compress_get_io_channel(void *ctx)
477 {
478 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
479 
480 	/* The IO channel code will allocate a channel for us which consists of
481 	 * the SPDK channel structure plus the size of our comp_io_channel struct
482 	 * that we passed in when we registered our IO device. It will then call
483 	 * our channel create callback to populate any elements that we need to
484 	 * update.
485 	 */
486 	return spdk_get_io_channel(comp_bdev);
487 }
488 
489 /* This is the output for bdev_get_bdevs() for this vbdev */
490 static int
491 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
492 {
493 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
494 
495 	spdk_json_write_name(w, "compress");
496 	spdk_json_write_object_begin(w);
497 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
498 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
499 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
500 	spdk_json_write_object_end(w);
501 
502 	return 0;
503 }
504 
505 static int
506 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
507 {
508 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
509 	return 0;
510 }
511 
512 struct vbdev_init_reduce_ctx {
513 	struct vbdev_compress   *comp_bdev;
514 	int                     status;
515 	bdev_compress_create_cb cb_fn;
516 	void                    *cb_ctx;
517 };
518 
519 static void
520 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
521 {
522 }
523 
524 static void
525 _vbdev_reduce_init_cb(void *ctx)
526 {
527 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
528 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
529 	int rc;
530 
531 	assert(comp_bdev->base_desc != NULL);
532 
533 	/* We're done with metadata operations */
534 	spdk_put_io_channel(comp_bdev->base_ch);
535 
536 	if (comp_bdev->vol) {
537 		rc = vbdev_compress_claim(comp_bdev);
538 		if (rc == 0) {
539 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
540 			free(init_ctx);
541 			return;
542 		} else {
543 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
544 		}
545 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
546 	}
547 
548 	/* Close the underlying bdev on its same opened thread. */
549 	spdk_bdev_close(comp_bdev->base_desc);
550 	free(comp_bdev);
551 	free(init_ctx);
552 }
553 
554 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
555  * used for initial metadata operations to claim where it will be further filled out
556  * and added to the global list.
557  */
558 static void
559 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
560 {
561 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
562 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
563 
564 	if (reduce_errno == 0) {
565 		comp_bdev->vol = vol;
566 	} else {
567 		SPDK_ERRLOG("for vol %s, error %s\n",
568 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
569 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
570 	}
571 
572 	init_ctx->status = reduce_errno;
573 
574 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
575 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
576 	} else {
577 		_vbdev_reduce_init_cb(init_ctx);
578 	}
579 }
580 
581 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
582  * call the callback provided by reduceLib when it called the read/write/unmap function and
583  * free the bdev_io.
584  */
585 static void
586 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
587 {
588 	struct spdk_reduce_vol_cb_args *cb_args = arg;
589 	int reduce_errno;
590 
591 	if (success) {
592 		reduce_errno = 0;
593 	} else {
594 		reduce_errno = -EIO;
595 	}
596 	spdk_bdev_free_io(bdev_io);
597 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
598 }
599 
600 static void
601 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
602 				 struct spdk_reduce_backing_io *backing_io)
603 {
604 	struct spdk_bdev_io_wait_entry *waitq_entry;
605 	int rc;
606 
607 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
608 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
609 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
610 	waitq_entry->cb_arg = backing_io;
611 
612 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
613 	if (rc) {
614 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
615 		assert(false);
616 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
617 	}
618 }
619 
620 static void
621 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
622 {
623 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
624 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
625 					   backing_dev);
626 	int rc;
627 
628 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
629 				    backing_io->iov, backing_io->iovcnt,
630 				    backing_io->lba, backing_io->lba_count,
631 				    comp_reduce_io_cb,
632 				    backing_cb_args);
633 
634 	if (rc) {
635 		if (rc == -ENOMEM) {
636 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
637 			return;
638 		} else {
639 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
640 		}
641 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
642 	}
643 }
644 
645 static void
646 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
647 {
648 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
649 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
650 					   backing_dev);
651 	int rc;
652 
653 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
654 				     backing_io->iov, backing_io->iovcnt,
655 				     backing_io->lba, backing_io->lba_count,
656 				     comp_reduce_io_cb,
657 				     backing_cb_args);
658 
659 	if (rc) {
660 		if (rc == -ENOMEM) {
661 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
662 			return;
663 		} else {
664 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
665 		}
666 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
667 	}
668 }
669 
670 static void
671 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
672 {
673 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
674 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
675 					   backing_dev);
676 	int rc;
677 
678 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
679 				    backing_io->lba, backing_io->lba_count,
680 				    comp_reduce_io_cb,
681 				    backing_cb_args);
682 
683 	if (rc) {
684 		if (rc == -ENOMEM) {
685 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
686 			return;
687 		} else {
688 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
689 		}
690 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
691 	}
692 }
693 
694 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
695  * directly to the backing device.
696  */
697 static void
698 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
699 {
700 	switch (backing_io->backing_io_type) {
701 	case SPDK_REDUCE_BACKING_IO_WRITE:
702 		_comp_backing_bdev_write(backing_io);
703 		break;
704 	case SPDK_REDUCE_BACKING_IO_READ:
705 		_comp_backing_bdev_read(backing_io);
706 		break;
707 	case SPDK_REDUCE_BACKING_IO_UNMAP:
708 		_comp_backing_bdev_unmap(backing_io);
709 		break;
710 	default:
711 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
712 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
713 		break;
714 	}
715 }
716 
717 static void
718 _comp_reduce_resubmit_backing_io(void *_backing_io)
719 {
720 	struct spdk_reduce_backing_io *backing_io = _backing_io;
721 
722 	_comp_reduce_submit_backing_io(backing_io);
723 }
724 
725 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
726 static void
727 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
728 {
729 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
730 
731 	if (reduce_errno) {
732 		SPDK_ERRLOG("number %d\n", reduce_errno);
733 	}
734 
735 	comp_bdev->vol = NULL;
736 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
737 }
738 
739 static void
740 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
741 {
742 	struct vbdev_compress *comp_bdev, *tmp;
743 
744 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
745 		if (bdev_find == comp_bdev->base_bdev) {
746 			/* Tell reduceLib that we're done with this volume. */
747 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
748 		}
749 	}
750 }
751 
752 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
753 static void
754 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
755 				  void *event_ctx)
756 {
757 	switch (type) {
758 	case SPDK_BDEV_EVENT_REMOVE:
759 		vbdev_compress_base_bdev_hotremove_cb(bdev);
760 		break;
761 	default:
762 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
763 		break;
764 	}
765 }
766 
767 /* TODO: determine which parms we want user configurable, HC for now
768  * params.vol_size
769  * params.chunk_size
770  * compression PMD, algorithm, window size, comp level, etc.
771  * DEV_MD_PATH
772  */
773 
774 /* Common function for init and load to allocate and populate the minimal
775  * information for reducelib to init or load.
776  */
777 struct vbdev_compress *
778 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
779 {
780 	struct vbdev_compress *comp_bdev;
781 	struct spdk_bdev *bdev;
782 
783 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
784 	if (comp_bdev == NULL) {
785 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
786 		return NULL;
787 	}
788 
789 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
790 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
791 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
792 
793 	comp_bdev->base_desc = bdev_desc;
794 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
795 	comp_bdev->base_bdev = bdev;
796 
797 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
798 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
799 
800 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
801 
802 	comp_bdev->params.chunk_size = CHUNK_SIZE;
803 	if (lb_size == 0) {
804 		comp_bdev->params.logical_block_size = bdev->blocklen;
805 	} else {
806 		comp_bdev->params.logical_block_size = lb_size;
807 	}
808 
809 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
810 	return comp_bdev;
811 }
812 
813 /* Call reducelib to initialize a new volume */
814 static int
815 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size,
816 		  bdev_compress_create_cb cb_fn, void *cb_arg)
817 {
818 	struct spdk_bdev_desc *bdev_desc = NULL;
819 	struct vbdev_init_reduce_ctx *init_ctx;
820 	struct vbdev_compress *comp_bdev;
821 	int rc;
822 
823 	init_ctx = calloc(1, sizeof(*init_ctx));
824 	if (init_ctx == NULL) {
825 		SPDK_ERRLOG("failed to alloc init contexts\n");
826 		return - ENOMEM;
827 	}
828 
829 	init_ctx->cb_fn = cb_fn;
830 	init_ctx->cb_ctx = cb_arg;
831 
832 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
833 				NULL, &bdev_desc);
834 	if (rc) {
835 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
836 		free(init_ctx);
837 		return rc;
838 	}
839 
840 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size);
841 	if (comp_bdev == NULL) {
842 		free(init_ctx);
843 		spdk_bdev_close(bdev_desc);
844 		return -EINVAL;
845 	}
846 
847 	init_ctx->comp_bdev = comp_bdev;
848 
849 	/* Save the thread where the base device is opened */
850 	comp_bdev->thread = spdk_get_thread();
851 
852 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
853 
854 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
855 			     pm_path,
856 			     vbdev_reduce_init_cb,
857 			     init_ctx);
858 	return 0;
859 }
860 
861 /* We provide this callback for the SPDK channel code to create a channel using
862  * the channel struct we provided in our module get_io_channel() entry point. Here
863  * we get and save off an underlying base channel of the device below us so that
864  * we can communicate with the base bdev on a per channel basis.  If we needed
865  * our own poller for this vbdev, we'd register it here.
866  */
867 static int
868 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
869 {
870 	struct vbdev_compress *comp_bdev = io_device;
871 
872 	/* Now set the reduce channel if it's not already set. */
873 	pthread_mutex_lock(&comp_bdev->reduce_lock);
874 	if (comp_bdev->ch_count == 0) {
875 		/* We use this queue to track outstanding IO in our layer. */
876 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
877 
878 		/* We use this to queue up compression operations as needed. */
879 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
880 
881 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
882 		comp_bdev->reduce_thread = spdk_get_thread();
883 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
884 	}
885 	comp_bdev->ch_count++;
886 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
887 
888 	return 0;
889 }
890 
891 static void
892 _channel_cleanup(struct vbdev_compress *comp_bdev)
893 {
894 	spdk_put_io_channel(comp_bdev->base_ch);
895 	spdk_put_io_channel(comp_bdev->accel_channel);
896 	comp_bdev->reduce_thread = NULL;
897 }
898 
899 /* Used to reroute destroy_ch to the correct thread */
900 static void
901 _comp_bdev_ch_destroy_cb(void *arg)
902 {
903 	struct vbdev_compress *comp_bdev = arg;
904 
905 	pthread_mutex_lock(&comp_bdev->reduce_lock);
906 	_channel_cleanup(comp_bdev);
907 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
908 }
909 
910 /* We provide this callback for the SPDK channel code to destroy a channel
911  * created with our create callback. We just need to undo anything we did
912  * when we created. If this bdev used its own poller, we'd unregister it here.
913  */
914 static void
915 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
916 {
917 	struct vbdev_compress *comp_bdev = io_device;
918 
919 	pthread_mutex_lock(&comp_bdev->reduce_lock);
920 	comp_bdev->ch_count--;
921 	if (comp_bdev->ch_count == 0) {
922 		/* Send this request to the thread where the channel was created. */
923 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
924 			spdk_thread_send_msg(comp_bdev->reduce_thread,
925 					     _comp_bdev_ch_destroy_cb, comp_bdev);
926 		} else {
927 			_channel_cleanup(comp_bdev);
928 		}
929 	}
930 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
931 }
932 
933 /* RPC entry point for compression vbdev creation. */
934 int
935 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
936 		     bdev_compress_create_cb cb_fn, void *cb_arg)
937 {
938 	struct vbdev_compress *comp_bdev = NULL;
939 	struct stat info;
940 
941 	if (stat(pm_path, &info) != 0) {
942 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
943 		return -EINVAL;
944 	} else if (!S_ISDIR(info.st_mode)) {
945 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
946 		return -EINVAL;
947 	}
948 
949 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
950 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
951 		return -EINVAL;
952 	}
953 
954 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
955 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
956 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
957 			return -EBUSY;
958 		}
959 	}
960 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, cb_fn, cb_arg);
961 }
962 
963 static int
964 vbdev_compress_init(void)
965 {
966 	return 0;
967 }
968 
969 /* Called when the entire module is being torn down. */
970 static void
971 vbdev_compress_finish(void)
972 {
973 	/* TODO: unload vol in a future patch */
974 }
975 
976 /* During init we'll be asked how much memory we'd like passed to us
977  * in bev_io structures as context. Here's where we specify how
978  * much context we want per IO.
979  */
980 static int
981 vbdev_compress_get_ctx_size(void)
982 {
983 	return sizeof(struct comp_bdev_io);
984 }
985 
986 /* When we register our bdev this is how we specify our entry points. */
987 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
988 	.destruct		= vbdev_compress_destruct,
989 	.submit_request		= vbdev_compress_submit_request,
990 	.io_type_supported	= vbdev_compress_io_type_supported,
991 	.get_io_channel		= vbdev_compress_get_io_channel,
992 	.dump_info_json		= vbdev_compress_dump_info_json,
993 	.write_config_json	= NULL,
994 };
995 
996 static struct spdk_bdev_module compress_if = {
997 	.name = "compress",
998 	.module_init = vbdev_compress_init,
999 	.get_ctx_size = vbdev_compress_get_ctx_size,
1000 	.examine_disk = vbdev_compress_examine,
1001 	.module_fini = vbdev_compress_finish,
1002 	.config_json = vbdev_compress_config_json
1003 };
1004 
1005 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1006 
1007 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1008 {
1009 	struct spdk_bdev_alias *aliases;
1010 
1011 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1012 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1013 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1014 		if (!comp_bdev->comp_bdev.name) {
1015 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1016 			return -ENOMEM;
1017 		}
1018 	} else {
1019 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1020 		if (!comp_bdev->comp_bdev.name) {
1021 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1022 			return -ENOMEM;
1023 		}
1024 	}
1025 	return 0;
1026 }
1027 
1028 static int
1029 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1030 {
1031 	struct spdk_uuid ns_uuid;
1032 	int rc;
1033 
1034 	if (_set_compbdev_name(comp_bdev)) {
1035 		return -EINVAL;
1036 	}
1037 
1038 	/* Note: some of the fields below will change in the future - for example,
1039 	 * blockcnt specifically will not match (the compressed volume size will
1040 	 * be slightly less than the base bdev size)
1041 	 */
1042 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1043 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1044 
1045 	comp_bdev->comp_bdev.optimal_io_boundary =
1046 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1047 
1048 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1049 
1050 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1051 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1052 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1053 
1054 	/* This is the context that is passed to us when the bdev
1055 	 * layer calls in so we'll save our comp_bdev node here.
1056 	 */
1057 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1058 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1059 	comp_bdev->comp_bdev.module = &compress_if;
1060 
1061 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1062 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1063 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1064 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1065 	if (rc) {
1066 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1067 		return -EINVAL;
1068 	}
1069 
1070 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1071 
1072 	/* Save the thread where the base device is opened */
1073 	comp_bdev->thread = spdk_get_thread();
1074 
1075 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1076 				sizeof(struct comp_io_channel),
1077 				comp_bdev->comp_bdev.name);
1078 
1079 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1080 					 comp_bdev->comp_bdev.module);
1081 	if (rc) {
1082 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1083 			    spdk_strerror(-rc));
1084 		goto error_claim;
1085 	}
1086 
1087 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1088 	if (rc < 0) {
1089 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1090 		goto error_bdev_register;
1091 	}
1092 
1093 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1094 
1095 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1096 
1097 	return 0;
1098 
1099 	/* Error cleanup paths. */
1100 error_bdev_register:
1101 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1102 error_claim:
1103 	spdk_io_device_unregister(comp_bdev, NULL);
1104 	free(comp_bdev->comp_bdev.name);
1105 	return rc;
1106 }
1107 
1108 static void
1109 _vbdev_compress_delete_done(void *_ctx)
1110 {
1111 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1112 
1113 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1114 
1115 	free(ctx);
1116 }
1117 
1118 static void
1119 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1120 {
1121 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1122 
1123 	ctx->cb_rc = bdeverrno;
1124 
1125 	if (ctx->orig_thread != spdk_get_thread()) {
1126 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1127 	} else {
1128 		_vbdev_compress_delete_done(ctx);
1129 	}
1130 }
1131 
1132 void
1133 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1134 {
1135 	struct vbdev_compress *comp_bdev = NULL;
1136 	struct vbdev_comp_delete_ctx *ctx;
1137 
1138 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1139 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1140 			break;
1141 		}
1142 	}
1143 
1144 	if (comp_bdev == NULL) {
1145 		cb_fn(cb_arg, -ENODEV);
1146 		return;
1147 	}
1148 
1149 	ctx = calloc(1, sizeof(*ctx));
1150 	if (ctx == NULL) {
1151 		SPDK_ERRLOG("Failed to allocate delete context\n");
1152 		cb_fn(cb_arg, -ENOMEM);
1153 		return;
1154 	}
1155 
1156 	/* Save these for after the vol is destroyed. */
1157 	ctx->cb_fn = cb_fn;
1158 	ctx->cb_arg = cb_arg;
1159 	ctx->orig_thread = spdk_get_thread();
1160 
1161 	comp_bdev->delete_ctx = ctx;
1162 
1163 	/* Tell reducelib that we're done with this volume. */
1164 	if (comp_bdev->orphaned == false) {
1165 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1166 	} else {
1167 		delete_vol_unload_cb(comp_bdev, 0);
1168 	}
1169 }
1170 
1171 static void
1172 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1173 {
1174 }
1175 
1176 static void
1177 _vbdev_reduce_load_cb(void *ctx)
1178 {
1179 	struct vbdev_compress *comp_bdev = ctx;
1180 	int rc;
1181 
1182 	assert(comp_bdev->base_desc != NULL);
1183 
1184 	/* Done with metadata operations */
1185 	spdk_put_io_channel(comp_bdev->base_ch);
1186 
1187 	if (comp_bdev->reduce_errno == 0) {
1188 		rc = vbdev_compress_claim(comp_bdev);
1189 		if (rc != 0) {
1190 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1191 			goto err;
1192 		}
1193 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1194 		if (_set_compbdev_name(comp_bdev)) {
1195 			goto err;
1196 		}
1197 
1198 		/* Save the thread where the base device is opened */
1199 		comp_bdev->thread = spdk_get_thread();
1200 
1201 		comp_bdev->comp_bdev.module = &compress_if;
1202 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1203 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1204 						 comp_bdev->comp_bdev.module);
1205 		if (rc) {
1206 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1207 				    spdk_strerror(-rc));
1208 			free(comp_bdev->comp_bdev.name);
1209 			goto err;
1210 		}
1211 
1212 		comp_bdev->orphaned = true;
1213 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1214 	} else {
1215 		if (comp_bdev->reduce_errno != -EILSEQ) {
1216 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1217 				    spdk_strerror(-comp_bdev->reduce_errno));
1218 		}
1219 		goto err;
1220 	}
1221 
1222 	spdk_bdev_module_examine_done(&compress_if);
1223 	return;
1224 
1225 err:
1226 	/* Close the underlying bdev on its same opened thread. */
1227 	spdk_bdev_close(comp_bdev->base_desc);
1228 	free(comp_bdev);
1229 	spdk_bdev_module_examine_done(&compress_if);
1230 }
1231 
1232 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1233  * used for initial metadata operations to claim where it will be further filled out
1234  * and added to the global list.
1235  */
1236 static void
1237 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1238 {
1239 	struct vbdev_compress *comp_bdev = cb_arg;
1240 
1241 	if (reduce_errno == 0) {
1242 		/* Update information following volume load. */
1243 		comp_bdev->vol = vol;
1244 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1245 		       sizeof(struct spdk_reduce_vol_params));
1246 	}
1247 
1248 	comp_bdev->reduce_errno = reduce_errno;
1249 
1250 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1251 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1252 	} else {
1253 		_vbdev_reduce_load_cb(comp_bdev);
1254 	}
1255 
1256 }
1257 
1258 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1259  * and if so will go ahead and claim it.
1260  */
1261 static void
1262 vbdev_compress_examine(struct spdk_bdev *bdev)
1263 {
1264 	struct spdk_bdev_desc *bdev_desc = NULL;
1265 	struct vbdev_compress *comp_bdev;
1266 	int rc;
1267 
1268 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1269 		spdk_bdev_module_examine_done(&compress_if);
1270 		return;
1271 	}
1272 
1273 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1274 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1275 	if (rc) {
1276 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1277 			    spdk_strerror(-rc));
1278 		spdk_bdev_module_examine_done(&compress_if);
1279 		return;
1280 	}
1281 
1282 	comp_bdev = _prepare_for_load_init(bdev_desc, 0);
1283 	if (comp_bdev == NULL) {
1284 		spdk_bdev_close(bdev_desc);
1285 		spdk_bdev_module_examine_done(&compress_if);
1286 		return;
1287 	}
1288 
1289 	/* Save the thread where the base device is opened */
1290 	comp_bdev->thread = spdk_get_thread();
1291 
1292 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1293 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1294 }
1295 
1296 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1297