xref: /spdk/module/bdev/compress/vbdev_compress.c (revision a0d24145bf3d795cf89adc414320b138fae480ab)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 #include "spdk/log.h"
20 #include "spdk/accel.h"
21 
22 #include "spdk_internal/accel_module.h"
23 
24 
25 #define CHUNK_SIZE (1024 * 16)
26 #define COMP_BDEV_NAME "compress"
27 #define BACKING_IO_SZ (4 * 1024)
28 
29 struct vbdev_comp_delete_ctx {
30 	spdk_delete_compress_complete	cb_fn;
31 	void				*cb_arg;
32 	int				cb_rc;
33 	struct spdk_thread		*orig_thread;
34 };
35 
36 /* List of virtual bdevs and associated info for each. */
37 struct vbdev_compress {
38 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
39 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
40 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
41 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
42 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
43 	struct spdk_io_channel		*accel_channel;	/* to communicate with the accel framework */
44 	struct spdk_thread		*reduce_thread;
45 	pthread_mutex_t			reduce_lock;
46 	uint32_t			ch_count;
47 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
48 	struct spdk_poller		*poller;	/* completion poller */
49 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
50 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
51 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
52 	struct vbdev_comp_delete_ctx	*delete_ctx;
53 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
54 	int				reduce_errno;
55 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
56 	TAILQ_ENTRY(vbdev_compress)	link;
57 	struct spdk_thread		*thread;	/* thread where base device is opened */
58 };
59 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
60 
61 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
62  */
63 struct comp_io_channel {
64 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
65 };
66 
67 /* Per I/O context for the compression vbdev. */
68 struct comp_bdev_io {
69 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
70 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
71 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
72 	struct spdk_bdev_io		*orig_io;		/* the original IO */
73 	struct spdk_io_channel		*ch;			/* for resubmission */
74 	int				status;			/* save for completion on orig thread */
75 };
76 
77 static void vbdev_compress_examine(struct spdk_bdev *bdev);
78 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
79 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
80 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
81 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
82 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
83 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
84 
85 /* for completing rw requests on the orig IO thread. */
86 static void
87 _reduce_rw_blocks_cb(void *arg)
88 {
89 	struct comp_bdev_io *io_ctx = arg;
90 
91 	if (spdk_likely(io_ctx->status == 0)) {
92 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
93 	} else if (io_ctx->status == -ENOMEM) {
94 		vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx));
95 	} else {
96 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
97 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
98 	}
99 }
100 
101 /* Completion callback for r/w that were issued via reducelib. */
102 static void
103 reduce_rw_blocks_cb(void *arg, int reduce_errno)
104 {
105 	struct spdk_bdev_io *bdev_io = arg;
106 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
107 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
108 	struct spdk_thread *orig_thread;
109 
110 	/* TODO: need to decide which error codes are bdev_io success vs failure;
111 	 * example examine calls reading metadata */
112 
113 	io_ctx->status = reduce_errno;
114 
115 	/* Send this request to the orig IO thread. */
116 	orig_thread = spdk_io_channel_get_thread(ch);
117 
118 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
119 }
120 
121 static int
122 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
123 		    int src_iovcnt, struct iovec *dst_iovs,
124 		    int dst_iovcnt, bool compress, void *cb_arg)
125 {
126 	struct spdk_reduce_vol_cb_args *reduce_cb_arg = cb_arg;
127 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
128 					   backing_dev);
129 	int rc;
130 
131 	if (compress) {
132 		assert(dst_iovcnt == 1);
133 		rc = spdk_accel_submit_compress(comp_bdev->accel_channel, dst_iovs[0].iov_base, dst_iovs[0].iov_len,
134 						src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
135 						0, reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
136 	} else {
137 		rc = spdk_accel_submit_decompress(comp_bdev->accel_channel, dst_iovs, dst_iovcnt,
138 						  src_iovs, src_iovcnt, &reduce_cb_arg->output_size,
139 						  0, reduce_cb_arg->cb_fn, reduce_cb_arg->cb_arg);
140 	}
141 
142 	return rc;
143 }
144 
145 /* Entry point for reduce lib to issue a compress operation. */
146 static void
147 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
148 		      struct iovec *src_iovs, int src_iovcnt,
149 		      struct iovec *dst_iovs, int dst_iovcnt,
150 		      struct spdk_reduce_vol_cb_args *cb_arg)
151 {
152 	int rc;
153 
154 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
155 	if (rc) {
156 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
157 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
158 	}
159 }
160 
161 /* Entry point for reduce lib to issue a decompress operation. */
162 static void
163 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
164 			struct iovec *src_iovs, int src_iovcnt,
165 			struct iovec *dst_iovs, int dst_iovcnt,
166 			struct spdk_reduce_vol_cb_args *cb_arg)
167 {
168 	int rc;
169 
170 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
171 	if (rc) {
172 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
173 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
174 	}
175 }
176 
177 static void
178 _comp_submit_write(void *ctx)
179 {
180 	struct spdk_bdev_io *bdev_io = ctx;
181 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
182 					   comp_bdev);
183 
184 	spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
185 			       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
186 			       reduce_rw_blocks_cb, bdev_io);
187 }
188 
189 static void
190 _comp_submit_read(void *ctx)
191 {
192 	struct spdk_bdev_io *bdev_io = ctx;
193 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
194 					   comp_bdev);
195 
196 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
197 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
198 			      reduce_rw_blocks_cb, bdev_io);
199 }
200 
201 
202 /* Callback for getting a buf from the bdev pool in the event that the caller passed
203  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
204  * beneath us before we're done with it.
205  */
206 static void
207 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
208 {
209 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
210 					   comp_bdev);
211 
212 	if (spdk_unlikely(!success)) {
213 		SPDK_ERRLOG("Failed to get data buffer\n");
214 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
215 		return;
216 	}
217 
218 	spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_read, bdev_io);
219 }
220 
221 /* Called when someone above submits IO to this vbdev. */
222 static void
223 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
224 {
225 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
226 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
227 					   comp_bdev);
228 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
229 
230 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
231 	io_ctx->comp_bdev = comp_bdev;
232 	io_ctx->comp_ch = comp_ch;
233 	io_ctx->orig_io = bdev_io;
234 
235 	switch (bdev_io->type) {
236 	case SPDK_BDEV_IO_TYPE_READ:
237 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
238 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
239 		return;
240 	case SPDK_BDEV_IO_TYPE_WRITE:
241 		spdk_thread_exec_msg(comp_bdev->reduce_thread, _comp_submit_write, bdev_io);
242 		return;
243 	/* TODO support RESET in future patch in the series */
244 	case SPDK_BDEV_IO_TYPE_RESET:
245 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
246 	case SPDK_BDEV_IO_TYPE_UNMAP:
247 	case SPDK_BDEV_IO_TYPE_FLUSH:
248 	default:
249 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
250 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
251 		break;
252 	}
253 }
254 
255 static bool
256 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
257 {
258 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
259 
260 	switch (io_type) {
261 	case SPDK_BDEV_IO_TYPE_READ:
262 	case SPDK_BDEV_IO_TYPE_WRITE:
263 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
264 	case SPDK_BDEV_IO_TYPE_UNMAP:
265 	case SPDK_BDEV_IO_TYPE_RESET:
266 	case SPDK_BDEV_IO_TYPE_FLUSH:
267 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
268 	default:
269 		return false;
270 	}
271 }
272 
273 /* Resubmission function used by the bdev layer when a queued IO is ready to be
274  * submitted.
275  */
276 static void
277 vbdev_compress_resubmit_io(void *arg)
278 {
279 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
280 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
281 
282 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
283 }
284 
285 /* Used to queue an IO in the event of resource issues. */
286 static void
287 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
288 {
289 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
290 	int rc;
291 
292 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
293 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
294 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
295 
296 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
297 	if (rc) {
298 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
299 		assert(false);
300 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
301 	}
302 }
303 
304 /* Callback for unregistering the IO device. */
305 static void
306 _device_unregister_cb(void *io_device)
307 {
308 	struct vbdev_compress *comp_bdev = io_device;
309 
310 	/* Done with this comp_bdev. */
311 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
312 	free(comp_bdev->comp_bdev.name);
313 	free(comp_bdev);
314 }
315 
316 static void
317 _vbdev_compress_destruct_cb(void *ctx)
318 {
319 	struct vbdev_compress *comp_bdev = ctx;
320 
321 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
322 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
323 	/* Close the underlying bdev on its same opened thread. */
324 	spdk_bdev_close(comp_bdev->base_desc);
325 	comp_bdev->vol = NULL;
326 	if (comp_bdev->orphaned == false) {
327 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
328 	} else {
329 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
330 		_device_unregister_cb(comp_bdev);
331 	}
332 }
333 
334 static void
335 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
336 {
337 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
338 
339 	if (reduce_errno) {
340 		SPDK_ERRLOG("number %d\n", reduce_errno);
341 	} else {
342 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
343 			spdk_thread_send_msg(comp_bdev->thread,
344 					     _vbdev_compress_destruct_cb, comp_bdev);
345 		} else {
346 			_vbdev_compress_destruct_cb(comp_bdev);
347 		}
348 	}
349 }
350 
351 static void
352 _reduce_destroy_cb(void *ctx, int reduce_errno)
353 {
354 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
355 
356 	if (reduce_errno) {
357 		SPDK_ERRLOG("number %d\n", reduce_errno);
358 	}
359 
360 	comp_bdev->vol = NULL;
361 	spdk_put_io_channel(comp_bdev->base_ch);
362 	if (comp_bdev->orphaned == false) {
363 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
364 				     comp_bdev->delete_ctx);
365 	} else {
366 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
367 	}
368 
369 }
370 
371 static void
372 _delete_vol_unload_cb(void *ctx)
373 {
374 	struct vbdev_compress *comp_bdev = ctx;
375 
376 	/* FIXME: Assert if these conditions are not satisfied for now. */
377 	assert(!comp_bdev->reduce_thread ||
378 	       comp_bdev->reduce_thread == spdk_get_thread());
379 
380 	/* reducelib needs a channel to comm with the backing device */
381 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
382 
383 	/* Clean the device before we free our resources. */
384 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
385 }
386 
387 /* Called by reduceLib after performing unload vol actions */
388 static void
389 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
390 {
391 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
392 
393 	if (reduce_errno) {
394 		SPDK_ERRLOG("number %d\n", reduce_errno);
395 		/* FIXME: callback should be executed. */
396 		return;
397 	}
398 
399 	pthread_mutex_lock(&comp_bdev->reduce_lock);
400 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
401 		spdk_thread_send_msg(comp_bdev->reduce_thread,
402 				     _delete_vol_unload_cb, comp_bdev);
403 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
404 	} else {
405 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
406 
407 		_delete_vol_unload_cb(comp_bdev);
408 	}
409 }
410 
411 const char *
412 compress_get_name(const struct vbdev_compress *comp_bdev)
413 {
414 	return comp_bdev->comp_bdev.name;
415 }
416 
417 struct vbdev_compress *
418 compress_bdev_first(void)
419 {
420 	struct vbdev_compress *comp_bdev;
421 
422 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
423 
424 	return comp_bdev;
425 }
426 
427 struct vbdev_compress *
428 compress_bdev_next(struct vbdev_compress *prev)
429 {
430 	struct vbdev_compress *comp_bdev;
431 
432 	comp_bdev = TAILQ_NEXT(prev, link);
433 
434 	return comp_bdev;
435 }
436 
437 bool
438 compress_has_orphan(const char *name)
439 {
440 	struct vbdev_compress *comp_bdev;
441 
442 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
443 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
444 			return true;
445 		}
446 	}
447 	return false;
448 }
449 
450 /* Called after we've unregistered following a hot remove callback.
451  * Our finish entry point will be called next.
452  */
453 static int
454 vbdev_compress_destruct(void *ctx)
455 {
456 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
457 
458 	if (comp_bdev->vol != NULL) {
459 		/* Tell reducelib that we're done with this volume. */
460 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
461 	} else {
462 		vbdev_compress_destruct_cb(comp_bdev, 0);
463 	}
464 
465 	return 0;
466 }
467 
468 /* We supplied this as an entry point for upper layers who want to communicate to this
469  * bdev.  This is how they get a channel.
470  */
471 static struct spdk_io_channel *
472 vbdev_compress_get_io_channel(void *ctx)
473 {
474 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
475 
476 	/* The IO channel code will allocate a channel for us which consists of
477 	 * the SPDK channel structure plus the size of our comp_io_channel struct
478 	 * that we passed in when we registered our IO device. It will then call
479 	 * our channel create callback to populate any elements that we need to
480 	 * update.
481 	 */
482 	return spdk_get_io_channel(comp_bdev);
483 }
484 
485 /* This is the output for bdev_get_bdevs() for this vbdev */
486 static int
487 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
488 {
489 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
490 
491 	spdk_json_write_name(w, "compress");
492 	spdk_json_write_object_begin(w);
493 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
494 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
495 	spdk_json_write_object_end(w);
496 
497 	return 0;
498 }
499 
500 /* This is used to generate JSON that can configure this module to its current state. */
501 static int
502 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
503 {
504 	struct vbdev_compress *comp_bdev;
505 	const char *module_name = NULL;
506 	int rc;
507 
508 	rc = spdk_accel_get_opc_module_name(ACCEL_OPC_COMPRESS, &module_name);
509 	if (rc) {
510 		SPDK_ERRLOG("error getting module name (%d)\n", rc);
511 	}
512 
513 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
514 		spdk_json_write_object_begin(w);
515 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
516 		spdk_json_write_named_object_begin(w, "params");
517 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
518 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
519 		spdk_json_write_object_end(w);
520 		spdk_json_write_object_end(w);
521 	}
522 	return 0;
523 }
524 
525 static void
526 _vbdev_reduce_init_cb(void *ctx)
527 {
528 	struct vbdev_compress *meta_ctx = ctx;
529 	int rc;
530 
531 	assert(meta_ctx->base_desc != NULL);
532 
533 	/* We're done with metadata operations */
534 	spdk_put_io_channel(meta_ctx->base_ch);
535 
536 	if (meta_ctx->vol) {
537 		rc = vbdev_compress_claim(meta_ctx);
538 		if (rc == 0) {
539 			return;
540 		}
541 	}
542 
543 	/* Close the underlying bdev on its same opened thread. */
544 	spdk_bdev_close(meta_ctx->base_desc);
545 	free(meta_ctx);
546 }
547 
548 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
549  * used for initial metadata operations to claim where it will be further filled out
550  * and added to the global list.
551  */
552 static void
553 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
554 {
555 	struct vbdev_compress *meta_ctx = cb_arg;
556 
557 	if (reduce_errno == 0) {
558 		meta_ctx->vol = vol;
559 	} else {
560 		SPDK_ERRLOG("for vol %s, error %u\n",
561 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
562 	}
563 
564 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
565 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
566 	} else {
567 		_vbdev_reduce_init_cb(meta_ctx);
568 	}
569 }
570 
571 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
572  * call the callback provided by reduceLib when it called the read/write/unmap function and
573  * free the bdev_io.
574  */
575 static void
576 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
577 {
578 	struct spdk_reduce_vol_cb_args *cb_args = arg;
579 	int reduce_errno;
580 
581 	if (success) {
582 		reduce_errno = 0;
583 	} else {
584 		reduce_errno = -EIO;
585 	}
586 	spdk_bdev_free_io(bdev_io);
587 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
588 }
589 
590 /* This is the function provided to the reduceLib for sending reads directly to
591  * the backing device.
592  */
593 static void
594 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
595 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
596 {
597 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
598 					   backing_dev);
599 	int rc;
600 
601 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
602 				    iov, iovcnt, lba, lba_count,
603 				    comp_reduce_io_cb,
604 				    args);
605 	if (rc) {
606 		if (rc == -ENOMEM) {
607 			SPDK_ERRLOG("No memory, start to queue io.\n");
608 			/* TODO: there's no bdev_io to queue */
609 		} else {
610 			SPDK_ERRLOG("submitting readv request\n");
611 		}
612 		args->cb_fn(args->cb_arg, rc);
613 	}
614 }
615 
616 /* This is the function provided to the reduceLib for sending writes directly to
617  * the backing device.
618  */
619 static void
620 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
621 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
622 {
623 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
624 					   backing_dev);
625 	int rc;
626 
627 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
628 				     iov, iovcnt, lba, lba_count,
629 				     comp_reduce_io_cb,
630 				     args);
631 	if (rc) {
632 		if (rc == -ENOMEM) {
633 			SPDK_ERRLOG("No memory, start to queue io.\n");
634 			/* TODO: there's no bdev_io to queue */
635 		} else {
636 			SPDK_ERRLOG("error submitting writev request\n");
637 		}
638 		args->cb_fn(args->cb_arg, rc);
639 	}
640 }
641 
642 /* This is the function provided to the reduceLib for sending unmaps directly to
643  * the backing device.
644  */
645 static void
646 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
647 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
648 {
649 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
650 					   backing_dev);
651 	int rc;
652 
653 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
654 				    lba, lba_count,
655 				    comp_reduce_io_cb,
656 				    args);
657 
658 	if (rc) {
659 		if (rc == -ENOMEM) {
660 			SPDK_ERRLOG("No memory, start to queue io.\n");
661 			/* TODO: there's no bdev_io to queue */
662 		} else {
663 			SPDK_ERRLOG("submitting unmap request\n");
664 		}
665 		args->cb_fn(args->cb_arg, rc);
666 	}
667 }
668 
669 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
670 static void
671 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
672 {
673 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
674 
675 	if (reduce_errno) {
676 		SPDK_ERRLOG("number %d\n", reduce_errno);
677 	}
678 
679 	comp_bdev->vol = NULL;
680 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
681 }
682 
683 static void
684 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
685 {
686 	struct vbdev_compress *comp_bdev, *tmp;
687 
688 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
689 		if (bdev_find == comp_bdev->base_bdev) {
690 			/* Tell reduceLib that we're done with this volume. */
691 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
692 		}
693 	}
694 }
695 
696 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
697 static void
698 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
699 				  void *event_ctx)
700 {
701 	switch (type) {
702 	case SPDK_BDEV_EVENT_REMOVE:
703 		vbdev_compress_base_bdev_hotremove_cb(bdev);
704 		break;
705 	default:
706 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
707 		break;
708 	}
709 }
710 
711 /* TODO: determine which parms we want user configurable, HC for now
712  * params.vol_size
713  * params.chunk_size
714  * compression PMD, algorithm, window size, comp level, etc.
715  * DEV_MD_PATH
716  */
717 
718 /* Common function for init and load to allocate and populate the minimal
719  * information for reducelib to init or load.
720  */
721 struct vbdev_compress *
722 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
723 {
724 	struct vbdev_compress *meta_ctx;
725 	struct spdk_bdev *bdev;
726 
727 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
728 	if (meta_ctx == NULL) {
729 		SPDK_ERRLOG("failed to alloc init contexts\n");
730 		return NULL;
731 	}
732 
733 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
734 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
735 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
736 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
737 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
738 
739 	meta_ctx->base_desc = bdev_desc;
740 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
741 	meta_ctx->base_bdev = bdev;
742 
743 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
744 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
745 
746 	meta_ctx->params.chunk_size = CHUNK_SIZE;
747 	if (lb_size == 0) {
748 		meta_ctx->params.logical_block_size = bdev->blocklen;
749 	} else {
750 		meta_ctx->params.logical_block_size = lb_size;
751 	}
752 
753 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
754 	return meta_ctx;
755 }
756 
757 /* Call reducelib to initialize a new volume */
758 static int
759 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
760 {
761 	struct spdk_bdev_desc *bdev_desc = NULL;
762 	struct vbdev_compress *meta_ctx;
763 	int rc;
764 
765 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
766 				NULL, &bdev_desc);
767 	if (rc) {
768 		SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
769 		return rc;
770 	}
771 
772 	meta_ctx = _prepare_for_load_init(bdev_desc, lb_size);
773 	if (meta_ctx == NULL) {
774 		spdk_bdev_close(bdev_desc);
775 		return -EINVAL;
776 	}
777 
778 	/* Save the thread where the base device is opened */
779 	meta_ctx->thread = spdk_get_thread();
780 
781 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
782 
783 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
784 			     pm_path,
785 			     vbdev_reduce_init_cb,
786 			     meta_ctx);
787 	return 0;
788 }
789 
790 /* We provide this callback for the SPDK channel code to create a channel using
791  * the channel struct we provided in our module get_io_channel() entry point. Here
792  * we get and save off an underlying base channel of the device below us so that
793  * we can communicate with the base bdev on a per channel basis.  If we needed
794  * our own poller for this vbdev, we'd register it here.
795  */
796 static int
797 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
798 {
799 	struct vbdev_compress *comp_bdev = io_device;
800 
801 	/* Now set the reduce channel if it's not already set. */
802 	pthread_mutex_lock(&comp_bdev->reduce_lock);
803 	if (comp_bdev->ch_count == 0) {
804 		/* We use this queue to track outstanding IO in our layer. */
805 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
806 
807 		/* We use this to queue up compression operations as needed. */
808 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
809 
810 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
811 		comp_bdev->reduce_thread = spdk_get_thread();
812 		comp_bdev->accel_channel = spdk_accel_get_io_channel();
813 	}
814 	comp_bdev->ch_count++;
815 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
816 
817 	return 0;
818 }
819 
820 static void
821 _channel_cleanup(struct vbdev_compress *comp_bdev)
822 {
823 	spdk_put_io_channel(comp_bdev->base_ch);
824 	spdk_put_io_channel(comp_bdev->accel_channel);
825 	comp_bdev->reduce_thread = NULL;
826 }
827 
828 /* Used to reroute destroy_ch to the correct thread */
829 static void
830 _comp_bdev_ch_destroy_cb(void *arg)
831 {
832 	struct vbdev_compress *comp_bdev = arg;
833 
834 	pthread_mutex_lock(&comp_bdev->reduce_lock);
835 	_channel_cleanup(comp_bdev);
836 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
837 }
838 
839 /* We provide this callback for the SPDK channel code to destroy a channel
840  * created with our create callback. We just need to undo anything we did
841  * when we created. If this bdev used its own poller, we'd unregister it here.
842  */
843 static void
844 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
845 {
846 	struct vbdev_compress *comp_bdev = io_device;
847 
848 	pthread_mutex_lock(&comp_bdev->reduce_lock);
849 	comp_bdev->ch_count--;
850 	if (comp_bdev->ch_count == 0) {
851 		/* Send this request to the thread where the channel was created. */
852 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
853 			spdk_thread_send_msg(comp_bdev->reduce_thread,
854 					     _comp_bdev_ch_destroy_cb, comp_bdev);
855 		} else {
856 			_channel_cleanup(comp_bdev);
857 		}
858 	}
859 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
860 }
861 
862 /* RPC entry point for compression vbdev creation. */
863 int
864 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
865 {
866 	struct vbdev_compress *comp_bdev = NULL;
867 
868 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
869 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
870 		return -EINVAL;
871 	}
872 
873 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
874 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
875 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
876 			return -EBUSY;
877 		}
878 	}
879 	return vbdev_init_reduce(bdev_name, pm_path, lb_size);
880 }
881 
882 static int
883 vbdev_compress_init(void)
884 {
885 	return 0;
886 }
887 
888 /* Called when the entire module is being torn down. */
889 static void
890 vbdev_compress_finish(void)
891 {
892 	/* TODO: unload vol in a future patch */
893 }
894 
895 /* During init we'll be asked how much memory we'd like passed to us
896  * in bev_io structures as context. Here's where we specify how
897  * much context we want per IO.
898  */
899 static int
900 vbdev_compress_get_ctx_size(void)
901 {
902 	return sizeof(struct comp_bdev_io);
903 }
904 
905 /* When we register our bdev this is how we specify our entry points. */
906 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
907 	.destruct		= vbdev_compress_destruct,
908 	.submit_request		= vbdev_compress_submit_request,
909 	.io_type_supported	= vbdev_compress_io_type_supported,
910 	.get_io_channel		= vbdev_compress_get_io_channel,
911 	.dump_info_json		= vbdev_compress_dump_info_json,
912 	.write_config_json	= NULL,
913 };
914 
915 static struct spdk_bdev_module compress_if = {
916 	.name = "compress",
917 	.module_init = vbdev_compress_init,
918 	.get_ctx_size = vbdev_compress_get_ctx_size,
919 	.examine_disk = vbdev_compress_examine,
920 	.module_fini = vbdev_compress_finish,
921 	.config_json = vbdev_compress_config_json
922 };
923 
924 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
925 
926 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
927 {
928 	struct spdk_bdev_alias *aliases;
929 
930 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
931 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
932 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
933 		if (!comp_bdev->comp_bdev.name) {
934 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
935 			return -ENOMEM;
936 		}
937 	} else {
938 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
939 		if (!comp_bdev->comp_bdev.name) {
940 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
941 			return -ENOMEM;
942 		}
943 	}
944 	return 0;
945 }
946 
947 static int
948 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
949 {
950 	int rc;
951 
952 	if (_set_compbdev_name(comp_bdev)) {
953 		return -EINVAL;
954 	}
955 
956 	/* Note: some of the fields below will change in the future - for example,
957 	 * blockcnt specifically will not match (the compressed volume size will
958 	 * be slightly less than the base bdev size)
959 	 */
960 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
961 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
962 
963 	comp_bdev->comp_bdev.optimal_io_boundary =
964 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
965 
966 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
967 
968 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
969 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
970 	assert(comp_bdev->comp_bdev.blockcnt > 0);
971 
972 	/* This is the context that is passed to us when the bdev
973 	 * layer calls in so we'll save our comp_bdev node here.
974 	 */
975 	comp_bdev->comp_bdev.ctxt = comp_bdev;
976 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
977 	comp_bdev->comp_bdev.module = &compress_if;
978 
979 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
980 
981 	/* Save the thread where the base device is opened */
982 	comp_bdev->thread = spdk_get_thread();
983 
984 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
985 				sizeof(struct comp_io_channel),
986 				comp_bdev->comp_bdev.name);
987 
988 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
989 					 comp_bdev->comp_bdev.module);
990 	if (rc) {
991 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
992 		goto error_claim;
993 	}
994 
995 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
996 	if (rc < 0) {
997 		SPDK_ERRLOG("trying to register bdev\n");
998 		goto error_bdev_register;
999 	}
1000 
1001 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1002 
1003 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1004 
1005 	return 0;
1006 
1007 	/* Error cleanup paths. */
1008 error_bdev_register:
1009 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1010 error_claim:
1011 	spdk_io_device_unregister(comp_bdev, NULL);
1012 	free(comp_bdev->comp_bdev.name);
1013 	return rc;
1014 }
1015 
1016 static void
1017 _vbdev_compress_delete_done(void *_ctx)
1018 {
1019 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1020 
1021 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1022 
1023 	free(ctx);
1024 }
1025 
1026 static void
1027 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1028 {
1029 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1030 
1031 	ctx->cb_rc = bdeverrno;
1032 
1033 	if (ctx->orig_thread != spdk_get_thread()) {
1034 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1035 	} else {
1036 		_vbdev_compress_delete_done(ctx);
1037 	}
1038 }
1039 
1040 void
1041 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1042 {
1043 	struct vbdev_compress *comp_bdev = NULL;
1044 	struct vbdev_comp_delete_ctx *ctx;
1045 
1046 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1047 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1048 			break;
1049 		}
1050 	}
1051 
1052 	if (comp_bdev == NULL) {
1053 		cb_fn(cb_arg, -ENODEV);
1054 		return;
1055 	}
1056 
1057 	ctx = calloc(1, sizeof(*ctx));
1058 	if (ctx == NULL) {
1059 		SPDK_ERRLOG("Failed to allocate delete context\n");
1060 		cb_fn(cb_arg, -ENOMEM);
1061 		return;
1062 	}
1063 
1064 	/* Save these for after the vol is destroyed. */
1065 	ctx->cb_fn = cb_fn;
1066 	ctx->cb_arg = cb_arg;
1067 	ctx->orig_thread = spdk_get_thread();
1068 
1069 	comp_bdev->delete_ctx = ctx;
1070 
1071 	/* Tell reducelib that we're done with this volume. */
1072 	if (comp_bdev->orphaned == false) {
1073 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1074 	} else {
1075 		delete_vol_unload_cb(comp_bdev, 0);
1076 	}
1077 }
1078 
1079 static void
1080 _vbdev_reduce_load_cb(void *ctx)
1081 {
1082 	struct vbdev_compress *meta_ctx = ctx;
1083 	int rc;
1084 
1085 	assert(meta_ctx->base_desc != NULL);
1086 
1087 	/* Done with metadata operations */
1088 	spdk_put_io_channel(meta_ctx->base_ch);
1089 
1090 	if (meta_ctx->reduce_errno == 0) {
1091 		rc = vbdev_compress_claim(meta_ctx);
1092 		if (rc != 0) {
1093 			goto err;
1094 		}
1095 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1096 		if (_set_compbdev_name(meta_ctx)) {
1097 			goto err;
1098 		}
1099 
1100 		/* Save the thread where the base device is opened */
1101 		meta_ctx->thread = spdk_get_thread();
1102 
1103 		meta_ctx->comp_bdev.module = &compress_if;
1104 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1105 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1106 						 meta_ctx->comp_bdev.module);
1107 		if (rc) {
1108 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1109 			free(meta_ctx->comp_bdev.name);
1110 			goto err;
1111 		}
1112 
1113 		meta_ctx->orphaned = true;
1114 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1115 	} else {
1116 		if (meta_ctx->reduce_errno != -EILSEQ) {
1117 			SPDK_ERRLOG("for vol %s, error %u\n",
1118 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1119 		}
1120 		goto err;
1121 	}
1122 
1123 	spdk_bdev_module_examine_done(&compress_if);
1124 	return;
1125 
1126 err:
1127 	/* Close the underlying bdev on its same opened thread. */
1128 	spdk_bdev_close(meta_ctx->base_desc);
1129 	free(meta_ctx);
1130 	spdk_bdev_module_examine_done(&compress_if);
1131 }
1132 
1133 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1134  * used for initial metadata operations to claim where it will be further filled out
1135  * and added to the global list.
1136  */
1137 static void
1138 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1139 {
1140 	struct vbdev_compress *meta_ctx = cb_arg;
1141 
1142 	if (reduce_errno == 0) {
1143 		/* Update information following volume load. */
1144 		meta_ctx->vol = vol;
1145 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1146 		       sizeof(struct spdk_reduce_vol_params));
1147 	}
1148 
1149 	meta_ctx->reduce_errno = reduce_errno;
1150 
1151 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1152 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1153 	} else {
1154 		_vbdev_reduce_load_cb(meta_ctx);
1155 	}
1156 
1157 }
1158 
1159 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1160  * and if so will go ahead and claim it.
1161  */
1162 static void
1163 vbdev_compress_examine(struct spdk_bdev *bdev)
1164 {
1165 	struct spdk_bdev_desc *bdev_desc = NULL;
1166 	struct vbdev_compress *meta_ctx;
1167 	int rc;
1168 
1169 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1170 		spdk_bdev_module_examine_done(&compress_if);
1171 		return;
1172 	}
1173 
1174 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1175 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1176 	if (rc) {
1177 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1178 		spdk_bdev_module_examine_done(&compress_if);
1179 		return;
1180 	}
1181 
1182 	meta_ctx = _prepare_for_load_init(bdev_desc, 0);
1183 	if (meta_ctx == NULL) {
1184 		spdk_bdev_close(bdev_desc);
1185 		spdk_bdev_module_examine_done(&compress_if);
1186 		return;
1187 	}
1188 
1189 	/* Save the thread where the base device is opened */
1190 	meta_ctx->thread = spdk_get_thread();
1191 
1192 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1193 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1194 }
1195 
1196 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1197