xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 712a3f69d32632bf6c862f00200f7f437d3f7529)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_compress.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/stdinc.h"
38 #include "spdk/rpc.h"
39 #include "spdk/env.h"
40 #include "spdk/conf.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 
54 #define NUM_MAX_XFORMS 2
55 #define NUM_MAX_INFLIGHT_OPS 128
56 #define DEFAULT_WINDOW_SIZE 15
57 /* We need extra mbufs per operation to accommodate host buffers that
58  *  span a 2MB boundary.
59  */
60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
61 #define CHUNK_SIZE (1024 * 16)
62 #define COMP_BDEV_NAME "compress"
63 #define BACKING_IO_SZ (4 * 1024)
64 
65 #define ISAL_PMD "compress_isal"
66 #define QAT_PMD "compress_qat"
67 #define NUM_MBUFS		8192
68 #define POOL_CACHE_SIZE		256
69 
70 static enum compress_pmd g_opts;
71 
72 /* Global list of available compression devices. */
73 struct compress_dev {
74 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
75 	uint8_t				cdev_id;	/* identifier for the device */
76 	void				*comp_xform;	/* shared private xform for comp on this PMD */
77 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
78 	TAILQ_ENTRY(compress_dev)	link;
79 };
80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
81 
82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
83  * the length of the internal ring name that it creates, it breaks a limit in the generic
84  * ring code and fails the qp initialization.
85  */
86 #define MAX_NUM_QP 99
87 /* Global list and lock for unique device/queue pair combos */
88 struct comp_device_qp {
89 	struct compress_dev		*device;	/* ptr to compression device */
90 	uint8_t				qp;		/* queue pair for this node */
91 	struct spdk_thread		*thread;	/* thead that this qp is assigned to */
92 	TAILQ_ENTRY(comp_device_qp)	link;
93 };
94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
96 
97 /* For queueing up compression operations that we can't submit for some reason */
98 struct vbdev_comp_op {
99 	struct spdk_reduce_backing_dev	*backing_dev;
100 	struct iovec			*src_iovs;
101 	int				src_iovcnt;
102 	struct iovec			*dst_iovs;
103 	int				dst_iovcnt;
104 	bool				compress;
105 	void				*cb_arg;
106 	TAILQ_ENTRY(vbdev_comp_op)	link;
107 };
108 
109 /* List of virtual bdevs and associated info for each. */
110 struct vbdev_compress {
111 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
112 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
113 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
114 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
115 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
116 	char				*drv_name;	/* name of the compression device driver */
117 	struct comp_device_qp		*device_qp;
118 	struct spdk_thread		*reduce_thread;
119 	pthread_mutex_t			reduce_lock;
120 	uint32_t			ch_count;
121 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
122 	struct spdk_poller		*poller;	/* completion poller */
123 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
124 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
125 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
126 	spdk_delete_compress_complete	delete_cb_fn;
127 	void				*delete_cb_arg;
128 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
129 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
130 	TAILQ_ENTRY(vbdev_compress)	link;
131 };
132 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
133 
134 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
135  */
136 struct comp_io_channel {
137 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
138 };
139 
140 /* Per I/O context for the compression vbdev. */
141 struct comp_bdev_io {
142 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
143 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
144 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
145 	struct spdk_bdev_io		*orig_io;		/* the original IO */
146 	struct spdk_io_channel		*ch;			/* for resubmission */
147 	int				status;			/* save for completion on orig thread */
148 };
149 
150 /* Shared mempools between all devices on this system */
151 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
152 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
153 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
154 static bool g_qat_available = false;
155 static bool g_isal_available = false;
156 
157 /* Create shared (between all ops per PMD) compress xforms. */
158 static struct rte_comp_xform g_comp_xform = {
159 	.type = RTE_COMP_COMPRESS,
160 	.compress = {
161 		.algo = RTE_COMP_ALGO_DEFLATE,
162 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
163 		.level = RTE_COMP_LEVEL_MAX,
164 		.window_size = DEFAULT_WINDOW_SIZE,
165 		.chksum = RTE_COMP_CHECKSUM_NONE,
166 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
167 	}
168 };
169 /* Create shared (between all ops per PMD) decompress xforms. */
170 static struct rte_comp_xform g_decomp_xform = {
171 	.type = RTE_COMP_DECOMPRESS,
172 	.decompress = {
173 		.algo = RTE_COMP_ALGO_DEFLATE,
174 		.chksum = RTE_COMP_CHECKSUM_NONE,
175 		.window_size = DEFAULT_WINDOW_SIZE,
176 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
177 	}
178 };
179 
180 static void vbdev_compress_examine(struct spdk_bdev *bdev);
181 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
182 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
183 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
184 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
185 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
186 
187 /* Dummy function used by DPDK to free ext attached buffers
188  * to mbufs, we free them ourselves but this callback has to
189  * be here.
190  */
191 static void
192 shinfo_free_cb(void *arg1, void *arg2)
193 {
194 }
195 
196 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
197 static int
198 create_compress_dev(uint8_t index)
199 {
200 	struct compress_dev *device;
201 	uint16_t q_pairs;
202 	uint8_t cdev_id;
203 	int rc, i;
204 	struct comp_device_qp *dev_qp;
205 	struct comp_device_qp *tmp_qp;
206 
207 	device = calloc(1, sizeof(struct compress_dev));
208 	if (!device) {
209 		return -ENOMEM;
210 	}
211 
212 	/* Get details about this device. */
213 	rte_compressdev_info_get(index, &device->cdev_info);
214 
215 	cdev_id = device->cdev_id = index;
216 
217 	/* Zero means no limit so choose number of lcores. */
218 	if (device->cdev_info.max_nb_queue_pairs == 0) {
219 		q_pairs = MAX_NUM_QP;
220 	} else {
221 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
222 	}
223 
224 	/* Configure the compression device. */
225 	struct rte_compressdev_config config = {
226 		.socket_id = rte_socket_id(),
227 		.nb_queue_pairs = q_pairs,
228 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
229 		.max_nb_streams = 0
230 	};
231 	rc = rte_compressdev_configure(cdev_id, &config);
232 	if (rc < 0) {
233 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
234 		goto err;
235 	}
236 
237 	/* Pre-setup all potential qpairs now and assign them in the channel
238 	 * callback.
239 	 */
240 	for (i = 0; i < q_pairs; i++) {
241 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
242 						      NUM_MAX_INFLIGHT_OPS,
243 						      rte_socket_id());
244 		if (rc) {
245 			if (i > 0) {
246 				q_pairs = i;
247 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
248 					       "compressdev %u with error %u "
249 					       "so limiting to %u qpairs\n",
250 					       cdev_id, rc, q_pairs);
251 				break;
252 			} else {
253 				SPDK_ERRLOG("Failed to setup queue pair on "
254 					    "compressdev %u with error %u\n", cdev_id, rc);
255 				rc = -EINVAL;
256 				goto err;
257 			}
258 		}
259 	}
260 
261 	rc = rte_compressdev_start(cdev_id);
262 	if (rc < 0) {
263 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
264 			    cdev_id, rc);
265 		goto err;
266 	}
267 
268 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
269 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
270 				&device->comp_xform);
271 		if (rc < 0) {
272 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
273 				    cdev_id, rc);
274 			goto err;
275 		}
276 
277 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
278 				&device->decomp_xform);
279 		if (rc) {
280 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
281 				    cdev_id, rc);
282 			goto err;
283 		}
284 	} else {
285 		SPDK_ERRLOG("PMD does not support shared transforms\n");
286 		goto err;
287 	}
288 
289 	/* Build up list of device/qp combinations */
290 	for (i = 0; i < q_pairs; i++) {
291 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
292 		if (!dev_qp) {
293 			rc = -ENOMEM;
294 			goto err;
295 		}
296 		dev_qp->device = device;
297 		dev_qp->qp = i;
298 		dev_qp->thread = NULL;
299 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
300 	}
301 
302 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
303 
304 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
305 		g_qat_available = true;
306 	}
307 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
308 		g_isal_available = true;
309 	}
310 
311 	return 0;
312 
313 err:
314 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
315 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
316 		free(dev_qp);
317 	}
318 	free(device);
319 	return rc;
320 }
321 
322 /* Called from driver init entry point, vbdev_compress_init() */
323 static int
324 vbdev_init_compress_drivers(void)
325 {
326 	uint8_t cdev_count, i;
327 	struct compress_dev *tmp_dev;
328 	struct compress_dev *device;
329 	int rc;
330 
331 	/* We always init the compress_isal PMD */
332 	rc = rte_vdev_init(ISAL_PMD, NULL);
333 	if (rc == 0) {
334 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
335 	} else if (rc == -EEXIST) {
336 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
337 	} else {
338 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
339 		return -EINVAL;
340 	}
341 
342 	/* If we have no compression devices, there's no reason to continue. */
343 	cdev_count = rte_compressdev_count();
344 	if (cdev_count == 0) {
345 		return 0;
346 	}
347 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
348 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
349 		return -EINVAL;
350 	}
351 
352 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
353 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
354 	if (g_mbuf_mp == NULL) {
355 		SPDK_ERRLOG("Cannot create mbuf pool\n");
356 		rc = -ENOMEM;
357 		goto error_create_mbuf;
358 	}
359 
360 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
361 					       0, rte_socket_id());
362 	if (g_comp_op_mp == NULL) {
363 		SPDK_ERRLOG("Cannot create comp op pool\n");
364 		rc = -ENOMEM;
365 		goto error_create_op;
366 	}
367 
368 	/* Init all devices */
369 	for (i = 0; i < cdev_count; i++) {
370 		rc = create_compress_dev(i);
371 		if (rc != 0) {
372 			goto error_create_compress_devs;
373 		}
374 	}
375 
376 	if (g_qat_available == true) {
377 		SPDK_NOTICELOG("initialized QAT PMD\n");
378 	}
379 
380 	g_shinfo.free_cb = shinfo_free_cb;
381 
382 	return 0;
383 
384 	/* Error cleanup paths. */
385 error_create_compress_devs:
386 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
387 		TAILQ_REMOVE(&g_compress_devs, device, link);
388 		free(device);
389 	}
390 error_create_op:
391 error_create_mbuf:
392 	rte_mempool_free(g_mbuf_mp);
393 
394 	return rc;
395 }
396 
397 /* for completing rw requests on the orig IO thread. */
398 static void
399 _spdk_reduce_rw_blocks_cb(void *arg)
400 {
401 	struct comp_bdev_io *io_ctx = arg;
402 
403 	if (io_ctx->status == 0) {
404 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
405 	} else {
406 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
407 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
408 	}
409 }
410 
411 /* Completion callback for r/w that were issued via reducelib. */
412 static void
413 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
414 {
415 	struct spdk_bdev_io *bdev_io = arg;
416 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
417 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
418 
419 	/* TODO: need to decide which error codes are bdev_io success vs failure;
420 	 * example examine calls reading metadata */
421 
422 	io_ctx->status = reduce_errno;
423 
424 	/* Send this request to the orig IO thread. */
425 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
426 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx);
427 	} else {
428 		_spdk_reduce_rw_blocks_cb(io_ctx);
429 	}
430 }
431 
432 static int
433 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
434 		    int src_iovcnt, struct iovec *dst_iovs,
435 		    int dst_iovcnt, bool compress, void *cb_arg)
436 {
437 	void *reduce_cb_arg = cb_arg;
438 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
439 					   backing_dev);
440 	struct rte_comp_op *comp_op;
441 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
442 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
443 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
444 	uint64_t updated_length, remainder, phys_addr, total_length = 0;
445 	uint8_t *current_src_base = NULL;
446 	uint8_t *current_dst_base = NULL;
447 	int iov_index, mbuf_index;
448 	int rc = 0;
449 	struct vbdev_comp_op *op_to_queue;
450 	int i;
451 	int src_mbuf_total = src_iovcnt;
452 	int dst_mbuf_total = dst_iovcnt;
453 	bool device_error = false;
454 
455 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
456 
457 #ifdef DEBUG
458 	memset(src_mbufs, 0, sizeof(src_mbufs));
459 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
460 #endif
461 
462 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
463 	if (!comp_op) {
464 		SPDK_ERRLOG("trying to get a comp op!\n");
465 		goto error_get_op;
466 	}
467 
468 	/* get an mbuf per iov, src and dst */
469 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
470 	if (rc) {
471 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
472 		goto error_get_src;
473 	}
474 
475 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
476 	if (rc) {
477 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
478 		goto error_get_dst;
479 	}
480 
481 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
482 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
483 	 * and associate with our single comp_op.
484 	 */
485 
486 	/* Setup src mbufs */
487 	iov_index = mbuf_index = 0;
488 	while (iov_index < src_iovcnt) {
489 
490 		current_src_base = src_iovs[iov_index].iov_base;
491 		total_length += src_iovs[iov_index].iov_len;
492 		assert(src_mbufs[mbuf_index] != NULL);
493 		src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
494 		updated_length = src_iovs[iov_index].iov_len;
495 		phys_addr = spdk_vtophys((void *)current_src_base, &updated_length);
496 
497 		rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
498 					  current_src_base,
499 					  phys_addr,
500 					  updated_length,
501 					  &g_shinfo);
502 		rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length);
503 		remainder = src_iovs[iov_index].iov_len - updated_length;
504 
505 		if (mbuf_index > 0) {
506 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
507 		}
508 
509 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
510 		if (remainder > 0) {
511 			/* allocate an mbuf at the end of the array */
512 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1);
513 			if (rc) {
514 				SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n");
515 				goto error_src_dst;
516 			}
517 			src_mbuf_total++;
518 			mbuf_index++;
519 			src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
520 			current_src_base += updated_length;
521 			phys_addr = spdk_vtophys((void *)current_src_base, &remainder);
522 			/* assert we don't cross another */
523 			assert(remainder == src_iovs[iov_index].iov_len - updated_length);
524 
525 			rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
526 						  current_src_base,
527 						  phys_addr,
528 						  remainder,
529 						  &g_shinfo);
530 			rte_pktmbuf_append(src_mbufs[mbuf_index], remainder);
531 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
532 		}
533 		iov_index++;
534 		mbuf_index++;
535 	}
536 
537 	comp_op->m_src = src_mbufs[0];
538 	comp_op->src.offset = 0;
539 	comp_op->src.length = total_length;
540 
541 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
542 	iov_index = mbuf_index = 0;
543 	while (iov_index < dst_iovcnt) {
544 
545 		current_dst_base = dst_iovs[iov_index].iov_base;
546 		updated_length = dst_iovs[iov_index].iov_len;
547 		phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length);
548 
549 		rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
550 					  current_dst_base,
551 					  phys_addr,
552 					  updated_length,
553 					  &g_shinfo);
554 		rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length);
555 		remainder = dst_iovs[iov_index].iov_len - updated_length;
556 
557 		if (mbuf_index > 0) {
558 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
559 		}
560 
561 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
562 		if (remainder > 0) {
563 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1);
564 			if (rc) {
565 				SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n");
566 				goto error_src_dst;
567 			}
568 			dst_mbuf_total++;
569 			mbuf_index++;
570 			current_dst_base += updated_length;
571 			phys_addr = spdk_vtophys((void *)current_dst_base, &remainder);
572 			/* assert we don't cross another */
573 			assert(remainder == dst_iovs[iov_index].iov_len - updated_length);
574 
575 			rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
576 						  current_dst_base,
577 						  phys_addr,
578 						  remainder,
579 						  &g_shinfo);
580 			rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder);
581 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
582 		}
583 		iov_index++;
584 		mbuf_index++;
585 	}
586 
587 	comp_op->m_dst = dst_mbufs[0];
588 	comp_op->dst.offset = 0;
589 
590 	if (compress == true) {
591 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
592 	} else {
593 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
594 	}
595 
596 	comp_op->op_type = RTE_COMP_OP_STATELESS;
597 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
598 
599 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
600 	assert(rc <= 1);
601 
602 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
603 	if (rc == 1) {
604 		return 0;
605 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
606 		/* we free mbufs differently depending on whether they were chained or not */
607 		rte_pktmbuf_free(comp_op->m_src);
608 		rte_pktmbuf_free(comp_op->m_dst);
609 		goto error_enqueue;
610 	} else {
611 		device_error = true;
612 		goto error_src_dst;
613 	}
614 
615 	/* Error cleanup paths. */
616 error_src_dst:
617 	for (i = 0; i < dst_mbuf_total; i++) {
618 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
619 	}
620 error_get_dst:
621 	for (i = 0; i < src_mbuf_total; i++) {
622 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
623 	}
624 error_get_src:
625 error_enqueue:
626 	rte_comp_op_free(comp_op);
627 error_get_op:
628 
629 	if (device_error == true) {
630 		/* There was an error sending the op to the device, most
631 		 * likely with the parameters.
632 		 */
633 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
634 		return -EINVAL;
635 	}
636 
637 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
638 	if (op_to_queue == NULL) {
639 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
640 		return -ENOMEM;
641 	}
642 	op_to_queue->backing_dev = backing_dev;
643 	op_to_queue->src_iovs = src_iovs;
644 	op_to_queue->src_iovcnt = src_iovcnt;
645 	op_to_queue->dst_iovs = dst_iovs;
646 	op_to_queue->dst_iovcnt = dst_iovcnt;
647 	op_to_queue->compress = compress;
648 	op_to_queue->cb_arg = cb_arg;
649 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
650 			  op_to_queue,
651 			  link);
652 	return 0;
653 }
654 
655 /* Poller for the DPDK compression driver. */
656 static int
657 comp_dev_poller(void *args)
658 {
659 	struct vbdev_compress *comp_bdev = args;
660 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
661 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
662 	uint16_t num_deq;
663 	struct spdk_reduce_vol_cb_args *reduce_args;
664 	struct vbdev_comp_op *op_to_resubmit;
665 	int rc, i;
666 
667 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
668 						NUM_MAX_INFLIGHT_OPS);
669 	for (i = 0; i < num_deq; i++) {
670 		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata;
671 
672 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
673 
674 			/* tell reduce this is done and what the bytecount was */
675 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
676 		} else {
677 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
678 				       deq_ops[i]->status);
679 
680 			/* Reduce will simply store uncompressed on neg errno value. */
681 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
682 		}
683 
684 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
685 		 * call takes care of freeing all of the mbufs in the chain back to their
686 		 * original pool.
687 		 */
688 		rte_pktmbuf_free(deq_ops[i]->m_src);
689 		rte_pktmbuf_free(deq_ops[i]->m_dst);
690 
691 		/* There is no bulk free for com ops so we have to free them one at a time
692 		 * here however it would be rare that we'd ever have more than 1 at a time
693 		 * anyways.
694 		 */
695 		rte_comp_op_free(deq_ops[i]);
696 
697 		/* Check if there are any pending comp ops to process, only pull one
698 		 * at a time off as _compress_operation() may re-queue the op.
699 		 */
700 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
701 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
702 			rc = _compress_operation(op_to_resubmit->backing_dev,
703 						 op_to_resubmit->src_iovs,
704 						 op_to_resubmit->src_iovcnt,
705 						 op_to_resubmit->dst_iovs,
706 						 op_to_resubmit->dst_iovcnt,
707 						 op_to_resubmit->compress,
708 						 op_to_resubmit->cb_arg);
709 			if (rc == 0) {
710 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
711 				free(op_to_resubmit);
712 			}
713 		}
714 	}
715 	return 0;
716 }
717 
718 /* Entry point for reduce lib to issue a compress operation. */
719 static void
720 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
721 		      struct iovec *src_iovs, int src_iovcnt,
722 		      struct iovec *dst_iovs, int dst_iovcnt,
723 		      struct spdk_reduce_vol_cb_args *cb_arg)
724 {
725 	int rc;
726 
727 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
728 	if (rc) {
729 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
730 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
731 	}
732 }
733 
734 /* Entry point for reduce lib to issue a decompress operation. */
735 static void
736 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
737 			struct iovec *src_iovs, int src_iovcnt,
738 			struct iovec *dst_iovs, int dst_iovcnt,
739 			struct spdk_reduce_vol_cb_args *cb_arg)
740 {
741 	int rc;
742 
743 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
744 	if (rc) {
745 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
746 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
747 	}
748 }
749 
750 /* Callback for getting a buf from the bdev pool in the event that the caller passed
751  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
752  * beneath us before we're done with it.
753  */
754 static void
755 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
756 {
757 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
758 					   comp_bdev);
759 
760 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
761 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
762 			      spdk_reduce_rw_blocks_cb, bdev_io);
763 }
764 
765 /* scheduled for completion on IO thread */
766 static void
767 _complete_other_io(void *arg)
768 {
769 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
770 	if (io_ctx->status == 0) {
771 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
772 	} else {
773 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
774 	}
775 }
776 
777 /* scheduled for submission on reduce thread */
778 static void
779 _comp_bdev_io_submit(void *arg)
780 {
781 	struct spdk_bdev_io *bdev_io = arg;
782 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
783 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
784 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
785 					   comp_bdev);
786 	int rc = 0;
787 
788 	switch (bdev_io->type) {
789 	case SPDK_BDEV_IO_TYPE_READ:
790 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
791 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
792 		return;
793 	case SPDK_BDEV_IO_TYPE_WRITE:
794 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
795 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
796 				       spdk_reduce_rw_blocks_cb, bdev_io);
797 		return;
798 	/* TODO in future patch in the series */
799 	case SPDK_BDEV_IO_TYPE_RESET:
800 		break;
801 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
802 	case SPDK_BDEV_IO_TYPE_UNMAP:
803 	case SPDK_BDEV_IO_TYPE_FLUSH:
804 	default:
805 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
806 		rc = -EINVAL;
807 	}
808 
809 	if (rc) {
810 		if (rc == -ENOMEM) {
811 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
812 			io_ctx->ch = ch;
813 			vbdev_compress_queue_io(bdev_io);
814 			return;
815 		} else {
816 			SPDK_ERRLOG("on bdev_io submission!\n");
817 			io_ctx->status = rc;
818 		}
819 	}
820 
821 	/* Complete this on the orig IO thread. */
822 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
823 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx);
824 	} else {
825 		_complete_other_io(io_ctx);
826 	}
827 }
828 
829 /* Called when someone above submits IO to this vbdev. */
830 static void
831 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
832 {
833 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
834 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
835 					   comp_bdev);
836 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
837 
838 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
839 	io_ctx->comp_bdev = comp_bdev;
840 	io_ctx->comp_ch = comp_ch;
841 	io_ctx->orig_io = bdev_io;
842 
843 	/* Send this request to the reduce_thread if that's not what we're on. */
844 	if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) {
845 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
846 	} else {
847 		_comp_bdev_io_submit(bdev_io);
848 	}
849 }
850 
851 static bool
852 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
853 {
854 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
855 
856 	switch (io_type) {
857 	case SPDK_BDEV_IO_TYPE_READ:
858 	case SPDK_BDEV_IO_TYPE_WRITE:
859 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
860 	case SPDK_BDEV_IO_TYPE_UNMAP:
861 	case SPDK_BDEV_IO_TYPE_RESET:
862 	case SPDK_BDEV_IO_TYPE_FLUSH:
863 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
864 	default:
865 		return false;
866 	}
867 }
868 
869 /* Resubmission function used by the bdev layer when a queued IO is ready to be
870  * submitted.
871  */
872 static void
873 vbdev_compress_resubmit_io(void *arg)
874 {
875 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
876 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
877 
878 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
879 }
880 
881 /* Used to queue an IO in the event of resource issues. */
882 static void
883 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
884 {
885 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
886 	int rc;
887 
888 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
889 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
890 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
891 
892 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
893 	if (rc) {
894 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
895 		assert(false);
896 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
897 	}
898 }
899 
900 /* Callback for unregistering the IO device. */
901 static void
902 _device_unregister_cb(void *io_device)
903 {
904 	struct vbdev_compress *comp_bdev = io_device;
905 
906 	/* Done with this comp_bdev. */
907 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
908 	free(comp_bdev->comp_bdev.name);
909 	free(comp_bdev);
910 }
911 
912 static void
913 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
914 {
915 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
916 
917 	if (reduce_errno) {
918 		SPDK_ERRLOG("number %d\n", reduce_errno);
919 	} else {
920 		TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
921 		spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
922 		spdk_bdev_close(comp_bdev->base_desc);
923 		comp_bdev->vol = NULL;
924 		if (comp_bdev->orphaned == false) {
925 			spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
926 		} else {
927 			comp_bdev->delete_cb_fn(comp_bdev->delete_cb_arg, 0);
928 			_device_unregister_cb(comp_bdev);
929 		}
930 	}
931 }
932 
933 static void
934 _reduce_destroy_cb(void *ctx, int reduce_errno)
935 {
936 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
937 
938 	if (reduce_errno) {
939 		SPDK_ERRLOG("number %d\n", reduce_errno);
940 	}
941 
942 	comp_bdev->vol = NULL;
943 	spdk_put_io_channel(comp_bdev->base_ch);
944 	if (comp_bdev->orphaned == false) {
945 		spdk_bdev_unregister(&comp_bdev->comp_bdev, comp_bdev->delete_cb_fn,
946 				     comp_bdev->delete_cb_arg);
947 	} else {
948 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
949 	}
950 
951 }
952 
953 /* Called by reduceLib after performing unload vol actions */
954 static void
955 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
956 {
957 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
958 
959 	if (reduce_errno) {
960 		SPDK_ERRLOG("number %d\n", reduce_errno);
961 	} else {
962 		/* reducelib needs a channel to comm with the backing device */
963 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
964 
965 		/* Clean the device before we free our resources. */
966 		spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
967 	}
968 }
969 
970 const char *
971 compress_get_name(const struct vbdev_compress *comp_bdev)
972 {
973 	return comp_bdev->comp_bdev.name;
974 }
975 
976 struct vbdev_compress *
977 compress_bdev_first(void)
978 {
979 	struct vbdev_compress *comp_bdev;
980 
981 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
982 
983 	return comp_bdev;
984 }
985 
986 struct vbdev_compress *
987 compress_bdev_next(struct vbdev_compress *prev)
988 {
989 	struct vbdev_compress *comp_bdev;
990 
991 	comp_bdev = TAILQ_NEXT(prev, link);
992 
993 	return comp_bdev;
994 }
995 
996 bool
997 compress_has_orphan(const char *name)
998 {
999 	struct vbdev_compress *comp_bdev;
1000 
1001 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1002 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1003 			return true;
1004 		}
1005 	}
1006 	return false;
1007 }
1008 
1009 /* Called after we've unregistered following a hot remove callback.
1010  * Our finish entry point will be called next.
1011  */
1012 static int
1013 vbdev_compress_destruct(void *ctx)
1014 {
1015 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1016 
1017 	if (comp_bdev->vol != NULL) {
1018 		/* Tell reducelib that we're done with this volume. */
1019 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1020 	} else {
1021 		vbdev_compress_destruct_cb(comp_bdev, 0);
1022 	}
1023 
1024 	return 0;
1025 }
1026 
1027 /* We supplied this as an entry point for upper layers who want to communicate to this
1028  * bdev.  This is how they get a channel.
1029  */
1030 static struct spdk_io_channel *
1031 vbdev_compress_get_io_channel(void *ctx)
1032 {
1033 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1034 
1035 	/* The IO channel code will allocate a channel for us which consists of
1036 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1037 	 * that we passed in when we registered our IO device. It will then call
1038 	 * our channel create callback to populate any elements that we need to
1039 	 * update.
1040 	 */
1041 	return spdk_get_io_channel(comp_bdev);
1042 }
1043 
1044 /* This is the output for bdev_get_bdevs() for this vbdev */
1045 static int
1046 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1047 {
1048 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1049 
1050 	spdk_json_write_name(w, "compress");
1051 	spdk_json_write_object_begin(w);
1052 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1053 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1054 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1055 	spdk_json_write_object_end(w);
1056 
1057 	return 0;
1058 }
1059 
1060 /* This is used to generate JSON that can configure this module to its current state. */
1061 static int
1062 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1063 {
1064 	struct vbdev_compress *comp_bdev;
1065 
1066 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1067 		spdk_json_write_object_begin(w);
1068 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1069 		spdk_json_write_named_object_begin(w, "params");
1070 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1071 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1072 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1073 		spdk_json_write_object_end(w);
1074 		spdk_json_write_object_end(w);
1075 	}
1076 	return 0;
1077 }
1078 
1079 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1080  * used for initial metadata operations to claim where it will be further filled out
1081  * and added to the global list.
1082  */
1083 static void
1084 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1085 {
1086 	struct vbdev_compress *meta_ctx = cb_arg;
1087 
1088 	/* We're done with metadata operations */
1089 	spdk_put_io_channel(meta_ctx->base_ch);
1090 	spdk_bdev_close(meta_ctx->base_desc);
1091 	meta_ctx->base_desc = NULL;
1092 
1093 	if (reduce_errno == 0) {
1094 		meta_ctx->vol = vol;
1095 		vbdev_compress_claim(meta_ctx);
1096 	} else {
1097 		SPDK_ERRLOG("for vol %s, error %u\n",
1098 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1099 		free(meta_ctx);
1100 	}
1101 }
1102 
1103 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1104  * call the callback provided by reduceLib when it called the read/write/unmap function and
1105  * free the bdev_io.
1106  */
1107 static void
1108 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1109 {
1110 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1111 	int reduce_errno;
1112 
1113 	if (success) {
1114 		reduce_errno = 0;
1115 	} else {
1116 		reduce_errno = -EIO;
1117 	}
1118 	spdk_bdev_free_io(bdev_io);
1119 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1120 }
1121 
1122 /* This is the function provided to the reduceLib for sending reads directly to
1123  * the backing device.
1124  */
1125 static void
1126 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1127 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1128 {
1129 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1130 					   backing_dev);
1131 	int rc;
1132 
1133 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1134 				    iov, iovcnt, lba, lba_count,
1135 				    comp_reduce_io_cb,
1136 				    args);
1137 	if (rc) {
1138 		if (rc == -ENOMEM) {
1139 			SPDK_ERRLOG("No memory, start to queue io.\n");
1140 			/* TODO: there's no bdev_io to queue */
1141 		} else {
1142 			SPDK_ERRLOG("submitting readv request\n");
1143 		}
1144 		args->cb_fn(args->cb_arg, rc);
1145 	}
1146 }
1147 
1148 /* This is the function provided to the reduceLib for sending writes directly to
1149  * the backing device.
1150  */
1151 static void
1152 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1153 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1154 {
1155 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1156 					   backing_dev);
1157 	int rc;
1158 
1159 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1160 				     iov, iovcnt, lba, lba_count,
1161 				     comp_reduce_io_cb,
1162 				     args);
1163 	if (rc) {
1164 		if (rc == -ENOMEM) {
1165 			SPDK_ERRLOG("No memory, start to queue io.\n");
1166 			/* TODO: there's no bdev_io to queue */
1167 		} else {
1168 			SPDK_ERRLOG("error submitting writev request\n");
1169 		}
1170 		args->cb_fn(args->cb_arg, rc);
1171 	}
1172 }
1173 
1174 /* This is the function provided to the reduceLib for sending unmaps directly to
1175  * the backing device.
1176  */
1177 static void
1178 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1179 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1180 {
1181 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1182 					   backing_dev);
1183 	int rc;
1184 
1185 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1186 				    lba, lba_count,
1187 				    comp_reduce_io_cb,
1188 				    args);
1189 
1190 	if (rc) {
1191 		if (rc == -ENOMEM) {
1192 			SPDK_ERRLOG("No memory, start to queue io.\n");
1193 			/* TODO: there's no bdev_io to queue */
1194 		} else {
1195 			SPDK_ERRLOG("submitting unmap request\n");
1196 		}
1197 		args->cb_fn(args->cb_arg, rc);
1198 	}
1199 }
1200 
1201 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1202 static void
1203 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1204 {
1205 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1206 
1207 	if (reduce_errno) {
1208 		SPDK_ERRLOG("number %d\n", reduce_errno);
1209 	}
1210 
1211 	comp_bdev->vol = NULL;
1212 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1213 }
1214 
1215 /* Called when the underlying base bdev goes away. */
1216 static void
1217 vbdev_compress_base_bdev_hotremove_cb(void *ctx)
1218 {
1219 	struct vbdev_compress *comp_bdev, *tmp;
1220 	struct spdk_bdev *bdev_find = ctx;
1221 
1222 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1223 		if (bdev_find == comp_bdev->base_bdev) {
1224 			/* Tell reduceLib that we're done with this volume. */
1225 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1226 		}
1227 	}
1228 }
1229 
1230 /* TODO: determine which parms we want user configurable, HC for now
1231  * params.vol_size
1232  * params.chunk_size
1233  * compression PMD, algorithm, window size, comp level, etc.
1234  * DEV_MD_PATH
1235  */
1236 
1237 /* Common function for init and load to allocate and populate the minimal
1238  * information for reducelib to init or load.
1239  */
1240 struct vbdev_compress *
1241 _prepare_for_load_init(struct spdk_bdev *bdev)
1242 {
1243 	struct vbdev_compress *meta_ctx;
1244 
1245 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1246 	if (meta_ctx == NULL) {
1247 		SPDK_ERRLOG("failed to alloc init contexts\n");
1248 		return NULL;
1249 	}
1250 
1251 	meta_ctx->drv_name = "None";
1252 	meta_ctx->base_bdev = bdev;
1253 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1254 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1255 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1256 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1257 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1258 
1259 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1260 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1261 
1262 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1263 	meta_ctx->params.logical_block_size = bdev->blocklen;
1264 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1265 	return meta_ctx;
1266 }
1267 
1268 static bool
1269 _set_pmd(struct vbdev_compress *comp_dev)
1270 {
1271 	if (g_opts == COMPRESS_PMD_AUTO) {
1272 		if (g_qat_available) {
1273 			comp_dev->drv_name = QAT_PMD;
1274 		} else {
1275 			comp_dev->drv_name = ISAL_PMD;
1276 		}
1277 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1278 		comp_dev->drv_name = QAT_PMD;
1279 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1280 		comp_dev->drv_name = ISAL_PMD;
1281 	} else {
1282 		SPDK_ERRLOG("Requested PMD is not available.\n");
1283 		return false;
1284 	}
1285 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1286 	return true;
1287 }
1288 
1289 /* Call reducelib to initialize a new volume */
1290 static int
1291 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path)
1292 {
1293 	struct vbdev_compress *meta_ctx;
1294 	int rc;
1295 
1296 	meta_ctx = _prepare_for_load_init(bdev);
1297 	if (meta_ctx == NULL) {
1298 		return -EINVAL;
1299 	}
1300 
1301 	if (_set_pmd(meta_ctx) == false) {
1302 		SPDK_ERRLOG("could not find required pmd\n");
1303 		free(meta_ctx);
1304 		return -EINVAL;
1305 	}
1306 
1307 	rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1308 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1309 	if (rc) {
1310 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1311 		free(meta_ctx);
1312 		return -EINVAL;
1313 	}
1314 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1315 
1316 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1317 			     pm_path,
1318 			     vbdev_reduce_init_cb,
1319 			     meta_ctx);
1320 	return 0;
1321 }
1322 
1323 /* We provide this callback for the SPDK channel code to create a channel using
1324  * the channel struct we provided in our module get_io_channel() entry point. Here
1325  * we get and save off an underlying base channel of the device below us so that
1326  * we can communicate with the base bdev on a per channel basis.  If we needed
1327  * our own poller for this vbdev, we'd register it here.
1328  */
1329 static int
1330 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1331 {
1332 	struct vbdev_compress *comp_bdev = io_device;
1333 	struct comp_device_qp *device_qp;
1334 
1335 	/* We use this queue to track outstanding IO in our layer. */
1336 	TAILQ_INIT(&comp_bdev->pending_comp_ios);
1337 
1338 	/* We use this to queue up compression operations as needed. */
1339 	TAILQ_INIT(&comp_bdev->queued_comp_ops);
1340 
1341 	/* Now set the reduce channel if it's not already set. */
1342 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1343 	if (comp_bdev->ch_count == 0) {
1344 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1345 		comp_bdev->reduce_thread = spdk_get_thread();
1346 		comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0);
1347 		/* Now assign a q pair */
1348 		pthread_mutex_lock(&g_comp_device_qp_lock);
1349 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1350 			if ((strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0)) {
1351 				if (device_qp->thread == spdk_get_thread()) {
1352 					comp_bdev->device_qp = device_qp;
1353 					break;
1354 				}
1355 				if (device_qp->thread == NULL) {
1356 					comp_bdev->device_qp = device_qp;
1357 					device_qp->thread = spdk_get_thread();
1358 					break;
1359 				}
1360 			}
1361 		}
1362 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1363 	}
1364 	comp_bdev->ch_count++;
1365 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1366 
1367 	if (comp_bdev->device_qp != NULL) {
1368 		return 0;
1369 	} else {
1370 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1371 		assert(false);
1372 		return -ENOMEM;
1373 	}
1374 }
1375 
1376 static void
1377 _channel_cleanup(struct vbdev_compress *comp_bdev)
1378 {
1379 	/* Note: comp_bdevs can share a device_qp if they are
1380 	 * on the same thread so we leave the device_qp element
1381 	 * alone for this comp_bdev and just clear the reduce thread.
1382 	 */
1383 	spdk_put_io_channel(comp_bdev->base_ch);
1384 	comp_bdev->reduce_thread = NULL;
1385 	spdk_poller_unregister(&comp_bdev->poller);
1386 }
1387 
1388 /* Used to reroute destroy_ch to the correct thread */
1389 static void
1390 _comp_bdev_ch_destroy_cb(void *arg)
1391 {
1392 	struct vbdev_compress *comp_bdev = arg;
1393 
1394 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1395 	if (comp_bdev->ch_count == 0) {
1396 		_channel_cleanup(comp_bdev);
1397 	}
1398 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1399 }
1400 
1401 /* We provide this callback for the SPDK channel code to destroy a channel
1402  * created with our create callback. We just need to undo anything we did
1403  * when we created. If this bdev used its own poller, we'd unregister it here.
1404  */
1405 static void
1406 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1407 {
1408 	struct vbdev_compress *comp_bdev = io_device;
1409 
1410 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1411 	comp_bdev->ch_count--;
1412 	if (comp_bdev->ch_count == 0) {
1413 		/* Send this request to the thread where the channel was created. */
1414 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1415 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1416 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1417 		} else {
1418 			_channel_cleanup(comp_bdev);
1419 		}
1420 	}
1421 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1422 }
1423 
1424 /* RPC entry point for compression vbdev creation. */
1425 int
1426 create_compress_bdev(const char *bdev_name, const char *pm_path)
1427 {
1428 	struct spdk_bdev *bdev;
1429 
1430 	bdev = spdk_bdev_get_by_name(bdev_name);
1431 	if (!bdev) {
1432 		return -ENODEV;
1433 	}
1434 
1435 	return vbdev_init_reduce(bdev, pm_path);;
1436 }
1437 
1438 /* On init, just init the compress drivers. All metadata is stored on disk. */
1439 static int
1440 vbdev_compress_init(void)
1441 {
1442 	if (vbdev_init_compress_drivers()) {
1443 		SPDK_ERRLOG("Error setting up compression devices\n");
1444 		return -EINVAL;
1445 	}
1446 
1447 	return 0;
1448 }
1449 
1450 /* Called when the entire module is being torn down. */
1451 static void
1452 vbdev_compress_finish(void)
1453 {
1454 	struct comp_device_qp *dev_qp;
1455 	/* TODO: unload vol in a future patch */
1456 
1457 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1458 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1459 		free(dev_qp);
1460 	}
1461 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1462 
1463 	rte_mempool_free(g_comp_op_mp);
1464 	rte_mempool_free(g_mbuf_mp);
1465 }
1466 
1467 /* During init we'll be asked how much memory we'd like passed to us
1468  * in bev_io structures as context. Here's where we specify how
1469  * much context we want per IO.
1470  */
1471 static int
1472 vbdev_compress_get_ctx_size(void)
1473 {
1474 	return sizeof(struct comp_bdev_io);
1475 }
1476 
1477 /* When we register our bdev this is how we specify our entry points. */
1478 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1479 	.destruct		= vbdev_compress_destruct,
1480 	.submit_request		= vbdev_compress_submit_request,
1481 	.io_type_supported	= vbdev_compress_io_type_supported,
1482 	.get_io_channel		= vbdev_compress_get_io_channel,
1483 	.dump_info_json		= vbdev_compress_dump_info_json,
1484 	.write_config_json	= NULL,
1485 };
1486 
1487 static struct spdk_bdev_module compress_if = {
1488 	.name = "compress",
1489 	.module_init = vbdev_compress_init,
1490 	.config_text = NULL,
1491 	.get_ctx_size = vbdev_compress_get_ctx_size,
1492 	.examine_disk = vbdev_compress_examine,
1493 	.module_fini = vbdev_compress_finish,
1494 	.config_json = vbdev_compress_config_json
1495 };
1496 
1497 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1498 
1499 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1500 {
1501 	struct spdk_bdev_alias *aliases;
1502 
1503 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1504 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1505 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias);
1506 		if (!comp_bdev->comp_bdev.name) {
1507 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1508 			return -ENOMEM;
1509 		}
1510 	} else {
1511 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1512 		if (!comp_bdev->comp_bdev.name) {
1513 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1514 			return -ENOMEM;
1515 		}
1516 	}
1517 	return 0;
1518 }
1519 
1520 static void
1521 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1522 {
1523 	int rc;
1524 
1525 	if (_set_compbdev_name(comp_bdev)) {
1526 		goto error_bdev_name;
1527 	}
1528 
1529 	/* Note: some of the fields below will change in the future - for example,
1530 	 * blockcnt specifically will not match (the compressed volume size will
1531 	 * be slightly less than the base bdev size)
1532 	 */
1533 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1534 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1535 
1536 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1537 		comp_bdev->comp_bdev.required_alignment =
1538 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1539 				 comp_bdev->base_bdev->required_alignment);
1540 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1541 			       comp_bdev->comp_bdev.required_alignment);
1542 	} else {
1543 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1544 	}
1545 	comp_bdev->comp_bdev.optimal_io_boundary =
1546 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1547 
1548 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1549 
1550 	comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen;
1551 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1552 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1553 
1554 	/* This is the context that is passed to us when the bdev
1555 	 * layer calls in so we'll save our comp_bdev node here.
1556 	 */
1557 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1558 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1559 	comp_bdev->comp_bdev.module = &compress_if;
1560 
1561 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1562 
1563 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1564 
1565 	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1566 			    comp_bdev->base_bdev, &comp_bdev->base_desc);
1567 	if (rc) {
1568 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1569 		goto error_open;
1570 	}
1571 
1572 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1573 				sizeof(struct comp_io_channel),
1574 				comp_bdev->comp_bdev.name);
1575 
1576 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1577 					 comp_bdev->comp_bdev.module);
1578 	if (rc) {
1579 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1580 		goto error_claim;
1581 	}
1582 
1583 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1584 	if (rc < 0) {
1585 		SPDK_ERRLOG("trying to register bdev\n");
1586 		goto error_bdev_register;
1587 	}
1588 
1589 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1590 
1591 	return;
1592 	/* Error cleanup paths. */
1593 error_bdev_register:
1594 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1595 error_claim:
1596 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
1597 	spdk_io_device_unregister(comp_bdev, NULL);
1598 error_open:
1599 	free(comp_bdev->comp_bdev.name);
1600 error_bdev_name:
1601 	spdk_put_io_channel(comp_bdev->base_ch);
1602 	spdk_bdev_close(comp_bdev->base_desc);
1603 	free(comp_bdev);
1604 	spdk_bdev_module_examine_done(&compress_if);
1605 }
1606 
1607 void
1608 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1609 {
1610 	struct vbdev_compress *comp_bdev = NULL;
1611 
1612 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1613 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1614 			break;
1615 		}
1616 	}
1617 
1618 	if (comp_bdev == NULL) {
1619 		cb_fn(cb_arg, -ENODEV);
1620 		return;
1621 	}
1622 
1623 	/* Save these for after the vol is destroyed. */
1624 	comp_bdev->delete_cb_fn = cb_fn;
1625 	comp_bdev->delete_cb_arg = cb_arg;
1626 
1627 	/* Tell reducelib that we're done with this volume. */
1628 	if (comp_bdev->orphaned == false) {
1629 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1630 	} else {
1631 		delete_vol_unload_cb(comp_bdev, 0);
1632 	}
1633 }
1634 
1635 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1636  * used for initial metadata operations to claim where it will be further filled out
1637  * and added to the global list.
1638  */
1639 static void
1640 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1641 {
1642 	struct vbdev_compress *meta_ctx = cb_arg;
1643 	int rc;
1644 
1645 	/* Done with metadata operations */
1646 	spdk_put_io_channel(meta_ctx->base_ch);
1647 	spdk_bdev_close(meta_ctx->base_desc);
1648 	meta_ctx->base_desc = NULL;
1649 
1650 	if (reduce_errno != 0 && reduce_errno != -ENOENT) {
1651 		/* This error means it is not a compress disk. */
1652 		if (reduce_errno != -EILSEQ) {
1653 			SPDK_ERRLOG("for vol %s, error %u\n",
1654 				    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1655 		}
1656 		free(meta_ctx);
1657 		spdk_bdev_module_examine_done(&compress_if);
1658 		return;
1659 	}
1660 
1661 	/* this status means that the vol could not be loaded because
1662 	 * the pmem file can't be found.
1663 	 */
1664 	if (reduce_errno == -ENOENT) {
1665 		if (_set_compbdev_name(meta_ctx)) {
1666 			goto err;
1667 		}
1668 
1669 		/* We still want to open and claim the backing device to protect the data until
1670 		 * either the pm metadata file is recovered or the comp bdev is deleted.
1671 		 */
1672 		rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1673 				    meta_ctx->base_bdev, &meta_ctx->base_desc);
1674 		if (rc) {
1675 			SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1676 			goto err;
1677 		}
1678 
1679 		meta_ctx->comp_bdev.module = &compress_if;
1680 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1681 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1682 						 meta_ctx->comp_bdev.module);
1683 		if (rc) {
1684 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1685 			goto err;
1686 		}
1687 
1688 		meta_ctx->orphaned = true;
1689 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1690 err:
1691 		spdk_bdev_module_examine_done(&compress_if);
1692 		return;
1693 	}
1694 
1695 	if (_set_pmd(meta_ctx) == false) {
1696 		SPDK_ERRLOG("could not find required pmd\n");
1697 		free(meta_ctx);
1698 		spdk_bdev_module_examine_done(&compress_if);
1699 		return;
1700 	}
1701 
1702 	/* Update information following volume load. */
1703 	meta_ctx->vol = vol;
1704 	memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1705 	       sizeof(struct spdk_reduce_vol_params));
1706 	vbdev_compress_claim(meta_ctx);
1707 	spdk_bdev_module_examine_done(&compress_if);
1708 }
1709 
1710 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1711  * and if so will go ahead and claim it.
1712  */
1713 static void
1714 vbdev_compress_examine(struct spdk_bdev *bdev)
1715 {
1716 	struct vbdev_compress *meta_ctx;
1717 	int rc;
1718 
1719 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1720 		spdk_bdev_module_examine_done(&compress_if);
1721 		return;
1722 	}
1723 
1724 	meta_ctx = _prepare_for_load_init(bdev);
1725 	if (meta_ctx == NULL) {
1726 		spdk_bdev_module_examine_done(&compress_if);
1727 		return;
1728 	}
1729 
1730 	rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
1731 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1732 	if (rc) {
1733 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1734 		free(meta_ctx);
1735 		spdk_bdev_module_examine_done(&compress_if);
1736 		return;
1737 	}
1738 
1739 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1740 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1741 }
1742 
1743 int
1744 compress_set_pmd(enum compress_pmd *opts)
1745 {
1746 	g_opts = *opts;
1747 
1748 	return 0;
1749 }
1750 
1751 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
1752