xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 03e3fc4f5835983a4e6602b4e770922e798ce263)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_compress.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/stdinc.h"
38 #include "spdk/rpc.h"
39 #include "spdk/env.h"
40 #include "spdk/conf.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 
54 #define NUM_MAX_XFORMS 2
55 #define NUM_MAX_INFLIGHT_OPS 128
56 #define DEFAULT_WINDOW_SIZE 15
57 /* We need extra mbufs per operation to accommodate host buffers that
58  *  span a 2MB boundary.
59  */
60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
61 #define CHUNK_SIZE (1024 * 16)
62 #define COMP_BDEV_NAME "compress"
63 #define BACKING_IO_SZ (4 * 1024)
64 
65 #define ISAL_PMD "compress_isal"
66 #define QAT_PMD "compress_qat"
67 #define NUM_MBUFS		8192
68 #define POOL_CACHE_SIZE		256
69 
70 static enum compress_pmd g_opts;
71 
72 /* Global list of available compression devices. */
73 struct compress_dev {
74 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
75 	uint8_t				cdev_id;	/* identifier for the device */
76 	void				*comp_xform;	/* shared private xform for comp on this PMD */
77 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
78 	TAILQ_ENTRY(compress_dev)	link;
79 };
80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
81 
82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
83  * the length of the internal ring name that it creates, it breaks a limit in the generic
84  * ring code and fails the qp initialization.
85  */
86 #define MAX_NUM_QP 99
87 /* Global list and lock for unique device/queue pair combos */
88 struct comp_device_qp {
89 	struct compress_dev		*device;	/* ptr to compression device */
90 	uint8_t				qp;		/* queue pair for this node */
91 	struct spdk_thread		*thread;	/* thead that this qp is assigned to */
92 	TAILQ_ENTRY(comp_device_qp)	link;
93 };
94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
96 
97 /* For queueing up compression operations that we can't submit for some reason */
98 struct vbdev_comp_op {
99 	struct spdk_reduce_backing_dev	*backing_dev;
100 	struct iovec			*src_iovs;
101 	int				src_iovcnt;
102 	struct iovec			*dst_iovs;
103 	int				dst_iovcnt;
104 	bool				compress;
105 	void				*cb_arg;
106 	TAILQ_ENTRY(vbdev_comp_op)	link;
107 };
108 
109 struct vbdev_comp_delete_ctx {
110 	spdk_delete_compress_complete	cb_fn;
111 	void				*cb_arg;
112 	int				cb_rc;
113 	struct spdk_thread		*orig_thread;
114 };
115 
116 /* List of virtual bdevs and associated info for each. */
117 struct vbdev_compress {
118 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
119 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
120 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
121 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
122 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
123 	char				*drv_name;	/* name of the compression device driver */
124 	struct comp_device_qp		*device_qp;
125 	struct spdk_thread		*reduce_thread;
126 	pthread_mutex_t			reduce_lock;
127 	uint32_t			ch_count;
128 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
129 	struct spdk_poller		*poller;	/* completion poller */
130 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
131 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
132 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
133 	struct vbdev_comp_delete_ctx	*delete_ctx;
134 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
135 	int				reduce_errno;
136 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
137 	TAILQ_ENTRY(vbdev_compress)	link;
138 	struct spdk_thread		*thread;	/* thread where base device is opened */
139 };
140 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
141 
142 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
143  */
144 struct comp_io_channel {
145 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
146 };
147 
148 /* Per I/O context for the compression vbdev. */
149 struct comp_bdev_io {
150 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
151 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
152 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
153 	struct spdk_bdev_io		*orig_io;		/* the original IO */
154 	struct spdk_io_channel		*ch;			/* for resubmission */
155 	int				status;			/* save for completion on orig thread */
156 };
157 
158 /* Shared mempools between all devices on this system */
159 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
160 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
161 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
162 static bool g_qat_available = false;
163 static bool g_isal_available = false;
164 
165 /* Create shared (between all ops per PMD) compress xforms. */
166 static struct rte_comp_xform g_comp_xform = {
167 	.type = RTE_COMP_COMPRESS,
168 	.compress = {
169 		.algo = RTE_COMP_ALGO_DEFLATE,
170 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
171 		.level = RTE_COMP_LEVEL_MAX,
172 		.window_size = DEFAULT_WINDOW_SIZE,
173 		.chksum = RTE_COMP_CHECKSUM_NONE,
174 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
175 	}
176 };
177 /* Create shared (between all ops per PMD) decompress xforms. */
178 static struct rte_comp_xform g_decomp_xform = {
179 	.type = RTE_COMP_DECOMPRESS,
180 	.decompress = {
181 		.algo = RTE_COMP_ALGO_DEFLATE,
182 		.chksum = RTE_COMP_CHECKSUM_NONE,
183 		.window_size = DEFAULT_WINDOW_SIZE,
184 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
185 	}
186 };
187 
188 static void vbdev_compress_examine(struct spdk_bdev *bdev);
189 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
190 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
191 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
192 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
193 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
194 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
195 
196 /* Dummy function used by DPDK to free ext attached buffers
197  * to mbufs, we free them ourselves but this callback has to
198  * be here.
199  */
200 static void
201 shinfo_free_cb(void *arg1, void *arg2)
202 {
203 }
204 
205 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
206 static int
207 create_compress_dev(uint8_t index)
208 {
209 	struct compress_dev *device;
210 	uint16_t q_pairs;
211 	uint8_t cdev_id;
212 	int rc, i;
213 	struct comp_device_qp *dev_qp;
214 	struct comp_device_qp *tmp_qp;
215 
216 	device = calloc(1, sizeof(struct compress_dev));
217 	if (!device) {
218 		return -ENOMEM;
219 	}
220 
221 	/* Get details about this device. */
222 	rte_compressdev_info_get(index, &device->cdev_info);
223 
224 	cdev_id = device->cdev_id = index;
225 
226 	/* Zero means no limit so choose number of lcores. */
227 	if (device->cdev_info.max_nb_queue_pairs == 0) {
228 		q_pairs = MAX_NUM_QP;
229 	} else {
230 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
231 	}
232 
233 	/* Configure the compression device. */
234 	struct rte_compressdev_config config = {
235 		.socket_id = rte_socket_id(),
236 		.nb_queue_pairs = q_pairs,
237 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
238 		.max_nb_streams = 0
239 	};
240 	rc = rte_compressdev_configure(cdev_id, &config);
241 	if (rc < 0) {
242 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
243 		goto err;
244 	}
245 
246 	/* Pre-setup all potential qpairs now and assign them in the channel
247 	 * callback.
248 	 */
249 	for (i = 0; i < q_pairs; i++) {
250 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
251 						      NUM_MAX_INFLIGHT_OPS,
252 						      rte_socket_id());
253 		if (rc) {
254 			if (i > 0) {
255 				q_pairs = i;
256 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
257 					       "compressdev %u with error %u "
258 					       "so limiting to %u qpairs\n",
259 					       cdev_id, rc, q_pairs);
260 				break;
261 			} else {
262 				SPDK_ERRLOG("Failed to setup queue pair on "
263 					    "compressdev %u with error %u\n", cdev_id, rc);
264 				rc = -EINVAL;
265 				goto err;
266 			}
267 		}
268 	}
269 
270 	rc = rte_compressdev_start(cdev_id);
271 	if (rc < 0) {
272 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
273 			    cdev_id, rc);
274 		goto err;
275 	}
276 
277 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
278 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
279 				&device->comp_xform);
280 		if (rc < 0) {
281 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
282 				    cdev_id, rc);
283 			goto err;
284 		}
285 
286 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
287 				&device->decomp_xform);
288 		if (rc) {
289 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
290 				    cdev_id, rc);
291 			goto err;
292 		}
293 	} else {
294 		SPDK_ERRLOG("PMD does not support shared transforms\n");
295 		goto err;
296 	}
297 
298 	/* Build up list of device/qp combinations */
299 	for (i = 0; i < q_pairs; i++) {
300 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
301 		if (!dev_qp) {
302 			rc = -ENOMEM;
303 			goto err;
304 		}
305 		dev_qp->device = device;
306 		dev_qp->qp = i;
307 		dev_qp->thread = NULL;
308 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
309 	}
310 
311 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
312 
313 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
314 		g_qat_available = true;
315 	}
316 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
317 		g_isal_available = true;
318 	}
319 
320 	return 0;
321 
322 err:
323 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
324 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
325 		free(dev_qp);
326 	}
327 	free(device);
328 	return rc;
329 }
330 
331 /* Called from driver init entry point, vbdev_compress_init() */
332 static int
333 vbdev_init_compress_drivers(void)
334 {
335 	uint8_t cdev_count, i;
336 	struct compress_dev *tmp_dev;
337 	struct compress_dev *device;
338 	int rc;
339 
340 	/* We always init the compress_isal PMD */
341 	rc = rte_vdev_init(ISAL_PMD, NULL);
342 	if (rc == 0) {
343 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
344 	} else if (rc == -EEXIST) {
345 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
346 	} else {
347 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
348 		return -EINVAL;
349 	}
350 
351 	/* If we have no compression devices, there's no reason to continue. */
352 	cdev_count = rte_compressdev_count();
353 	if (cdev_count == 0) {
354 		return 0;
355 	}
356 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
357 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
358 		return -EINVAL;
359 	}
360 
361 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
362 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
363 	if (g_mbuf_mp == NULL) {
364 		SPDK_ERRLOG("Cannot create mbuf pool\n");
365 		rc = -ENOMEM;
366 		goto error_create_mbuf;
367 	}
368 
369 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
370 					       0, rte_socket_id());
371 	if (g_comp_op_mp == NULL) {
372 		SPDK_ERRLOG("Cannot create comp op pool\n");
373 		rc = -ENOMEM;
374 		goto error_create_op;
375 	}
376 
377 	/* Init all devices */
378 	for (i = 0; i < cdev_count; i++) {
379 		rc = create_compress_dev(i);
380 		if (rc != 0) {
381 			goto error_create_compress_devs;
382 		}
383 	}
384 
385 	if (g_qat_available == true) {
386 		SPDK_NOTICELOG("initialized QAT PMD\n");
387 	}
388 
389 	g_shinfo.free_cb = shinfo_free_cb;
390 
391 	return 0;
392 
393 	/* Error cleanup paths. */
394 error_create_compress_devs:
395 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
396 		TAILQ_REMOVE(&g_compress_devs, device, link);
397 		free(device);
398 	}
399 error_create_op:
400 error_create_mbuf:
401 	rte_mempool_free(g_mbuf_mp);
402 
403 	return rc;
404 }
405 
406 /* for completing rw requests on the orig IO thread. */
407 static void
408 _reduce_rw_blocks_cb(void *arg)
409 {
410 	struct comp_bdev_io *io_ctx = arg;
411 
412 	if (io_ctx->status == 0) {
413 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
414 	} else {
415 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
416 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
417 	}
418 }
419 
420 /* Completion callback for r/w that were issued via reducelib. */
421 static void
422 reduce_rw_blocks_cb(void *arg, int reduce_errno)
423 {
424 	struct spdk_bdev_io *bdev_io = arg;
425 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
426 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
427 	struct spdk_thread *orig_thread;
428 
429 	/* TODO: need to decide which error codes are bdev_io success vs failure;
430 	 * example examine calls reading metadata */
431 
432 	io_ctx->status = reduce_errno;
433 
434 	/* Send this request to the orig IO thread. */
435 	orig_thread = spdk_io_channel_get_thread(ch);
436 	if (orig_thread != spdk_get_thread()) {
437 		spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
438 	} else {
439 		_reduce_rw_blocks_cb(io_ctx);
440 	}
441 }
442 
443 static uint64_t
444 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
445 		     struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
446 {
447 	uint64_t updated_length, remainder, phys_addr;
448 	uint8_t *current_base = NULL;
449 	int iov_index, mbuf_index;
450 	int rc = 0;
451 
452 	/* Setup mbufs */
453 	iov_index = mbuf_index = 0;
454 	while (iov_index < iovcnt) {
455 
456 		current_base = iovs[iov_index].iov_base;
457 		if (total_length) {
458 			*total_length += iovs[iov_index].iov_len;
459 		}
460 		assert(mbufs[mbuf_index] != NULL);
461 		mbufs[mbuf_index]->userdata = reduce_cb_arg;
462 		updated_length = iovs[iov_index].iov_len;
463 		phys_addr = spdk_vtophys((void *)current_base, &updated_length);
464 
465 		rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
466 					  current_base,
467 					  phys_addr,
468 					  updated_length,
469 					  &g_shinfo);
470 		rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
471 		remainder = iovs[iov_index].iov_len - updated_length;
472 
473 		if (mbuf_index > 0) {
474 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
475 		}
476 
477 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
478 		if (remainder > 0) {
479 			/* allocate an mbuf at the end of the array */
480 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
481 						    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
482 			if (rc) {
483 				SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
484 				return -1;
485 			}
486 			(*mbuf_total)++;
487 			mbuf_index++;
488 			mbufs[mbuf_index]->userdata = reduce_cb_arg;
489 			current_base += updated_length;
490 			phys_addr = spdk_vtophys((void *)current_base, &remainder);
491 			/* assert we don't cross another */
492 			assert(remainder == iovs[iov_index].iov_len - updated_length);
493 
494 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
495 						  current_base,
496 						  phys_addr,
497 						  remainder,
498 						  &g_shinfo);
499 			rte_pktmbuf_append(mbufs[mbuf_index], remainder);
500 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
501 		}
502 		iov_index++;
503 		mbuf_index++;
504 	}
505 
506 	return 0;
507 }
508 
509 static int
510 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
511 		    int src_iovcnt, struct iovec *dst_iovs,
512 		    int dst_iovcnt, bool compress, void *cb_arg)
513 {
514 	void *reduce_cb_arg = cb_arg;
515 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
516 					   backing_dev);
517 	struct rte_comp_op *comp_op;
518 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
519 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
520 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
521 	uint64_t total_length = 0;
522 	int rc = 0;
523 	struct vbdev_comp_op *op_to_queue;
524 	int i;
525 	int src_mbuf_total = src_iovcnt;
526 	int dst_mbuf_total = dst_iovcnt;
527 	bool device_error = false;
528 
529 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
530 
531 #ifdef DEBUG
532 	memset(src_mbufs, 0, sizeof(src_mbufs));
533 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
534 #endif
535 
536 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
537 	if (!comp_op) {
538 		SPDK_ERRLOG("trying to get a comp op!\n");
539 		goto error_get_op;
540 	}
541 
542 	/* get an mbuf per iov, src and dst */
543 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
544 	if (rc) {
545 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
546 		goto error_get_src;
547 	}
548 
549 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
550 	if (rc) {
551 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
552 		goto error_get_dst;
553 	}
554 
555 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
556 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
557 	 * and associate with our single comp_op.
558 	 */
559 
560 	rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length,
561 				  src_iovs, src_iovcnt, reduce_cb_arg);
562 	if (rc < 0) {
563 		goto error_src_dst;
564 	}
565 
566 	comp_op->m_src = src_mbufs[0];
567 	comp_op->src.offset = 0;
568 	comp_op->src.length = total_length;
569 
570 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
571 	rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL,
572 				  dst_iovs, dst_iovcnt, reduce_cb_arg);
573 	if (rc < 0) {
574 		goto error_src_dst;
575 	}
576 
577 	comp_op->m_dst = dst_mbufs[0];
578 	comp_op->dst.offset = 0;
579 
580 	if (compress == true) {
581 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
582 	} else {
583 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
584 	}
585 
586 	comp_op->op_type = RTE_COMP_OP_STATELESS;
587 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
588 
589 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
590 	assert(rc <= 1);
591 
592 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
593 	if (rc == 1) {
594 		return 0;
595 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
596 		/* we free mbufs differently depending on whether they were chained or not */
597 		rte_pktmbuf_free(comp_op->m_src);
598 		rte_pktmbuf_free(comp_op->m_dst);
599 		goto error_enqueue;
600 	} else {
601 		device_error = true;
602 		goto error_src_dst;
603 	}
604 
605 	/* Error cleanup paths. */
606 error_src_dst:
607 	for (i = 0; i < dst_mbuf_total; i++) {
608 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
609 	}
610 error_get_dst:
611 	for (i = 0; i < src_mbuf_total; i++) {
612 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
613 	}
614 error_get_src:
615 error_enqueue:
616 	rte_comp_op_free(comp_op);
617 error_get_op:
618 
619 	if (device_error == true) {
620 		/* There was an error sending the op to the device, most
621 		 * likely with the parameters.
622 		 */
623 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
624 		return -EINVAL;
625 	}
626 
627 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
628 	if (op_to_queue == NULL) {
629 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
630 		return -ENOMEM;
631 	}
632 	op_to_queue->backing_dev = backing_dev;
633 	op_to_queue->src_iovs = src_iovs;
634 	op_to_queue->src_iovcnt = src_iovcnt;
635 	op_to_queue->dst_iovs = dst_iovs;
636 	op_to_queue->dst_iovcnt = dst_iovcnt;
637 	op_to_queue->compress = compress;
638 	op_to_queue->cb_arg = cb_arg;
639 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
640 			  op_to_queue,
641 			  link);
642 	return 0;
643 }
644 
645 /* Poller for the DPDK compression driver. */
646 static int
647 comp_dev_poller(void *args)
648 {
649 	struct vbdev_compress *comp_bdev = args;
650 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
651 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
652 	uint16_t num_deq;
653 	struct spdk_reduce_vol_cb_args *reduce_args;
654 	struct vbdev_comp_op *op_to_resubmit;
655 	int rc, i;
656 
657 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
658 						NUM_MAX_INFLIGHT_OPS);
659 	for (i = 0; i < num_deq; i++) {
660 		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata;
661 
662 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
663 
664 			/* tell reduce this is done and what the bytecount was */
665 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
666 		} else {
667 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
668 				       deq_ops[i]->status);
669 
670 			/* Reduce will simply store uncompressed on neg errno value. */
671 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
672 		}
673 
674 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
675 		 * call takes care of freeing all of the mbufs in the chain back to their
676 		 * original pool.
677 		 */
678 		rte_pktmbuf_free(deq_ops[i]->m_src);
679 		rte_pktmbuf_free(deq_ops[i]->m_dst);
680 
681 		/* There is no bulk free for com ops so we have to free them one at a time
682 		 * here however it would be rare that we'd ever have more than 1 at a time
683 		 * anyways.
684 		 */
685 		rte_comp_op_free(deq_ops[i]);
686 
687 		/* Check if there are any pending comp ops to process, only pull one
688 		 * at a time off as _compress_operation() may re-queue the op.
689 		 */
690 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
691 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
692 			rc = _compress_operation(op_to_resubmit->backing_dev,
693 						 op_to_resubmit->src_iovs,
694 						 op_to_resubmit->src_iovcnt,
695 						 op_to_resubmit->dst_iovs,
696 						 op_to_resubmit->dst_iovcnt,
697 						 op_to_resubmit->compress,
698 						 op_to_resubmit->cb_arg);
699 			if (rc == 0) {
700 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
701 				free(op_to_resubmit);
702 			}
703 		}
704 	}
705 	return 0;
706 }
707 
708 /* Entry point for reduce lib to issue a compress operation. */
709 static void
710 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
711 		      struct iovec *src_iovs, int src_iovcnt,
712 		      struct iovec *dst_iovs, int dst_iovcnt,
713 		      struct spdk_reduce_vol_cb_args *cb_arg)
714 {
715 	int rc;
716 
717 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
718 	if (rc) {
719 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
720 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
721 	}
722 }
723 
724 /* Entry point for reduce lib to issue a decompress operation. */
725 static void
726 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
727 			struct iovec *src_iovs, int src_iovcnt,
728 			struct iovec *dst_iovs, int dst_iovcnt,
729 			struct spdk_reduce_vol_cb_args *cb_arg)
730 {
731 	int rc;
732 
733 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
734 	if (rc) {
735 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
736 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
737 	}
738 }
739 
740 /* Callback for getting a buf from the bdev pool in the event that the caller passed
741  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
742  * beneath us before we're done with it.
743  */
744 static void
745 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
746 {
747 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
748 					   comp_bdev);
749 
750 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
751 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
752 			      reduce_rw_blocks_cb, bdev_io);
753 }
754 
755 /* scheduled for completion on IO thread */
756 static void
757 _complete_other_io(void *arg)
758 {
759 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
760 	if (io_ctx->status == 0) {
761 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
762 	} else {
763 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
764 	}
765 }
766 
767 /* scheduled for submission on reduce thread */
768 static void
769 _comp_bdev_io_submit(void *arg)
770 {
771 	struct spdk_bdev_io *bdev_io = arg;
772 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
773 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
774 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
775 					   comp_bdev);
776 	struct spdk_thread *orig_thread;
777 	int rc = 0;
778 
779 	switch (bdev_io->type) {
780 	case SPDK_BDEV_IO_TYPE_READ:
781 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
782 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
783 		return;
784 	case SPDK_BDEV_IO_TYPE_WRITE:
785 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
786 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
787 				       reduce_rw_blocks_cb, bdev_io);
788 		return;
789 	/* TODO in future patch in the series */
790 	case SPDK_BDEV_IO_TYPE_RESET:
791 		break;
792 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
793 	case SPDK_BDEV_IO_TYPE_UNMAP:
794 	case SPDK_BDEV_IO_TYPE_FLUSH:
795 	default:
796 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
797 		rc = -EINVAL;
798 	}
799 
800 	if (rc) {
801 		if (rc == -ENOMEM) {
802 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
803 			io_ctx->ch = ch;
804 			vbdev_compress_queue_io(bdev_io);
805 			return;
806 		} else {
807 			SPDK_ERRLOG("on bdev_io submission!\n");
808 			io_ctx->status = rc;
809 		}
810 	}
811 
812 	/* Complete this on the orig IO thread. */
813 	orig_thread = spdk_io_channel_get_thread(ch);
814 	if (orig_thread != spdk_get_thread()) {
815 		spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
816 	} else {
817 		_complete_other_io(io_ctx);
818 	}
819 }
820 
821 /* Called when someone above submits IO to this vbdev. */
822 static void
823 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
824 {
825 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
826 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
827 					   comp_bdev);
828 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
829 
830 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
831 	io_ctx->comp_bdev = comp_bdev;
832 	io_ctx->comp_ch = comp_ch;
833 	io_ctx->orig_io = bdev_io;
834 
835 	/* Send this request to the reduce_thread if that's not what we're on. */
836 	if (spdk_get_thread() != comp_bdev->reduce_thread) {
837 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
838 	} else {
839 		_comp_bdev_io_submit(bdev_io);
840 	}
841 }
842 
843 static bool
844 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
845 {
846 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
847 
848 	switch (io_type) {
849 	case SPDK_BDEV_IO_TYPE_READ:
850 	case SPDK_BDEV_IO_TYPE_WRITE:
851 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
852 	case SPDK_BDEV_IO_TYPE_UNMAP:
853 	case SPDK_BDEV_IO_TYPE_RESET:
854 	case SPDK_BDEV_IO_TYPE_FLUSH:
855 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
856 	default:
857 		return false;
858 	}
859 }
860 
861 /* Resubmission function used by the bdev layer when a queued IO is ready to be
862  * submitted.
863  */
864 static void
865 vbdev_compress_resubmit_io(void *arg)
866 {
867 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
868 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
869 
870 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
871 }
872 
873 /* Used to queue an IO in the event of resource issues. */
874 static void
875 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
876 {
877 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
878 	int rc;
879 
880 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
881 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
882 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
883 
884 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
885 	if (rc) {
886 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
887 		assert(false);
888 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
889 	}
890 }
891 
892 /* Callback for unregistering the IO device. */
893 static void
894 _device_unregister_cb(void *io_device)
895 {
896 	struct vbdev_compress *comp_bdev = io_device;
897 
898 	/* Done with this comp_bdev. */
899 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
900 	free(comp_bdev->comp_bdev.name);
901 	free(comp_bdev);
902 }
903 
904 static void
905 _vbdev_compress_destruct_cb(void *ctx)
906 {
907 	struct vbdev_compress *comp_bdev = ctx;
908 
909 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
910 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
911 	/* Close the underlying bdev on its same opened thread. */
912 	spdk_bdev_close(comp_bdev->base_desc);
913 	comp_bdev->vol = NULL;
914 	if (comp_bdev->orphaned == false) {
915 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
916 	} else {
917 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
918 		_device_unregister_cb(comp_bdev);
919 	}
920 }
921 
922 static void
923 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
924 {
925 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
926 
927 	if (reduce_errno) {
928 		SPDK_ERRLOG("number %d\n", reduce_errno);
929 	} else {
930 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
931 			spdk_thread_send_msg(comp_bdev->thread,
932 					     _vbdev_compress_destruct_cb, comp_bdev);
933 		} else {
934 			_vbdev_compress_destruct_cb(comp_bdev);
935 		}
936 	}
937 }
938 
939 static void
940 _reduce_destroy_cb(void *ctx, int reduce_errno)
941 {
942 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
943 
944 	if (reduce_errno) {
945 		SPDK_ERRLOG("number %d\n", reduce_errno);
946 	}
947 
948 	comp_bdev->vol = NULL;
949 	spdk_put_io_channel(comp_bdev->base_ch);
950 	if (comp_bdev->orphaned == false) {
951 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
952 				     comp_bdev->delete_ctx);
953 	} else {
954 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
955 	}
956 
957 }
958 
959 static void
960 _delete_vol_unload_cb(void *ctx)
961 {
962 	struct vbdev_compress *comp_bdev = ctx;
963 
964 	/* FIXME: Assert if these conditions are not satisified for now. */
965 	assert(!comp_bdev->reduce_thread ||
966 	       comp_bdev->reduce_thread == spdk_get_thread());
967 
968 	/* reducelib needs a channel to comm with the backing device */
969 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
970 
971 	/* Clean the device before we free our resources. */
972 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
973 }
974 
975 /* Called by reduceLib after performing unload vol actions */
976 static void
977 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
978 {
979 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
980 
981 	if (reduce_errno) {
982 		SPDK_ERRLOG("number %d\n", reduce_errno);
983 		/* FIXME: callback should be executed. */
984 		return;
985 	}
986 
987 	pthread_mutex_lock(&comp_bdev->reduce_lock);
988 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
989 		spdk_thread_send_msg(comp_bdev->reduce_thread,
990 				     _delete_vol_unload_cb, comp_bdev);
991 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
992 	} else {
993 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
994 
995 		_delete_vol_unload_cb(comp_bdev);
996 	}
997 }
998 
999 const char *
1000 compress_get_name(const struct vbdev_compress *comp_bdev)
1001 {
1002 	return comp_bdev->comp_bdev.name;
1003 }
1004 
1005 struct vbdev_compress *
1006 compress_bdev_first(void)
1007 {
1008 	struct vbdev_compress *comp_bdev;
1009 
1010 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1011 
1012 	return comp_bdev;
1013 }
1014 
1015 struct vbdev_compress *
1016 compress_bdev_next(struct vbdev_compress *prev)
1017 {
1018 	struct vbdev_compress *comp_bdev;
1019 
1020 	comp_bdev = TAILQ_NEXT(prev, link);
1021 
1022 	return comp_bdev;
1023 }
1024 
1025 bool
1026 compress_has_orphan(const char *name)
1027 {
1028 	struct vbdev_compress *comp_bdev;
1029 
1030 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1031 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1032 			return true;
1033 		}
1034 	}
1035 	return false;
1036 }
1037 
1038 /* Called after we've unregistered following a hot remove callback.
1039  * Our finish entry point will be called next.
1040  */
1041 static int
1042 vbdev_compress_destruct(void *ctx)
1043 {
1044 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1045 
1046 	if (comp_bdev->vol != NULL) {
1047 		/* Tell reducelib that we're done with this volume. */
1048 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1049 	} else {
1050 		vbdev_compress_destruct_cb(comp_bdev, 0);
1051 	}
1052 
1053 	return 0;
1054 }
1055 
1056 /* We supplied this as an entry point for upper layers who want to communicate to this
1057  * bdev.  This is how they get a channel.
1058  */
1059 static struct spdk_io_channel *
1060 vbdev_compress_get_io_channel(void *ctx)
1061 {
1062 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1063 
1064 	/* The IO channel code will allocate a channel for us which consists of
1065 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1066 	 * that we passed in when we registered our IO device. It will then call
1067 	 * our channel create callback to populate any elements that we need to
1068 	 * update.
1069 	 */
1070 	return spdk_get_io_channel(comp_bdev);
1071 }
1072 
1073 /* This is the output for bdev_get_bdevs() for this vbdev */
1074 static int
1075 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1076 {
1077 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1078 
1079 	spdk_json_write_name(w, "compress");
1080 	spdk_json_write_object_begin(w);
1081 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1082 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1083 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1084 	spdk_json_write_object_end(w);
1085 
1086 	return 0;
1087 }
1088 
1089 /* This is used to generate JSON that can configure this module to its current state. */
1090 static int
1091 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1092 {
1093 	struct vbdev_compress *comp_bdev;
1094 
1095 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1096 		spdk_json_write_object_begin(w);
1097 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1098 		spdk_json_write_named_object_begin(w, "params");
1099 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1100 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1101 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1102 		spdk_json_write_object_end(w);
1103 		spdk_json_write_object_end(w);
1104 	}
1105 	return 0;
1106 }
1107 
1108 static void
1109 _vbdev_reduce_init_cb(void *ctx)
1110 {
1111 	struct vbdev_compress *meta_ctx = ctx;
1112 
1113 	/* We're done with metadata operations */
1114 	spdk_put_io_channel(meta_ctx->base_ch);
1115 	/* Close the underlying bdev on its same opened thread. */
1116 	spdk_bdev_close(meta_ctx->base_desc);
1117 	meta_ctx->base_desc = NULL;
1118 
1119 	if (meta_ctx->vol) {
1120 		vbdev_compress_claim(meta_ctx);
1121 	} else {
1122 		free(meta_ctx);
1123 	}
1124 }
1125 
1126 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1127  * used for initial metadata operations to claim where it will be further filled out
1128  * and added to the global list.
1129  */
1130 static void
1131 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1132 {
1133 	struct vbdev_compress *meta_ctx = cb_arg;
1134 
1135 	if (reduce_errno == 0) {
1136 		meta_ctx->vol = vol;
1137 	} else {
1138 		SPDK_ERRLOG("for vol %s, error %u\n",
1139 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1140 	}
1141 
1142 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1143 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
1144 	} else {
1145 		_vbdev_reduce_init_cb(meta_ctx);
1146 	}
1147 }
1148 
1149 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1150  * call the callback provided by reduceLib when it called the read/write/unmap function and
1151  * free the bdev_io.
1152  */
1153 static void
1154 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1155 {
1156 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1157 	int reduce_errno;
1158 
1159 	if (success) {
1160 		reduce_errno = 0;
1161 	} else {
1162 		reduce_errno = -EIO;
1163 	}
1164 	spdk_bdev_free_io(bdev_io);
1165 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1166 }
1167 
1168 /* This is the function provided to the reduceLib for sending reads directly to
1169  * the backing device.
1170  */
1171 static void
1172 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1173 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1174 {
1175 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1176 					   backing_dev);
1177 	int rc;
1178 
1179 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1180 				    iov, iovcnt, lba, lba_count,
1181 				    comp_reduce_io_cb,
1182 				    args);
1183 	if (rc) {
1184 		if (rc == -ENOMEM) {
1185 			SPDK_ERRLOG("No memory, start to queue io.\n");
1186 			/* TODO: there's no bdev_io to queue */
1187 		} else {
1188 			SPDK_ERRLOG("submitting readv request\n");
1189 		}
1190 		args->cb_fn(args->cb_arg, rc);
1191 	}
1192 }
1193 
1194 /* This is the function provided to the reduceLib for sending writes directly to
1195  * the backing device.
1196  */
1197 static void
1198 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1199 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1200 {
1201 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1202 					   backing_dev);
1203 	int rc;
1204 
1205 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1206 				     iov, iovcnt, lba, lba_count,
1207 				     comp_reduce_io_cb,
1208 				     args);
1209 	if (rc) {
1210 		if (rc == -ENOMEM) {
1211 			SPDK_ERRLOG("No memory, start to queue io.\n");
1212 			/* TODO: there's no bdev_io to queue */
1213 		} else {
1214 			SPDK_ERRLOG("error submitting writev request\n");
1215 		}
1216 		args->cb_fn(args->cb_arg, rc);
1217 	}
1218 }
1219 
1220 /* This is the function provided to the reduceLib for sending unmaps directly to
1221  * the backing device.
1222  */
1223 static void
1224 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1225 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1226 {
1227 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1228 					   backing_dev);
1229 	int rc;
1230 
1231 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1232 				    lba, lba_count,
1233 				    comp_reduce_io_cb,
1234 				    args);
1235 
1236 	if (rc) {
1237 		if (rc == -ENOMEM) {
1238 			SPDK_ERRLOG("No memory, start to queue io.\n");
1239 			/* TODO: there's no bdev_io to queue */
1240 		} else {
1241 			SPDK_ERRLOG("submitting unmap request\n");
1242 		}
1243 		args->cb_fn(args->cb_arg, rc);
1244 	}
1245 }
1246 
1247 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1248 static void
1249 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1250 {
1251 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1252 
1253 	if (reduce_errno) {
1254 		SPDK_ERRLOG("number %d\n", reduce_errno);
1255 	}
1256 
1257 	comp_bdev->vol = NULL;
1258 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1259 }
1260 
1261 /* Called when the underlying base bdev goes away. */
1262 static void
1263 vbdev_compress_base_bdev_hotremove_cb(void *ctx)
1264 {
1265 	struct vbdev_compress *comp_bdev, *tmp;
1266 	struct spdk_bdev *bdev_find = ctx;
1267 
1268 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1269 		if (bdev_find == comp_bdev->base_bdev) {
1270 			/* Tell reduceLib that we're done with this volume. */
1271 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1272 		}
1273 	}
1274 }
1275 
1276 /* TODO: determine which parms we want user configurable, HC for now
1277  * params.vol_size
1278  * params.chunk_size
1279  * compression PMD, algorithm, window size, comp level, etc.
1280  * DEV_MD_PATH
1281  */
1282 
1283 /* Common function for init and load to allocate and populate the minimal
1284  * information for reducelib to init or load.
1285  */
1286 struct vbdev_compress *
1287 _prepare_for_load_init(struct spdk_bdev *bdev)
1288 {
1289 	struct vbdev_compress *meta_ctx;
1290 
1291 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1292 	if (meta_ctx == NULL) {
1293 		SPDK_ERRLOG("failed to alloc init contexts\n");
1294 		return NULL;
1295 	}
1296 
1297 	meta_ctx->drv_name = "None";
1298 	meta_ctx->base_bdev = bdev;
1299 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1300 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1301 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1302 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1303 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1304 
1305 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1306 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1307 
1308 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1309 	meta_ctx->params.logical_block_size = bdev->blocklen;
1310 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1311 	return meta_ctx;
1312 }
1313 
1314 static bool
1315 _set_pmd(struct vbdev_compress *comp_dev)
1316 {
1317 	if (g_opts == COMPRESS_PMD_AUTO) {
1318 		if (g_qat_available) {
1319 			comp_dev->drv_name = QAT_PMD;
1320 		} else {
1321 			comp_dev->drv_name = ISAL_PMD;
1322 		}
1323 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1324 		comp_dev->drv_name = QAT_PMD;
1325 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1326 		comp_dev->drv_name = ISAL_PMD;
1327 	} else {
1328 		SPDK_ERRLOG("Requested PMD is not available.\n");
1329 		return false;
1330 	}
1331 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1332 	return true;
1333 }
1334 
1335 /* Call reducelib to initialize a new volume */
1336 static int
1337 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path)
1338 {
1339 	struct vbdev_compress *meta_ctx;
1340 	int rc;
1341 
1342 	meta_ctx = _prepare_for_load_init(bdev);
1343 	if (meta_ctx == NULL) {
1344 		return -EINVAL;
1345 	}
1346 
1347 	if (_set_pmd(meta_ctx) == false) {
1348 		SPDK_ERRLOG("could not find required pmd\n");
1349 		free(meta_ctx);
1350 		return -EINVAL;
1351 	}
1352 
1353 	rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1354 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1355 	if (rc) {
1356 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1357 		free(meta_ctx);
1358 		return -EINVAL;
1359 	}
1360 
1361 	/* Save the thread where the base device is opened */
1362 	meta_ctx->thread = spdk_get_thread();
1363 
1364 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1365 
1366 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1367 			     pm_path,
1368 			     vbdev_reduce_init_cb,
1369 			     meta_ctx);
1370 	return 0;
1371 }
1372 
1373 /* We provide this callback for the SPDK channel code to create a channel using
1374  * the channel struct we provided in our module get_io_channel() entry point. Here
1375  * we get and save off an underlying base channel of the device below us so that
1376  * we can communicate with the base bdev on a per channel basis.  If we needed
1377  * our own poller for this vbdev, we'd register it here.
1378  */
1379 static int
1380 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1381 {
1382 	struct vbdev_compress *comp_bdev = io_device;
1383 	struct comp_device_qp *device_qp;
1384 
1385 	/* Now set the reduce channel if it's not already set. */
1386 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1387 	if (comp_bdev->ch_count == 0) {
1388 		/* We use this queue to track outstanding IO in our layer. */
1389 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1390 
1391 		/* We use this to queue up compression operations as needed. */
1392 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1393 
1394 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1395 		comp_bdev->reduce_thread = spdk_get_thread();
1396 		comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
1397 		/* Now assign a q pair */
1398 		pthread_mutex_lock(&g_comp_device_qp_lock);
1399 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1400 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1401 				if (device_qp->thread == spdk_get_thread()) {
1402 					comp_bdev->device_qp = device_qp;
1403 					break;
1404 				}
1405 				if (device_qp->thread == NULL) {
1406 					comp_bdev->device_qp = device_qp;
1407 					device_qp->thread = spdk_get_thread();
1408 					break;
1409 				}
1410 			}
1411 		}
1412 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1413 	}
1414 	comp_bdev->ch_count++;
1415 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1416 
1417 	if (comp_bdev->device_qp != NULL) {
1418 		return 0;
1419 	} else {
1420 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1421 		assert(false);
1422 		return -ENOMEM;
1423 	}
1424 }
1425 
1426 static void
1427 _channel_cleanup(struct vbdev_compress *comp_bdev)
1428 {
1429 	/* Note: comp_bdevs can share a device_qp if they are
1430 	 * on the same thread so we leave the device_qp element
1431 	 * alone for this comp_bdev and just clear the reduce thread.
1432 	 */
1433 	spdk_put_io_channel(comp_bdev->base_ch);
1434 	comp_bdev->reduce_thread = NULL;
1435 	spdk_poller_unregister(&comp_bdev->poller);
1436 }
1437 
1438 /* Used to reroute destroy_ch to the correct thread */
1439 static void
1440 _comp_bdev_ch_destroy_cb(void *arg)
1441 {
1442 	struct vbdev_compress *comp_bdev = arg;
1443 
1444 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1445 	_channel_cleanup(comp_bdev);
1446 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1447 }
1448 
1449 /* We provide this callback for the SPDK channel code to destroy a channel
1450  * created with our create callback. We just need to undo anything we did
1451  * when we created. If this bdev used its own poller, we'd unregister it here.
1452  */
1453 static void
1454 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1455 {
1456 	struct vbdev_compress *comp_bdev = io_device;
1457 
1458 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1459 	comp_bdev->ch_count--;
1460 	if (comp_bdev->ch_count == 0) {
1461 		/* Send this request to the thread where the channel was created. */
1462 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1463 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1464 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1465 		} else {
1466 			_channel_cleanup(comp_bdev);
1467 		}
1468 	}
1469 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1470 }
1471 
1472 /* RPC entry point for compression vbdev creation. */
1473 int
1474 create_compress_bdev(const char *bdev_name, const char *pm_path)
1475 {
1476 	struct spdk_bdev *bdev;
1477 
1478 	bdev = spdk_bdev_get_by_name(bdev_name);
1479 	if (!bdev) {
1480 		return -ENODEV;
1481 	}
1482 
1483 	return vbdev_init_reduce(bdev, pm_path);;
1484 }
1485 
1486 /* On init, just init the compress drivers. All metadata is stored on disk. */
1487 static int
1488 vbdev_compress_init(void)
1489 {
1490 	if (vbdev_init_compress_drivers()) {
1491 		SPDK_ERRLOG("Error setting up compression devices\n");
1492 		return -EINVAL;
1493 	}
1494 
1495 	return 0;
1496 }
1497 
1498 /* Called when the entire module is being torn down. */
1499 static void
1500 vbdev_compress_finish(void)
1501 {
1502 	struct comp_device_qp *dev_qp;
1503 	/* TODO: unload vol in a future patch */
1504 
1505 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1506 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1507 		free(dev_qp);
1508 	}
1509 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1510 
1511 	rte_mempool_free(g_comp_op_mp);
1512 	rte_mempool_free(g_mbuf_mp);
1513 }
1514 
1515 /* During init we'll be asked how much memory we'd like passed to us
1516  * in bev_io structures as context. Here's where we specify how
1517  * much context we want per IO.
1518  */
1519 static int
1520 vbdev_compress_get_ctx_size(void)
1521 {
1522 	return sizeof(struct comp_bdev_io);
1523 }
1524 
1525 /* When we register our bdev this is how we specify our entry points. */
1526 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1527 	.destruct		= vbdev_compress_destruct,
1528 	.submit_request		= vbdev_compress_submit_request,
1529 	.io_type_supported	= vbdev_compress_io_type_supported,
1530 	.get_io_channel		= vbdev_compress_get_io_channel,
1531 	.dump_info_json		= vbdev_compress_dump_info_json,
1532 	.write_config_json	= NULL,
1533 };
1534 
1535 static struct spdk_bdev_module compress_if = {
1536 	.name = "compress",
1537 	.module_init = vbdev_compress_init,
1538 	.config_text = NULL,
1539 	.get_ctx_size = vbdev_compress_get_ctx_size,
1540 	.examine_disk = vbdev_compress_examine,
1541 	.module_fini = vbdev_compress_finish,
1542 	.config_json = vbdev_compress_config_json
1543 };
1544 
1545 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1546 
1547 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1548 {
1549 	struct spdk_bdev_alias *aliases;
1550 
1551 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1552 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1553 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias);
1554 		if (!comp_bdev->comp_bdev.name) {
1555 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1556 			return -ENOMEM;
1557 		}
1558 	} else {
1559 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1560 		if (!comp_bdev->comp_bdev.name) {
1561 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1562 			return -ENOMEM;
1563 		}
1564 	}
1565 	return 0;
1566 }
1567 
1568 static void
1569 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1570 {
1571 	int rc;
1572 
1573 	if (_set_compbdev_name(comp_bdev)) {
1574 		goto error_bdev_name;
1575 	}
1576 
1577 	/* Note: some of the fields below will change in the future - for example,
1578 	 * blockcnt specifically will not match (the compressed volume size will
1579 	 * be slightly less than the base bdev size)
1580 	 */
1581 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1582 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1583 
1584 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1585 		comp_bdev->comp_bdev.required_alignment =
1586 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1587 				 comp_bdev->base_bdev->required_alignment);
1588 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1589 			       comp_bdev->comp_bdev.required_alignment);
1590 	} else {
1591 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1592 	}
1593 	comp_bdev->comp_bdev.optimal_io_boundary =
1594 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1595 
1596 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1597 
1598 	comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen;
1599 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1600 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1601 
1602 	/* This is the context that is passed to us when the bdev
1603 	 * layer calls in so we'll save our comp_bdev node here.
1604 	 */
1605 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1606 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1607 	comp_bdev->comp_bdev.module = &compress_if;
1608 
1609 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1610 
1611 	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1612 			    comp_bdev->base_bdev, &comp_bdev->base_desc);
1613 	if (rc) {
1614 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1615 		goto error_open;
1616 	}
1617 
1618 	/* Save the thread where the base device is opened */
1619 	comp_bdev->thread = spdk_get_thread();
1620 
1621 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1622 				sizeof(struct comp_io_channel),
1623 				comp_bdev->comp_bdev.name);
1624 
1625 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1626 					 comp_bdev->comp_bdev.module);
1627 	if (rc) {
1628 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1629 		goto error_claim;
1630 	}
1631 
1632 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1633 	if (rc < 0) {
1634 		SPDK_ERRLOG("trying to register bdev\n");
1635 		goto error_bdev_register;
1636 	}
1637 
1638 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1639 
1640 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1641 
1642 	return;
1643 	/* Error cleanup paths. */
1644 error_bdev_register:
1645 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1646 error_claim:
1647 	spdk_io_device_unregister(comp_bdev, NULL);
1648 	spdk_bdev_close(comp_bdev->base_desc);
1649 error_open:
1650 	free(comp_bdev->comp_bdev.name);
1651 error_bdev_name:
1652 	free(comp_bdev);
1653 }
1654 
1655 static void
1656 _vbdev_compress_delete_done(void *_ctx)
1657 {
1658 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1659 
1660 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1661 
1662 	free(ctx);
1663 }
1664 
1665 static void
1666 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1667 {
1668 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1669 
1670 	ctx->cb_rc = bdeverrno;
1671 
1672 	if (ctx->orig_thread != spdk_get_thread()) {
1673 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1674 	} else {
1675 		_vbdev_compress_delete_done(ctx);
1676 	}
1677 }
1678 
1679 void
1680 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1681 {
1682 	struct vbdev_compress *comp_bdev = NULL;
1683 	struct vbdev_comp_delete_ctx *ctx;
1684 
1685 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1686 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1687 			break;
1688 		}
1689 	}
1690 
1691 	if (comp_bdev == NULL) {
1692 		cb_fn(cb_arg, -ENODEV);
1693 		return;
1694 	}
1695 
1696 	ctx = calloc(1, sizeof(*ctx));
1697 	if (ctx == NULL) {
1698 		SPDK_ERRLOG("Failed to allocate delete context\n");
1699 		cb_fn(cb_arg, -ENOMEM);
1700 		return;
1701 	}
1702 
1703 	/* Save these for after the vol is destroyed. */
1704 	ctx->cb_fn = cb_fn;
1705 	ctx->cb_arg = cb_arg;
1706 	ctx->orig_thread = spdk_get_thread();
1707 
1708 	comp_bdev->delete_ctx = ctx;
1709 
1710 	/* Tell reducelib that we're done with this volume. */
1711 	if (comp_bdev->orphaned == false) {
1712 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1713 	} else {
1714 		delete_vol_unload_cb(comp_bdev, 0);
1715 	}
1716 }
1717 
1718 static void
1719 _vbdev_reduce_load_cb(void *ctx)
1720 {
1721 	struct vbdev_compress *meta_ctx = ctx;
1722 	int rc;
1723 
1724 	/* Done with metadata operations */
1725 	spdk_put_io_channel(meta_ctx->base_ch);
1726 	/* Close the underlying bdev on its same opened thread. */
1727 	spdk_bdev_close(meta_ctx->base_desc);
1728 	meta_ctx->base_desc = NULL;
1729 
1730 	if (meta_ctx->reduce_errno == 0) {
1731 		if (_set_pmd(meta_ctx) == false) {
1732 			SPDK_ERRLOG("could not find required pmd\n");
1733 			goto err;
1734 		}
1735 
1736 		vbdev_compress_claim(meta_ctx);
1737 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1738 		if (_set_compbdev_name(meta_ctx)) {
1739 			goto err;
1740 		}
1741 
1742 		/* We still want to open and claim the backing device to protect the data until
1743 		 * either the pm metadata file is recovered or the comp bdev is deleted.
1744 		 */
1745 		rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1746 				    meta_ctx->base_bdev, &meta_ctx->base_desc);
1747 		if (rc) {
1748 			SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1749 			free(meta_ctx->comp_bdev.name);
1750 			goto err;
1751 		}
1752 
1753 		/* Save the thread where the base device is opened */
1754 		meta_ctx->thread = spdk_get_thread();
1755 
1756 		meta_ctx->comp_bdev.module = &compress_if;
1757 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1758 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1759 						 meta_ctx->comp_bdev.module);
1760 		if (rc) {
1761 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1762 			spdk_bdev_close(meta_ctx->base_desc);
1763 			free(meta_ctx->comp_bdev.name);
1764 			goto err;
1765 		}
1766 
1767 		meta_ctx->orphaned = true;
1768 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1769 	} else {
1770 		if (meta_ctx->reduce_errno != -EILSEQ) {
1771 			SPDK_ERRLOG("for vol %s, error %u\n",
1772 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1773 		}
1774 		goto err;
1775 	}
1776 
1777 	spdk_bdev_module_examine_done(&compress_if);
1778 	return;
1779 
1780 err:
1781 	free(meta_ctx);
1782 	spdk_bdev_module_examine_done(&compress_if);
1783 }
1784 
1785 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1786  * used for initial metadata operations to claim where it will be further filled out
1787  * and added to the global list.
1788  */
1789 static void
1790 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1791 {
1792 	struct vbdev_compress *meta_ctx = cb_arg;
1793 
1794 	if (reduce_errno == 0) {
1795 		/* Update information following volume load. */
1796 		meta_ctx->vol = vol;
1797 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1798 		       sizeof(struct spdk_reduce_vol_params));
1799 	}
1800 
1801 	meta_ctx->reduce_errno = reduce_errno;
1802 
1803 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1804 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1805 	} else {
1806 		_vbdev_reduce_load_cb(meta_ctx);
1807 	}
1808 
1809 }
1810 
1811 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1812  * and if so will go ahead and claim it.
1813  */
1814 static void
1815 vbdev_compress_examine(struct spdk_bdev *bdev)
1816 {
1817 	struct vbdev_compress *meta_ctx;
1818 	int rc;
1819 
1820 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1821 		spdk_bdev_module_examine_done(&compress_if);
1822 		return;
1823 	}
1824 
1825 	meta_ctx = _prepare_for_load_init(bdev);
1826 	if (meta_ctx == NULL) {
1827 		spdk_bdev_module_examine_done(&compress_if);
1828 		return;
1829 	}
1830 
1831 	rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
1832 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1833 	if (rc) {
1834 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1835 		free(meta_ctx);
1836 		spdk_bdev_module_examine_done(&compress_if);
1837 		return;
1838 	}
1839 
1840 	/* Save the thread where the base device is opened */
1841 	meta_ctx->thread = spdk_get_thread();
1842 
1843 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1844 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1845 }
1846 
1847 int
1848 compress_set_pmd(enum compress_pmd *opts)
1849 {
1850 	g_opts = *opts;
1851 
1852 	return 0;
1853 }
1854 
1855 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
1856