xref: /spdk/module/bdev/compress/vbdev_compress.c (revision cc6920a4763d4b9a43aa40583c8397d8f14fa100)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "vbdev_compress.h"
36 
37 #include "spdk/reduce.h"
38 #include "spdk/stdinc.h"
39 #include "spdk/rpc.h"
40 #include "spdk/env.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 #include <rte_mbuf_dyn.h>
54 
55 /* Used to store IO context in mbuf */
56 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
57 	.name = "context_reduce",
58 	.size = sizeof(uint64_t),
59 	.align = __alignof__(uint64_t),
60 	.flags = 0,
61 };
62 static int g_mbuf_offset;
63 
64 #define NUM_MAX_XFORMS 2
65 #define NUM_MAX_INFLIGHT_OPS 128
66 #define DEFAULT_WINDOW_SIZE 15
67 /* We need extra mbufs per operation to accommodate host buffers that
68  *  span a 2MB boundary.
69  */
70 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
71 #define CHUNK_SIZE (1024 * 16)
72 #define COMP_BDEV_NAME "compress"
73 #define BACKING_IO_SZ (4 * 1024)
74 
75 #define ISAL_PMD "compress_isal"
76 #define QAT_PMD "compress_qat"
77 #define MLX5_PMD "mlx5_pci"
78 #define NUM_MBUFS		8192
79 #define POOL_CACHE_SIZE		256
80 
81 static enum compress_pmd g_opts;
82 
83 /* Global list of available compression devices. */
84 struct compress_dev {
85 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
86 	uint8_t				cdev_id;	/* identifier for the device */
87 	void				*comp_xform;	/* shared private xform for comp on this PMD */
88 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
89 	TAILQ_ENTRY(compress_dev)	link;
90 };
91 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
92 
93 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
94  * the length of the internal ring name that it creates, it breaks a limit in the generic
95  * ring code and fails the qp initialization.
96  */
97 #define MAX_NUM_QP 99
98 /* Global list and lock for unique device/queue pair combos */
99 struct comp_device_qp {
100 	struct compress_dev		*device;	/* ptr to compression device */
101 	uint8_t				qp;		/* queue pair for this node */
102 	struct spdk_thread		*thread;	/* thread that this qp is assigned to */
103 	TAILQ_ENTRY(comp_device_qp)	link;
104 };
105 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
106 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
107 
108 /* For queueing up compression operations that we can't submit for some reason */
109 struct vbdev_comp_op {
110 	struct spdk_reduce_backing_dev	*backing_dev;
111 	struct iovec			*src_iovs;
112 	int				src_iovcnt;
113 	struct iovec			*dst_iovs;
114 	int				dst_iovcnt;
115 	bool				compress;
116 	void				*cb_arg;
117 	TAILQ_ENTRY(vbdev_comp_op)	link;
118 };
119 
120 struct vbdev_comp_delete_ctx {
121 	spdk_delete_compress_complete	cb_fn;
122 	void				*cb_arg;
123 	int				cb_rc;
124 	struct spdk_thread		*orig_thread;
125 };
126 
127 /* List of virtual bdevs and associated info for each. */
128 struct vbdev_compress {
129 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
130 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
131 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
132 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
133 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
134 	char				*drv_name;	/* name of the compression device driver */
135 	struct comp_device_qp		*device_qp;
136 	struct spdk_thread		*reduce_thread;
137 	pthread_mutex_t			reduce_lock;
138 	uint32_t			ch_count;
139 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
140 	struct spdk_poller		*poller;	/* completion poller */
141 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
142 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
143 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
144 	struct vbdev_comp_delete_ctx	*delete_ctx;
145 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
146 	int				reduce_errno;
147 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
148 	TAILQ_ENTRY(vbdev_compress)	link;
149 	struct spdk_thread		*thread;	/* thread where base device is opened */
150 };
151 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
152 
153 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
154  */
155 struct comp_io_channel {
156 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
157 };
158 
159 /* Per I/O context for the compression vbdev. */
160 struct comp_bdev_io {
161 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
162 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
163 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
164 	struct spdk_bdev_io		*orig_io;		/* the original IO */
165 	struct spdk_io_channel		*ch;			/* for resubmission */
166 	int				status;			/* save for completion on orig thread */
167 };
168 
169 /* Shared mempools between all devices on this system */
170 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
171 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
172 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
173 static bool g_qat_available = false;
174 static bool g_isal_available = false;
175 static bool g_mlx5_pci_available = false;
176 
177 /* Create shared (between all ops per PMD) compress xforms. */
178 static struct rte_comp_xform g_comp_xform = {
179 	.type = RTE_COMP_COMPRESS,
180 	.compress = {
181 		.algo = RTE_COMP_ALGO_DEFLATE,
182 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
183 		.level = RTE_COMP_LEVEL_MAX,
184 		.window_size = DEFAULT_WINDOW_SIZE,
185 		.chksum = RTE_COMP_CHECKSUM_NONE,
186 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
187 	}
188 };
189 /* Create shared (between all ops per PMD) decompress xforms. */
190 static struct rte_comp_xform g_decomp_xform = {
191 	.type = RTE_COMP_DECOMPRESS,
192 	.decompress = {
193 		.algo = RTE_COMP_ALGO_DEFLATE,
194 		.chksum = RTE_COMP_CHECKSUM_NONE,
195 		.window_size = DEFAULT_WINDOW_SIZE,
196 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
197 	}
198 };
199 
200 static void vbdev_compress_examine(struct spdk_bdev *bdev);
201 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
202 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
203 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
204 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
205 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
206 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
207 
208 /* Dummy function used by DPDK to free ext attached buffers
209  * to mbufs, we free them ourselves but this callback has to
210  * be here.
211  */
212 static void
213 shinfo_free_cb(void *arg1, void *arg2)
214 {
215 }
216 
217 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
218 static int
219 create_compress_dev(uint8_t index)
220 {
221 	struct compress_dev *device;
222 	uint16_t q_pairs;
223 	uint8_t cdev_id;
224 	int rc, i;
225 	struct comp_device_qp *dev_qp;
226 	struct comp_device_qp *tmp_qp;
227 
228 	device = calloc(1, sizeof(struct compress_dev));
229 	if (!device) {
230 		return -ENOMEM;
231 	}
232 
233 	/* Get details about this device. */
234 	rte_compressdev_info_get(index, &device->cdev_info);
235 
236 	cdev_id = device->cdev_id = index;
237 
238 	/* Zero means no limit so choose number of lcores. */
239 	if (device->cdev_info.max_nb_queue_pairs == 0) {
240 		q_pairs = MAX_NUM_QP;
241 	} else {
242 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
243 	}
244 
245 	/* Configure the compression device. */
246 	struct rte_compressdev_config config = {
247 		.socket_id = rte_socket_id(),
248 		.nb_queue_pairs = q_pairs,
249 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
250 		.max_nb_streams = 0
251 	};
252 	rc = rte_compressdev_configure(cdev_id, &config);
253 	if (rc < 0) {
254 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
255 		goto err;
256 	}
257 
258 	/* Pre-setup all potential qpairs now and assign them in the channel
259 	 * callback.
260 	 */
261 	for (i = 0; i < q_pairs; i++) {
262 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
263 						      NUM_MAX_INFLIGHT_OPS,
264 						      rte_socket_id());
265 		if (rc) {
266 			if (i > 0) {
267 				q_pairs = i;
268 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
269 					       "compressdev %u with error %u "
270 					       "so limiting to %u qpairs\n",
271 					       cdev_id, rc, q_pairs);
272 				break;
273 			} else {
274 				SPDK_ERRLOG("Failed to setup queue pair on "
275 					    "compressdev %u with error %u\n", cdev_id, rc);
276 				rc = -EINVAL;
277 				goto err;
278 			}
279 		}
280 	}
281 
282 	rc = rte_compressdev_start(cdev_id);
283 	if (rc < 0) {
284 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
285 			    cdev_id, rc);
286 		goto err;
287 	}
288 
289 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
290 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
291 				&device->comp_xform);
292 		if (rc < 0) {
293 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
294 				    cdev_id, rc);
295 			goto err;
296 		}
297 
298 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
299 				&device->decomp_xform);
300 		if (rc) {
301 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
302 				    cdev_id, rc);
303 			goto err;
304 		}
305 	} else {
306 		SPDK_ERRLOG("PMD does not support shared transforms\n");
307 		goto err;
308 	}
309 
310 	/* Build up list of device/qp combinations */
311 	for (i = 0; i < q_pairs; i++) {
312 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
313 		if (!dev_qp) {
314 			rc = -ENOMEM;
315 			goto err;
316 		}
317 		dev_qp->device = device;
318 		dev_qp->qp = i;
319 		dev_qp->thread = NULL;
320 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
321 	}
322 
323 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
324 
325 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
326 		g_qat_available = true;
327 	}
328 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
329 		g_isal_available = true;
330 	}
331 	if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
332 		g_mlx5_pci_available = true;
333 	}
334 
335 	return 0;
336 
337 err:
338 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
339 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
340 		free(dev_qp);
341 	}
342 	free(device);
343 	return rc;
344 }
345 
346 /* Called from driver init entry point, vbdev_compress_init() */
347 static int
348 vbdev_init_compress_drivers(void)
349 {
350 	uint8_t cdev_count, i;
351 	struct compress_dev *tmp_dev;
352 	struct compress_dev *device;
353 	int rc;
354 
355 	/* We always init the compress_isal PMD */
356 	rc = rte_vdev_init(ISAL_PMD, NULL);
357 	if (rc == 0) {
358 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
359 	} else if (rc == -EEXIST) {
360 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
361 	} else {
362 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
363 		return -EINVAL;
364 	}
365 
366 	/* If we have no compression devices, there's no reason to continue. */
367 	cdev_count = rte_compressdev_count();
368 	if (cdev_count == 0) {
369 		return 0;
370 	}
371 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
372 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
373 		return -EINVAL;
374 	}
375 
376 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
377 	if (g_mbuf_offset < 0) {
378 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
379 		return -EINVAL;
380 	}
381 
382 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
383 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
384 	if (g_mbuf_mp == NULL) {
385 		SPDK_ERRLOG("Cannot create mbuf pool\n");
386 		rc = -ENOMEM;
387 		goto error_create_mbuf;
388 	}
389 
390 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
391 					       0, rte_socket_id());
392 	if (g_comp_op_mp == NULL) {
393 		SPDK_ERRLOG("Cannot create comp op pool\n");
394 		rc = -ENOMEM;
395 		goto error_create_op;
396 	}
397 
398 	/* Init all devices */
399 	for (i = 0; i < cdev_count; i++) {
400 		rc = create_compress_dev(i);
401 		if (rc != 0) {
402 			goto error_create_compress_devs;
403 		}
404 	}
405 
406 	if (g_qat_available == true) {
407 		SPDK_NOTICELOG("initialized QAT PMD\n");
408 	}
409 
410 	g_shinfo.free_cb = shinfo_free_cb;
411 
412 	return 0;
413 
414 	/* Error cleanup paths. */
415 error_create_compress_devs:
416 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
417 		TAILQ_REMOVE(&g_compress_devs, device, link);
418 		free(device);
419 	}
420 error_create_op:
421 error_create_mbuf:
422 	rte_mempool_free(g_mbuf_mp);
423 
424 	return rc;
425 }
426 
427 /* for completing rw requests on the orig IO thread. */
428 static void
429 _reduce_rw_blocks_cb(void *arg)
430 {
431 	struct comp_bdev_io *io_ctx = arg;
432 
433 	if (io_ctx->status == 0) {
434 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
435 	} else {
436 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
437 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
438 	}
439 }
440 
441 /* Completion callback for r/w that were issued via reducelib. */
442 static void
443 reduce_rw_blocks_cb(void *arg, int reduce_errno)
444 {
445 	struct spdk_bdev_io *bdev_io = arg;
446 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
447 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
448 	struct spdk_thread *orig_thread;
449 
450 	/* TODO: need to decide which error codes are bdev_io success vs failure;
451 	 * example examine calls reading metadata */
452 
453 	io_ctx->status = reduce_errno;
454 
455 	/* Send this request to the orig IO thread. */
456 	orig_thread = spdk_io_channel_get_thread(ch);
457 	if (orig_thread != spdk_get_thread()) {
458 		spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
459 	} else {
460 		_reduce_rw_blocks_cb(io_ctx);
461 	}
462 }
463 
464 static uint64_t
465 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
466 		     struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
467 {
468 	uint64_t updated_length, remainder, phys_addr;
469 	uint8_t *current_base = NULL;
470 	int iov_index, mbuf_index;
471 	int rc = 0;
472 
473 	/* Setup mbufs */
474 	iov_index = mbuf_index = 0;
475 	while (iov_index < iovcnt) {
476 
477 		current_base = iovs[iov_index].iov_base;
478 		if (total_length) {
479 			*total_length += iovs[iov_index].iov_len;
480 		}
481 		assert(mbufs[mbuf_index] != NULL);
482 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
483 		updated_length = iovs[iov_index].iov_len;
484 		phys_addr = spdk_vtophys((void *)current_base, &updated_length);
485 
486 		rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
487 					  current_base,
488 					  phys_addr,
489 					  updated_length,
490 					  &g_shinfo);
491 		rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
492 		remainder = iovs[iov_index].iov_len - updated_length;
493 
494 		if (mbuf_index > 0) {
495 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
496 		}
497 
498 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
499 		if (remainder > 0) {
500 			/* allocate an mbuf at the end of the array */
501 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
502 						    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
503 			if (rc) {
504 				SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
505 				return -1;
506 			}
507 			(*mbuf_total)++;
508 			mbuf_index++;
509 			*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
510 			current_base += updated_length;
511 			phys_addr = spdk_vtophys((void *)current_base, &remainder);
512 			/* assert we don't cross another */
513 			assert(remainder == iovs[iov_index].iov_len - updated_length);
514 
515 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
516 						  current_base,
517 						  phys_addr,
518 						  remainder,
519 						  &g_shinfo);
520 			rte_pktmbuf_append(mbufs[mbuf_index], remainder);
521 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
522 		}
523 		iov_index++;
524 		mbuf_index++;
525 	}
526 
527 	return 0;
528 }
529 
530 static int
531 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
532 		    int src_iovcnt, struct iovec *dst_iovs,
533 		    int dst_iovcnt, bool compress, void *cb_arg)
534 {
535 	void *reduce_cb_arg = cb_arg;
536 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
537 					   backing_dev);
538 	struct rte_comp_op *comp_op;
539 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
540 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
541 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
542 	uint64_t total_length = 0;
543 	int rc = 0;
544 	struct vbdev_comp_op *op_to_queue;
545 	int i;
546 	int src_mbuf_total = src_iovcnt;
547 	int dst_mbuf_total = dst_iovcnt;
548 	bool device_error = false;
549 
550 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
551 
552 #ifdef DEBUG
553 	memset(src_mbufs, 0, sizeof(src_mbufs));
554 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
555 #endif
556 
557 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
558 	if (!comp_op) {
559 		SPDK_ERRLOG("trying to get a comp op!\n");
560 		goto error_get_op;
561 	}
562 
563 	/* get an mbuf per iov, src and dst */
564 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
565 	if (rc) {
566 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
567 		goto error_get_src;
568 	}
569 
570 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
571 	if (rc) {
572 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
573 		goto error_get_dst;
574 	}
575 
576 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
577 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
578 	 * and associate with our single comp_op.
579 	 */
580 
581 	rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length,
582 				  src_iovs, src_iovcnt, reduce_cb_arg);
583 	if (rc < 0) {
584 		goto error_src_dst;
585 	}
586 
587 	comp_op->m_src = src_mbufs[0];
588 	comp_op->src.offset = 0;
589 	comp_op->src.length = total_length;
590 
591 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
592 	rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL,
593 				  dst_iovs, dst_iovcnt, reduce_cb_arg);
594 	if (rc < 0) {
595 		goto error_src_dst;
596 	}
597 
598 	comp_op->m_dst = dst_mbufs[0];
599 	comp_op->dst.offset = 0;
600 
601 	if (compress == true) {
602 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
603 	} else {
604 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
605 	}
606 
607 	comp_op->op_type = RTE_COMP_OP_STATELESS;
608 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
609 
610 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
611 	assert(rc <= 1);
612 
613 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
614 	if (rc == 1) {
615 		return 0;
616 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
617 		/* we free mbufs differently depending on whether they were chained or not */
618 		rte_pktmbuf_free(comp_op->m_src);
619 		rte_pktmbuf_free(comp_op->m_dst);
620 		goto error_enqueue;
621 	} else {
622 		device_error = true;
623 		goto error_src_dst;
624 	}
625 
626 	/* Error cleanup paths. */
627 error_src_dst:
628 	for (i = 0; i < dst_mbuf_total; i++) {
629 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
630 	}
631 error_get_dst:
632 	for (i = 0; i < src_mbuf_total; i++) {
633 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
634 	}
635 error_get_src:
636 error_enqueue:
637 	rte_comp_op_free(comp_op);
638 error_get_op:
639 
640 	if (device_error == true) {
641 		/* There was an error sending the op to the device, most
642 		 * likely with the parameters.
643 		 */
644 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
645 		return -EINVAL;
646 	}
647 
648 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
649 	if (op_to_queue == NULL) {
650 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
651 		return -ENOMEM;
652 	}
653 	op_to_queue->backing_dev = backing_dev;
654 	op_to_queue->src_iovs = src_iovs;
655 	op_to_queue->src_iovcnt = src_iovcnt;
656 	op_to_queue->dst_iovs = dst_iovs;
657 	op_to_queue->dst_iovcnt = dst_iovcnt;
658 	op_to_queue->compress = compress;
659 	op_to_queue->cb_arg = cb_arg;
660 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
661 			  op_to_queue,
662 			  link);
663 	return 0;
664 }
665 
666 /* Poller for the DPDK compression driver. */
667 static int
668 comp_dev_poller(void *args)
669 {
670 	struct vbdev_compress *comp_bdev = args;
671 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
672 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
673 	uint16_t num_deq;
674 	struct spdk_reduce_vol_cb_args *reduce_args;
675 	struct vbdev_comp_op *op_to_resubmit;
676 	int rc, i;
677 
678 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
679 						NUM_MAX_INFLIGHT_OPS);
680 	for (i = 0; i < num_deq; i++) {
681 		reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
682 				uint64_t *);
683 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
684 
685 			/* tell reduce this is done and what the bytecount was */
686 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
687 		} else {
688 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
689 				       deq_ops[i]->status);
690 
691 			/* Reduce will simply store uncompressed on neg errno value. */
692 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
693 		}
694 
695 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
696 		 * call takes care of freeing all of the mbufs in the chain back to their
697 		 * original pool.
698 		 */
699 		rte_pktmbuf_free(deq_ops[i]->m_src);
700 		rte_pktmbuf_free(deq_ops[i]->m_dst);
701 
702 		/* There is no bulk free for com ops so we have to free them one at a time
703 		 * here however it would be rare that we'd ever have more than 1 at a time
704 		 * anyways.
705 		 */
706 		rte_comp_op_free(deq_ops[i]);
707 
708 		/* Check if there are any pending comp ops to process, only pull one
709 		 * at a time off as _compress_operation() may re-queue the op.
710 		 */
711 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
712 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
713 			rc = _compress_operation(op_to_resubmit->backing_dev,
714 						 op_to_resubmit->src_iovs,
715 						 op_to_resubmit->src_iovcnt,
716 						 op_to_resubmit->dst_iovs,
717 						 op_to_resubmit->dst_iovcnt,
718 						 op_to_resubmit->compress,
719 						 op_to_resubmit->cb_arg);
720 			if (rc == 0) {
721 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
722 				free(op_to_resubmit);
723 			}
724 		}
725 	}
726 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
727 }
728 
729 /* Entry point for reduce lib to issue a compress operation. */
730 static void
731 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
732 		      struct iovec *src_iovs, int src_iovcnt,
733 		      struct iovec *dst_iovs, int dst_iovcnt,
734 		      struct spdk_reduce_vol_cb_args *cb_arg)
735 {
736 	int rc;
737 
738 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
739 	if (rc) {
740 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
741 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
742 	}
743 }
744 
745 /* Entry point for reduce lib to issue a decompress operation. */
746 static void
747 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
748 			struct iovec *src_iovs, int src_iovcnt,
749 			struct iovec *dst_iovs, int dst_iovcnt,
750 			struct spdk_reduce_vol_cb_args *cb_arg)
751 {
752 	int rc;
753 
754 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
755 	if (rc) {
756 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
757 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
758 	}
759 }
760 
761 /* Callback for getting a buf from the bdev pool in the event that the caller passed
762  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
763  * beneath us before we're done with it.
764  */
765 static void
766 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
767 {
768 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
769 					   comp_bdev);
770 
771 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
772 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
773 			      reduce_rw_blocks_cb, bdev_io);
774 }
775 
776 /* scheduled for completion on IO thread */
777 static void
778 _complete_other_io(void *arg)
779 {
780 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
781 	if (io_ctx->status == 0) {
782 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
783 	} else {
784 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
785 	}
786 }
787 
788 /* scheduled for submission on reduce thread */
789 static void
790 _comp_bdev_io_submit(void *arg)
791 {
792 	struct spdk_bdev_io *bdev_io = arg;
793 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
794 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
795 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
796 					   comp_bdev);
797 	struct spdk_thread *orig_thread;
798 	int rc = 0;
799 
800 	switch (bdev_io->type) {
801 	case SPDK_BDEV_IO_TYPE_READ:
802 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
803 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
804 		return;
805 	case SPDK_BDEV_IO_TYPE_WRITE:
806 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
807 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
808 				       reduce_rw_blocks_cb, bdev_io);
809 		return;
810 	/* TODO in future patch in the series */
811 	case SPDK_BDEV_IO_TYPE_RESET:
812 		break;
813 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
814 	case SPDK_BDEV_IO_TYPE_UNMAP:
815 	case SPDK_BDEV_IO_TYPE_FLUSH:
816 	default:
817 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
818 		rc = -EINVAL;
819 	}
820 
821 	if (rc) {
822 		if (rc == -ENOMEM) {
823 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
824 			io_ctx->ch = ch;
825 			vbdev_compress_queue_io(bdev_io);
826 			return;
827 		} else {
828 			SPDK_ERRLOG("on bdev_io submission!\n");
829 			io_ctx->status = rc;
830 		}
831 	}
832 
833 	/* Complete this on the orig IO thread. */
834 	orig_thread = spdk_io_channel_get_thread(ch);
835 	if (orig_thread != spdk_get_thread()) {
836 		spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
837 	} else {
838 		_complete_other_io(io_ctx);
839 	}
840 }
841 
842 /* Called when someone above submits IO to this vbdev. */
843 static void
844 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
845 {
846 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
847 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
848 					   comp_bdev);
849 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
850 
851 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
852 	io_ctx->comp_bdev = comp_bdev;
853 	io_ctx->comp_ch = comp_ch;
854 	io_ctx->orig_io = bdev_io;
855 
856 	/* Send this request to the reduce_thread if that's not what we're on. */
857 	if (spdk_get_thread() != comp_bdev->reduce_thread) {
858 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
859 	} else {
860 		_comp_bdev_io_submit(bdev_io);
861 	}
862 }
863 
864 static bool
865 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
866 {
867 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
868 
869 	switch (io_type) {
870 	case SPDK_BDEV_IO_TYPE_READ:
871 	case SPDK_BDEV_IO_TYPE_WRITE:
872 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
873 	case SPDK_BDEV_IO_TYPE_UNMAP:
874 	case SPDK_BDEV_IO_TYPE_RESET:
875 	case SPDK_BDEV_IO_TYPE_FLUSH:
876 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
877 	default:
878 		return false;
879 	}
880 }
881 
882 /* Resubmission function used by the bdev layer when a queued IO is ready to be
883  * submitted.
884  */
885 static void
886 vbdev_compress_resubmit_io(void *arg)
887 {
888 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
889 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
890 
891 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
892 }
893 
894 /* Used to queue an IO in the event of resource issues. */
895 static void
896 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
897 {
898 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
899 	int rc;
900 
901 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
902 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
903 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
904 
905 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
906 	if (rc) {
907 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
908 		assert(false);
909 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
910 	}
911 }
912 
913 /* Callback for unregistering the IO device. */
914 static void
915 _device_unregister_cb(void *io_device)
916 {
917 	struct vbdev_compress *comp_bdev = io_device;
918 
919 	/* Done with this comp_bdev. */
920 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
921 	free(comp_bdev->comp_bdev.name);
922 	free(comp_bdev);
923 }
924 
925 static void
926 _vbdev_compress_destruct_cb(void *ctx)
927 {
928 	struct vbdev_compress *comp_bdev = ctx;
929 
930 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
931 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
932 	/* Close the underlying bdev on its same opened thread. */
933 	spdk_bdev_close(comp_bdev->base_desc);
934 	comp_bdev->vol = NULL;
935 	if (comp_bdev->orphaned == false) {
936 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
937 	} else {
938 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
939 		_device_unregister_cb(comp_bdev);
940 	}
941 }
942 
943 static void
944 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
945 {
946 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
947 
948 	if (reduce_errno) {
949 		SPDK_ERRLOG("number %d\n", reduce_errno);
950 	} else {
951 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
952 			spdk_thread_send_msg(comp_bdev->thread,
953 					     _vbdev_compress_destruct_cb, comp_bdev);
954 		} else {
955 			_vbdev_compress_destruct_cb(comp_bdev);
956 		}
957 	}
958 }
959 
960 static void
961 _reduce_destroy_cb(void *ctx, int reduce_errno)
962 {
963 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
964 
965 	if (reduce_errno) {
966 		SPDK_ERRLOG("number %d\n", reduce_errno);
967 	}
968 
969 	comp_bdev->vol = NULL;
970 	spdk_put_io_channel(comp_bdev->base_ch);
971 	if (comp_bdev->orphaned == false) {
972 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
973 				     comp_bdev->delete_ctx);
974 	} else {
975 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
976 	}
977 
978 }
979 
980 static void
981 _delete_vol_unload_cb(void *ctx)
982 {
983 	struct vbdev_compress *comp_bdev = ctx;
984 
985 	/* FIXME: Assert if these conditions are not satisfied for now. */
986 	assert(!comp_bdev->reduce_thread ||
987 	       comp_bdev->reduce_thread == spdk_get_thread());
988 
989 	/* reducelib needs a channel to comm with the backing device */
990 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
991 
992 	/* Clean the device before we free our resources. */
993 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
994 }
995 
996 /* Called by reduceLib after performing unload vol actions */
997 static void
998 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
999 {
1000 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1001 
1002 	if (reduce_errno) {
1003 		SPDK_ERRLOG("number %d\n", reduce_errno);
1004 		/* FIXME: callback should be executed. */
1005 		return;
1006 	}
1007 
1008 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1009 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
1010 		spdk_thread_send_msg(comp_bdev->reduce_thread,
1011 				     _delete_vol_unload_cb, comp_bdev);
1012 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1013 	} else {
1014 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1015 
1016 		_delete_vol_unload_cb(comp_bdev);
1017 	}
1018 }
1019 
1020 const char *
1021 compress_get_name(const struct vbdev_compress *comp_bdev)
1022 {
1023 	return comp_bdev->comp_bdev.name;
1024 }
1025 
1026 struct vbdev_compress *
1027 compress_bdev_first(void)
1028 {
1029 	struct vbdev_compress *comp_bdev;
1030 
1031 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1032 
1033 	return comp_bdev;
1034 }
1035 
1036 struct vbdev_compress *
1037 compress_bdev_next(struct vbdev_compress *prev)
1038 {
1039 	struct vbdev_compress *comp_bdev;
1040 
1041 	comp_bdev = TAILQ_NEXT(prev, link);
1042 
1043 	return comp_bdev;
1044 }
1045 
1046 bool
1047 compress_has_orphan(const char *name)
1048 {
1049 	struct vbdev_compress *comp_bdev;
1050 
1051 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1052 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1053 			return true;
1054 		}
1055 	}
1056 	return false;
1057 }
1058 
1059 /* Called after we've unregistered following a hot remove callback.
1060  * Our finish entry point will be called next.
1061  */
1062 static int
1063 vbdev_compress_destruct(void *ctx)
1064 {
1065 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1066 
1067 	if (comp_bdev->vol != NULL) {
1068 		/* Tell reducelib that we're done with this volume. */
1069 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1070 	} else {
1071 		vbdev_compress_destruct_cb(comp_bdev, 0);
1072 	}
1073 
1074 	return 0;
1075 }
1076 
1077 /* We supplied this as an entry point for upper layers who want to communicate to this
1078  * bdev.  This is how they get a channel.
1079  */
1080 static struct spdk_io_channel *
1081 vbdev_compress_get_io_channel(void *ctx)
1082 {
1083 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1084 
1085 	/* The IO channel code will allocate a channel for us which consists of
1086 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1087 	 * that we passed in when we registered our IO device. It will then call
1088 	 * our channel create callback to populate any elements that we need to
1089 	 * update.
1090 	 */
1091 	return spdk_get_io_channel(comp_bdev);
1092 }
1093 
1094 /* This is the output for bdev_get_bdevs() for this vbdev */
1095 static int
1096 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1097 {
1098 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1099 
1100 	spdk_json_write_name(w, "compress");
1101 	spdk_json_write_object_begin(w);
1102 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1103 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1104 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1105 	spdk_json_write_object_end(w);
1106 
1107 	return 0;
1108 }
1109 
1110 /* This is used to generate JSON that can configure this module to its current state. */
1111 static int
1112 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1113 {
1114 	struct vbdev_compress *comp_bdev;
1115 
1116 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1117 		spdk_json_write_object_begin(w);
1118 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1119 		spdk_json_write_named_object_begin(w, "params");
1120 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1121 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1122 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1123 		spdk_json_write_object_end(w);
1124 		spdk_json_write_object_end(w);
1125 	}
1126 	return 0;
1127 }
1128 
1129 static void
1130 _vbdev_reduce_init_cb(void *ctx)
1131 {
1132 	struct vbdev_compress *meta_ctx = ctx;
1133 	int rc;
1134 
1135 	assert(meta_ctx->base_desc != NULL);
1136 
1137 	/* We're done with metadata operations */
1138 	spdk_put_io_channel(meta_ctx->base_ch);
1139 
1140 	if (meta_ctx->vol) {
1141 		rc = vbdev_compress_claim(meta_ctx);
1142 		if (rc == 0) {
1143 			return;
1144 		}
1145 	}
1146 
1147 	/* Close the underlying bdev on its same opened thread. */
1148 	spdk_bdev_close(meta_ctx->base_desc);
1149 	free(meta_ctx);
1150 }
1151 
1152 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1153  * used for initial metadata operations to claim where it will be further filled out
1154  * and added to the global list.
1155  */
1156 static void
1157 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1158 {
1159 	struct vbdev_compress *meta_ctx = cb_arg;
1160 
1161 	if (reduce_errno == 0) {
1162 		meta_ctx->vol = vol;
1163 	} else {
1164 		SPDK_ERRLOG("for vol %s, error %u\n",
1165 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1166 	}
1167 
1168 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1169 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
1170 	} else {
1171 		_vbdev_reduce_init_cb(meta_ctx);
1172 	}
1173 }
1174 
1175 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1176  * call the callback provided by reduceLib when it called the read/write/unmap function and
1177  * free the bdev_io.
1178  */
1179 static void
1180 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1181 {
1182 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1183 	int reduce_errno;
1184 
1185 	if (success) {
1186 		reduce_errno = 0;
1187 	} else {
1188 		reduce_errno = -EIO;
1189 	}
1190 	spdk_bdev_free_io(bdev_io);
1191 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1192 }
1193 
1194 /* This is the function provided to the reduceLib for sending reads directly to
1195  * the backing device.
1196  */
1197 static void
1198 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1199 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1200 {
1201 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1202 					   backing_dev);
1203 	int rc;
1204 
1205 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1206 				    iov, iovcnt, lba, lba_count,
1207 				    comp_reduce_io_cb,
1208 				    args);
1209 	if (rc) {
1210 		if (rc == -ENOMEM) {
1211 			SPDK_ERRLOG("No memory, start to queue io.\n");
1212 			/* TODO: there's no bdev_io to queue */
1213 		} else {
1214 			SPDK_ERRLOG("submitting readv request\n");
1215 		}
1216 		args->cb_fn(args->cb_arg, rc);
1217 	}
1218 }
1219 
1220 /* This is the function provided to the reduceLib for sending writes directly to
1221  * the backing device.
1222  */
1223 static void
1224 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1225 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1226 {
1227 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1228 					   backing_dev);
1229 	int rc;
1230 
1231 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1232 				     iov, iovcnt, lba, lba_count,
1233 				     comp_reduce_io_cb,
1234 				     args);
1235 	if (rc) {
1236 		if (rc == -ENOMEM) {
1237 			SPDK_ERRLOG("No memory, start to queue io.\n");
1238 			/* TODO: there's no bdev_io to queue */
1239 		} else {
1240 			SPDK_ERRLOG("error submitting writev request\n");
1241 		}
1242 		args->cb_fn(args->cb_arg, rc);
1243 	}
1244 }
1245 
1246 /* This is the function provided to the reduceLib for sending unmaps directly to
1247  * the backing device.
1248  */
1249 static void
1250 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1251 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1252 {
1253 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1254 					   backing_dev);
1255 	int rc;
1256 
1257 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1258 				    lba, lba_count,
1259 				    comp_reduce_io_cb,
1260 				    args);
1261 
1262 	if (rc) {
1263 		if (rc == -ENOMEM) {
1264 			SPDK_ERRLOG("No memory, start to queue io.\n");
1265 			/* TODO: there's no bdev_io to queue */
1266 		} else {
1267 			SPDK_ERRLOG("submitting unmap request\n");
1268 		}
1269 		args->cb_fn(args->cb_arg, rc);
1270 	}
1271 }
1272 
1273 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1274 static void
1275 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1276 {
1277 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1278 
1279 	if (reduce_errno) {
1280 		SPDK_ERRLOG("number %d\n", reduce_errno);
1281 	}
1282 
1283 	comp_bdev->vol = NULL;
1284 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1285 }
1286 
1287 static void
1288 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1289 {
1290 	struct vbdev_compress *comp_bdev, *tmp;
1291 
1292 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1293 		if (bdev_find == comp_bdev->base_bdev) {
1294 			/* Tell reduceLib that we're done with this volume. */
1295 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1296 		}
1297 	}
1298 }
1299 
1300 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1301 static void
1302 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1303 				  void *event_ctx)
1304 {
1305 	switch (type) {
1306 	case SPDK_BDEV_EVENT_REMOVE:
1307 		vbdev_compress_base_bdev_hotremove_cb(bdev);
1308 		break;
1309 	default:
1310 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1311 		break;
1312 	}
1313 }
1314 
1315 /* TODO: determine which parms we want user configurable, HC for now
1316  * params.vol_size
1317  * params.chunk_size
1318  * compression PMD, algorithm, window size, comp level, etc.
1319  * DEV_MD_PATH
1320  */
1321 
1322 /* Common function for init and load to allocate and populate the minimal
1323  * information for reducelib to init or load.
1324  */
1325 struct vbdev_compress *
1326 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
1327 {
1328 	struct vbdev_compress *meta_ctx;
1329 	struct spdk_bdev *bdev;
1330 
1331 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1332 	if (meta_ctx == NULL) {
1333 		SPDK_ERRLOG("failed to alloc init contexts\n");
1334 		return NULL;
1335 	}
1336 
1337 	meta_ctx->drv_name = "None";
1338 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1339 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1340 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1341 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1342 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1343 
1344 	meta_ctx->base_desc = bdev_desc;
1345 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
1346 	meta_ctx->base_bdev = bdev;
1347 
1348 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1349 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1350 
1351 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1352 	if (lb_size == 0) {
1353 		meta_ctx->params.logical_block_size = bdev->blocklen;
1354 	} else {
1355 		meta_ctx->params.logical_block_size = lb_size;
1356 	}
1357 
1358 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1359 	return meta_ctx;
1360 }
1361 
1362 static bool
1363 _set_pmd(struct vbdev_compress *comp_dev)
1364 {
1365 	if (g_opts == COMPRESS_PMD_AUTO) {
1366 		if (g_qat_available) {
1367 			comp_dev->drv_name = QAT_PMD;
1368 		} else if (g_mlx5_pci_available) {
1369 			comp_dev->drv_name = MLX5_PMD;
1370 		} else {
1371 			comp_dev->drv_name = ISAL_PMD;
1372 		}
1373 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1374 		comp_dev->drv_name = QAT_PMD;
1375 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1376 		comp_dev->drv_name = ISAL_PMD;
1377 	} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
1378 		comp_dev->drv_name = MLX5_PMD;
1379 	} else {
1380 		SPDK_ERRLOG("Requested PMD is not available.\n");
1381 		return false;
1382 	}
1383 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1384 	return true;
1385 }
1386 
1387 /* Call reducelib to initialize a new volume */
1388 static int
1389 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1390 {
1391 	struct spdk_bdev_desc *bdev_desc = NULL;
1392 	struct vbdev_compress *meta_ctx;
1393 	int rc;
1394 
1395 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1396 				NULL, &bdev_desc);
1397 	if (rc) {
1398 		SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
1399 		return rc;
1400 	}
1401 
1402 	meta_ctx = _prepare_for_load_init(bdev_desc, lb_size);
1403 	if (meta_ctx == NULL) {
1404 		spdk_bdev_close(bdev_desc);
1405 		return -EINVAL;
1406 	}
1407 
1408 	if (_set_pmd(meta_ctx) == false) {
1409 		SPDK_ERRLOG("could not find required pmd\n");
1410 		free(meta_ctx);
1411 		spdk_bdev_close(bdev_desc);
1412 		return -EINVAL;
1413 	}
1414 
1415 	/* Save the thread where the base device is opened */
1416 	meta_ctx->thread = spdk_get_thread();
1417 
1418 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1419 
1420 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1421 			     pm_path,
1422 			     vbdev_reduce_init_cb,
1423 			     meta_ctx);
1424 	return 0;
1425 }
1426 
1427 /* We provide this callback for the SPDK channel code to create a channel using
1428  * the channel struct we provided in our module get_io_channel() entry point. Here
1429  * we get and save off an underlying base channel of the device below us so that
1430  * we can communicate with the base bdev on a per channel basis.  If we needed
1431  * our own poller for this vbdev, we'd register it here.
1432  */
1433 static int
1434 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1435 {
1436 	struct vbdev_compress *comp_bdev = io_device;
1437 	struct comp_device_qp *device_qp;
1438 
1439 	/* Now set the reduce channel if it's not already set. */
1440 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1441 	if (comp_bdev->ch_count == 0) {
1442 		/* We use this queue to track outstanding IO in our layer. */
1443 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1444 
1445 		/* We use this to queue up compression operations as needed. */
1446 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1447 
1448 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1449 		comp_bdev->reduce_thread = spdk_get_thread();
1450 		comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
1451 		/* Now assign a q pair */
1452 		pthread_mutex_lock(&g_comp_device_qp_lock);
1453 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1454 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1455 				if (device_qp->thread == spdk_get_thread()) {
1456 					comp_bdev->device_qp = device_qp;
1457 					break;
1458 				}
1459 				if (device_qp->thread == NULL) {
1460 					comp_bdev->device_qp = device_qp;
1461 					device_qp->thread = spdk_get_thread();
1462 					break;
1463 				}
1464 			}
1465 		}
1466 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1467 	}
1468 	comp_bdev->ch_count++;
1469 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1470 
1471 	if (comp_bdev->device_qp != NULL) {
1472 		return 0;
1473 	} else {
1474 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1475 		assert(false);
1476 		return -ENOMEM;
1477 	}
1478 }
1479 
1480 static void
1481 _channel_cleanup(struct vbdev_compress *comp_bdev)
1482 {
1483 	/* Note: comp_bdevs can share a device_qp if they are
1484 	 * on the same thread so we leave the device_qp element
1485 	 * alone for this comp_bdev and just clear the reduce thread.
1486 	 */
1487 	spdk_put_io_channel(comp_bdev->base_ch);
1488 	comp_bdev->reduce_thread = NULL;
1489 	spdk_poller_unregister(&comp_bdev->poller);
1490 }
1491 
1492 /* Used to reroute destroy_ch to the correct thread */
1493 static void
1494 _comp_bdev_ch_destroy_cb(void *arg)
1495 {
1496 	struct vbdev_compress *comp_bdev = arg;
1497 
1498 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1499 	_channel_cleanup(comp_bdev);
1500 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1501 }
1502 
1503 /* We provide this callback for the SPDK channel code to destroy a channel
1504  * created with our create callback. We just need to undo anything we did
1505  * when we created. If this bdev used its own poller, we'd unregister it here.
1506  */
1507 static void
1508 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1509 {
1510 	struct vbdev_compress *comp_bdev = io_device;
1511 
1512 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1513 	comp_bdev->ch_count--;
1514 	if (comp_bdev->ch_count == 0) {
1515 		/* Send this request to the thread where the channel was created. */
1516 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1517 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1518 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1519 		} else {
1520 			_channel_cleanup(comp_bdev);
1521 		}
1522 	}
1523 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1524 }
1525 
1526 /* RPC entry point for compression vbdev creation. */
1527 int
1528 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1529 {
1530 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1531 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1532 		return -EINVAL;
1533 	}
1534 
1535 	return vbdev_init_reduce(bdev_name, pm_path, lb_size);
1536 }
1537 
1538 /* On init, just init the compress drivers. All metadata is stored on disk. */
1539 static int
1540 vbdev_compress_init(void)
1541 {
1542 	if (vbdev_init_compress_drivers()) {
1543 		SPDK_ERRLOG("Error setting up compression devices\n");
1544 		return -EINVAL;
1545 	}
1546 
1547 	return 0;
1548 }
1549 
1550 /* Called when the entire module is being torn down. */
1551 static void
1552 vbdev_compress_finish(void)
1553 {
1554 	struct comp_device_qp *dev_qp;
1555 	/* TODO: unload vol in a future patch */
1556 
1557 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1558 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1559 		free(dev_qp);
1560 	}
1561 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1562 
1563 	rte_mempool_free(g_comp_op_mp);
1564 	rte_mempool_free(g_mbuf_mp);
1565 }
1566 
1567 /* During init we'll be asked how much memory we'd like passed to us
1568  * in bev_io structures as context. Here's where we specify how
1569  * much context we want per IO.
1570  */
1571 static int
1572 vbdev_compress_get_ctx_size(void)
1573 {
1574 	return sizeof(struct comp_bdev_io);
1575 }
1576 
1577 /* When we register our bdev this is how we specify our entry points. */
1578 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1579 	.destruct		= vbdev_compress_destruct,
1580 	.submit_request		= vbdev_compress_submit_request,
1581 	.io_type_supported	= vbdev_compress_io_type_supported,
1582 	.get_io_channel		= vbdev_compress_get_io_channel,
1583 	.dump_info_json		= vbdev_compress_dump_info_json,
1584 	.write_config_json	= NULL,
1585 };
1586 
1587 static struct spdk_bdev_module compress_if = {
1588 	.name = "compress",
1589 	.module_init = vbdev_compress_init,
1590 	.get_ctx_size = vbdev_compress_get_ctx_size,
1591 	.examine_disk = vbdev_compress_examine,
1592 	.module_fini = vbdev_compress_finish,
1593 	.config_json = vbdev_compress_config_json
1594 };
1595 
1596 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1597 
1598 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1599 {
1600 	struct spdk_bdev_alias *aliases;
1601 
1602 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1603 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1604 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1605 		if (!comp_bdev->comp_bdev.name) {
1606 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1607 			return -ENOMEM;
1608 		}
1609 	} else {
1610 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1611 		if (!comp_bdev->comp_bdev.name) {
1612 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1613 			return -ENOMEM;
1614 		}
1615 	}
1616 	return 0;
1617 }
1618 
1619 static int
1620 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1621 {
1622 	int rc;
1623 
1624 	if (_set_compbdev_name(comp_bdev)) {
1625 		return -EINVAL;
1626 	}
1627 
1628 	/* Note: some of the fields below will change in the future - for example,
1629 	 * blockcnt specifically will not match (the compressed volume size will
1630 	 * be slightly less than the base bdev size)
1631 	 */
1632 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1633 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1634 
1635 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1636 		comp_bdev->comp_bdev.required_alignment =
1637 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1638 				 comp_bdev->base_bdev->required_alignment);
1639 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1640 			       comp_bdev->comp_bdev.required_alignment);
1641 	} else {
1642 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1643 	}
1644 	comp_bdev->comp_bdev.optimal_io_boundary =
1645 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1646 
1647 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1648 
1649 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1650 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1651 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1652 
1653 	/* This is the context that is passed to us when the bdev
1654 	 * layer calls in so we'll save our comp_bdev node here.
1655 	 */
1656 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1657 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1658 	comp_bdev->comp_bdev.module = &compress_if;
1659 
1660 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1661 
1662 	/* Save the thread where the base device is opened */
1663 	comp_bdev->thread = spdk_get_thread();
1664 
1665 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1666 				sizeof(struct comp_io_channel),
1667 				comp_bdev->comp_bdev.name);
1668 
1669 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1670 					 comp_bdev->comp_bdev.module);
1671 	if (rc) {
1672 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1673 		goto error_claim;
1674 	}
1675 
1676 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1677 	if (rc < 0) {
1678 		SPDK_ERRLOG("trying to register bdev\n");
1679 		goto error_bdev_register;
1680 	}
1681 
1682 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1683 
1684 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1685 
1686 	return 0;
1687 
1688 	/* Error cleanup paths. */
1689 error_bdev_register:
1690 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1691 error_claim:
1692 	spdk_io_device_unregister(comp_bdev, NULL);
1693 	free(comp_bdev->comp_bdev.name);
1694 	return rc;
1695 }
1696 
1697 static void
1698 _vbdev_compress_delete_done(void *_ctx)
1699 {
1700 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1701 
1702 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1703 
1704 	free(ctx);
1705 }
1706 
1707 static void
1708 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1709 {
1710 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1711 
1712 	ctx->cb_rc = bdeverrno;
1713 
1714 	if (ctx->orig_thread != spdk_get_thread()) {
1715 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1716 	} else {
1717 		_vbdev_compress_delete_done(ctx);
1718 	}
1719 }
1720 
1721 void
1722 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1723 {
1724 	struct vbdev_compress *comp_bdev = NULL;
1725 	struct vbdev_comp_delete_ctx *ctx;
1726 
1727 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1728 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1729 			break;
1730 		}
1731 	}
1732 
1733 	if (comp_bdev == NULL) {
1734 		cb_fn(cb_arg, -ENODEV);
1735 		return;
1736 	}
1737 
1738 	ctx = calloc(1, sizeof(*ctx));
1739 	if (ctx == NULL) {
1740 		SPDK_ERRLOG("Failed to allocate delete context\n");
1741 		cb_fn(cb_arg, -ENOMEM);
1742 		return;
1743 	}
1744 
1745 	/* Save these for after the vol is destroyed. */
1746 	ctx->cb_fn = cb_fn;
1747 	ctx->cb_arg = cb_arg;
1748 	ctx->orig_thread = spdk_get_thread();
1749 
1750 	comp_bdev->delete_ctx = ctx;
1751 
1752 	/* Tell reducelib that we're done with this volume. */
1753 	if (comp_bdev->orphaned == false) {
1754 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1755 	} else {
1756 		delete_vol_unload_cb(comp_bdev, 0);
1757 	}
1758 }
1759 
1760 static void
1761 _vbdev_reduce_load_cb(void *ctx)
1762 {
1763 	struct vbdev_compress *meta_ctx = ctx;
1764 	int rc;
1765 
1766 	assert(meta_ctx->base_desc != NULL);
1767 
1768 	/* Done with metadata operations */
1769 	spdk_put_io_channel(meta_ctx->base_ch);
1770 
1771 	if (meta_ctx->reduce_errno == 0) {
1772 		if (_set_pmd(meta_ctx) == false) {
1773 			SPDK_ERRLOG("could not find required pmd\n");
1774 			goto err;
1775 		}
1776 
1777 		rc = vbdev_compress_claim(meta_ctx);
1778 		if (rc != 0) {
1779 			goto err;
1780 		}
1781 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1782 		if (_set_compbdev_name(meta_ctx)) {
1783 			goto err;
1784 		}
1785 
1786 		/* Save the thread where the base device is opened */
1787 		meta_ctx->thread = spdk_get_thread();
1788 
1789 		meta_ctx->comp_bdev.module = &compress_if;
1790 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1791 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1792 						 meta_ctx->comp_bdev.module);
1793 		if (rc) {
1794 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1795 			free(meta_ctx->comp_bdev.name);
1796 			goto err;
1797 		}
1798 
1799 		meta_ctx->orphaned = true;
1800 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1801 	} else {
1802 		if (meta_ctx->reduce_errno != -EILSEQ) {
1803 			SPDK_ERRLOG("for vol %s, error %u\n",
1804 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1805 		}
1806 		goto err;
1807 	}
1808 
1809 	spdk_bdev_module_examine_done(&compress_if);
1810 	return;
1811 
1812 err:
1813 	/* Close the underlying bdev on its same opened thread. */
1814 	spdk_bdev_close(meta_ctx->base_desc);
1815 	free(meta_ctx);
1816 	spdk_bdev_module_examine_done(&compress_if);
1817 }
1818 
1819 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1820  * used for initial metadata operations to claim where it will be further filled out
1821  * and added to the global list.
1822  */
1823 static void
1824 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1825 {
1826 	struct vbdev_compress *meta_ctx = cb_arg;
1827 
1828 	if (reduce_errno == 0) {
1829 		/* Update information following volume load. */
1830 		meta_ctx->vol = vol;
1831 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1832 		       sizeof(struct spdk_reduce_vol_params));
1833 	}
1834 
1835 	meta_ctx->reduce_errno = reduce_errno;
1836 
1837 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1838 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1839 	} else {
1840 		_vbdev_reduce_load_cb(meta_ctx);
1841 	}
1842 
1843 }
1844 
1845 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1846  * and if so will go ahead and claim it.
1847  */
1848 static void
1849 vbdev_compress_examine(struct spdk_bdev *bdev)
1850 {
1851 	struct spdk_bdev_desc *bdev_desc = NULL;
1852 	struct vbdev_compress *meta_ctx;
1853 	int rc;
1854 
1855 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1856 		spdk_bdev_module_examine_done(&compress_if);
1857 		return;
1858 	}
1859 
1860 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1861 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1862 	if (rc) {
1863 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1864 		spdk_bdev_module_examine_done(&compress_if);
1865 		return;
1866 	}
1867 
1868 	meta_ctx = _prepare_for_load_init(bdev_desc, 0);
1869 	if (meta_ctx == NULL) {
1870 		spdk_bdev_close(bdev_desc);
1871 		spdk_bdev_module_examine_done(&compress_if);
1872 		return;
1873 	}
1874 
1875 	/* Save the thread where the base device is opened */
1876 	meta_ctx->thread = spdk_get_thread();
1877 
1878 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1879 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1880 }
1881 
1882 int
1883 compress_set_pmd(enum compress_pmd *opts)
1884 {
1885 	g_opts = *opts;
1886 
1887 	return 0;
1888 }
1889 
1890 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1891