xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 8afdeef3becfe9409cc9e7372bd0bc10e8b7d46d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk/accel_module.h"
23 
24 
25 #define CHUNK_SIZE (1024 * 16)
26 #define COMP_BDEV_NAME "compress"
27 #define BACKING_IO_SZ (4 * 1024)
28 
29 /* This namespace UUID was generated using uuid_generate() method. */
30 #define BDEV_COMPRESS_NAMESPACE_UUID "c3fad6da-832f-4cc0-9cdc-5c552b225e7b"
31 
32 struct vbdev_comp_delete_ctx {
33 	spdk_delete_compress_complete	cb_fn;
34 	void				*cb_arg;
35 	int				cb_rc;
36 	struct spdk_thread		*orig_thread;
37 };
38 
39 /* List of virtual bdevs and associated info for each. */
40 struct vbdev_compress {
41 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
42 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
43 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
44 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
45 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
46 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
47 	struct spdk_thread		*reduce_thread;
48 	pthread_mutex_t			reduce_lock;
49 	uint32_t			ch_count;
50 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
51 	struct spdk_poller		*poller;	/* completion poller */
52 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
53 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
54 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
55 	struct vbdev_comp_delete_ctx	*delete_ctx;
56 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
57 	int				reduce_errno;
58 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
59 	TAILQ_ENTRY(vbdev_compress)	link;
60 	struct spdk_thread		*thread;	/* thread where base device is opened */
61 };
62 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
63 
64 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
65  */
66 struct comp_io_channel {
67 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
68 };
69 
70 /* Per I/O context for the compression vbdev. */
71 struct comp_bdev_io {
72 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
73 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
74 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
75 	struct spdk_bdev_io		*orig_io;		/* the original IO */
76 	struct spdk_io_channel		*ch;			/* for resubmission */
77 	int				status;			/* save for completion on orig thread */
78 };
79 
80 static void vbdev_compress_examine(struct spdk_bdev *bdev);
81 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
82 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
83 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
84 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
85 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
86 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
87 
88 /* for completing rw requests on the orig IO thread. */
89 static void
90 _reduce_rw_blocks_cb(void *arg)
91 {
92 	struct comp_bdev_io *io_ctx = arg;
93 
94 	if (spdk_likely(io_ctx->status == 0)) {
95 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
96 	} else if (io_ctx->status == -ENOMEM) {
97 		vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx));
98 	} else {
99 		SPDK_ERRLOG("Failed to execute reduce api. %s\n", spdk_strerror(-io_ctx->status));
100 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
101 	}
102 }
103 
104 /* Completion callback for r/w that were issued via reducelib. */
105 static void
106 reduce_rw_blocks_cb(void *arg, int reduce_errno)
107 {
108 	struct spdk_bdev_io *bdev_io = arg;
109 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
110 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
111 	struct spdk_thread *orig_thread;
112 
113 	/* TODO: need to decide which error codes are bdev_io success vs failure;
114 	 * example examine calls reading metadata */
115 
116 	io_ctx->status = reduce_errno;
117 
118 	/* Send this request to the orig IO thread. */
119 	orig_thread = spdk_io_channel_get_thread(ch);
120 
121 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
122 }
123 
124 static int
125 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
126 		    int src_iovcnt, struct iovec *dst_iovs,
127 		    int dst_iovcnt, bool compress, void *cb_arg)
128 {
129 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
130 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
131 					   backing_dev);
132 	int rc;
133 
134 	if (compress) {
135 		assert(dst_iovcnt == 1);
136 		rc = spdk_accel_submit_compress(comp_bdev->accel_channel, dst_iovs[0].iov_base, dst_iovs[0].iov_len,
137 						src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
138 						reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
139 	} else {
140 		rc = spdk_accel_submit_decompress(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
141 						  src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
142 						  reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
143 	}
144 
145 	return rc;
146 }
147 
148 /* Entry point for reduce lib to issue a compress operation. */
149 static void
150 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
151 		      struct iovec *src_iovs, int src_iovcnt,
152 		      struct iovec *dst_iovs, int dst_iovcnt,
153 		      struct spdk_reduce_vol_cb_args *cb_arg)
154 {
155 	int rc;
156 
157 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
158 	if (rc) {
159 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
160 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
161 	}
162 }
163 
164 /* Entry point for reduce lib to issue a decompress operation. */
165 static void
166 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
167 			struct iovec *src_iovs, int src_iovcnt,
168 			struct iovec *dst_iovs, int dst_iovcnt,
169 			struct spdk_reduce_vol_cb_args *cb_arg)
170 {
171 	int rc;
172 
173 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
174 	if (rc) {
175 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
176 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
177 	}
178 }
179 
180 static void
181 _comp_submit_write(void *ctx)
182 {
183 	struct spdk_bdev_io *bdev_io = ctx;
184 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
185 					   comp_bdev);
186 
187 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
188 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
189 			       reduce_rw_blocks_cb, bdev_io);
190 }
191 
192 static void
193 _comp_submit_read(void *ctx)
194 {
195 	struct spdk_bdev_io *bdev_io = ctx;
196 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
197 					   comp_bdev);
198 
199 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
200 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
201 			      reduce_rw_blocks_cb, bdev_io);
202 }
203 
204 
205 /* Callback for getting a buf from the bdev pool in the event that the caller passed
206  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
207  * beneath us before we're done with it.
208  */
209 static void
210 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
211 {
212 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
213 					   comp_bdev);
214 
215 	if (spdk_unlikely(!success)) {
216 		SPDK_ERRLOG("Failed to get data buffer\n");
217 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
218 		return;
219 	}
220 
221 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
222 }
223 
224 /* Called when someone above submits IO to this vbdev. */
225 static void
226 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
227 {
228 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
229 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
230 					   comp_bdev);
231 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
232 
233 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
234 	io_ctx->comp_bdev = comp_bdev;
235 	io_ctx->comp_ch = comp_ch;
236 	io_ctx->orig_io = bdev_io;
237 
238 	switch (bdev_io->type) {
239 	case SPDK_BDEV_IO_TYPE_READ:
240 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
241 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
242 		return;
243 	case SPDK_BDEV_IO_TYPE_WRITE:
244 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
245 		return;
246 	/* TODO support RESET in future patch in the series */
247 	case SPDK_BDEV_IO_TYPE_RESET:
248 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
249 	case SPDK_BDEV_IO_TYPE_UNMAP:
250 	case SPDK_BDEV_IO_TYPE_FLUSH:
251 	default:
252 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
253 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
254 		break;
255 	}
256 }
257 
258 static bool
259 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
260 {
261 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
262 
263 	switch (io_type) {
264 	case SPDK_BDEV_IO_TYPE_READ:
265 	case SPDK_BDEV_IO_TYPE_WRITE:
266 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
267 	case SPDK_BDEV_IO_TYPE_UNMAP:
268 	case SPDK_BDEV_IO_TYPE_RESET:
269 	case SPDK_BDEV_IO_TYPE_FLUSH:
270 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
271 	default:
272 		return false;
273 	}
274 }
275 
276 /* Resubmission function used by the bdev layer when a queued IO is ready to be
277  * submitted.
278  */
279 static void
280 vbdev_compress_resubmit_io(void *arg)
281 {
282 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
283 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
284 
285 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
286 }
287 
288 /* Used to queue an IO in the event of resource issues. */
289 static void
290 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
291 {
292 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
293 	int rc;
294 
295 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
296 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
297 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
298 
299 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
300 	if (rc) {
301 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
302 		assert(false);
303 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
304 	}
305 }
306 
307 /* Callback for unregistering the IO device. */
308 static void
309 _device_unregister_cb(void *io_device)
310 {
311 	struct vbdev_compress *comp_bdev = io_device;
312 
313 	/* Done with this comp_bdev. */
314 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
315 	free(comp_bdev->comp_bdev.name);
316 	free(comp_bdev);
317 }
318 
319 static void
320 _vbdev_compress_destruct_cb(void *ctx)
321 {
322 	struct vbdev_compress *comp_bdev = ctx;
323 
324 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
325 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
326 	/* Close the underlying bdev on its same opened thread. */
327 	spdk_bdev_close(comp_bdev->base_desc);
328 	comp_bdev->vol = NULL;
329 	if (comp_bdev->orphaned == false) {
330 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
331 	} else {
332 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
333 		_device_unregister_cb(comp_bdev);
334 	}
335 }
336 
337 static void
338 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
339 {
340 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
341 
342 	if (reduce_errno) {
343 		SPDK_ERRLOG("number %d\n", reduce_errno);
344 	} else {
345 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
346 			spdk_thread_send_msg(comp_bdev->thread,
347 					     _vbdev_compress_destruct_cb, comp_bdev);
348 		} else {
349 			_vbdev_compress_destruct_cb(comp_bdev);
350 		}
351 	}
352 }
353 
354 static void
355 _reduce_destroy_cb(void *ctx, int reduce_errno)
356 {
357 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
358 
359 	if (reduce_errno) {
360 		SPDK_ERRLOG("number %d\n", reduce_errno);
361 	}
362 
363 	comp_bdev->vol = NULL;
364 	spdk_put_io_channel(comp_bdev->base_ch);
365 	if (comp_bdev->orphaned == false) {
366 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
367 				     comp_bdev->delete_ctx);
368 	} else {
369 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
370 	}
371 
372 }
373 
374 static void
375 _delete_vol_unload_cb(void *ctx)
376 {
377 	struct vbdev_compress *comp_bdev = ctx;
378 
379 	/* FIXME: Assert if these conditions are not satisfied for now. */
380 	assert(!comp_bdev->reduce_thread ||
381 	       comp_bdev->reduce_thread == spdk_get_thread());
382 
383 	/* reducelib needs a channel to comm with the backing device */
384 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
385 
386 	/* Clean the device before we free our resources. */
387 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
388 }
389 
390 /* Called by reduceLib after performing unload vol actions */
391 static void
392 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
393 {
394 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
395 
396 	if (reduce_errno) {
397 		SPDK_ERRLOG("number %d\n", reduce_errno);
398 		/* FIXME: callback should be executed. */
399 		return;
400 	}
401 
402 	pthread_mutex_lock(&comp_bdev->reduce_lock);
403 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
404 		spdk_thread_send_msg(comp_bdev->reduce_thread,
405 				     _delete_vol_unload_cb, comp_bdev);
406 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
407 	} else {
408 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
409 
410 		_delete_vol_unload_cb(comp_bdev);
411 	}
412 }
413 
414 const char *
415 compress_get_name(const struct vbdev_compress *comp_bdev)
416 {
417 	return comp_bdev->comp_bdev.name;
418 }
419 
420 struct vbdev_compress *
421 compress_bdev_first(void)
422 {
423 	struct vbdev_compress *comp_bdev;
424 
425 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
426 
427 	return comp_bdev;
428 }
429 
430 struct vbdev_compress *
431 compress_bdev_next(struct vbdev_compress *prev)
432 {
433 	struct vbdev_compress *comp_bdev;
434 
435 	comp_bdev = TAILQ_NEXT(prev, link);
436 
437 	return comp_bdev;
438 }
439 
440 bool
441 compress_has_orphan(const char *name)
442 {
443 	struct vbdev_compress *comp_bdev;
444 
445 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
446 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
447 			return true;
448 		}
449 	}
450 	return false;
451 }
452 
453 /* Called after we've unregistered following a hot remove callback.
454  * Our finish entry point will be called next.
455  */
456 static int
457 vbdev_compress_destruct(void *ctx)
458 {
459 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
460 
461 	if (comp_bdev->vol != NULL) {
462 		/* Tell reducelib that we're done with this volume. */
463 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
464 	} else {
465 		vbdev_compress_destruct_cb(comp_bdev, 0);
466 	}
467 
468 	return 0;
469 }
470 
471 /* We supplied this as an entry point for upper layers who want to communicate to this
472  * bdev.  This is how they get a channel.
473  */
474 static struct spdk_io_channel *
475 vbdev_compress_get_io_channel(void *ctx)
476 {
477 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
478 
479 	/* The IO channel code will allocate a channel for us which consists of
480 	 * the SPDK channel structure plus the size of our comp_io_channel struct
481 	 * that we passed in when we registered our IO device. It will then call
482 	 * our channel create callback to populate any elements that we need to
483 	 * update.
484 	 */
485 	return spdk_get_io_channel(comp_bdev);
486 }
487 
488 /* This is the output for bdev_get_bdevs() for this vbdev */
489 static int
490 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
491 {
492 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
493 
494 	spdk_json_write_name(w, "compress");
495 	spdk_json_write_object_begin(w);
496 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
497 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
498 	spdk_json_write_named_string(w, "pm_path", spdk_reduce_vol_get_pm_path(comp_bdev->vol));
499 	spdk_json_write_object_end(w);
500 
501 	return 0;
502 }
503 
504 static int
505 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
506 {
507 	/* Nothing to dump as compress bdev configuration is saved on physical device. */
508 	return 0;
509 }
510 
511 struct vbdev_init_reduce_ctx {
512 	struct vbdev_compress   *comp_bdev;
513 	int                     status;
514 	bdev_compress_create_cb cb_fn;
515 	void                    *cb_ctx;
516 };
517 
518 static void
519 _vbdev_reduce_init_unload_cb(void *ctx, int reduce_errno)
520 {
521 }
522 
523 static void
524 _vbdev_reduce_init_cb(void *ctx)
525 {
526 	struct vbdev_init_reduce_ctx *init_ctx = ctx;
527 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
528 	int rc;
529 
530 	assert(comp_bdev->base_desc != NULL);
531 
532 	/* We're done with metadata operations */
533 	spdk_put_io_channel(comp_bdev->base_ch);
534 
535 	if (comp_bdev->vol) {
536 		rc = vbdev_compress_claim(comp_bdev);
537 		if (rc == 0) {
538 			init_ctx->cb_fn(init_ctx->cb_ctx, rc);
539 			free(init_ctx);
540 			return;
541 		} else {
542 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_init_unload_cb, NULL);
543 		}
544 		init_ctx->cb_fn(init_ctx->cb_ctx, rc);
545 	}
546 
547 	/* Close the underlying bdev on its same opened thread. */
548 	spdk_bdev_close(comp_bdev->base_desc);
549 	free(comp_bdev);
550 	free(init_ctx);
551 }
552 
553 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
554  * used for initial metadata operations to claim where it will be further filled out
555  * and added to the global list.
556  */
557 static void
558 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
559 {
560 	struct vbdev_init_reduce_ctx *init_ctx = cb_arg;
561 	struct vbdev_compress *comp_bdev = init_ctx->comp_bdev;
562 
563 	if (reduce_errno == 0) {
564 		comp_bdev->vol = vol;
565 	} else {
566 		SPDK_ERRLOG("for vol %s, error %u\n",
567 			    spdk_bdev_get_name(comp_bdev->base_bdev), reduce_errno);
568 		init_ctx->cb_fn(init_ctx->cb_ctx, reduce_errno);
569 	}
570 
571 	init_ctx->status = reduce_errno;
572 
573 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
574 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_init_cb, init_ctx);
575 	} else {
576 		_vbdev_reduce_init_cb(init_ctx);
577 	}
578 }
579 
580 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
581  * call the callback provided by reduceLib when it called the read/write/unmap function and
582  * free the bdev_io.
583  */
584 static void
585 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
586 {
587 	struct spdk_reduce_vol_cb_args *cb_args = arg;
588 	int reduce_errno;
589 
590 	if (success) {
591 		reduce_errno = 0;
592 	} else {
593 		reduce_errno = -EIO;
594 	}
595 	spdk_bdev_free_io(bdev_io);
596 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
597 }
598 
599 /* This is the function provided to the reduceLib for sending reads directly to
600  * the backing device.
601  */
602 static void
603 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
604 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
605 {
606 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
607 					   backing_dev);
608 	int rc;
609 
610 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
611 				    iov, iovcnt, lba, lba_count,
612 				    comp_reduce_io_cb,
613 				    args);
614 	if (rc) {
615 		if (rc == -ENOMEM) {
616 			SPDK_ERRLOG("No memory, start to queue io.\n");
617 			/* TODO: there's no bdev_io to queue */
618 		} else {
619 			SPDK_ERRLOG("submitting readv request\n");
620 		}
621 		args->cb_fn(args->cb_arg, rc);
622 	}
623 }
624 
625 /* This is the function provided to the reduceLib for sending writes directly to
626  * the backing device.
627  */
628 static void
629 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
630 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
631 {
632 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
633 					   backing_dev);
634 	int rc;
635 
636 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
637 				     iov, iovcnt, lba, lba_count,
638 				     comp_reduce_io_cb,
639 				     args);
640 	if (rc) {
641 		if (rc == -ENOMEM) {
642 			SPDK_ERRLOG("No memory, start to queue io.\n");
643 			/* TODO: there's no bdev_io to queue */
644 		} else {
645 			SPDK_ERRLOG("error submitting writev request\n");
646 		}
647 		args->cb_fn(args->cb_arg, rc);
648 	}
649 }
650 
651 /* This is the function provided to the reduceLib for sending unmaps directly to
652  * the backing device.
653  */
654 static void
655 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
656 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
657 {
658 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
659 					   backing_dev);
660 	int rc;
661 
662 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
663 				    lba, lba_count,
664 				    comp_reduce_io_cb,
665 				    args);
666 
667 	if (rc) {
668 		if (rc == -ENOMEM) {
669 			SPDK_ERRLOG("No memory, start to queue io.\n");
670 			/* TODO: there's no bdev_io to queue */
671 		} else {
672 			SPDK_ERRLOG("submitting unmap request\n");
673 		}
674 		args->cb_fn(args->cb_arg, rc);
675 	}
676 }
677 
678 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
679 static void
680 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
681 {
682 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
683 
684 	if (reduce_errno) {
685 		SPDK_ERRLOG("number %d\n", reduce_errno);
686 	}
687 
688 	comp_bdev->vol = NULL;
689 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
690 }
691 
692 static void
693 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
694 {
695 	struct vbdev_compress *comp_bdev, *tmp;
696 
697 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
698 		if (bdev_find == comp_bdev->base_bdev) {
699 			/* Tell reduceLib that we're done with this volume. */
700 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
701 		}
702 	}
703 }
704 
705 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
706 static void
707 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
708 				  void *event_ctx)
709 {
710 	switch (type) {
711 	case SPDK_BDEV_EVENT_REMOVE:
712 		vbdev_compress_base_bdev_hotremove_cb(bdev);
713 		break;
714 	default:
715 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
716 		break;
717 	}
718 }
719 
720 /* TODO: determine which parms we want user configurable, HC for now
721  * params.vol_size
722  * params.chunk_size
723  * compression PMD, algorithm, window size, comp level, etc.
724  * DEV_MD_PATH
725  */
726 
727 /* Common function for init and load to allocate and populate the minimal
728  * information for reducelib to init or load.
729  */
730 struct vbdev_compress *
731 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
732 {
733 	struct vbdev_compress *comp_bdev;
734 	struct spdk_bdev *bdev;
735 
736 	comp_bdev = calloc(1, sizeof(struct vbdev_compress));
737 	if (comp_bdev == NULL) {
738 		SPDK_ERRLOG("failed to alloc comp_bdev\n");
739 		return NULL;
740 	}
741 
742 	comp_bdev->backing_dev.unmap = _comp_reduce_unmap;
743 	comp_bdev->backing_dev.readv = _comp_reduce_readv;
744 	comp_bdev->backing_dev.writev = _comp_reduce_writev;
745 	comp_bdev->backing_dev.compress = _comp_reduce_compress;
746 	comp_bdev->backing_dev.decompress = _comp_reduce_decompress;
747 
748 	comp_bdev->base_desc = bdev_desc;
749 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
750 	comp_bdev->base_bdev = bdev;
751 
752 	comp_bdev->backing_dev.blocklen = bdev->blocklen;
753 	comp_bdev->backing_dev.blockcnt = bdev->blockcnt;
754 
755 	comp_bdev->params.chunk_size = CHUNK_SIZE;
756 	if (lb_size == 0) {
757 		comp_bdev->params.logical_block_size = bdev->blocklen;
758 	} else {
759 		comp_bdev->params.logical_block_size = lb_size;
760 	}
761 
762 	comp_bdev->params.backing_io_unit_size = BACKING_IO_SZ;
763 	return comp_bdev;
764 }
765 
766 /* Call reducelib to initialize a new volume */
767 static int
768 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size,
769 		  bdev_compress_create_cb cb_fn, void *cb_arg)
770 {
771 	struct spdk_bdev_desc *bdev_desc = NULL;
772 	struct vbdev_init_reduce_ctx *init_ctx;
773 	struct vbdev_compress *comp_bdev;
774 	int rc;
775 
776 	init_ctx = calloc(1, sizeof(*init_ctx));
777 	if (init_ctx == NULL) {
778 		SPDK_ERRLOG("failed to alloc init contexts\n");
779 		return - ENOMEM;
780 	}
781 
782 	init_ctx->cb_fn = cb_fn;
783 	init_ctx->cb_ctx = cb_arg;
784 
785 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
786 				NULL, &bdev_desc);
787 	if (rc) {
788 		SPDK_ERRLOG("could not open bdev %s, error %s\n", bdev_name, spdk_strerror(-rc));
789 		free(init_ctx);
790 		return rc;
791 	}
792 
793 	comp_bdev = _prepare_for_load_init(bdev_desc, lb_size);
794 	if (comp_bdev == NULL) {
795 		free(init_ctx);
796 		spdk_bdev_close(bdev_desc);
797 		return -EINVAL;
798 	}
799 
800 	init_ctx->comp_bdev = comp_bdev;
801 
802 	/* Save the thread where the base device is opened */
803 	comp_bdev->thread = spdk_get_thread();
804 
805 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
806 
807 	spdk_reduce_vol_init(&comp_bdev->params, &comp_bdev->backing_dev,
808 			     pm_path,
809 			     vbdev_reduce_init_cb,
810 			     init_ctx);
811 	return 0;
812 }
813 
814 /* We provide this callback for the SPDK channel code to create a channel using
815  * the channel struct we provided in our module get_io_channel() entry point. Here
816  * we get and save off an underlying base channel of the device below us so that
817  * we can communicate with the base bdev on a per channel basis.  If we needed
818  * our own poller for this vbdev, we'd register it here.
819  */
820 static int
821 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
822 {
823 	struct vbdev_compress *comp_bdev = io_device;
824 
825 	/* Now set the reduce channel if it's not already set. */
826 	pthread_mutex_lock(&comp_bdev->reduce_lock);
827 	if (comp_bdev->ch_count == 0) {
828 		/* We use this queue to track outstanding IO in our layer. */
829 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
830 
831 		/* We use this to queue up compression operations as needed. */
832 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
833 
834 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
835 		comp_bdev->reduce_thread = spdk_get_thread();
836 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
837 	}
838 	comp_bdev->ch_count++;
839 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
840 
841 	return 0;
842 }
843 
844 static void
845 _channel_cleanup(struct vbdev_compress *comp_bdev)
846 {
847 	spdk_put_io_channel(comp_bdev->base_ch);
848 	spdk_put_io_channel(comp_bdev->accel_channel);
849 	comp_bdev->reduce_thread = NULL;
850 }
851 
852 /* Used to reroute destroy_ch to the correct thread */
853 static void
854 _comp_bdev_ch_destroy_cb(void *arg)
855 {
856 	struct vbdev_compress *comp_bdev = arg;
857 
858 	pthread_mutex_lock(&comp_bdev->reduce_lock);
859 	_channel_cleanup(comp_bdev);
860 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
861 }
862 
863 /* We provide this callback for the SPDK channel code to destroy a channel
864  * created with our create callback. We just need to undo anything we did
865  * when we created. If this bdev used its own poller, we'd unregister it here.
866  */
867 static void
868 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
869 {
870 	struct vbdev_compress *comp_bdev = io_device;
871 
872 	pthread_mutex_lock(&comp_bdev->reduce_lock);
873 	comp_bdev->ch_count--;
874 	if (comp_bdev->ch_count == 0) {
875 		/* Send this request to the thread where the channel was created. */
876 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
877 			spdk_thread_send_msg(comp_bdev->reduce_thread,
878 					     _comp_bdev_ch_destroy_cb, comp_bdev);
879 		} else {
880 			_channel_cleanup(comp_bdev);
881 		}
882 	}
883 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
884 }
885 
886 /* RPC entry point for compression vbdev creation. */
887 int
888 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size,
889 		     bdev_compress_create_cb cb_fn, void *cb_arg)
890 {
891 	struct vbdev_compress *comp_bdev = NULL;
892 	struct stat info;
893 
894 	if (stat(pm_path, &info) != 0) {
895 		SPDK_ERRLOG("PM path %s does not exist.\n", pm_path);
896 		return -EINVAL;
897 	} else if (!S_ISDIR(info.st_mode)) {
898 		SPDK_ERRLOG("PM path %s is not a directory.\n", pm_path);
899 		return -EINVAL;
900 	}
901 
902 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
903 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
904 		return -EINVAL;
905 	}
906 
907 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
908 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
909 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
910 			return -EBUSY;
911 		}
912 	}
913 	return vbdev_init_reduce(bdev_name, pm_path, lb_size, cb_fn, cb_arg);
914 }
915 
916 static int
917 vbdev_compress_init(void)
918 {
919 	return 0;
920 }
921 
922 /* Called when the entire module is being torn down. */
923 static void
924 vbdev_compress_finish(void)
925 {
926 	/* TODO: unload vol in a future patch */
927 }
928 
929 /* During init we'll be asked how much memory we'd like passed to us
930  * in bev_io structures as context. Here's where we specify how
931  * much context we want per IO.
932  */
933 static int
934 vbdev_compress_get_ctx_size(void)
935 {
936 	return sizeof(struct comp_bdev_io);
937 }
938 
939 /* When we register our bdev this is how we specify our entry points. */
940 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
941 	.destruct		= vbdev_compress_destruct,
942 	.submit_request		= vbdev_compress_submit_request,
943 	.io_type_supported	= vbdev_compress_io_type_supported,
944 	.get_io_channel		= vbdev_compress_get_io_channel,
945 	.dump_info_json		= vbdev_compress_dump_info_json,
946 	.write_config_json	= NULL,
947 };
948 
949 static struct spdk_bdev_module compress_if = {
950 	.name = "compress",
951 	.module_init = vbdev_compress_init,
952 	.get_ctx_size = vbdev_compress_get_ctx_size,
953 	.examine_disk = vbdev_compress_examine,
954 	.module_fini = vbdev_compress_finish,
955 	.config_json = vbdev_compress_config_json
956 };
957 
958 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
959 
960 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
961 {
962 	struct spdk_bdev_alias *aliases;
963 
964 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
965 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
966 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
967 		if (!comp_bdev->comp_bdev.name) {
968 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
969 			return -ENOMEM;
970 		}
971 	} else {
972 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
973 		if (!comp_bdev->comp_bdev.name) {
974 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
975 			return -ENOMEM;
976 		}
977 	}
978 	return 0;
979 }
980 
981 static int
982 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
983 {
984 	struct spdk_uuid ns_uuid;
985 	int rc;
986 
987 	if (_set_compbdev_name(comp_bdev)) {
988 		return -EINVAL;
989 	}
990 
991 	/* Note: some of the fields below will change in the future - for example,
992 	 * blockcnt specifically will not match (the compressed volume size will
993 	 * be slightly less than the base bdev size)
994 	 */
995 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
996 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
997 
998 	comp_bdev->comp_bdev.optimal_io_boundary =
999 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1000 
1001 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1002 
1003 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1004 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1005 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1006 
1007 	/* This is the context that is passed to us when the bdev
1008 	 * layer calls in so we'll save our comp_bdev node here.
1009 	 */
1010 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1011 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1012 	comp_bdev->comp_bdev.module = &compress_if;
1013 
1014 	/* Generate UUID based on namespace UUID + base bdev UUID. */
1015 	spdk_uuid_parse(&ns_uuid, BDEV_COMPRESS_NAMESPACE_UUID);
1016 	rc = spdk_uuid_generate_sha1(&comp_bdev->comp_bdev.uuid, &ns_uuid,
1017 				     (const char *)&comp_bdev->base_bdev->uuid, sizeof(struct spdk_uuid));
1018 	if (rc) {
1019 		SPDK_ERRLOG("Unable to generate new UUID for compress bdev, error %s\n", spdk_strerror(-rc));
1020 		return -EINVAL;
1021 	}
1022 
1023 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1024 
1025 	/* Save the thread where the base device is opened */
1026 	comp_bdev->thread = spdk_get_thread();
1027 
1028 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1029 				sizeof(struct comp_io_channel),
1030 				comp_bdev->comp_bdev.name);
1031 
1032 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1033 					 comp_bdev->comp_bdev.module);
1034 	if (rc) {
1035 		SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1036 			    spdk_strerror(-rc));
1037 		goto error_claim;
1038 	}
1039 
1040 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1041 	if (rc < 0) {
1042 		SPDK_ERRLOG("trying to register bdev, error %s\n", spdk_strerror(-rc));
1043 		goto error_bdev_register;
1044 	}
1045 
1046 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1047 
1048 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1049 
1050 	return 0;
1051 
1052 	/* Error cleanup paths. */
1053 error_bdev_register:
1054 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1055 error_claim:
1056 	spdk_io_device_unregister(comp_bdev, NULL);
1057 	free(comp_bdev->comp_bdev.name);
1058 	return rc;
1059 }
1060 
1061 static void
1062 _vbdev_compress_delete_done(void *_ctx)
1063 {
1064 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1065 
1066 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1067 
1068 	free(ctx);
1069 }
1070 
1071 static void
1072 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1073 {
1074 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1075 
1076 	ctx->cb_rc = bdeverrno;
1077 
1078 	if (ctx->orig_thread != spdk_get_thread()) {
1079 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1080 	} else {
1081 		_vbdev_compress_delete_done(ctx);
1082 	}
1083 }
1084 
1085 void
1086 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1087 {
1088 	struct vbdev_compress *comp_bdev = NULL;
1089 	struct vbdev_comp_delete_ctx *ctx;
1090 
1091 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1092 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1093 			break;
1094 		}
1095 	}
1096 
1097 	if (comp_bdev == NULL) {
1098 		cb_fn(cb_arg, -ENODEV);
1099 		return;
1100 	}
1101 
1102 	ctx = calloc(1, sizeof(*ctx));
1103 	if (ctx == NULL) {
1104 		SPDK_ERRLOG("Failed to allocate delete context\n");
1105 		cb_fn(cb_arg, -ENOMEM);
1106 		return;
1107 	}
1108 
1109 	/* Save these for after the vol is destroyed. */
1110 	ctx->cb_fn = cb_fn;
1111 	ctx->cb_arg = cb_arg;
1112 	ctx->orig_thread = spdk_get_thread();
1113 
1114 	comp_bdev->delete_ctx = ctx;
1115 
1116 	/* Tell reducelib that we're done with this volume. */
1117 	if (comp_bdev->orphaned == false) {
1118 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1119 	} else {
1120 		delete_vol_unload_cb(comp_bdev, 0);
1121 	}
1122 }
1123 
1124 static void
1125 _vbdev_reduce_load_unload_cb(void *ctx, int reduce_errno)
1126 {
1127 }
1128 
1129 static void
1130 _vbdev_reduce_load_cb(void *ctx)
1131 {
1132 	struct vbdev_compress *comp_bdev = ctx;
1133 	int rc;
1134 
1135 	assert(comp_bdev->base_desc != NULL);
1136 
1137 	/* Done with metadata operations */
1138 	spdk_put_io_channel(comp_bdev->base_ch);
1139 
1140 	if (comp_bdev->reduce_errno == 0) {
1141 		rc = vbdev_compress_claim(comp_bdev);
1142 		if (rc != 0) {
1143 			spdk_reduce_vol_unload(comp_bdev->vol, _vbdev_reduce_load_unload_cb, NULL);
1144 			goto err;
1145 		}
1146 	} else if (comp_bdev->reduce_errno == -ENOENT) {
1147 		if (_set_compbdev_name(comp_bdev)) {
1148 			goto err;
1149 		}
1150 
1151 		/* Save the thread where the base device is opened */
1152 		comp_bdev->thread = spdk_get_thread();
1153 
1154 		comp_bdev->comp_bdev.module = &compress_if;
1155 		pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1156 		rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1157 						 comp_bdev->comp_bdev.module);
1158 		if (rc) {
1159 			SPDK_ERRLOG("could not claim bdev %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1160 				    spdk_strerror(-rc));
1161 			free(comp_bdev->comp_bdev.name);
1162 			goto err;
1163 		}
1164 
1165 		comp_bdev->orphaned = true;
1166 		TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1167 	} else {
1168 		if (comp_bdev->reduce_errno != -EILSEQ) {
1169 			SPDK_ERRLOG("for vol %s, error %s\n", spdk_bdev_get_name(comp_bdev->base_bdev),
1170 				    spdk_strerror(-comp_bdev->reduce_errno));
1171 		}
1172 		goto err;
1173 	}
1174 
1175 	spdk_bdev_module_examine_done(&compress_if);
1176 	return;
1177 
1178 err:
1179 	/* Close the underlying bdev on its same opened thread. */
1180 	spdk_bdev_close(comp_bdev->base_desc);
1181 	free(comp_bdev);
1182 	spdk_bdev_module_examine_done(&compress_if);
1183 }
1184 
1185 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1186  * used for initial metadata operations to claim where it will be further filled out
1187  * and added to the global list.
1188  */
1189 static void
1190 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1191 {
1192 	struct vbdev_compress *comp_bdev = cb_arg;
1193 
1194 	if (reduce_errno == 0) {
1195 		/* Update information following volume load. */
1196 		comp_bdev->vol = vol;
1197 		memcpy(&comp_bdev->params, spdk_reduce_vol_get_params(vol),
1198 		       sizeof(struct spdk_reduce_vol_params));
1199 	}
1200 
1201 	comp_bdev->reduce_errno = reduce_errno;
1202 
1203 	if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
1204 		spdk_thread_send_msg(comp_bdev->thread, _vbdev_reduce_load_cb, comp_bdev);
1205 	} else {
1206 		_vbdev_reduce_load_cb(comp_bdev);
1207 	}
1208 
1209 }
1210 
1211 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1212  * and if so will go ahead and claim it.
1213  */
1214 static void
1215 vbdev_compress_examine(struct spdk_bdev *bdev)
1216 {
1217 	struct spdk_bdev_desc *bdev_desc = NULL;
1218 	struct vbdev_compress *comp_bdev;
1219 	int rc;
1220 
1221 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1222 		spdk_bdev_module_examine_done(&compress_if);
1223 		return;
1224 	}
1225 
1226 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1227 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1228 	if (rc) {
1229 		SPDK_ERRLOG("could not open bdev %s, error %s\n", spdk_bdev_get_name(bdev),
1230 			    spdk_strerror(-rc));
1231 		spdk_bdev_module_examine_done(&compress_if);
1232 		return;
1233 	}
1234 
1235 	comp_bdev = _prepare_for_load_init(bdev_desc, 0);
1236 	if (comp_bdev == NULL) {
1237 		spdk_bdev_close(bdev_desc);
1238 		spdk_bdev_module_examine_done(&compress_if);
1239 		return;
1240 	}
1241 
1242 	/* Save the thread where the base device is opened */
1243 	comp_bdev->thread = spdk_get_thread();
1244 
1245 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1246 	spdk_reduce_vol_load(&comp_bdev->backing_dev, vbdev_reduce_load_cb, comp_bdev);
1247 }
1248 
1249 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1250