xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_compress.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/stdinc.h"
38 #include "spdk/rpc.h"
39 #include "spdk/env.h"
40 #include "spdk/conf.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 
54 #define NUM_MAX_XFORMS 2
55 #define NUM_MAX_INFLIGHT_OPS 128
56 #define DEFAULT_WINDOW_SIZE 15
57 /* We need extra mbufs per operation to accommodate host buffers that
58  *  span a 2MB boundary.
59  */
60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
61 #define CHUNK_SIZE (1024 * 16)
62 #define COMP_BDEV_NAME "compress"
63 #define BACKING_IO_SZ (4 * 1024)
64 
65 #define ISAL_PMD "compress_isal"
66 #define QAT_PMD "compress_qat"
67 #define NUM_MBUFS		8192
68 #define POOL_CACHE_SIZE		256
69 
70 static enum compress_pmd g_opts;
71 
72 /* Global list of available compression devices. */
73 struct compress_dev {
74 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
75 	uint8_t				cdev_id;	/* identifier for the device */
76 	void				*comp_xform;	/* shared private xform for comp on this PMD */
77 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
78 	TAILQ_ENTRY(compress_dev)	link;
79 };
80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
81 
82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
83  * the length of the internal ring name that it creates, it breaks a limit in the generic
84  * ring code and fails the qp initialization.
85  */
86 #define MAX_NUM_QP 99
87 /* Global list and lock for unique device/queue pair combos */
88 struct comp_device_qp {
89 	struct compress_dev		*device;	/* ptr to compression device */
90 	uint8_t				qp;		/* queue pair for this node */
91 	struct spdk_thread		*thread;	/* thead that this qp is assigned to */
92 	TAILQ_ENTRY(comp_device_qp)	link;
93 };
94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
96 
97 /* For queueing up compression operations that we can't submit for some reason */
98 struct vbdev_comp_op {
99 	struct spdk_reduce_backing_dev	*backing_dev;
100 	struct iovec			*src_iovs;
101 	int				src_iovcnt;
102 	struct iovec			*dst_iovs;
103 	int				dst_iovcnt;
104 	bool				compress;
105 	void				*cb_arg;
106 	TAILQ_ENTRY(vbdev_comp_op)	link;
107 };
108 
109 /* List of virtual bdevs and associated info for each. */
110 struct vbdev_compress {
111 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
112 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
113 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
114 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
115 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
116 	char				*drv_name;	/* name of the compression device driver */
117 	struct comp_device_qp		*device_qp;
118 	struct spdk_thread		*reduce_thread;
119 	pthread_mutex_t			reduce_lock;
120 	uint32_t			ch_count;
121 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
122 	struct spdk_poller		*poller;	/* completion poller */
123 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
124 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
125 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
126 	spdk_delete_compress_complete	delete_cb_fn;
127 	void				*delete_cb_arg;
128 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
129 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
130 	TAILQ_ENTRY(vbdev_compress)	link;
131 };
132 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
133 
134 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
135  */
136 struct comp_io_channel {
137 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
138 };
139 
140 /* Per I/O context for the compression vbdev. */
141 struct comp_bdev_io {
142 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
143 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
144 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
145 	struct spdk_bdev_io		*orig_io;		/* the original IO */
146 	struct spdk_io_channel		*ch;			/* for resubmission */
147 	int				status;			/* save for completion on orig thread */
148 };
149 
150 /* Shared mempools between all devices on this system */
151 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
152 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
153 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
154 static bool g_qat_available = false;
155 static bool g_isal_available = false;
156 
157 /* Create shared (between all ops per PMD) compress xforms. */
158 static struct rte_comp_xform g_comp_xform = {
159 	.type = RTE_COMP_COMPRESS,
160 	.compress = {
161 		.algo = RTE_COMP_ALGO_DEFLATE,
162 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
163 		.level = RTE_COMP_LEVEL_MAX,
164 		.window_size = DEFAULT_WINDOW_SIZE,
165 		.chksum = RTE_COMP_CHECKSUM_NONE,
166 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
167 	}
168 };
169 /* Create shared (between all ops per PMD) decompress xforms. */
170 static struct rte_comp_xform g_decomp_xform = {
171 	.type = RTE_COMP_DECOMPRESS,
172 	.decompress = {
173 		.algo = RTE_COMP_ALGO_DEFLATE,
174 		.chksum = RTE_COMP_CHECKSUM_NONE,
175 		.window_size = DEFAULT_WINDOW_SIZE,
176 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
177 	}
178 };
179 
180 static void vbdev_compress_examine(struct spdk_bdev *bdev);
181 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
182 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
183 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
184 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
185 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
186 
187 /* Dummy function used by DPDK to free ext attached buffers
188  * to mbufs, we free them ourselves but this callback has to
189  * be here.
190  */
191 static void
192 shinfo_free_cb(void *arg1, void *arg2)
193 {
194 }
195 
196 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
197 static int
198 create_compress_dev(uint8_t index)
199 {
200 	struct compress_dev *device;
201 	uint16_t q_pairs;
202 	uint8_t cdev_id;
203 	int rc, i;
204 	struct comp_device_qp *dev_qp;
205 	struct comp_device_qp *tmp_qp;
206 
207 	device = calloc(1, sizeof(struct compress_dev));
208 	if (!device) {
209 		return -ENOMEM;
210 	}
211 
212 	/* Get details about this device. */
213 	rte_compressdev_info_get(index, &device->cdev_info);
214 
215 	cdev_id = device->cdev_id = index;
216 
217 	/* Zero means no limit so choose number of lcores. */
218 	if (device->cdev_info.max_nb_queue_pairs == 0) {
219 		q_pairs = MAX_NUM_QP;
220 	} else {
221 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
222 	}
223 
224 	/* Configure the compression device. */
225 	struct rte_compressdev_config config = {
226 		.socket_id = rte_socket_id(),
227 		.nb_queue_pairs = q_pairs,
228 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
229 		.max_nb_streams = 0
230 	};
231 	rc = rte_compressdev_configure(cdev_id, &config);
232 	if (rc < 0) {
233 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
234 		goto err;
235 	}
236 
237 	/* Pre-setup all potential qpairs now and assign them in the channel
238 	 * callback.
239 	 */
240 	for (i = 0; i < q_pairs; i++) {
241 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
242 						      NUM_MAX_INFLIGHT_OPS,
243 						      rte_socket_id());
244 		if (rc) {
245 			if (i > 0) {
246 				q_pairs = i;
247 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
248 					       "compressdev %u with error %u "
249 					       "so limiting to %u qpairs\n",
250 					       cdev_id, rc, q_pairs);
251 				break;
252 			} else {
253 				SPDK_ERRLOG("Failed to setup queue pair on "
254 					    "compressdev %u with error %u\n", cdev_id, rc);
255 				rc = -EINVAL;
256 				goto err;
257 			}
258 		}
259 	}
260 
261 	rc = rte_compressdev_start(cdev_id);
262 	if (rc < 0) {
263 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
264 			    cdev_id, rc);
265 		goto err;
266 	}
267 
268 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
269 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
270 				&device->comp_xform);
271 		if (rc < 0) {
272 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
273 				    cdev_id, rc);
274 			goto err;
275 		}
276 
277 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
278 				&device->decomp_xform);
279 		if (rc) {
280 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
281 				    cdev_id, rc);
282 			goto err;
283 		}
284 	} else {
285 		SPDK_ERRLOG("PMD does not support shared transforms\n");
286 		goto err;
287 	}
288 
289 	/* Build up list of device/qp combinations */
290 	for (i = 0; i < q_pairs; i++) {
291 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
292 		if (!dev_qp) {
293 			rc = -ENOMEM;
294 			goto err;
295 		}
296 		dev_qp->device = device;
297 		dev_qp->qp = i;
298 		dev_qp->thread = NULL;
299 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
300 	}
301 
302 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
303 
304 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
305 		g_qat_available = true;
306 	}
307 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
308 		g_isal_available = true;
309 	}
310 
311 	return 0;
312 
313 err:
314 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
315 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
316 		free(dev_qp);
317 	}
318 	free(device);
319 	return rc;
320 }
321 
322 /* Called from driver init entry point, vbdev_compress_init() */
323 static int
324 vbdev_init_compress_drivers(void)
325 {
326 	uint8_t cdev_count, i;
327 	struct compress_dev *tmp_dev;
328 	struct compress_dev *device;
329 	int rc;
330 
331 	/* We always init the compress_isal PMD */
332 	rc = rte_vdev_init(ISAL_PMD, NULL);
333 	if (rc == 0) {
334 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
335 	} else if (rc == -EEXIST) {
336 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
337 	} else {
338 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
339 		return -EINVAL;
340 	}
341 
342 	/* If we have no compression devices, there's no reason to continue. */
343 	cdev_count = rte_compressdev_count();
344 	if (cdev_count == 0) {
345 		return 0;
346 	}
347 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
348 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
349 		return -EINVAL;
350 	}
351 
352 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
353 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
354 	if (g_mbuf_mp == NULL) {
355 		SPDK_ERRLOG("Cannot create mbuf pool\n");
356 		rc = -ENOMEM;
357 		goto error_create_mbuf;
358 	}
359 
360 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
361 					       0, rte_socket_id());
362 	if (g_comp_op_mp == NULL) {
363 		SPDK_ERRLOG("Cannot create comp op pool\n");
364 		rc = -ENOMEM;
365 		goto error_create_op;
366 	}
367 
368 	/* Init all devices */
369 	for (i = 0; i < cdev_count; i++) {
370 		rc = create_compress_dev(i);
371 		if (rc != 0) {
372 			goto error_create_compress_devs;
373 		}
374 	}
375 
376 	if (g_qat_available == true) {
377 		SPDK_NOTICELOG("initialized QAT PMD\n");
378 	}
379 
380 	g_shinfo.free_cb = shinfo_free_cb;
381 
382 	return 0;
383 
384 	/* Error cleanup paths. */
385 error_create_compress_devs:
386 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
387 		TAILQ_REMOVE(&g_compress_devs, device, link);
388 		free(device);
389 	}
390 error_create_op:
391 error_create_mbuf:
392 	rte_mempool_free(g_mbuf_mp);
393 
394 	return rc;
395 }
396 
397 /* for completing rw requests on the orig IO thread. */
398 static void
399 _spdk_reduce_rw_blocks_cb(void *arg)
400 {
401 	struct comp_bdev_io *io_ctx = arg;
402 
403 	if (io_ctx->status == 0) {
404 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
405 	} else {
406 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
407 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
408 	}
409 }
410 
411 /* Completion callback for r/w that were issued via reducelib. */
412 static void
413 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
414 {
415 	struct spdk_bdev_io *bdev_io = arg;
416 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
417 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
418 
419 	/* TODO: need to decide which error codes are bdev_io success vs failure;
420 	 * example examine calls reading metadata */
421 
422 	io_ctx->status = reduce_errno;
423 
424 	/* Send this request to the orig IO thread. */
425 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
426 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx);
427 	} else {
428 		_spdk_reduce_rw_blocks_cb(io_ctx);
429 	}
430 }
431 
432 static int
433 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
434 		    int src_iovcnt, struct iovec *dst_iovs,
435 		    int dst_iovcnt, bool compress, void *cb_arg)
436 {
437 	void *reduce_cb_arg = cb_arg;
438 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
439 					   backing_dev);
440 	struct rte_comp_op *comp_op;
441 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
442 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
443 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
444 	uint64_t updated_length, remainder, phys_addr, total_length = 0;
445 	uint8_t *current_src_base = NULL;
446 	uint8_t *current_dst_base = NULL;
447 	int iov_index, mbuf_index;
448 	int rc = 0;
449 	struct vbdev_comp_op *op_to_queue;
450 	int i;
451 	int src_mbuf_total = src_iovcnt;
452 	int dst_mbuf_total = dst_iovcnt;
453 
454 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
455 
456 #ifdef DEBUG
457 	memset(src_mbufs, 0, sizeof(src_mbufs));
458 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
459 #endif
460 
461 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
462 	if (!comp_op) {
463 		SPDK_ERRLOG("trying to get a comp op!\n");
464 		goto error_get_op;
465 	}
466 
467 	/* get an mbuf per iov, src and dst */
468 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
469 	if (rc) {
470 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
471 		goto error_get_src;
472 	}
473 
474 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
475 	if (rc) {
476 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
477 		goto error_get_dst;
478 	}
479 
480 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
481 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
482 	 * and associate with our single comp_op.
483 	 */
484 
485 	/* Setup src mbufs */
486 	iov_index = mbuf_index = 0;
487 	while (iov_index < src_iovcnt) {
488 
489 		current_src_base = src_iovs[iov_index].iov_base;
490 		total_length += src_iovs[iov_index].iov_len;
491 		assert(src_mbufs[mbuf_index] != NULL);
492 		src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
493 		updated_length = src_iovs[iov_index].iov_len;
494 		phys_addr = spdk_vtophys((void *)current_src_base, &updated_length);
495 
496 		rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
497 					  current_src_base,
498 					  phys_addr,
499 					  updated_length,
500 					  &g_shinfo);
501 		rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length);
502 		remainder = src_iovs[iov_index].iov_len - updated_length;
503 
504 		if (mbuf_index > 0) {
505 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
506 		}
507 
508 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
509 		if (remainder > 0) {
510 			/* allocate an mbuf at the end of the array */
511 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1);
512 			if (rc) {
513 				SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n");
514 				goto error_src_dst;
515 			}
516 			src_mbuf_total++;
517 			mbuf_index++;
518 			src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
519 			current_src_base += updated_length;
520 			phys_addr = spdk_vtophys((void *)current_src_base, &remainder);
521 			/* assert we don't cross another */
522 			assert(remainder == src_iovs[iov_index].iov_len - updated_length);
523 
524 			rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
525 						  current_src_base,
526 						  phys_addr,
527 						  remainder,
528 						  &g_shinfo);
529 			rte_pktmbuf_append(src_mbufs[mbuf_index], remainder);
530 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
531 		}
532 		iov_index++;
533 		mbuf_index++;
534 	}
535 
536 	comp_op->m_src = src_mbufs[0];
537 	comp_op->src.offset = 0;
538 	comp_op->src.length = total_length;
539 
540 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
541 	iov_index = mbuf_index = 0;
542 	while (iov_index < dst_iovcnt) {
543 
544 		current_dst_base = dst_iovs[iov_index].iov_base;
545 		updated_length = dst_iovs[iov_index].iov_len;
546 		phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length);
547 
548 		rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
549 					  current_dst_base,
550 					  phys_addr,
551 					  updated_length,
552 					  &g_shinfo);
553 		rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length);
554 		remainder = dst_iovs[iov_index].iov_len - updated_length;
555 
556 		if (mbuf_index > 0) {
557 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
558 		}
559 
560 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
561 		if (remainder > 0) {
562 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1);
563 			if (rc) {
564 				SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n");
565 				goto error_src_dst;
566 			}
567 			dst_mbuf_total++;
568 			mbuf_index++;
569 			current_dst_base += updated_length;
570 			phys_addr = spdk_vtophys((void *)current_dst_base, &remainder);
571 			/* assert we don't cross another */
572 			assert(remainder == dst_iovs[iov_index].iov_len - updated_length);
573 
574 			rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
575 						  current_dst_base,
576 						  phys_addr,
577 						  remainder,
578 						  &g_shinfo);
579 			rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder);
580 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
581 		}
582 		iov_index++;
583 		mbuf_index++;
584 	}
585 
586 	comp_op->m_dst = dst_mbufs[0];
587 	comp_op->dst.offset = 0;
588 
589 	if (compress == true) {
590 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
591 	} else {
592 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
593 	}
594 
595 	comp_op->op_type = RTE_COMP_OP_STATELESS;
596 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
597 
598 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
599 	assert(rc <= 1);
600 
601 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
602 	if (rc == 1) {
603 		return 0;
604 	} else {
605 		/* we free mbufs differently depending on whether they were chained or not */
606 		rte_pktmbuf_free(comp_op->m_src);
607 		rte_pktmbuf_free(comp_op->m_dst);
608 		goto error_enqueue;
609 	}
610 
611 	/* Error cleanup paths. */
612 error_src_dst:
613 	for (i = 0; i < dst_mbuf_total; i++) {
614 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
615 	}
616 error_get_dst:
617 	for (i = 0; i < src_mbuf_total; i++) {
618 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
619 	}
620 error_get_src:
621 error_enqueue:
622 	rte_comp_op_free(comp_op);
623 error_get_op:
624 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
625 	if (op_to_queue == NULL) {
626 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
627 		return -ENOMEM;
628 	}
629 	op_to_queue->backing_dev = backing_dev;
630 	op_to_queue->src_iovs = src_iovs;
631 	op_to_queue->src_iovcnt = src_iovcnt;
632 	op_to_queue->dst_iovs = dst_iovs;
633 	op_to_queue->dst_iovcnt = dst_iovcnt;
634 	op_to_queue->compress = compress;
635 	op_to_queue->cb_arg = cb_arg;
636 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
637 			  op_to_queue,
638 			  link);
639 	return 0;
640 }
641 
642 /* Poller for the DPDK compression driver. */
643 static int
644 comp_dev_poller(void *args)
645 {
646 	struct vbdev_compress *comp_bdev = args;
647 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
648 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
649 	uint16_t num_deq;
650 	struct spdk_reduce_vol_cb_args *reduce_args;
651 	struct vbdev_comp_op *op_to_resubmit;
652 	int rc, i;
653 
654 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
655 						NUM_MAX_INFLIGHT_OPS);
656 	for (i = 0; i < num_deq; i++) {
657 		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata;
658 
659 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
660 
661 			/* tell reduce this is done and what the bytecount was */
662 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
663 		} else {
664 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
665 				       deq_ops[i]->status);
666 
667 			/* Reduce will simply store uncompressed on neg errno value. */
668 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
669 		}
670 
671 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
672 		 * call takes care of freeing all of the mbufs in the chain back to their
673 		 * original pool.
674 		 */
675 		rte_pktmbuf_free(deq_ops[i]->m_src);
676 		rte_pktmbuf_free(deq_ops[i]->m_dst);
677 
678 		/* There is no bulk free for com ops so we have to free them one at a time
679 		 * here however it would be rare that we'd ever have more than 1 at a time
680 		 * anyways.
681 		 */
682 		rte_comp_op_free(deq_ops[i]);
683 
684 		/* Check if there are any pending comp ops to process */
685 		while (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
686 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
687 			rc = _compress_operation(op_to_resubmit->backing_dev,
688 						 op_to_resubmit->src_iovs,
689 						 op_to_resubmit->src_iovcnt,
690 						 op_to_resubmit->dst_iovs,
691 						 op_to_resubmit->dst_iovcnt,
692 						 op_to_resubmit->compress,
693 						 op_to_resubmit->cb_arg);
694 			if (rc == 0) {
695 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
696 				free(op_to_resubmit);
697 			}
698 			break;
699 		}
700 	}
701 	return 0;
702 }
703 
704 /* Entry point for reduce lib to issue a compress operation. */
705 static void
706 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
707 		      struct iovec *src_iovs, int src_iovcnt,
708 		      struct iovec *dst_iovs, int dst_iovcnt,
709 		      struct spdk_reduce_vol_cb_args *cb_arg)
710 {
711 	int rc;
712 
713 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
714 	if (rc) {
715 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
716 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
717 	}
718 }
719 
720 /* Entry point for reduce lib to issue a decompress operation. */
721 static void
722 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
723 			struct iovec *src_iovs, int src_iovcnt,
724 			struct iovec *dst_iovs, int dst_iovcnt,
725 			struct spdk_reduce_vol_cb_args *cb_arg)
726 {
727 	int rc;
728 
729 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
730 	if (rc) {
731 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
732 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
733 	}
734 }
735 
736 /* Callback for getting a buf from the bdev pool in the event that the caller passed
737  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
738  * beneath us before we're done with it.
739  */
740 static void
741 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
742 {
743 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
744 					   comp_bdev);
745 
746 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
747 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
748 			      spdk_reduce_rw_blocks_cb, bdev_io);
749 }
750 
751 /* scheduled for completion on IO thread */
752 static void
753 _complete_other_io(void *arg)
754 {
755 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
756 	if (io_ctx->status == 0) {
757 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
758 	} else {
759 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
760 	}
761 }
762 
763 /* scheduled for submission on reduce thread */
764 static void
765 _comp_bdev_io_submit(void *arg)
766 {
767 	struct spdk_bdev_io *bdev_io = arg;
768 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
769 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
770 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
771 					   comp_bdev);
772 	int rc = 0;
773 
774 	switch (bdev_io->type) {
775 	case SPDK_BDEV_IO_TYPE_READ:
776 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
777 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
778 		return;
779 	case SPDK_BDEV_IO_TYPE_WRITE:
780 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
781 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
782 				       spdk_reduce_rw_blocks_cb, bdev_io);
783 		return;
784 	/* TODO in future patch in the series */
785 	case SPDK_BDEV_IO_TYPE_RESET:
786 		break;
787 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
788 	case SPDK_BDEV_IO_TYPE_UNMAP:
789 	case SPDK_BDEV_IO_TYPE_FLUSH:
790 	default:
791 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
792 		rc = -EINVAL;
793 	}
794 
795 	if (rc) {
796 		if (rc == -ENOMEM) {
797 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
798 			io_ctx->ch = ch;
799 			vbdev_compress_queue_io(bdev_io);
800 			return;
801 		} else {
802 			SPDK_ERRLOG("on bdev_io submission!\n");
803 			io_ctx->status = rc;
804 		}
805 	}
806 
807 	/* Complete this on the orig IO thread. */
808 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
809 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx);
810 	} else {
811 		_complete_other_io(io_ctx);
812 	}
813 }
814 
815 /* Called when someone above submits IO to this vbdev. */
816 static void
817 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
818 {
819 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
820 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
821 					   comp_bdev);
822 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
823 
824 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
825 	io_ctx->comp_bdev = comp_bdev;
826 	io_ctx->comp_ch = comp_ch;
827 	io_ctx->orig_io = bdev_io;
828 
829 	/* Send this request to the reduce_thread if that's not what we're on. */
830 	if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) {
831 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
832 	} else {
833 		_comp_bdev_io_submit(bdev_io);
834 	}
835 }
836 
837 static bool
838 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
839 {
840 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
841 
842 	switch (io_type) {
843 	case SPDK_BDEV_IO_TYPE_READ:
844 	case SPDK_BDEV_IO_TYPE_WRITE:
845 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
846 	case SPDK_BDEV_IO_TYPE_UNMAP:
847 	case SPDK_BDEV_IO_TYPE_RESET:
848 	case SPDK_BDEV_IO_TYPE_FLUSH:
849 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
850 	default:
851 		return false;
852 	}
853 }
854 
855 /* Resubmission function used by the bdev layer when a queued IO is ready to be
856  * submitted.
857  */
858 static void
859 vbdev_compress_resubmit_io(void *arg)
860 {
861 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
862 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
863 
864 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
865 }
866 
867 /* Used to queue an IO in the event of resource issues. */
868 static void
869 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
870 {
871 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
872 	int rc;
873 
874 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
875 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
876 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
877 
878 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->ch, &io_ctx->bdev_io_wait);
879 	if (rc) {
880 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
881 		assert(false);
882 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
883 	}
884 }
885 
886 /* Callback for unregistering the IO device. */
887 static void
888 _device_unregister_cb(void *io_device)
889 {
890 	struct vbdev_compress *comp_bdev = io_device;
891 
892 	/* Done with this comp_bdev. */
893 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
894 	free(comp_bdev->comp_bdev.name);
895 	free(comp_bdev);
896 }
897 
898 static void
899 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
900 {
901 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
902 
903 	if (reduce_errno) {
904 		SPDK_ERRLOG("number %d\n", reduce_errno);
905 	} else {
906 		TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
907 		spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
908 		spdk_bdev_close(comp_bdev->base_desc);
909 		comp_bdev->vol = NULL;
910 		if (comp_bdev->orphaned == false) {
911 			spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
912 		} else {
913 			comp_bdev->delete_cb_fn(comp_bdev->delete_cb_arg, 0);
914 			_device_unregister_cb(comp_bdev);
915 		}
916 	}
917 }
918 
919 static void
920 _reduce_destroy_cb(void *ctx, int reduce_errno)
921 {
922 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
923 
924 	if (reduce_errno) {
925 		SPDK_ERRLOG("number %d\n", reduce_errno);
926 	}
927 
928 	comp_bdev->vol = NULL;
929 	spdk_put_io_channel(comp_bdev->base_ch);
930 	if (comp_bdev->orphaned == false) {
931 		spdk_bdev_unregister(&comp_bdev->comp_bdev, comp_bdev->delete_cb_fn,
932 				     comp_bdev->delete_cb_arg);
933 	} else {
934 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
935 	}
936 
937 }
938 
939 /* Called by reduceLib after performing unload vol actions */
940 static void
941 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
942 {
943 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
944 
945 	if (reduce_errno) {
946 		SPDK_ERRLOG("number %d\n", reduce_errno);
947 	} else {
948 		/* reducelib needs a channel to comm with the backing device */
949 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
950 
951 		/* Clean the device before we free our resources. */
952 		spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
953 	}
954 }
955 
956 const char *
957 compress_get_name(const struct vbdev_compress *comp_bdev)
958 {
959 	return comp_bdev->comp_bdev.name;
960 }
961 
962 struct vbdev_compress *
963 compress_bdev_first(void)
964 {
965 	struct vbdev_compress *comp_bdev;
966 
967 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
968 
969 	return comp_bdev;
970 }
971 
972 struct vbdev_compress *
973 compress_bdev_next(struct vbdev_compress *prev)
974 {
975 	struct vbdev_compress *comp_bdev;
976 
977 	comp_bdev = TAILQ_NEXT(prev, link);
978 
979 	return comp_bdev;
980 }
981 
982 bool
983 compress_has_orphan(const char *name)
984 {
985 	struct vbdev_compress *comp_bdev;
986 
987 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
988 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
989 			return true;
990 		}
991 	}
992 	return false;
993 }
994 
995 /* Called after we've unregistered following a hot remove callback.
996  * Our finish entry point will be called next.
997  */
998 static int
999 vbdev_compress_destruct(void *ctx)
1000 {
1001 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1002 
1003 	if (comp_bdev->vol != NULL) {
1004 		/* Tell reducelib that we're done with this volume. */
1005 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1006 	} else {
1007 		vbdev_compress_destruct_cb(comp_bdev, 0);
1008 	}
1009 
1010 	return 0;
1011 }
1012 
1013 /* We supplied this as an entry point for upper layers who want to communicate to this
1014  * bdev.  This is how they get a channel.
1015  */
1016 static struct spdk_io_channel *
1017 vbdev_compress_get_io_channel(void *ctx)
1018 {
1019 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1020 
1021 	/* The IO channel code will allocate a channel for us which consists of
1022 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1023 	 * that we passed in when we registered our IO device. It will then call
1024 	 * our channel create callback to populate any elements that we need to
1025 	 * update.
1026 	 */
1027 	return spdk_get_io_channel(comp_bdev);
1028 }
1029 
1030 /* This is the output for bdev_get_bdevs() for this vbdev */
1031 static int
1032 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1033 {
1034 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1035 
1036 	spdk_json_write_name(w, "compress");
1037 	spdk_json_write_object_begin(w);
1038 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1039 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1040 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1041 	spdk_json_write_object_end(w);
1042 
1043 	return 0;
1044 }
1045 
1046 /* This is used to generate JSON that can configure this module to its current state. */
1047 static int
1048 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1049 {
1050 	struct vbdev_compress *comp_bdev;
1051 
1052 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1053 		spdk_json_write_object_begin(w);
1054 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1055 		spdk_json_write_named_object_begin(w, "params");
1056 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1057 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1058 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1059 		spdk_json_write_object_end(w);
1060 		spdk_json_write_object_end(w);
1061 	}
1062 	return 0;
1063 }
1064 
1065 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1066  * used for initial metadata operations to claim where it will be further filled out
1067  * and added to the global list.
1068  */
1069 static void
1070 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1071 {
1072 	struct vbdev_compress *meta_ctx = cb_arg;
1073 
1074 	/* We're done with metadata operations */
1075 	spdk_put_io_channel(meta_ctx->base_ch);
1076 	spdk_bdev_close(meta_ctx->base_desc);
1077 	meta_ctx->base_desc = NULL;
1078 
1079 	if (reduce_errno == 0) {
1080 		meta_ctx->vol = vol;
1081 		vbdev_compress_claim(meta_ctx);
1082 	} else {
1083 		SPDK_ERRLOG("for vol %s, error %u\n",
1084 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1085 		free(meta_ctx);
1086 	}
1087 }
1088 
1089 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1090  * call the callback provided by reduceLib when it called the read/write/unmap function and
1091  * free the bdev_io.
1092  */
1093 static void
1094 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1095 {
1096 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1097 	int reduce_errno;
1098 
1099 	if (success) {
1100 		reduce_errno = 0;
1101 	} else {
1102 		reduce_errno = -EIO;
1103 	}
1104 	spdk_bdev_free_io(bdev_io);
1105 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1106 }
1107 
1108 /* This is the function provided to the reduceLib for sending reads directly to
1109  * the backing device.
1110  */
1111 static void
1112 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1113 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1114 {
1115 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1116 					   backing_dev);
1117 	int rc;
1118 
1119 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1120 				    iov, iovcnt, lba, lba_count,
1121 				    comp_reduce_io_cb,
1122 				    args);
1123 	if (rc) {
1124 		if (rc == -ENOMEM) {
1125 			SPDK_ERRLOG("No memory, start to queue io.\n");
1126 			/* TODO: there's no bdev_io to queue */
1127 		} else {
1128 			SPDK_ERRLOG("submitting readv request\n");
1129 		}
1130 		args->cb_fn(args->cb_arg, rc);
1131 	}
1132 }
1133 
1134 /* This is the function provided to the reduceLib for sending writes directly to
1135  * the backing device.
1136  */
1137 static void
1138 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1139 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1140 {
1141 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1142 					   backing_dev);
1143 	int rc;
1144 
1145 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1146 				     iov, iovcnt, lba, lba_count,
1147 				     comp_reduce_io_cb,
1148 				     args);
1149 	if (rc) {
1150 		if (rc == -ENOMEM) {
1151 			SPDK_ERRLOG("No memory, start to queue io.\n");
1152 			/* TODO: there's no bdev_io to queue */
1153 		} else {
1154 			SPDK_ERRLOG("error submitting writev request\n");
1155 		}
1156 		args->cb_fn(args->cb_arg, rc);
1157 	}
1158 }
1159 
1160 /* This is the function provided to the reduceLib for sending unmaps directly to
1161  * the backing device.
1162  */
1163 static void
1164 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1165 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1166 {
1167 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1168 					   backing_dev);
1169 	int rc;
1170 
1171 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1172 				    lba, lba_count,
1173 				    comp_reduce_io_cb,
1174 				    args);
1175 
1176 	if (rc) {
1177 		if (rc == -ENOMEM) {
1178 			SPDK_ERRLOG("No memory, start to queue io.\n");
1179 			/* TODO: there's no bdev_io to queue */
1180 		} else {
1181 			SPDK_ERRLOG("submitting unmap request\n");
1182 		}
1183 		args->cb_fn(args->cb_arg, rc);
1184 	}
1185 }
1186 
1187 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1188 static void
1189 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1190 {
1191 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1192 
1193 	if (reduce_errno) {
1194 		SPDK_ERRLOG("number %d\n", reduce_errno);
1195 	}
1196 
1197 	comp_bdev->vol = NULL;
1198 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1199 }
1200 
1201 /* Called when the underlying base bdev goes away. */
1202 static void
1203 vbdev_compress_base_bdev_hotremove_cb(void *ctx)
1204 {
1205 	struct vbdev_compress *comp_bdev, *tmp;
1206 	struct spdk_bdev *bdev_find = ctx;
1207 
1208 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1209 		if (bdev_find == comp_bdev->base_bdev) {
1210 			/* Tell reduceLib that we're done with this volume. */
1211 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1212 		}
1213 	}
1214 }
1215 
1216 /* TODO: determine which parms we want user configurable, HC for now
1217  * params.vol_size
1218  * params.chunk_size
1219  * compression PMD, algorithm, window size, comp level, etc.
1220  * DEV_MD_PATH
1221  */
1222 
1223 /* Common function for init and load to allocate and populate the minimal
1224  * information for reducelib to init or load.
1225  */
1226 struct vbdev_compress *
1227 _prepare_for_load_init(struct spdk_bdev *bdev)
1228 {
1229 	struct vbdev_compress *meta_ctx;
1230 
1231 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1232 	if (meta_ctx == NULL) {
1233 		SPDK_ERRLOG("failed to alloc init contexts\n");
1234 		return NULL;
1235 	}
1236 
1237 	meta_ctx->drv_name = "None";
1238 	meta_ctx->base_bdev = bdev;
1239 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1240 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1241 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1242 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1243 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1244 
1245 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1246 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1247 
1248 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1249 	meta_ctx->params.logical_block_size = bdev->blocklen;
1250 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1251 	return meta_ctx;
1252 }
1253 
1254 static bool
1255 _set_pmd(struct vbdev_compress *comp_dev)
1256 {
1257 	if (g_opts == COMPRESS_PMD_AUTO) {
1258 		if (g_qat_available) {
1259 			comp_dev->drv_name = QAT_PMD;
1260 		} else {
1261 			comp_dev->drv_name = ISAL_PMD;
1262 		}
1263 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1264 		comp_dev->drv_name = QAT_PMD;
1265 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1266 		comp_dev->drv_name = ISAL_PMD;
1267 	} else {
1268 		SPDK_ERRLOG("Requested PMD is not available.\n");
1269 		return false;
1270 	}
1271 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1272 	return true;
1273 }
1274 
1275 /* Call reducelib to initialize a new volume */
1276 static int
1277 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path)
1278 {
1279 	struct vbdev_compress *meta_ctx;
1280 	int rc;
1281 
1282 	meta_ctx = _prepare_for_load_init(bdev);
1283 	if (meta_ctx == NULL) {
1284 		return -EINVAL;
1285 	}
1286 
1287 	if (_set_pmd(meta_ctx) == false) {
1288 		SPDK_ERRLOG("could not find required pmd\n");
1289 		free(meta_ctx);
1290 		return -EINVAL;
1291 	}
1292 
1293 	rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1294 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1295 	if (rc) {
1296 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1297 		free(meta_ctx);
1298 		return -EINVAL;
1299 	}
1300 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1301 
1302 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1303 			     pm_path,
1304 			     vbdev_reduce_init_cb,
1305 			     meta_ctx);
1306 	return 0;
1307 }
1308 
1309 /* We provide this callback for the SPDK channel code to create a channel using
1310  * the channel struct we provided in our module get_io_channel() entry point. Here
1311  * we get and save off an underlying base channel of the device below us so that
1312  * we can communicate with the base bdev on a per channel basis.  If we needed
1313  * our own poller for this vbdev, we'd register it here.
1314  */
1315 static int
1316 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1317 {
1318 	struct vbdev_compress *comp_bdev = io_device;
1319 	struct comp_device_qp *device_qp;
1320 
1321 	/* We use this queue to track outstanding IO in our layer. */
1322 	TAILQ_INIT(&comp_bdev->pending_comp_ios);
1323 
1324 	/* We use this to queue up compression operations as needed. */
1325 	TAILQ_INIT(&comp_bdev->queued_comp_ops);
1326 
1327 	/* Now set the reduce channel if it's not already set. */
1328 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1329 	if (comp_bdev->ch_count == 0) {
1330 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1331 		comp_bdev->reduce_thread = spdk_get_thread();
1332 		comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0);
1333 		/* Now assign a q pair */
1334 		pthread_mutex_lock(&g_comp_device_qp_lock);
1335 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1336 			if ((strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0)) {
1337 				if (device_qp->thread == spdk_get_thread()) {
1338 					comp_bdev->device_qp = device_qp;
1339 					break;
1340 				}
1341 				if (device_qp->thread == NULL) {
1342 					comp_bdev->device_qp = device_qp;
1343 					device_qp->thread = spdk_get_thread();
1344 					break;
1345 				}
1346 			}
1347 		}
1348 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1349 	}
1350 	comp_bdev->ch_count++;
1351 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1352 
1353 	if (comp_bdev->device_qp != NULL) {
1354 		return 0;
1355 	} else {
1356 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1357 		assert(false);
1358 		return -ENOMEM;
1359 	}
1360 }
1361 
1362 static void
1363 _channel_cleanup(struct vbdev_compress *comp_bdev)
1364 {
1365 	/* Note: comp_bdevs can share a device_qp if they are
1366 	 * on the same thread so we leave the device_qp element
1367 	 * alone for this comp_bdev and just clear the reduce thread.
1368 	 */
1369 	spdk_put_io_channel(comp_bdev->base_ch);
1370 	comp_bdev->reduce_thread = NULL;
1371 	spdk_poller_unregister(&comp_bdev->poller);
1372 }
1373 
1374 /* Used to reroute destroy_ch to the correct thread */
1375 static void
1376 _comp_bdev_ch_destroy_cb(void *arg)
1377 {
1378 	struct vbdev_compress *comp_bdev = arg;
1379 
1380 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1381 	if (comp_bdev->ch_count == 0) {
1382 		_channel_cleanup(comp_bdev);
1383 	}
1384 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1385 }
1386 
1387 /* We provide this callback for the SPDK channel code to destroy a channel
1388  * created with our create callback. We just need to undo anything we did
1389  * when we created. If this bdev used its own poller, we'd unregister it here.
1390  */
1391 static void
1392 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1393 {
1394 	struct vbdev_compress *comp_bdev = io_device;
1395 
1396 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1397 	comp_bdev->ch_count--;
1398 	if (comp_bdev->ch_count == 0) {
1399 		/* Send this request to the thread where the channel was created. */
1400 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1401 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1402 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1403 		} else {
1404 			_channel_cleanup(comp_bdev);
1405 		}
1406 	}
1407 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1408 }
1409 
1410 /* RPC entry point for compression vbdev creation. */
1411 int
1412 create_compress_bdev(const char *bdev_name, const char *pm_path)
1413 {
1414 	struct spdk_bdev *bdev;
1415 
1416 	bdev = spdk_bdev_get_by_name(bdev_name);
1417 	if (!bdev) {
1418 		return -ENODEV;
1419 	}
1420 
1421 	return vbdev_init_reduce(bdev, pm_path);;
1422 }
1423 
1424 /* On init, just init the compress drivers. All metadata is stored on disk. */
1425 static int
1426 vbdev_compress_init(void)
1427 {
1428 	if (vbdev_init_compress_drivers()) {
1429 		SPDK_ERRLOG("Error setting up compression devices\n");
1430 		return -EINVAL;
1431 	}
1432 
1433 	return 0;
1434 }
1435 
1436 /* Called when the entire module is being torn down. */
1437 static void
1438 vbdev_compress_finish(void)
1439 {
1440 	struct comp_device_qp *dev_qp;
1441 	/* TODO: unload vol in a future patch */
1442 
1443 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1444 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1445 		free(dev_qp);
1446 	}
1447 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1448 
1449 	rte_mempool_free(g_comp_op_mp);
1450 	rte_mempool_free(g_mbuf_mp);
1451 }
1452 
1453 /* During init we'll be asked how much memory we'd like passed to us
1454  * in bev_io structures as context. Here's where we specify how
1455  * much context we want per IO.
1456  */
1457 static int
1458 vbdev_compress_get_ctx_size(void)
1459 {
1460 	return sizeof(struct comp_bdev_io);
1461 }
1462 
1463 /* When we register our bdev this is how we specify our entry points. */
1464 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1465 	.destruct		= vbdev_compress_destruct,
1466 	.submit_request		= vbdev_compress_submit_request,
1467 	.io_type_supported	= vbdev_compress_io_type_supported,
1468 	.get_io_channel		= vbdev_compress_get_io_channel,
1469 	.dump_info_json		= vbdev_compress_dump_info_json,
1470 	.write_config_json	= NULL,
1471 };
1472 
1473 static struct spdk_bdev_module compress_if = {
1474 	.name = "compress",
1475 	.module_init = vbdev_compress_init,
1476 	.config_text = NULL,
1477 	.get_ctx_size = vbdev_compress_get_ctx_size,
1478 	.examine_disk = vbdev_compress_examine,
1479 	.module_fini = vbdev_compress_finish,
1480 	.config_json = vbdev_compress_config_json
1481 };
1482 
1483 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1484 
1485 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1486 {
1487 	struct spdk_bdev_alias *aliases;
1488 
1489 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1490 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1491 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias);
1492 		if (!comp_bdev->comp_bdev.name) {
1493 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1494 			return -ENOMEM;
1495 		}
1496 	} else {
1497 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1498 		if (!comp_bdev->comp_bdev.name) {
1499 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1500 			return -ENOMEM;
1501 		}
1502 	}
1503 	return 0;
1504 }
1505 
1506 static void
1507 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1508 {
1509 	int rc;
1510 
1511 	if (_set_compbdev_name(comp_bdev)) {
1512 		goto error_bdev_name;
1513 	}
1514 
1515 	/* Note: some of the fields below will change in the future - for example,
1516 	 * blockcnt specifically will not match (the compressed volume size will
1517 	 * be slightly less than the base bdev size)
1518 	 */
1519 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1520 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1521 
1522 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1523 		comp_bdev->comp_bdev.required_alignment =
1524 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1525 				 comp_bdev->base_bdev->required_alignment);
1526 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1527 			       comp_bdev->comp_bdev.required_alignment);
1528 	} else {
1529 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1530 	}
1531 	comp_bdev->comp_bdev.optimal_io_boundary =
1532 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1533 
1534 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1535 
1536 	comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen;
1537 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1538 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1539 
1540 	/* This is the context that is passed to us when the bdev
1541 	 * layer calls in so we'll save our comp_bdev node here.
1542 	 */
1543 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1544 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1545 	comp_bdev->comp_bdev.module = &compress_if;
1546 
1547 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1548 
1549 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1550 
1551 	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1552 			    comp_bdev->base_bdev, &comp_bdev->base_desc);
1553 	if (rc) {
1554 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1555 		goto error_open;
1556 	}
1557 
1558 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1559 				sizeof(struct comp_io_channel),
1560 				comp_bdev->comp_bdev.name);
1561 
1562 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1563 					 comp_bdev->comp_bdev.module);
1564 	if (rc) {
1565 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1566 		goto error_claim;
1567 	}
1568 
1569 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1570 	if (rc < 0) {
1571 		SPDK_ERRLOG("trying to register bdev\n");
1572 		goto error_bdev_register;
1573 	}
1574 
1575 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1576 
1577 	return;
1578 	/* Error cleanup paths. */
1579 error_bdev_register:
1580 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1581 error_claim:
1582 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
1583 	spdk_io_device_unregister(comp_bdev, NULL);
1584 error_open:
1585 	free(comp_bdev->comp_bdev.name);
1586 error_bdev_name:
1587 	spdk_put_io_channel(comp_bdev->base_ch);
1588 	spdk_bdev_close(comp_bdev->base_desc);
1589 	free(comp_bdev);
1590 	spdk_bdev_module_examine_done(&compress_if);
1591 }
1592 
1593 void
1594 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1595 {
1596 	struct vbdev_compress *comp_bdev = NULL;
1597 
1598 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1599 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1600 			break;
1601 		}
1602 	}
1603 
1604 	if (comp_bdev == NULL) {
1605 		cb_fn(cb_arg, -ENODEV);
1606 		return;
1607 	}
1608 
1609 	/* Save these for after the vol is destroyed. */
1610 	comp_bdev->delete_cb_fn = cb_fn;
1611 	comp_bdev->delete_cb_arg = cb_arg;
1612 
1613 	/* Tell reducelib that we're done with this volume. */
1614 	if (comp_bdev->orphaned == false) {
1615 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1616 	} else {
1617 		delete_vol_unload_cb(comp_bdev, 0);
1618 	}
1619 }
1620 
1621 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1622  * used for initial metadata operations to claim where it will be further filled out
1623  * and added to the global list.
1624  */
1625 static void
1626 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1627 {
1628 	struct vbdev_compress *meta_ctx = cb_arg;
1629 	int rc;
1630 
1631 	/* Done with metadata operations */
1632 	spdk_put_io_channel(meta_ctx->base_ch);
1633 	spdk_bdev_close(meta_ctx->base_desc);
1634 	meta_ctx->base_desc = NULL;
1635 
1636 	if (reduce_errno != 0 && reduce_errno != -ENOENT) {
1637 		/* This error means it is not a compress disk. */
1638 		if (reduce_errno != -EILSEQ) {
1639 			SPDK_ERRLOG("for vol %s, error %u\n",
1640 				    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1641 		}
1642 		free(meta_ctx);
1643 		spdk_bdev_module_examine_done(&compress_if);
1644 		return;
1645 	}
1646 
1647 	/* this status means that the vol could not be loaded because
1648 	 * the pmem file can't be found.
1649 	 */
1650 	if (reduce_errno == -ENOENT) {
1651 		if (_set_compbdev_name(meta_ctx)) {
1652 			goto err;
1653 		}
1654 
1655 		/* We still want to open and claim the backing device to protect the data until
1656 		 * either the pm metadata file is recovered or the comp bdev is deleted.
1657 		 */
1658 		rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1659 				    meta_ctx->base_bdev, &meta_ctx->base_desc);
1660 		if (rc) {
1661 			SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1662 			goto err;
1663 		}
1664 
1665 		meta_ctx->comp_bdev.module = &compress_if;
1666 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1667 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1668 						 meta_ctx->comp_bdev.module);
1669 		if (rc) {
1670 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1671 			goto err;
1672 		}
1673 
1674 		meta_ctx->orphaned = true;
1675 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1676 err:
1677 		spdk_bdev_module_examine_done(&compress_if);
1678 		return;
1679 	}
1680 
1681 	if (_set_pmd(meta_ctx) == false) {
1682 		SPDK_ERRLOG("could not find required pmd\n");
1683 		free(meta_ctx);
1684 		spdk_bdev_module_examine_done(&compress_if);
1685 		return;
1686 	}
1687 
1688 	/* Update information following volume load. */
1689 	meta_ctx->vol = vol;
1690 	memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1691 	       sizeof(struct spdk_reduce_vol_params));
1692 	vbdev_compress_claim(meta_ctx);
1693 	spdk_bdev_module_examine_done(&compress_if);
1694 }
1695 
1696 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1697  * and if so will go ahead and claim it.
1698  */
1699 static void
1700 vbdev_compress_examine(struct spdk_bdev *bdev)
1701 {
1702 	struct vbdev_compress *meta_ctx;
1703 	int rc;
1704 
1705 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1706 		spdk_bdev_module_examine_done(&compress_if);
1707 		return;
1708 	}
1709 
1710 	meta_ctx = _prepare_for_load_init(bdev);
1711 	if (meta_ctx == NULL) {
1712 		spdk_bdev_module_examine_done(&compress_if);
1713 		return;
1714 	}
1715 
1716 	rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
1717 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1718 	if (rc) {
1719 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1720 		free(meta_ctx);
1721 		spdk_bdev_module_examine_done(&compress_if);
1722 		return;
1723 	}
1724 
1725 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1726 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1727 }
1728 
1729 int
1730 set_compress_pmd(enum compress_pmd *opts)
1731 {
1732 	g_opts = *opts;
1733 
1734 	return 0;
1735 }
1736 
1737 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
1738