xref: /spdk/module/bdev/compress/vbdev_compress.c (revision ddd4603ceb2154dd59f14c6f2851f5f8cd1711c4)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 #define CHUNK_SIZE (1024 * 16)
25 #define COMP_BDEV_NAME "compress"
26 #define BACKING_IO_SZ (4 * 1024)
27 
28 /* This namespace UUID was generated using uuid_generate() method. */
29 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
30 
31 struct vbdev_comp_delete_ctx {
32 	spdk_delete_compress_complete	cb_fn;
33 	void				*cb_arg;
34 	int				cb_rc;
35 	struct spdk_thread		*orig_thread;
36 };
37 
38 /* List of virtual bdevs and associated info for each. */
39 struct vbdev_compress {
40 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
41 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
42 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
43 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
44 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
45 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
46 	struct spdk_thread		*reduce_thread;
47 	pthread_mutex_t			reduce_lock;
48 	uint32_t			ch_count;
49 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
50 	struct spdk_poller		*poller;	/* completion poller */
51 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
52 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
53 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
54 	struct vbdev_comp_delete_ctx	*delete_ctx;
55 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
56 	int				reduce_errno;
57 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
58 	TAILQ_ENTRY(vbdev_compress)	link;
59 	struct spdk_thread		*thread;	/* thread where base device is opened */
60 	enum spdk_accel_comp_algo       comp_algo;      /* compression algorithm for compress bdev */
61 	uint32_t                        comp_level;     /* compression algorithm level */
62 };
63 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
64 
65 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
66  */
67 struct comp_io_channel {
68 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
69 };
70 
71 /* Per I/O context for the compression vbdev. */
72 struct comp_bdev_io {
73 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
74 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
75 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
76 	struct spdk_bdev_io		*orig_io;		/* the original IO */
77 	struct spdk_io_channel		*ch;			/* for resubmission */
78 	int				status;			/* save for completion on orig thread */
79 };
80 
81 static void vbdev_compress_examine(struct spdk_bdev *bdev);
82 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
83 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
84 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size,
85 		uint8_t comp_algo, uint32_t comp_level);
86 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
87 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
88 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
89 static void _comp_reduce_resubmit_backing_io(void *_backing_io);
90 
91 /* for completing rw requests on the orig IO thread. */
92 static void
93 _reduce_rw_blocks_cb(void *arg)
94 {
95 	struct comp_bdev_io *io_ctx = arg;
96 
97 	if (spdk_likely(io_ctx->status == 0)) {
98 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
99 	} else if (io_ctx->status == -ENOMEM) {
100 		vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx));
101 	} else {
102 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
103 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
104 	}
105 }
106 
107 /* Completion callback for r/w that were issued via reducelib. */
108 static void
109 reduce_rw_blocks_cb(void *arg, int reduce_errno)
110 {
111 	struct spdk_bdev_io *bdev_io = arg;
112 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
113 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
114 	struct spdk_thread *orig_thread;
115 
116 	/* TODO: need to decide which error codes are bdev_io success vs failure;
117 	 * example examine calls reading metadata */
118 
119 	io_ctx->status = reduce_errno;
120 
121 	/* Send this request to the orig IO thread. */
122 	orig_thread = spdk_io_channel_get_thread(ch);
123 
124 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
125 }
126 
127 static int
128 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
129 		    int src_iovcnt, struct iovec *dst_iovs,
130 		    int dst_iovcnt, bool compress, void *cb_arg)
131 {
132 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
133 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
134 					   backing_dev);
135 	int rc;
136 
137 	if (compress) {
138 		assert(dst_iovcnt == 1);
139 		rc = spdk_accel_submit_compress_ext(comp_bdev->accel_channel, dst_iovs[0].iov_base,
140 						    dst_iovs[0].iov_len, src_iovs, src_iovcnt,
141 						    comp_bdev->comp_algo, comp_bdev->comp_level,
142 						    &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
143 						    reduce_cb_arg->cb_arg);
144 	} else {
145 		rc = spdk_accel_submit_decompress_ext(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
146 						      src_iovs, src_iovcnt, comp_bdev->comp_algo,
147 						      &reduce_cb_arg->output_size, reduce_cb_arg->cb_fn,
148 						      reduce_cb_arg->cb_arg);
149 	}
150 
151 	return rc;
152 }
153 
154 /* Entry point for reduce lib to issue a compress operation. */
155 static void
156 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
157 		      struct iovec *src_iovs, int src_iovcnt,
158 		      struct iovec *dst_iovs, int dst_iovcnt,
159 		      struct spdk_reduce_vol_cb_args *cb_arg)
160 {
161 	int rc;
162 
163 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
164 	if (rc) {
165 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
166 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
167 	}
168 }
169 
170 /* Entry point for reduce lib to issue a decompress operation. */
171 static void
172 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
173 			struct iovec *src_iovs, int src_iovcnt,
174 			struct iovec *dst_iovs, int dst_iovcnt,
175 			struct spdk_reduce_vol_cb_args *cb_arg)
176 {
177 	int rc;
178 
179 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
180 	if (rc) {
181 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
182 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
183 	}
184 }
185 
186 static void
187 _comp_submit_write(void *ctx)
188 {
189 	struct spdk_bdev_io *bdev_io = ctx;
190 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
191 					   comp_bdev);
192 
193 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
194 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
195 			       reduce_rw_blocks_cb, bdev_io);
196 }
197 
198 static void
199 _comp_submit_read(void *ctx)
200 {
201 	struct spdk_bdev_io *bdev_io = ctx;
202 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
203 					   comp_bdev);
204 
205 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
206 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
207 			      reduce_rw_blocks_cb, bdev_io);
208 }
209 
210 
211 /* Callback for getting a buf from the bdev pool in the event that the caller passed
212  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
213  * beneath us before we're done with it.
214  */
215 static void
216 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
217 {
218 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
219 					   comp_bdev);
220 
221 	if (spdk_unlikely(!success)) {
222 		SPDK_ERRLOG("Failed to get data buffer\n");
223 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
224 		return;
225 	}
226 
227 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
228 }
229 
230 /* Called when someone above submits IO to this vbdev. */
231 static void
232 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
233 {
234 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
235 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
236 					   comp_bdev);
237 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
238 
239 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
240 	io_ctx->comp_bdev = comp_bdev;
241 	io_ctx->comp_ch = comp_ch;
242 	io_ctx->orig_io = bdev_io;
243 
244 	switch (bdev_io->type) {
245 	case SPDK_BDEV_IO_TYPE_READ:
246 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
247 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
248 		return;
249 	case SPDK_BDEV_IO_TYPE_WRITE:
250 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
251 		return;
252 	/* TODO support RESET in future patch in the series */
253 	case SPDK_BDEV_IO_TYPE_RESET:
254 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
255 	case SPDK_BDEV_IO_TYPE_UNMAP:
256 	case SPDK_BDEV_IO_TYPE_FLUSH:
257 	default:
258 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
259 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
260 		break;
261 	}
262 }
263 
264 static bool
265 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
266 {
267 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
268 
269 	switch (io_type) {
270 	case SPDK_BDEV_IO_TYPE_READ:
271 	case SPDK_BDEV_IO_TYPE_WRITE:
272 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
273 	case SPDK_BDEV_IO_TYPE_UNMAP:
274 	case SPDK_BDEV_IO_TYPE_RESET:
275 	case SPDK_BDEV_IO_TYPE_FLUSH:
276 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
277 	default:
278 		return false;
279 	}
280 }
281 
282 /* Resubmission function used by the bdev layer when a queued IO is ready to be
283  * submitted.
284  */
285 static void
286 vbdev_compress_resubmit_io(void *arg)
287 {
288 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
289 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
290 
291 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
292 }
293 
294 /* Used to queue an IO in the event of resource issues. */
295 static void
296 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
297 {
298 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
299 	int rc;
300 
301 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
302 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
303 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
304 
305 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
306 	if (rc) {
307 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
308 		assert(false);
309 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
310 	}
311 }
312 
313 /* Callback for unregistering the IO device. */
314 static void
315 _device_unregister_cb(void *io_device)
316 {
317 	struct vbdev_compress *comp_bdev = io_device;
318 
319 	/* Done with this comp_bdev. */
320 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
321 	free(comp_bdev->comp_bdev.name);
322 	free(comp_bdev);
323 }
324 
325 static void
326 _vbdev_compress_destruct_cb(void *ctx)
327 {
328 	struct vbdev_compress *comp_bdev = ctx;
329 
330 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
331 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
332 	/* Close the underlying bdev on its same opened thread. */
333 	spdk_bdev_close(comp_bdev->base_desc);
334 	comp_bdev->vol = NULL;
335 	if (comp_bdev->orphaned == false) {
336 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
337 	} else {
338 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
339 		_device_unregister_cb(comp_bdev);
340 	}
341 }
342 
343 static void
344 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
345 {
346 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
347 
348 	if (reduce_errno) {
349 		SPDK_ERRLOG("number %d\n", reduce_errno);
350 	} else {
351 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
352 			spdk_thread_send_msg(comp_bdev->thread,
353 					     _vbdev_compress_destruct_cb, comp_bdev);
354 		} else {
355 			_vbdev_compress_destruct_cb(comp_bdev);
356 		}
357 	}
358 }
359 
360 static void
361 _reduce_destroy_cb(void *ctx, int reduce_errno)
362 {
363 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
364 
365 	if (reduce_errno) {
366 		SPDK_ERRLOG("number %d\n", reduce_errno);
367 	}
368 
369 	comp_bdev->vol = NULL;
370 	spdk_put_io_channel(comp_bdev->base_ch);
371 	if (comp_bdev->orphaned == false) {
372 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
373 				     comp_bdev->delete_ctx);
374 	} else {
375 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
376 	}
377 
378 }
379 
380 static void
381 _delete_vol_unload_cb(void *ctx)
382 {
383 	struct vbdev_compress *comp_bdev = ctx;
384 
385 	/* FIXME: Assert if these conditions are not satisfied for now. */
386 	assert(!comp_bdev->reduce_thread ||
387 	       comp_bdev->reduce_thread == spdk_get_thread());
388 
389 	/* reducelib needs a channel to comm with the backing device */
390 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
391 
392 	/* Clean the device before we free our resources. */
393 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
394 }
395 
396 /* Called by reduceLib after performing unload vol actions */
397 static void
398 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
399 {
400 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
401 
402 	if (reduce_errno) {
403 		SPDK_ERRLOG("number %d\n", reduce_errno);
404 		/* FIXME: callback should be executed. */
405 		return;
406 	}
407 
408 	pthread_mutex_lock(&comp_bdev->reduce_lock);
409 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
410 		spdk_thread_send_msg(comp_bdev->reduce_thread,
411 				     _delete_vol_unload_cb, comp_bdev);
412 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
413 	} else {
414 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
415 
416 		_delete_vol_unload_cb(comp_bdev);
417 	}
418 }
419 
420 const char *
421 compress_get_name(const struct vbdev_compress *comp_bdev)
422 {
423 	return comp_bdev->comp_bdev.name;
424 }
425 
426 struct vbdev_compress *
427 compress_bdev_first(void)
428 {
429 	struct vbdev_compress *comp_bdev;
430 
431 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
432 
433 	return comp_bdev;
434 }
435 
436 struct vbdev_compress *
437 compress_bdev_next(struct vbdev_compress *prev)
438 {
439 	struct vbdev_compress *comp_bdev;
440 
441 	comp_bdev = TAILQ_NEXT(prev, link);
442 
443 	return comp_bdev;
444 }
445 
446 bool
447 compress_has_orphan(const char *name)
448 {
449 	struct vbdev_compress *comp_bdev;
450 
451 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
452 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
453 			return true;
454 		}
455 	}
456 	return false;
457 }
458 
459 /* Called after we've unregistered following a hot remove callback.
460  * Our finish entry point will be called next.
461  */
462 static int
463 vbdev_compress_destruct(void *ctx)
464 {
465 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
466 
467 	if (comp_bdev->vol != NULL) {
468 		/* Tell reducelib that we're done with this volume. */
469 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
470 	} else {
471 		vbdev_compress_destruct_cb(comp_bdev, 0);
472 	}
473 
474 	return 0;
475 }
476 
477 /* We supplied this as an entry point for upper layers who want to communicate to this
478  * bdev.  This is how they get a channel.
479  */
480 static struct spdk_io_channel *
481 vbdev_compress_get_io_channel(void *ctx)
482 {
483 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
484 
485 	/* The IO channel code will allocate a channel for us which consists of
486 	 * the SPDK channel structure plus the size of our comp_io_channel struct
487 	 * that we passed in when we registered our IO device. It will then call
488 	 * our channel create callback to populate any elements that we need to
489 	 * update.
490 	 */
491 	return spdk_get_io_channel(comp_bdev);
492 }
493 
494 /* This is the output for bdev_get_bdevs() for this vbdev */
495 static int
496 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
497 {
498 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
499 
500 	spdk_json_write_name(w, "compress");
501 	spdk_json_write_object_begin(w);
502 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
503 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
504 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
505 	spdk_json_write_object_end(w);
506 
507 	return 0;
508 }
509 
510 static int
511 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
512 {
513 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
514 	return 0;
515 }
516 
517 struct vbdev_init_reduce_ctx {
518 	struct vbdev_compress   *comp_bdev;
519 	int                     status;
520 	bdev_compress_create_cb cb_fn;
521 	void                    *cb_ctx;
522 };
523 
524 static void
525 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
526 {
527 }
528 
529 static void
530 _vbdev_reduce_init_cb(void *ctx)
531 {
532 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
533 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
534 	int rc;
535 
536 	assert(comp_bdev->base_desc != NULL);
537 
538 	/* We're done with metadata operations */
539 	spdk_put_io_channel(comp_bdev->base_ch);
540 
541 	if (comp_bdev->vol) {
542 		rc = vbdev_compress_claim(comp_bdev);
543 		if (rc == 0) {
544 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
545 			free(init_ctx);
546 			return;
547 		} else {
548 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
549 		}
550 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
551 	}
552 
553 	/* Close the underlying bdev on its same opened thread. */
554 	spdk_bdev_close(comp_bdev->base_desc);
555 	free(comp_bdev);
556 	free(init_ctx);
557 }
558 
559 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
560  * used for initial metadata operations to claim where it will be further filled out
561  * and added to the global list.
562  */
563 static void
564 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
565 {
566 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
567 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
568 
569 	if (reduce_errno == 0) {
570 		comp_bdev->vol = vol;
571 	} else {
572 		SPDK_ERRLOG("for vol %s, error %s\n",
573 			    spdk_bdev_get_name(comp_bdev->base_bdev), spdk_strerror(-reduce_errno));
574 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
575 	}
576 
577 	init_ctx->status = reduce_errno;
578 
579 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
580 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
581 	} else {
582 		_vbdev_reduce_init_cb(init_ctx);
583 	}
584 }
585 
586 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
587  * call the callback provided by reduceLib when it called the read/write/unmap function and
588  * free the bdev_io.
589  */
590 static void
591 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
592 {
593 	struct spdk_reduce_vol_cb_args *cb_args = arg;
594 	int reduce_errno;
595 
596 	if (success) {
597 		reduce_errno = 0;
598 	} else {
599 		reduce_errno = -EIO;
600 	}
601 	spdk_bdev_free_io(bdev_io);
602 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
603 }
604 
605 static void
606 _comp_backing_bdev_queue_io_wait(struct vbdev_compress *comp_bdev,
607 				 struct spdk_reduce_backing_io *backing_io)
608 {
609 	struct spdk_bdev_io_wait_entry *waitq_entry;
610 	int rc;
611 
612 	waitq_entry = (struct spdk_bdev_io_wait_entry *) &backing_io->user_ctx;
613 	waitq_entry->bdev = spdk_bdev_desc_get_bdev(comp_bdev->base_desc);
614 	waitq_entry->cb_fn = _comp_reduce_resubmit_backing_io;
615 	waitq_entry->cb_arg = backing_io;
616 
617 	rc = spdk_bdev_queue_io_wait(waitq_entry->bdev, comp_bdev->base_ch, waitq_entry);
618 	if (rc) {
619 		SPDK_ERRLOG("Queue io failed in _comp_backing_bdev_queue_io_wait, rc=%d.\n", rc);
620 		assert(false);
621 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, rc);
622 	}
623 }
624 
625 static void
626 _comp_backing_bdev_read(struct spdk_reduce_backing_io *backing_io)
627 {
628 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
629 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
630 					   backing_dev);
631 	int rc;
632 
633 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
634 				    backing_io->iov, backing_io->iovcnt,
635 				    backing_io->lba, backing_io->lba_count,
636 				    comp_reduce_io_cb,
637 				    backing_cb_args);
638 
639 	if (rc) {
640 		if (rc == -ENOMEM) {
641 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
642 			return;
643 		} else {
644 			SPDK_ERRLOG("submitting readv request, rc=%d\n", rc);
645 		}
646 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
647 	}
648 }
649 
650 static void
651 _comp_backing_bdev_write(struct spdk_reduce_backing_io  *backing_io)
652 {
653 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
654 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
655 					   backing_dev);
656 	int rc;
657 
658 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
659 				     backing_io->iov, backing_io->iovcnt,
660 				     backing_io->lba, backing_io->lba_count,
661 				     comp_reduce_io_cb,
662 				     backing_cb_args);
663 
664 	if (rc) {
665 		if (rc == -ENOMEM) {
666 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
667 			return;
668 		} else {
669 			SPDK_ERRLOG("error submitting writev request, rc=%d\n", rc);
670 		}
671 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
672 	}
673 }
674 
675 static void
676 _comp_backing_bdev_unmap(struct spdk_reduce_backing_io *backing_io)
677 {
678 	struct spdk_reduce_vol_cb_args *backing_cb_args = backing_io->backing_cb_args;
679 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_io->dev, struct vbdev_compress,
680 					   backing_dev);
681 	int rc;
682 
683 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
684 				    backing_io->lba, backing_io->lba_count,
685 				    comp_reduce_io_cb,
686 				    backing_cb_args);
687 
688 	if (rc) {
689 		if (rc == -ENOMEM) {
690 			_comp_backing_bdev_queue_io_wait(comp_bdev, backing_io);
691 			return;
692 		} else {
693 			SPDK_ERRLOG("submitting unmap request, rc=%d\n", rc);
694 		}
695 		backing_cb_args->cb_fn(backing_cb_args->cb_arg, rc);
696 	}
697 }
698 
699 /* This is the function provided to the reduceLib for sending reads/writes/unmaps
700  * directly to the backing device.
701  */
702 static void
703 _comp_reduce_submit_backing_io(struct spdk_reduce_backing_io *backing_io)
704 {
705 	switch (backing_io->backing_io_type) {
706 	case SPDK_REDUCE_BACKING_IO_WRITE:
707 		_comp_backing_bdev_write(backing_io);
708 		break;
709 	case SPDK_REDUCE_BACKING_IO_READ:
710 		_comp_backing_bdev_read(backing_io);
711 		break;
712 	case SPDK_REDUCE_BACKING_IO_UNMAP:
713 		_comp_backing_bdev_unmap(backing_io);
714 		break;
715 	default:
716 		SPDK_ERRLOG("Unknown I/O type %d\n", backing_io->backing_io_type);
717 		backing_io->backing_cb_args->cb_fn(backing_io->backing_cb_args->cb_arg, -EINVAL);
718 		break;
719 	}
720 }
721 
722 static void
723 _comp_reduce_resubmit_backing_io(void *_backing_io)
724 {
725 	struct spdk_reduce_backing_io *backing_io = _backing_io;
726 
727 	_comp_reduce_submit_backing_io(backing_io);
728 }
729 
730 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
731 static void
732 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
733 {
734 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
735 
736 	if (reduce_errno) {
737 		SPDK_ERRLOG("number %d\n", reduce_errno);
738 	}
739 
740 	comp_bdev->vol = NULL;
741 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
742 }
743 
744 static void
745 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
746 {
747 	struct vbdev_compress *comp_bdev, *tmp;
748 
749 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
750 		if (bdev_find == comp_bdev->base_bdev) {
751 			/* Tell reduceLib that we're done with this volume. */
752 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
753 		}
754 	}
755 }
756 
757 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
758 static void
759 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
760 				  void *event_ctx)
761 {
762 	switch (type) {
763 	case SPDK_BDEV_EVENT_REMOVE:
764 		vbdev_compress_base_bdev_hotremove_cb(bdev);
765 		break;
766 	default:
767 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
768 		break;
769 	}
770 }
771 
772 /* TODO: determine which parms we want user configurable, HC for now
773  * params.vol_size
774  * params.chunk_size
775  * compression PMD, algorithm, window size, comp level, etc.
776  * DEV_MD_PATH
777  */
778 
779 /* Common function for init and load to allocate and populate the minimal
780  * information for reducelib to init or load.
781  */
782 struct vbdev_compress *
783 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size, uint8_t comp_algo,
784 		       uint32_t comp_level)
785 {
786 	struct vbdev_compress *comp_bdev;
787 	struct spdk_bdev *bdev;
788 
789 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
790 	if (comp_bdev == NULL) {
791 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
792 		return NULL;
793 	}
794 
795 	comp_bdev->backing_dev.submit_backing_io = _comp_reduce_submit_backing_io;
796 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
797 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
798 
799 	comp_bdev->base_desc = bdev_desc;
800 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
801 	comp_bdev->base_bdev = bdev;
802 
803 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
804 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
805 
806 	comp_bdev->backing_dev.user_ctx_size = sizeof(struct spdk_bdev_io_wait_entry);
807 
808 	comp_bdev->comp_algo = comp_algo;
809 	comp_bdev->comp_level = comp_level;
810 	comp_bdev->params.comp_algo = comp_algo;
811 	comp_bdev->params.comp_level = comp_level;
812 	comp_bdev->params.chunk_size = CHUNK_SIZE;
813 	if (lb_size == 0) {
814 		comp_bdev->params.logical_block_size = bdev->blocklen;
815 	} else {
816 		comp_bdev->params.logical_block_size = lb_size;
817 	}
818 
819 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
820 	return comp_bdev;
821 }
822 
823 /* Call reducelib to initialize a new volume */
824 static int
825 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size, uint8_t comp_algo,
826 		  uint32_t comp_level, bdev_compress_create_cb cb_fn, void *cb_arg)
827 {
828 	struct spdk_bdev_desc *bdev_desc = NULL;
829 	struct vbdev_init_reduce_ctx *init_ctx;
830 	struct vbdev_compress *comp_bdev;
831 	int rc;
832 
833 	init_ctx = calloc(1, sizeof(*init_ctx));
834 	if (init_ctx == NULL) {
835 		SPDK_ERRLOG("failed to alloc init contexts\n");
836 		return - ENOMEM;
837 	}
838 
839 	init_ctx->cb_fn = cb_fn;
840 	init_ctx->cb_ctx = cb_arg;
841 
842 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
843 				NULL, &bdev_desc);
844 	if (rc) {
845 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
846 		free(init_ctx);
847 		return rc;
848 	}
849 
850 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size, comp_algo, comp_level);
851 	if (comp_bdev == NULL) {
852 		free(init_ctx);
853 		spdk_bdev_close(bdev_desc);
854 		return -EINVAL;
855 	}
856 
857 	init_ctx->comp_bdev = comp_bdev;
858 
859 	/* Save the thread where the base device is opened */
860 	comp_bdev->thread = spdk_get_thread();
861 
862 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
863 
864 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
865 			     pm_path,
866 			     vbdev_reduce_init_cb,
867 			     init_ctx);
868 	return 0;
869 }
870 
871 /* We provide this callback for the SPDK channel code to create a channel using
872  * the channel struct we provided in our module get_io_channel() entry point. Here
873  * we get and save off an underlying base channel of the device below us so that
874  * we can communicate with the base bdev on a per channel basis.  If we needed
875  * our own poller for this vbdev, we'd register it here.
876  */
877 static int
878 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
879 {
880 	struct vbdev_compress *comp_bdev = io_device;
881 
882 	/* Now set the reduce channel if it's not already set. */
883 	pthread_mutex_lock(&comp_bdev->reduce_lock);
884 	if (comp_bdev->ch_count == 0) {
885 		/* We use this queue to track outstanding IO in our layer. */
886 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
887 
888 		/* We use this to queue up compression operations as needed. */
889 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
890 
891 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
892 		comp_bdev->reduce_thread = spdk_get_thread();
893 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
894 	}
895 	comp_bdev->ch_count++;
896 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
897 
898 	return 0;
899 }
900 
901 static void
902 _channel_cleanup(struct vbdev_compress *comp_bdev)
903 {
904 	spdk_put_io_channel(comp_bdev->base_ch);
905 	spdk_put_io_channel(comp_bdev->accel_channel);
906 	comp_bdev->reduce_thread = NULL;
907 }
908 
909 /* Used to reroute destroy_ch to the correct thread */
910 static void
911 _comp_bdev_ch_destroy_cb(void *arg)
912 {
913 	struct vbdev_compress *comp_bdev = arg;
914 
915 	pthread_mutex_lock(&comp_bdev->reduce_lock);
916 	_channel_cleanup(comp_bdev);
917 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
918 }
919 
920 /* We provide this callback for the SPDK channel code to destroy a channel
921  * created with our create callback. We just need to undo anything we did
922  * when we created. If this bdev used its own poller, we'd unregister it here.
923  */
924 static void
925 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
926 {
927 	struct vbdev_compress *comp_bdev = io_device;
928 
929 	pthread_mutex_lock(&comp_bdev->reduce_lock);
930 	comp_bdev->ch_count--;
931 	if (comp_bdev->ch_count == 0) {
932 		/* Send this request to the thread where the channel was created. */
933 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
934 			spdk_thread_send_msg(comp_bdev->reduce_thread,
935 					     _comp_bdev_ch_destroy_cb, comp_bdev);
936 		} else {
937 			_channel_cleanup(comp_bdev);
938 		}
939 	}
940 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
941 }
942 
943 static int
944 _check_compress_bdev_comp_algo(enum spdk_accel_comp_algo algo, uint32_t comp_level)
945 {
946 	uint32_t min_level, max_level;
947 	int rc;
948 
949 	rc = spdk_accel_get_compress_level_range(algo, &min_level, &max_level);
950 	if (rc != 0) {
951 		return rc;
952 	}
953 
954 	/* If both min_level and max_level are 0, the compression level can be ignored.
955 	 * The back-end implementation hardcodes the compression level.
956 	 */
957 	if (min_level == 0 && max_level == 0) {
958 		return 0;
959 	}
960 
961 	if (comp_level > max_level || comp_level < min_level) {
962 		return -EINVAL;
963 	}
964 
965 	return 0;
966 }
967 
968 /* RPC entry point for compression vbdev creation. */
969 int
970 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
971 		     uint8_t comp_algo, uint32_t comp_level,
972 		     bdev_compress_create_cb cb_fn, void *cb_arg)
973 {
974 	struct vbdev_compress *comp_bdev = NULL;
975 	struct stat info;
976 	int rc;
977 
978 	if (stat(pm_path, &info) != 0) {
979 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
980 		return -EINVAL;
981 	} else if (!S_ISDIR(info.st_mode)) {
982 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
983 		return -EINVAL;
984 	}
985 
986 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
987 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
988 		return -EINVAL;
989 	}
990 
991 	rc = _check_compress_bdev_comp_algo(comp_algo, comp_level);
992 	if (rc != 0) {
993 		SPDK_ERRLOG("Compress bdev doesn't support compression algo(%u) or level(%u)\n",
994 			    comp_algo, comp_level);
995 		return rc;
996 	}
997 
998 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
999 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
1000 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
1001 			return -EBUSY;
1002 		}
1003 	}
1004 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, comp_algo, comp_level, cb_fn, cb_arg);
1005 }
1006 
1007 static int
1008 vbdev_compress_init(void)
1009 {
1010 	return 0;
1011 }
1012 
1013 /* Called when the entire module is being torn down. */
1014 static void
1015 vbdev_compress_finish(void)
1016 {
1017 	/* TODO: unload vol in a future patch */
1018 }
1019 
1020 /* During init we'll be asked how much memory we'd like passed to us
1021  * in bev_io structures as context. Here's where we specify how
1022  * much context we want per IO.
1023  */
1024 static int
1025 vbdev_compress_get_ctx_size(void)
1026 {
1027 	return sizeof(struct comp_bdev_io);
1028 }
1029 
1030 /* When we register our bdev this is how we specify our entry points. */
1031 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1032 	.destruct		= vbdev_compress_destruct,
1033 	.submit_request		= vbdev_compress_submit_request,
1034 	.io_type_supported	= vbdev_compress_io_type_supported,
1035 	.get_io_channel		= vbdev_compress_get_io_channel,
1036 	.dump_info_json		= vbdev_compress_dump_info_json,
1037 	.write_config_json	= NULL,
1038 };
1039 
1040 static struct spdk_bdev_module compress_if = {
1041 	.name = "compress",
1042 	.module_init = vbdev_compress_init,
1043 	.get_ctx_size = vbdev_compress_get_ctx_size,
1044 	.examine_disk = vbdev_compress_examine,
1045 	.module_fini = vbdev_compress_finish,
1046 	.config_json = vbdev_compress_config_json
1047 };
1048 
1049 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1050 
1051 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1052 {
1053 	struct spdk_bdev_alias *aliases;
1054 
1055 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1056 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1057 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1058 		if (!comp_bdev->comp_bdev.name) {
1059 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1060 			return -ENOMEM;
1061 		}
1062 	} else {
1063 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1064 		if (!comp_bdev->comp_bdev.name) {
1065 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1066 			return -ENOMEM;
1067 		}
1068 	}
1069 	return 0;
1070 }
1071 
1072 static int
1073 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1074 {
1075 	struct spdk_uuid ns_uuid;
1076 	int rc;
1077 
1078 	if (_set_compbdev_name(comp_bdev)) {
1079 		return -EINVAL;
1080 	}
1081 
1082 	/* Note: some of the fields below will change in the future - for example,
1083 	 * blockcnt specifically will not match (the compressed volume size will
1084 	 * be slightly less than the base bdev size)
1085 	 */
1086 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1087 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1088 
1089 	comp_bdev->comp_bdev.optimal_io_boundary =
1090 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1091 
1092 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1093 
1094 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1095 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1096 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1097 
1098 	/* This is the context that is passed to us when the bdev
1099 	 * layer calls in so we'll save our comp_bdev node here.
1100 	 */
1101 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1102 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1103 	comp_bdev->comp_bdev.module = &compress_if;
1104 
1105 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1106 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1107 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1108 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1109 	if (rc) {
1110 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1111 		return -EINVAL;
1112 	}
1113 
1114 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1115 
1116 	/* Save the thread where the base device is opened */
1117 	comp_bdev->thread = spdk_get_thread();
1118 
1119 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1120 				sizeof(struct comp_io_channel),
1121 				comp_bdev->comp_bdev.name);
1122 
1123 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1124 					 comp_bdev->comp_bdev.module);
1125 	if (rc) {
1126 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1127 			    spdk_strerror(-rc));
1128 		goto error_claim;
1129 	}
1130 
1131 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1132 	if (rc < 0) {
1133 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1134 		goto error_bdev_register;
1135 	}
1136 
1137 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1138 
1139 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1140 
1141 	return 0;
1142 
1143 	/* Error cleanup paths. */
1144 error_bdev_register:
1145 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1146 error_claim:
1147 	spdk_io_device_unregister(comp_bdev, NULL);
1148 	free(comp_bdev->comp_bdev.name);
1149 	return rc;
1150 }
1151 
1152 static void
1153 _vbdev_compress_delete_done(void *_ctx)
1154 {
1155 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1156 
1157 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1158 
1159 	free(ctx);
1160 }
1161 
1162 static void
1163 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1164 {
1165 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1166 
1167 	ctx->cb_rc = bdeverrno;
1168 
1169 	if (ctx->orig_thread != spdk_get_thread()) {
1170 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1171 	} else {
1172 		_vbdev_compress_delete_done(ctx);
1173 	}
1174 }
1175 
1176 void
1177 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1178 {
1179 	struct vbdev_compress *comp_bdev = NULL;
1180 	struct vbdev_comp_delete_ctx *ctx;
1181 
1182 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1183 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1184 			break;
1185 		}
1186 	}
1187 
1188 	if (comp_bdev == NULL) {
1189 		cb_fn(cb_arg, -ENODEV);
1190 		return;
1191 	}
1192 
1193 	ctx = calloc(1, sizeof(*ctx));
1194 	if (ctx == NULL) {
1195 		SPDK_ERRLOG("Failed to allocate delete context\n");
1196 		cb_fn(cb_arg, -ENOMEM);
1197 		return;
1198 	}
1199 
1200 	/* Save these for after the vol is destroyed. */
1201 	ctx->cb_fn = cb_fn;
1202 	ctx->cb_arg = cb_arg;
1203 	ctx->orig_thread = spdk_get_thread();
1204 
1205 	comp_bdev->delete_ctx = ctx;
1206 
1207 	/* Tell reducelib that we're done with this volume. */
1208 	if (comp_bdev->orphaned == false) {
1209 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1210 	} else {
1211 		delete_vol_unload_cb(comp_bdev, 0);
1212 	}
1213 }
1214 
1215 static void
1216 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1217 {
1218 }
1219 
1220 static void
1221 _vbdev_reduce_load_cb(void *ctx)
1222 {
1223 	struct vbdev_compress *comp_bdev = ctx;
1224 	int rc;
1225 
1226 	assert(comp_bdev->base_desc != NULL);
1227 
1228 	/* Done with metadata operations */
1229 	spdk_put_io_channel(comp_bdev->base_ch);
1230 
1231 	if (comp_bdev->reduce_errno == 0) {
1232 		rc = vbdev_compress_claim(comp_bdev);
1233 		if (rc != 0) {
1234 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1235 			goto err;
1236 		}
1237 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1238 		if (_set_compbdev_name(comp_bdev)) {
1239 			goto err;
1240 		}
1241 
1242 		/* Save the thread where the base device is opened */
1243 		comp_bdev->thread = spdk_get_thread();
1244 
1245 		comp_bdev->comp_bdev.module = &compress_if;
1246 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1247 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1248 						 comp_bdev->comp_bdev.module);
1249 		if (rc) {
1250 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1251 				    spdk_strerror(-rc));
1252 			free(comp_bdev->comp_bdev.name);
1253 			goto err;
1254 		}
1255 
1256 		comp_bdev->orphaned = true;
1257 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1258 	} else {
1259 		if (comp_bdev->reduce_errno != -EILSEQ) {
1260 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1261 				    spdk_strerror(-comp_bdev->reduce_errno));
1262 		}
1263 		goto err;
1264 	}
1265 
1266 	spdk_bdev_module_examine_done(&compress_if);
1267 	return;
1268 
1269 err:
1270 	/* Close the underlying bdev on its same opened thread. */
1271 	spdk_bdev_close(comp_bdev->base_desc);
1272 	free(comp_bdev);
1273 	spdk_bdev_module_examine_done(&compress_if);
1274 }
1275 
1276 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1277  * used for initial metadata operations to claim where it will be further filled out
1278  * and added to the global list.
1279  */
1280 static void
1281 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1282 {
1283 	struct vbdev_compress *comp_bdev = cb_arg;
1284 
1285 	if (reduce_errno == 0) {
1286 		/* Update information following volume load. */
1287 		comp_bdev->vol = vol;
1288 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1289 		       sizeof(struct spdk_reduce_vol_params));
1290 		comp_bdev->comp_algo = comp_bdev->params.comp_algo;
1291 		comp_bdev->comp_level = comp_bdev->params.comp_level;
1292 	}
1293 
1294 	comp_bdev->reduce_errno = reduce_errno;
1295 
1296 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1297 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1298 	} else {
1299 		_vbdev_reduce_load_cb(comp_bdev);
1300 	}
1301 
1302 }
1303 
1304 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1305  * and if so will go ahead and claim it.
1306  */
1307 static void
1308 vbdev_compress_examine(struct spdk_bdev *bdev)
1309 {
1310 	struct spdk_bdev_desc *bdev_desc = NULL;
1311 	struct vbdev_compress *comp_bdev;
1312 	int rc;
1313 
1314 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1315 		spdk_bdev_module_examine_done(&compress_if);
1316 		return;
1317 	}
1318 
1319 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1320 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1321 	if (rc) {
1322 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1323 			    spdk_strerror(-rc));
1324 		spdk_bdev_module_examine_done(&compress_if);
1325 		return;
1326 	}
1327 
1328 	comp_bdev = _prepare_for_load_init(bdev_desc, 0, SPDK_ACCEL_COMP_ALGO_DEFLATE, 1);
1329 	if (comp_bdev == NULL) {
1330 		spdk_bdev_close(bdev_desc);
1331 		spdk_bdev_module_examine_done(&compress_if);
1332 		return;
1333 	}
1334 
1335 	/* Save the thread where the base device is opened */
1336 	comp_bdev->thread = spdk_get_thread();
1337 
1338 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1339 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1340 }
1341 
1342 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1343