xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 08f9b40113d864c06ab043418b830938bc32face)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "vbdev_compress.h"
36 
37 #include "spdk/reduce.h"
38 #include "spdk/stdinc.h"
39 #include "spdk/rpc.h"
40 #include "spdk/env.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 #include <rte_mbuf_dyn.h>
54 
55 /* Used to store IO context in mbuf */
56 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
57 	.name = "context_reduce",
58 	.size = sizeof(uint64_t),
59 	.align = __alignof__(uint64_t),
60 	.flags = 0,
61 };
62 static int g_mbuf_offset;
63 
64 #define NUM_MAX_XFORMS 2
65 #define NUM_MAX_INFLIGHT_OPS 128
66 #define DEFAULT_WINDOW_SIZE 15
67 /* We need extra mbufs per operation to accommodate host buffers that
68  *  span a 2MB boundary.
69  */
70 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
71 #define CHUNK_SIZE (1024 * 16)
72 #define COMP_BDEV_NAME "compress"
73 #define BACKING_IO_SZ (4 * 1024)
74 
75 #define ISAL_PMD "compress_isal"
76 #define QAT_PMD "compress_qat"
77 #define MLX5_PMD "mlx5_pci"
78 #define NUM_MBUFS		8192
79 #define POOL_CACHE_SIZE		256
80 
81 static enum compress_pmd g_opts;
82 
83 /* Global list of available compression devices. */
84 struct compress_dev {
85 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
86 	uint8_t				cdev_id;	/* identifier for the device */
87 	void				*comp_xform;	/* shared private xform for comp on this PMD */
88 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
89 	TAILQ_ENTRY(compress_dev)	link;
90 };
91 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
92 
93 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
94  * the length of the internal ring name that it creates, it breaks a limit in the generic
95  * ring code and fails the qp initialization.
96  * FIXME: Reduce number of qpairs to 48, due to issue #2338
97  */
98 #define MAX_NUM_QP 48
99 /* Global list and lock for unique device/queue pair combos */
100 struct comp_device_qp {
101 	struct compress_dev		*device;	/* ptr to compression device */
102 	uint8_t				qp;		/* queue pair for this node */
103 	struct spdk_thread		*thread;	/* thread that this qp is assigned to */
104 	TAILQ_ENTRY(comp_device_qp)	link;
105 };
106 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
107 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
108 
109 /* For queueing up compression operations that we can't submit for some reason */
110 struct vbdev_comp_op {
111 	struct spdk_reduce_backing_dev	*backing_dev;
112 	struct iovec			*src_iovs;
113 	int				src_iovcnt;
114 	struct iovec			*dst_iovs;
115 	int				dst_iovcnt;
116 	bool				compress;
117 	void				*cb_arg;
118 	TAILQ_ENTRY(vbdev_comp_op)	link;
119 };
120 
121 struct vbdev_comp_delete_ctx {
122 	spdk_delete_compress_complete	cb_fn;
123 	void				*cb_arg;
124 	int				cb_rc;
125 	struct spdk_thread		*orig_thread;
126 };
127 
128 /* List of virtual bdevs and associated info for each. */
129 struct vbdev_compress {
130 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
131 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
132 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
133 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
134 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
135 	char				*drv_name;	/* name of the compression device driver */
136 	struct comp_device_qp		*device_qp;
137 	struct spdk_thread		*reduce_thread;
138 	pthread_mutex_t			reduce_lock;
139 	uint32_t			ch_count;
140 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
141 	struct spdk_poller		*poller;	/* completion poller */
142 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
143 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
144 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
145 	struct vbdev_comp_delete_ctx	*delete_ctx;
146 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
147 	int				reduce_errno;
148 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
149 	TAILQ_ENTRY(vbdev_compress)	link;
150 	struct spdk_thread		*thread;	/* thread where base device is opened */
151 };
152 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
153 
154 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
155  */
156 struct comp_io_channel {
157 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
158 };
159 
160 /* Per I/O context for the compression vbdev. */
161 struct comp_bdev_io {
162 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
163 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
164 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
165 	struct spdk_bdev_io		*orig_io;		/* the original IO */
166 	struct spdk_io_channel		*ch;			/* for resubmission */
167 	int				status;			/* save for completion on orig thread */
168 };
169 
170 /* Shared mempools between all devices on this system */
171 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
172 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
173 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
174 static bool g_qat_available = false;
175 static bool g_isal_available = false;
176 static bool g_mlx5_pci_available = false;
177 
178 /* Create shared (between all ops per PMD) compress xforms. */
179 static struct rte_comp_xform g_comp_xform = {
180 	.type = RTE_COMP_COMPRESS,
181 	.compress = {
182 		.algo = RTE_COMP_ALGO_DEFLATE,
183 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
184 		.level = RTE_COMP_LEVEL_MAX,
185 		.window_size = DEFAULT_WINDOW_SIZE,
186 		.chksum = RTE_COMP_CHECKSUM_NONE,
187 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
188 	}
189 };
190 /* Create shared (between all ops per PMD) decompress xforms. */
191 static struct rte_comp_xform g_decomp_xform = {
192 	.type = RTE_COMP_DECOMPRESS,
193 	.decompress = {
194 		.algo = RTE_COMP_ALGO_DEFLATE,
195 		.chksum = RTE_COMP_CHECKSUM_NONE,
196 		.window_size = DEFAULT_WINDOW_SIZE,
197 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
198 	}
199 };
200 
201 static void vbdev_compress_examine(struct spdk_bdev *bdev);
202 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
203 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
204 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
205 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
206 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
207 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
208 
209 /* Dummy function used by DPDK to free ext attached buffers
210  * to mbufs, we free them ourselves but this callback has to
211  * be here.
212  */
213 static void
214 shinfo_free_cb(void *arg1, void *arg2)
215 {
216 }
217 
218 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
219 static int
220 create_compress_dev(uint8_t index)
221 {
222 	struct compress_dev *device;
223 	uint16_t q_pairs;
224 	uint8_t cdev_id;
225 	int rc, i;
226 	struct comp_device_qp *dev_qp;
227 	struct comp_device_qp *tmp_qp;
228 
229 	device = calloc(1, sizeof(struct compress_dev));
230 	if (!device) {
231 		return -ENOMEM;
232 	}
233 
234 	/* Get details about this device. */
235 	rte_compressdev_info_get(index, &device->cdev_info);
236 
237 	cdev_id = device->cdev_id = index;
238 
239 	/* Zero means no limit so choose number of lcores. */
240 	if (device->cdev_info.max_nb_queue_pairs == 0) {
241 		q_pairs = MAX_NUM_QP;
242 	} else {
243 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
244 	}
245 
246 	/* Configure the compression device. */
247 	struct rte_compressdev_config config = {
248 		.socket_id = rte_socket_id(),
249 		.nb_queue_pairs = q_pairs,
250 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
251 		.max_nb_streams = 0
252 	};
253 	rc = rte_compressdev_configure(cdev_id, &config);
254 	if (rc < 0) {
255 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
256 		goto err;
257 	}
258 
259 	/* Pre-setup all potential qpairs now and assign them in the channel
260 	 * callback.
261 	 */
262 	for (i = 0; i < q_pairs; i++) {
263 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
264 						      NUM_MAX_INFLIGHT_OPS,
265 						      rte_socket_id());
266 		if (rc) {
267 			if (i > 0) {
268 				q_pairs = i;
269 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
270 					       "compressdev %u with error %u "
271 					       "so limiting to %u qpairs\n",
272 					       cdev_id, rc, q_pairs);
273 				break;
274 			} else {
275 				SPDK_ERRLOG("Failed to setup queue pair on "
276 					    "compressdev %u with error %u\n", cdev_id, rc);
277 				rc = -EINVAL;
278 				goto err;
279 			}
280 		}
281 	}
282 
283 	rc = rte_compressdev_start(cdev_id);
284 	if (rc < 0) {
285 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
286 			    cdev_id, rc);
287 		goto err;
288 	}
289 
290 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
291 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
292 				&device->comp_xform);
293 		if (rc < 0) {
294 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
295 				    cdev_id, rc);
296 			goto err;
297 		}
298 
299 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
300 				&device->decomp_xform);
301 		if (rc) {
302 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
303 				    cdev_id, rc);
304 			goto err;
305 		}
306 	} else {
307 		SPDK_ERRLOG("PMD does not support shared transforms\n");
308 		goto err;
309 	}
310 
311 	/* Build up list of device/qp combinations */
312 	for (i = 0; i < q_pairs; i++) {
313 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
314 		if (!dev_qp) {
315 			rc = -ENOMEM;
316 			goto err;
317 		}
318 		dev_qp->device = device;
319 		dev_qp->qp = i;
320 		dev_qp->thread = NULL;
321 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
322 	}
323 
324 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
325 
326 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
327 		g_qat_available = true;
328 	}
329 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
330 		g_isal_available = true;
331 	}
332 	if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
333 		g_mlx5_pci_available = true;
334 	}
335 
336 	return 0;
337 
338 err:
339 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
340 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
341 		free(dev_qp);
342 	}
343 	free(device);
344 	return rc;
345 }
346 
347 /* Called from driver init entry point, vbdev_compress_init() */
348 static int
349 vbdev_init_compress_drivers(void)
350 {
351 	uint8_t cdev_count, i;
352 	struct compress_dev *tmp_dev;
353 	struct compress_dev *device;
354 	int rc;
355 
356 	/* We always init the compress_isal PMD */
357 	rc = rte_vdev_init(ISAL_PMD, NULL);
358 	if (rc == 0) {
359 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
360 	} else if (rc == -EEXIST) {
361 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
362 	} else {
363 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
364 		return -EINVAL;
365 	}
366 
367 	/* If we have no compression devices, there's no reason to continue. */
368 	cdev_count = rte_compressdev_count();
369 	if (cdev_count == 0) {
370 		return 0;
371 	}
372 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
373 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
374 		return -EINVAL;
375 	}
376 
377 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
378 	if (g_mbuf_offset < 0) {
379 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
380 		return -EINVAL;
381 	}
382 
383 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
384 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
385 	if (g_mbuf_mp == NULL) {
386 		SPDK_ERRLOG("Cannot create mbuf pool\n");
387 		rc = -ENOMEM;
388 		goto error_create_mbuf;
389 	}
390 
391 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
392 					       0, rte_socket_id());
393 	if (g_comp_op_mp == NULL) {
394 		SPDK_ERRLOG("Cannot create comp op pool\n");
395 		rc = -ENOMEM;
396 		goto error_create_op;
397 	}
398 
399 	/* Init all devices */
400 	for (i = 0; i < cdev_count; i++) {
401 		rc = create_compress_dev(i);
402 		if (rc != 0) {
403 			goto error_create_compress_devs;
404 		}
405 	}
406 
407 	if (g_qat_available == true) {
408 		SPDK_NOTICELOG("initialized QAT PMD\n");
409 	}
410 
411 	g_shinfo.free_cb = shinfo_free_cb;
412 
413 	return 0;
414 
415 	/* Error cleanup paths. */
416 error_create_compress_devs:
417 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
418 		TAILQ_REMOVE(&g_compress_devs, device, link);
419 		free(device);
420 	}
421 error_create_op:
422 error_create_mbuf:
423 	rte_mempool_free(g_mbuf_mp);
424 
425 	return rc;
426 }
427 
428 /* for completing rw requests on the orig IO thread. */
429 static void
430 _reduce_rw_blocks_cb(void *arg)
431 {
432 	struct comp_bdev_io *io_ctx = arg;
433 
434 	if (io_ctx->status == 0) {
435 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
436 	} else {
437 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
438 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
439 	}
440 }
441 
442 /* Completion callback for r/w that were issued via reducelib. */
443 static void
444 reduce_rw_blocks_cb(void *arg, int reduce_errno)
445 {
446 	struct spdk_bdev_io *bdev_io = arg;
447 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
448 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
449 	struct spdk_thread *orig_thread;
450 
451 	/* TODO: need to decide which error codes are bdev_io success vs failure;
452 	 * example examine calls reading metadata */
453 
454 	io_ctx->status = reduce_errno;
455 
456 	/* Send this request to the orig IO thread. */
457 	orig_thread = spdk_io_channel_get_thread(ch);
458 	if (orig_thread != spdk_get_thread()) {
459 		spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
460 	} else {
461 		_reduce_rw_blocks_cb(io_ctx);
462 	}
463 }
464 
465 static uint64_t
466 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
467 		     struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
468 {
469 	uint64_t updated_length, remainder, phys_addr;
470 	uint8_t *current_base = NULL;
471 	int iov_index, mbuf_index;
472 	int rc = 0;
473 
474 	/* Setup mbufs */
475 	iov_index = mbuf_index = 0;
476 	while (iov_index < iovcnt) {
477 
478 		current_base = iovs[iov_index].iov_base;
479 		if (total_length) {
480 			*total_length += iovs[iov_index].iov_len;
481 		}
482 		assert(mbufs[mbuf_index] != NULL);
483 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
484 		updated_length = iovs[iov_index].iov_len;
485 		phys_addr = spdk_vtophys((void *)current_base, &updated_length);
486 
487 		rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
488 					  current_base,
489 					  phys_addr,
490 					  updated_length,
491 					  &g_shinfo);
492 		rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
493 		remainder = iovs[iov_index].iov_len - updated_length;
494 
495 		if (mbuf_index > 0) {
496 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
497 		}
498 
499 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
500 		if (remainder > 0) {
501 			/* allocate an mbuf at the end of the array */
502 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
503 						    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
504 			if (rc) {
505 				SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
506 				return -1;
507 			}
508 			(*mbuf_total)++;
509 			mbuf_index++;
510 			*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
511 			current_base += updated_length;
512 			phys_addr = spdk_vtophys((void *)current_base, &remainder);
513 			/* assert we don't cross another */
514 			assert(remainder == iovs[iov_index].iov_len - updated_length);
515 
516 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
517 						  current_base,
518 						  phys_addr,
519 						  remainder,
520 						  &g_shinfo);
521 			rte_pktmbuf_append(mbufs[mbuf_index], remainder);
522 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
523 		}
524 		iov_index++;
525 		mbuf_index++;
526 	}
527 
528 	return 0;
529 }
530 
531 static int
532 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
533 		    int src_iovcnt, struct iovec *dst_iovs,
534 		    int dst_iovcnt, bool compress, void *cb_arg)
535 {
536 	void *reduce_cb_arg = cb_arg;
537 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
538 					   backing_dev);
539 	struct rte_comp_op *comp_op;
540 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
541 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
542 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
543 	uint64_t total_length = 0;
544 	int rc = 0;
545 	struct vbdev_comp_op *op_to_queue;
546 	int i;
547 	int src_mbuf_total = src_iovcnt;
548 	int dst_mbuf_total = dst_iovcnt;
549 	bool device_error = false;
550 
551 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
552 
553 #ifdef DEBUG
554 	memset(src_mbufs, 0, sizeof(src_mbufs));
555 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
556 #endif
557 
558 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
559 	if (!comp_op) {
560 		SPDK_ERRLOG("trying to get a comp op!\n");
561 		goto error_get_op;
562 	}
563 
564 	/* get an mbuf per iov, src and dst */
565 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
566 	if (rc) {
567 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
568 		goto error_get_src;
569 	}
570 
571 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
572 	if (rc) {
573 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
574 		goto error_get_dst;
575 	}
576 
577 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
578 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
579 	 * and associate with our single comp_op.
580 	 */
581 
582 	rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length,
583 				  src_iovs, src_iovcnt, reduce_cb_arg);
584 	if (rc < 0) {
585 		goto error_src_dst;
586 	}
587 
588 	comp_op->m_src = src_mbufs[0];
589 	comp_op->src.offset = 0;
590 	comp_op->src.length = total_length;
591 
592 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
593 	rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL,
594 				  dst_iovs, dst_iovcnt, reduce_cb_arg);
595 	if (rc < 0) {
596 		goto error_src_dst;
597 	}
598 
599 	comp_op->m_dst = dst_mbufs[0];
600 	comp_op->dst.offset = 0;
601 
602 	if (compress == true) {
603 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
604 	} else {
605 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
606 	}
607 
608 	comp_op->op_type = RTE_COMP_OP_STATELESS;
609 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
610 
611 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
612 	assert(rc <= 1);
613 
614 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
615 	if (rc == 1) {
616 		return 0;
617 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
618 		/* we free mbufs differently depending on whether they were chained or not */
619 		rte_pktmbuf_free(comp_op->m_src);
620 		rte_pktmbuf_free(comp_op->m_dst);
621 		goto error_enqueue;
622 	} else {
623 		device_error = true;
624 		goto error_src_dst;
625 	}
626 
627 	/* Error cleanup paths. */
628 error_src_dst:
629 	for (i = 0; i < dst_mbuf_total; i++) {
630 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
631 	}
632 error_get_dst:
633 	for (i = 0; i < src_mbuf_total; i++) {
634 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
635 	}
636 error_get_src:
637 error_enqueue:
638 	rte_comp_op_free(comp_op);
639 error_get_op:
640 
641 	if (device_error == true) {
642 		/* There was an error sending the op to the device, most
643 		 * likely with the parameters.
644 		 */
645 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
646 		return -EINVAL;
647 	}
648 
649 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
650 	if (op_to_queue == NULL) {
651 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
652 		return -ENOMEM;
653 	}
654 	op_to_queue->backing_dev = backing_dev;
655 	op_to_queue->src_iovs = src_iovs;
656 	op_to_queue->src_iovcnt = src_iovcnt;
657 	op_to_queue->dst_iovs = dst_iovs;
658 	op_to_queue->dst_iovcnt = dst_iovcnt;
659 	op_to_queue->compress = compress;
660 	op_to_queue->cb_arg = cb_arg;
661 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
662 			  op_to_queue,
663 			  link);
664 	return 0;
665 }
666 
667 /* Poller for the DPDK compression driver. */
668 static int
669 comp_dev_poller(void *args)
670 {
671 	struct vbdev_compress *comp_bdev = args;
672 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
673 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
674 	uint16_t num_deq;
675 	struct spdk_reduce_vol_cb_args *reduce_args;
676 	struct vbdev_comp_op *op_to_resubmit;
677 	int rc, i;
678 
679 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
680 						NUM_MAX_INFLIGHT_OPS);
681 	for (i = 0; i < num_deq; i++) {
682 		reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
683 				uint64_t *);
684 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
685 
686 			/* tell reduce this is done and what the bytecount was */
687 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
688 		} else {
689 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
690 				       deq_ops[i]->status);
691 
692 			/* Reduce will simply store uncompressed on neg errno value. */
693 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
694 		}
695 
696 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
697 		 * call takes care of freeing all of the mbufs in the chain back to their
698 		 * original pool.
699 		 */
700 		rte_pktmbuf_free(deq_ops[i]->m_src);
701 		rte_pktmbuf_free(deq_ops[i]->m_dst);
702 
703 		/* There is no bulk free for com ops so we have to free them one at a time
704 		 * here however it would be rare that we'd ever have more than 1 at a time
705 		 * anyways.
706 		 */
707 		rte_comp_op_free(deq_ops[i]);
708 
709 		/* Check if there are any pending comp ops to process, only pull one
710 		 * at a time off as _compress_operation() may re-queue the op.
711 		 */
712 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
713 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
714 			rc = _compress_operation(op_to_resubmit->backing_dev,
715 						 op_to_resubmit->src_iovs,
716 						 op_to_resubmit->src_iovcnt,
717 						 op_to_resubmit->dst_iovs,
718 						 op_to_resubmit->dst_iovcnt,
719 						 op_to_resubmit->compress,
720 						 op_to_resubmit->cb_arg);
721 			if (rc == 0) {
722 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
723 				free(op_to_resubmit);
724 			}
725 		}
726 	}
727 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
728 }
729 
730 /* Entry point for reduce lib to issue a compress operation. */
731 static void
732 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
733 		      struct iovec *src_iovs, int src_iovcnt,
734 		      struct iovec *dst_iovs, int dst_iovcnt,
735 		      struct spdk_reduce_vol_cb_args *cb_arg)
736 {
737 	int rc;
738 
739 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
740 	if (rc) {
741 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
742 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
743 	}
744 }
745 
746 /* Entry point for reduce lib to issue a decompress operation. */
747 static void
748 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
749 			struct iovec *src_iovs, int src_iovcnt,
750 			struct iovec *dst_iovs, int dst_iovcnt,
751 			struct spdk_reduce_vol_cb_args *cb_arg)
752 {
753 	int rc;
754 
755 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
756 	if (rc) {
757 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
758 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
759 	}
760 }
761 
762 /* Callback for getting a buf from the bdev pool in the event that the caller passed
763  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
764  * beneath us before we're done with it.
765  */
766 static void
767 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
768 {
769 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
770 					   comp_bdev);
771 
772 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
773 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
774 			      reduce_rw_blocks_cb, bdev_io);
775 }
776 
777 /* scheduled for completion on IO thread */
778 static void
779 _complete_other_io(void *arg)
780 {
781 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
782 	if (io_ctx->status == 0) {
783 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
784 	} else {
785 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
786 	}
787 }
788 
789 /* scheduled for submission on reduce thread */
790 static void
791 _comp_bdev_io_submit(void *arg)
792 {
793 	struct spdk_bdev_io *bdev_io = arg;
794 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
795 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
796 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
797 					   comp_bdev);
798 	struct spdk_thread *orig_thread;
799 	int rc = 0;
800 
801 	switch (bdev_io->type) {
802 	case SPDK_BDEV_IO_TYPE_READ:
803 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
804 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
805 		return;
806 	case SPDK_BDEV_IO_TYPE_WRITE:
807 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
808 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
809 				       reduce_rw_blocks_cb, bdev_io);
810 		return;
811 	/* TODO in future patch in the series */
812 	case SPDK_BDEV_IO_TYPE_RESET:
813 		break;
814 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
815 	case SPDK_BDEV_IO_TYPE_UNMAP:
816 	case SPDK_BDEV_IO_TYPE_FLUSH:
817 	default:
818 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
819 		rc = -EINVAL;
820 	}
821 
822 	if (rc) {
823 		if (rc == -ENOMEM) {
824 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
825 			io_ctx->ch = ch;
826 			vbdev_compress_queue_io(bdev_io);
827 			return;
828 		} else {
829 			SPDK_ERRLOG("on bdev_io submission!\n");
830 			io_ctx->status = rc;
831 		}
832 	}
833 
834 	/* Complete this on the orig IO thread. */
835 	orig_thread = spdk_io_channel_get_thread(ch);
836 	if (orig_thread != spdk_get_thread()) {
837 		spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
838 	} else {
839 		_complete_other_io(io_ctx);
840 	}
841 }
842 
843 /* Called when someone above submits IO to this vbdev. */
844 static void
845 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
846 {
847 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
848 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
849 					   comp_bdev);
850 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
851 
852 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
853 	io_ctx->comp_bdev = comp_bdev;
854 	io_ctx->comp_ch = comp_ch;
855 	io_ctx->orig_io = bdev_io;
856 
857 	/* Send this request to the reduce_thread if that's not what we're on. */
858 	if (spdk_get_thread() != comp_bdev->reduce_thread) {
859 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
860 	} else {
861 		_comp_bdev_io_submit(bdev_io);
862 	}
863 }
864 
865 static bool
866 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
867 {
868 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
869 
870 	switch (io_type) {
871 	case SPDK_BDEV_IO_TYPE_READ:
872 	case SPDK_BDEV_IO_TYPE_WRITE:
873 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
874 	case SPDK_BDEV_IO_TYPE_UNMAP:
875 	case SPDK_BDEV_IO_TYPE_RESET:
876 	case SPDK_BDEV_IO_TYPE_FLUSH:
877 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
878 	default:
879 		return false;
880 	}
881 }
882 
883 /* Resubmission function used by the bdev layer when a queued IO is ready to be
884  * submitted.
885  */
886 static void
887 vbdev_compress_resubmit_io(void *arg)
888 {
889 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
890 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
891 
892 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
893 }
894 
895 /* Used to queue an IO in the event of resource issues. */
896 static void
897 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
898 {
899 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
900 	int rc;
901 
902 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
903 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
904 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
905 
906 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
907 	if (rc) {
908 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
909 		assert(false);
910 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
911 	}
912 }
913 
914 /* Callback for unregistering the IO device. */
915 static void
916 _device_unregister_cb(void *io_device)
917 {
918 	struct vbdev_compress *comp_bdev = io_device;
919 
920 	/* Done with this comp_bdev. */
921 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
922 	free(comp_bdev->comp_bdev.name);
923 	free(comp_bdev);
924 }
925 
926 static void
927 _vbdev_compress_destruct_cb(void *ctx)
928 {
929 	struct vbdev_compress *comp_bdev = ctx;
930 
931 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
932 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
933 	/* Close the underlying bdev on its same opened thread. */
934 	spdk_bdev_close(comp_bdev->base_desc);
935 	comp_bdev->vol = NULL;
936 	if (comp_bdev->orphaned == false) {
937 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
938 	} else {
939 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
940 		_device_unregister_cb(comp_bdev);
941 	}
942 }
943 
944 static void
945 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
946 {
947 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
948 
949 	if (reduce_errno) {
950 		SPDK_ERRLOG("number %d\n", reduce_errno);
951 	} else {
952 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
953 			spdk_thread_send_msg(comp_bdev->thread,
954 					     _vbdev_compress_destruct_cb, comp_bdev);
955 		} else {
956 			_vbdev_compress_destruct_cb(comp_bdev);
957 		}
958 	}
959 }
960 
961 static void
962 _reduce_destroy_cb(void *ctx, int reduce_errno)
963 {
964 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
965 
966 	if (reduce_errno) {
967 		SPDK_ERRLOG("number %d\n", reduce_errno);
968 	}
969 
970 	comp_bdev->vol = NULL;
971 	spdk_put_io_channel(comp_bdev->base_ch);
972 	if (comp_bdev->orphaned == false) {
973 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
974 				     comp_bdev->delete_ctx);
975 	} else {
976 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
977 	}
978 
979 }
980 
981 static void
982 _delete_vol_unload_cb(void *ctx)
983 {
984 	struct vbdev_compress *comp_bdev = ctx;
985 
986 	/* FIXME: Assert if these conditions are not satisfied for now. */
987 	assert(!comp_bdev->reduce_thread ||
988 	       comp_bdev->reduce_thread == spdk_get_thread());
989 
990 	/* reducelib needs a channel to comm with the backing device */
991 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
992 
993 	/* Clean the device before we free our resources. */
994 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
995 }
996 
997 /* Called by reduceLib after performing unload vol actions */
998 static void
999 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
1000 {
1001 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1002 
1003 	if (reduce_errno) {
1004 		SPDK_ERRLOG("number %d\n", reduce_errno);
1005 		/* FIXME: callback should be executed. */
1006 		return;
1007 	}
1008 
1009 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1010 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
1011 		spdk_thread_send_msg(comp_bdev->reduce_thread,
1012 				     _delete_vol_unload_cb, comp_bdev);
1013 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1014 	} else {
1015 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1016 
1017 		_delete_vol_unload_cb(comp_bdev);
1018 	}
1019 }
1020 
1021 const char *
1022 compress_get_name(const struct vbdev_compress *comp_bdev)
1023 {
1024 	return comp_bdev->comp_bdev.name;
1025 }
1026 
1027 struct vbdev_compress *
1028 compress_bdev_first(void)
1029 {
1030 	struct vbdev_compress *comp_bdev;
1031 
1032 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1033 
1034 	return comp_bdev;
1035 }
1036 
1037 struct vbdev_compress *
1038 compress_bdev_next(struct vbdev_compress *prev)
1039 {
1040 	struct vbdev_compress *comp_bdev;
1041 
1042 	comp_bdev = TAILQ_NEXT(prev, link);
1043 
1044 	return comp_bdev;
1045 }
1046 
1047 bool
1048 compress_has_orphan(const char *name)
1049 {
1050 	struct vbdev_compress *comp_bdev;
1051 
1052 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1053 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1054 			return true;
1055 		}
1056 	}
1057 	return false;
1058 }
1059 
1060 /* Called after we've unregistered following a hot remove callback.
1061  * Our finish entry point will be called next.
1062  */
1063 static int
1064 vbdev_compress_destruct(void *ctx)
1065 {
1066 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1067 
1068 	if (comp_bdev->vol != NULL) {
1069 		/* Tell reducelib that we're done with this volume. */
1070 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1071 	} else {
1072 		vbdev_compress_destruct_cb(comp_bdev, 0);
1073 	}
1074 
1075 	return 0;
1076 }
1077 
1078 /* We supplied this as an entry point for upper layers who want to communicate to this
1079  * bdev.  This is how they get a channel.
1080  */
1081 static struct spdk_io_channel *
1082 vbdev_compress_get_io_channel(void *ctx)
1083 {
1084 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1085 
1086 	/* The IO channel code will allocate a channel for us which consists of
1087 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1088 	 * that we passed in when we registered our IO device. It will then call
1089 	 * our channel create callback to populate any elements that we need to
1090 	 * update.
1091 	 */
1092 	return spdk_get_io_channel(comp_bdev);
1093 }
1094 
1095 /* This is the output for bdev_get_bdevs() for this vbdev */
1096 static int
1097 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1098 {
1099 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1100 
1101 	spdk_json_write_name(w, "compress");
1102 	spdk_json_write_object_begin(w);
1103 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1104 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1105 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1106 	spdk_json_write_object_end(w);
1107 
1108 	return 0;
1109 }
1110 
1111 /* This is used to generate JSON that can configure this module to its current state. */
1112 static int
1113 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1114 {
1115 	struct vbdev_compress *comp_bdev;
1116 
1117 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1118 		spdk_json_write_object_begin(w);
1119 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1120 		spdk_json_write_named_object_begin(w, "params");
1121 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1122 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1123 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1124 		spdk_json_write_object_end(w);
1125 		spdk_json_write_object_end(w);
1126 	}
1127 	return 0;
1128 }
1129 
1130 static void
1131 _vbdev_reduce_init_cb(void *ctx)
1132 {
1133 	struct vbdev_compress *meta_ctx = ctx;
1134 	int rc;
1135 
1136 	assert(meta_ctx->base_desc != NULL);
1137 
1138 	/* We're done with metadata operations */
1139 	spdk_put_io_channel(meta_ctx->base_ch);
1140 
1141 	if (meta_ctx->vol) {
1142 		rc = vbdev_compress_claim(meta_ctx);
1143 		if (rc == 0) {
1144 			return;
1145 		}
1146 	}
1147 
1148 	/* Close the underlying bdev on its same opened thread. */
1149 	spdk_bdev_close(meta_ctx->base_desc);
1150 	free(meta_ctx);
1151 }
1152 
1153 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1154  * used for initial metadata operations to claim where it will be further filled out
1155  * and added to the global list.
1156  */
1157 static void
1158 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1159 {
1160 	struct vbdev_compress *meta_ctx = cb_arg;
1161 
1162 	if (reduce_errno == 0) {
1163 		meta_ctx->vol = vol;
1164 	} else {
1165 		SPDK_ERRLOG("for vol %s, error %u\n",
1166 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1167 	}
1168 
1169 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1170 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
1171 	} else {
1172 		_vbdev_reduce_init_cb(meta_ctx);
1173 	}
1174 }
1175 
1176 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1177  * call the callback provided by reduceLib when it called the read/write/unmap function and
1178  * free the bdev_io.
1179  */
1180 static void
1181 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1182 {
1183 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1184 	int reduce_errno;
1185 
1186 	if (success) {
1187 		reduce_errno = 0;
1188 	} else {
1189 		reduce_errno = -EIO;
1190 	}
1191 	spdk_bdev_free_io(bdev_io);
1192 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1193 }
1194 
1195 /* This is the function provided to the reduceLib for sending reads directly to
1196  * the backing device.
1197  */
1198 static void
1199 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1200 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1201 {
1202 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1203 					   backing_dev);
1204 	int rc;
1205 
1206 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1207 				    iov, iovcnt, lba, lba_count,
1208 				    comp_reduce_io_cb,
1209 				    args);
1210 	if (rc) {
1211 		if (rc == -ENOMEM) {
1212 			SPDK_ERRLOG("No memory, start to queue io.\n");
1213 			/* TODO: there's no bdev_io to queue */
1214 		} else {
1215 			SPDK_ERRLOG("submitting readv request\n");
1216 		}
1217 		args->cb_fn(args->cb_arg, rc);
1218 	}
1219 }
1220 
1221 /* This is the function provided to the reduceLib for sending writes directly to
1222  * the backing device.
1223  */
1224 static void
1225 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1226 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1227 {
1228 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1229 					   backing_dev);
1230 	int rc;
1231 
1232 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1233 				     iov, iovcnt, lba, lba_count,
1234 				     comp_reduce_io_cb,
1235 				     args);
1236 	if (rc) {
1237 		if (rc == -ENOMEM) {
1238 			SPDK_ERRLOG("No memory, start to queue io.\n");
1239 			/* TODO: there's no bdev_io to queue */
1240 		} else {
1241 			SPDK_ERRLOG("error submitting writev request\n");
1242 		}
1243 		args->cb_fn(args->cb_arg, rc);
1244 	}
1245 }
1246 
1247 /* This is the function provided to the reduceLib for sending unmaps directly to
1248  * the backing device.
1249  */
1250 static void
1251 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1252 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1253 {
1254 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1255 					   backing_dev);
1256 	int rc;
1257 
1258 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1259 				    lba, lba_count,
1260 				    comp_reduce_io_cb,
1261 				    args);
1262 
1263 	if (rc) {
1264 		if (rc == -ENOMEM) {
1265 			SPDK_ERRLOG("No memory, start to queue io.\n");
1266 			/* TODO: there's no bdev_io to queue */
1267 		} else {
1268 			SPDK_ERRLOG("submitting unmap request\n");
1269 		}
1270 		args->cb_fn(args->cb_arg, rc);
1271 	}
1272 }
1273 
1274 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1275 static void
1276 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1277 {
1278 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1279 
1280 	if (reduce_errno) {
1281 		SPDK_ERRLOG("number %d\n", reduce_errno);
1282 	}
1283 
1284 	comp_bdev->vol = NULL;
1285 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1286 }
1287 
1288 static void
1289 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1290 {
1291 	struct vbdev_compress *comp_bdev, *tmp;
1292 
1293 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1294 		if (bdev_find == comp_bdev->base_bdev) {
1295 			/* Tell reduceLib that we're done with this volume. */
1296 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1297 		}
1298 	}
1299 }
1300 
1301 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1302 static void
1303 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1304 				  void *event_ctx)
1305 {
1306 	switch (type) {
1307 	case SPDK_BDEV_EVENT_REMOVE:
1308 		vbdev_compress_base_bdev_hotremove_cb(bdev);
1309 		break;
1310 	default:
1311 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1312 		break;
1313 	}
1314 }
1315 
1316 /* TODO: determine which parms we want user configurable, HC for now
1317  * params.vol_size
1318  * params.chunk_size
1319  * compression PMD, algorithm, window size, comp level, etc.
1320  * DEV_MD_PATH
1321  */
1322 
1323 /* Common function for init and load to allocate and populate the minimal
1324  * information for reducelib to init or load.
1325  */
1326 struct vbdev_compress *
1327 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
1328 {
1329 	struct vbdev_compress *meta_ctx;
1330 	struct spdk_bdev *bdev;
1331 
1332 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1333 	if (meta_ctx == NULL) {
1334 		SPDK_ERRLOG("failed to alloc init contexts\n");
1335 		return NULL;
1336 	}
1337 
1338 	meta_ctx->drv_name = "None";
1339 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1340 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1341 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1342 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1343 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1344 
1345 	meta_ctx->base_desc = bdev_desc;
1346 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
1347 	meta_ctx->base_bdev = bdev;
1348 
1349 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1350 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1351 
1352 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1353 	if (lb_size == 0) {
1354 		meta_ctx->params.logical_block_size = bdev->blocklen;
1355 	} else {
1356 		meta_ctx->params.logical_block_size = lb_size;
1357 	}
1358 
1359 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1360 	return meta_ctx;
1361 }
1362 
1363 static bool
1364 _set_pmd(struct vbdev_compress *comp_dev)
1365 {
1366 	if (g_opts == COMPRESS_PMD_AUTO) {
1367 		if (g_qat_available) {
1368 			comp_dev->drv_name = QAT_PMD;
1369 		} else if (g_mlx5_pci_available) {
1370 			comp_dev->drv_name = MLX5_PMD;
1371 		} else {
1372 			comp_dev->drv_name = ISAL_PMD;
1373 		}
1374 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1375 		comp_dev->drv_name = QAT_PMD;
1376 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1377 		comp_dev->drv_name = ISAL_PMD;
1378 	} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
1379 		comp_dev->drv_name = MLX5_PMD;
1380 	} else {
1381 		SPDK_ERRLOG("Requested PMD is not available.\n");
1382 		return false;
1383 	}
1384 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1385 	return true;
1386 }
1387 
1388 /* Call reducelib to initialize a new volume */
1389 static int
1390 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1391 {
1392 	struct spdk_bdev_desc *bdev_desc = NULL;
1393 	struct vbdev_compress *meta_ctx;
1394 	int rc;
1395 
1396 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1397 				NULL, &bdev_desc);
1398 	if (rc) {
1399 		SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
1400 		return rc;
1401 	}
1402 
1403 	meta_ctx = _prepare_for_load_init(bdev_desc, lb_size);
1404 	if (meta_ctx == NULL) {
1405 		spdk_bdev_close(bdev_desc);
1406 		return -EINVAL;
1407 	}
1408 
1409 	if (_set_pmd(meta_ctx) == false) {
1410 		SPDK_ERRLOG("could not find required pmd\n");
1411 		free(meta_ctx);
1412 		spdk_bdev_close(bdev_desc);
1413 		return -EINVAL;
1414 	}
1415 
1416 	/* Save the thread where the base device is opened */
1417 	meta_ctx->thread = spdk_get_thread();
1418 
1419 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1420 
1421 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1422 			     pm_path,
1423 			     vbdev_reduce_init_cb,
1424 			     meta_ctx);
1425 	return 0;
1426 }
1427 
1428 /* We provide this callback for the SPDK channel code to create a channel using
1429  * the channel struct we provided in our module get_io_channel() entry point. Here
1430  * we get and save off an underlying base channel of the device below us so that
1431  * we can communicate with the base bdev on a per channel basis.  If we needed
1432  * our own poller for this vbdev, we'd register it here.
1433  */
1434 static int
1435 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1436 {
1437 	struct vbdev_compress *comp_bdev = io_device;
1438 	struct comp_device_qp *device_qp;
1439 
1440 	/* Now set the reduce channel if it's not already set. */
1441 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1442 	if (comp_bdev->ch_count == 0) {
1443 		/* We use this queue to track outstanding IO in our layer. */
1444 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1445 
1446 		/* We use this to queue up compression operations as needed. */
1447 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1448 
1449 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1450 		comp_bdev->reduce_thread = spdk_get_thread();
1451 		comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
1452 		/* Now assign a q pair */
1453 		pthread_mutex_lock(&g_comp_device_qp_lock);
1454 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1455 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1456 				if (device_qp->thread == spdk_get_thread()) {
1457 					comp_bdev->device_qp = device_qp;
1458 					break;
1459 				}
1460 				if (device_qp->thread == NULL) {
1461 					comp_bdev->device_qp = device_qp;
1462 					device_qp->thread = spdk_get_thread();
1463 					break;
1464 				}
1465 			}
1466 		}
1467 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1468 	}
1469 	comp_bdev->ch_count++;
1470 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1471 
1472 	if (comp_bdev->device_qp != NULL) {
1473 		return 0;
1474 	} else {
1475 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1476 		assert(false);
1477 		return -ENOMEM;
1478 	}
1479 }
1480 
1481 static void
1482 _channel_cleanup(struct vbdev_compress *comp_bdev)
1483 {
1484 	/* Note: comp_bdevs can share a device_qp if they are
1485 	 * on the same thread so we leave the device_qp element
1486 	 * alone for this comp_bdev and just clear the reduce thread.
1487 	 */
1488 	spdk_put_io_channel(comp_bdev->base_ch);
1489 	comp_bdev->reduce_thread = NULL;
1490 	spdk_poller_unregister(&comp_bdev->poller);
1491 }
1492 
1493 /* Used to reroute destroy_ch to the correct thread */
1494 static void
1495 _comp_bdev_ch_destroy_cb(void *arg)
1496 {
1497 	struct vbdev_compress *comp_bdev = arg;
1498 
1499 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1500 	_channel_cleanup(comp_bdev);
1501 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1502 }
1503 
1504 /* We provide this callback for the SPDK channel code to destroy a channel
1505  * created with our create callback. We just need to undo anything we did
1506  * when we created. If this bdev used its own poller, we'd unregister it here.
1507  */
1508 static void
1509 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1510 {
1511 	struct vbdev_compress *comp_bdev = io_device;
1512 
1513 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1514 	comp_bdev->ch_count--;
1515 	if (comp_bdev->ch_count == 0) {
1516 		/* Send this request to the thread where the channel was created. */
1517 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1518 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1519 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1520 		} else {
1521 			_channel_cleanup(comp_bdev);
1522 		}
1523 	}
1524 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1525 }
1526 
1527 /* RPC entry point for compression vbdev creation. */
1528 int
1529 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1530 {
1531 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1532 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1533 		return -EINVAL;
1534 	}
1535 
1536 	return vbdev_init_reduce(bdev_name, pm_path, lb_size);
1537 }
1538 
1539 /* On init, just init the compress drivers. All metadata is stored on disk. */
1540 static int
1541 vbdev_compress_init(void)
1542 {
1543 	if (vbdev_init_compress_drivers()) {
1544 		SPDK_ERRLOG("Error setting up compression devices\n");
1545 		return -EINVAL;
1546 	}
1547 
1548 	return 0;
1549 }
1550 
1551 /* Called when the entire module is being torn down. */
1552 static void
1553 vbdev_compress_finish(void)
1554 {
1555 	struct comp_device_qp *dev_qp;
1556 	/* TODO: unload vol in a future patch */
1557 
1558 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1559 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1560 		free(dev_qp);
1561 	}
1562 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1563 
1564 	rte_mempool_free(g_comp_op_mp);
1565 	rte_mempool_free(g_mbuf_mp);
1566 }
1567 
1568 /* During init we'll be asked how much memory we'd like passed to us
1569  * in bev_io structures as context. Here's where we specify how
1570  * much context we want per IO.
1571  */
1572 static int
1573 vbdev_compress_get_ctx_size(void)
1574 {
1575 	return sizeof(struct comp_bdev_io);
1576 }
1577 
1578 /* When we register our bdev this is how we specify our entry points. */
1579 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1580 	.destruct		= vbdev_compress_destruct,
1581 	.submit_request		= vbdev_compress_submit_request,
1582 	.io_type_supported	= vbdev_compress_io_type_supported,
1583 	.get_io_channel		= vbdev_compress_get_io_channel,
1584 	.dump_info_json		= vbdev_compress_dump_info_json,
1585 	.write_config_json	= NULL,
1586 };
1587 
1588 static struct spdk_bdev_module compress_if = {
1589 	.name = "compress",
1590 	.module_init = vbdev_compress_init,
1591 	.get_ctx_size = vbdev_compress_get_ctx_size,
1592 	.examine_disk = vbdev_compress_examine,
1593 	.module_fini = vbdev_compress_finish,
1594 	.config_json = vbdev_compress_config_json
1595 };
1596 
1597 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1598 
1599 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1600 {
1601 	struct spdk_bdev_alias *aliases;
1602 
1603 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1604 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1605 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1606 		if (!comp_bdev->comp_bdev.name) {
1607 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1608 			return -ENOMEM;
1609 		}
1610 	} else {
1611 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1612 		if (!comp_bdev->comp_bdev.name) {
1613 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1614 			return -ENOMEM;
1615 		}
1616 	}
1617 	return 0;
1618 }
1619 
1620 static int
1621 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1622 {
1623 	int rc;
1624 
1625 	if (_set_compbdev_name(comp_bdev)) {
1626 		return -EINVAL;
1627 	}
1628 
1629 	/* Note: some of the fields below will change in the future - for example,
1630 	 * blockcnt specifically will not match (the compressed volume size will
1631 	 * be slightly less than the base bdev size)
1632 	 */
1633 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1634 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1635 
1636 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1637 		comp_bdev->comp_bdev.required_alignment =
1638 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1639 				 comp_bdev->base_bdev->required_alignment);
1640 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1641 			       comp_bdev->comp_bdev.required_alignment);
1642 	} else {
1643 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1644 	}
1645 	comp_bdev->comp_bdev.optimal_io_boundary =
1646 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1647 
1648 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1649 
1650 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1651 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1652 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1653 
1654 	/* This is the context that is passed to us when the bdev
1655 	 * layer calls in so we'll save our comp_bdev node here.
1656 	 */
1657 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1658 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1659 	comp_bdev->comp_bdev.module = &compress_if;
1660 
1661 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1662 
1663 	/* Save the thread where the base device is opened */
1664 	comp_bdev->thread = spdk_get_thread();
1665 
1666 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1667 				sizeof(struct comp_io_channel),
1668 				comp_bdev->comp_bdev.name);
1669 
1670 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1671 					 comp_bdev->comp_bdev.module);
1672 	if (rc) {
1673 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1674 		goto error_claim;
1675 	}
1676 
1677 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1678 	if (rc < 0) {
1679 		SPDK_ERRLOG("trying to register bdev\n");
1680 		goto error_bdev_register;
1681 	}
1682 
1683 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1684 
1685 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1686 
1687 	return 0;
1688 
1689 	/* Error cleanup paths. */
1690 error_bdev_register:
1691 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1692 error_claim:
1693 	spdk_io_device_unregister(comp_bdev, NULL);
1694 	free(comp_bdev->comp_bdev.name);
1695 	return rc;
1696 }
1697 
1698 static void
1699 _vbdev_compress_delete_done(void *_ctx)
1700 {
1701 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1702 
1703 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1704 
1705 	free(ctx);
1706 }
1707 
1708 static void
1709 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1710 {
1711 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1712 
1713 	ctx->cb_rc = bdeverrno;
1714 
1715 	if (ctx->orig_thread != spdk_get_thread()) {
1716 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1717 	} else {
1718 		_vbdev_compress_delete_done(ctx);
1719 	}
1720 }
1721 
1722 void
1723 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1724 {
1725 	struct vbdev_compress *comp_bdev = NULL;
1726 	struct vbdev_comp_delete_ctx *ctx;
1727 
1728 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1729 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1730 			break;
1731 		}
1732 	}
1733 
1734 	if (comp_bdev == NULL) {
1735 		cb_fn(cb_arg, -ENODEV);
1736 		return;
1737 	}
1738 
1739 	ctx = calloc(1, sizeof(*ctx));
1740 	if (ctx == NULL) {
1741 		SPDK_ERRLOG("Failed to allocate delete context\n");
1742 		cb_fn(cb_arg, -ENOMEM);
1743 		return;
1744 	}
1745 
1746 	/* Save these for after the vol is destroyed. */
1747 	ctx->cb_fn = cb_fn;
1748 	ctx->cb_arg = cb_arg;
1749 	ctx->orig_thread = spdk_get_thread();
1750 
1751 	comp_bdev->delete_ctx = ctx;
1752 
1753 	/* Tell reducelib that we're done with this volume. */
1754 	if (comp_bdev->orphaned == false) {
1755 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1756 	} else {
1757 		delete_vol_unload_cb(comp_bdev, 0);
1758 	}
1759 }
1760 
1761 static void
1762 _vbdev_reduce_load_cb(void *ctx)
1763 {
1764 	struct vbdev_compress *meta_ctx = ctx;
1765 	int rc;
1766 
1767 	assert(meta_ctx->base_desc != NULL);
1768 
1769 	/* Done with metadata operations */
1770 	spdk_put_io_channel(meta_ctx->base_ch);
1771 
1772 	if (meta_ctx->reduce_errno == 0) {
1773 		if (_set_pmd(meta_ctx) == false) {
1774 			SPDK_ERRLOG("could not find required pmd\n");
1775 			goto err;
1776 		}
1777 
1778 		rc = vbdev_compress_claim(meta_ctx);
1779 		if (rc != 0) {
1780 			goto err;
1781 		}
1782 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1783 		if (_set_compbdev_name(meta_ctx)) {
1784 			goto err;
1785 		}
1786 
1787 		/* Save the thread where the base device is opened */
1788 		meta_ctx->thread = spdk_get_thread();
1789 
1790 		meta_ctx->comp_bdev.module = &compress_if;
1791 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1792 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1793 						 meta_ctx->comp_bdev.module);
1794 		if (rc) {
1795 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1796 			free(meta_ctx->comp_bdev.name);
1797 			goto err;
1798 		}
1799 
1800 		meta_ctx->orphaned = true;
1801 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1802 	} else {
1803 		if (meta_ctx->reduce_errno != -EILSEQ) {
1804 			SPDK_ERRLOG("for vol %s, error %u\n",
1805 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1806 		}
1807 		goto err;
1808 	}
1809 
1810 	spdk_bdev_module_examine_done(&compress_if);
1811 	return;
1812 
1813 err:
1814 	/* Close the underlying bdev on its same opened thread. */
1815 	spdk_bdev_close(meta_ctx->base_desc);
1816 	free(meta_ctx);
1817 	spdk_bdev_module_examine_done(&compress_if);
1818 }
1819 
1820 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1821  * used for initial metadata operations to claim where it will be further filled out
1822  * and added to the global list.
1823  */
1824 static void
1825 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1826 {
1827 	struct vbdev_compress *meta_ctx = cb_arg;
1828 
1829 	if (reduce_errno == 0) {
1830 		/* Update information following volume load. */
1831 		meta_ctx->vol = vol;
1832 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1833 		       sizeof(struct spdk_reduce_vol_params));
1834 	}
1835 
1836 	meta_ctx->reduce_errno = reduce_errno;
1837 
1838 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1839 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1840 	} else {
1841 		_vbdev_reduce_load_cb(meta_ctx);
1842 	}
1843 
1844 }
1845 
1846 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1847  * and if so will go ahead and claim it.
1848  */
1849 static void
1850 vbdev_compress_examine(struct spdk_bdev *bdev)
1851 {
1852 	struct spdk_bdev_desc *bdev_desc = NULL;
1853 	struct vbdev_compress *meta_ctx;
1854 	int rc;
1855 
1856 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1857 		spdk_bdev_module_examine_done(&compress_if);
1858 		return;
1859 	}
1860 
1861 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1862 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1863 	if (rc) {
1864 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1865 		spdk_bdev_module_examine_done(&compress_if);
1866 		return;
1867 	}
1868 
1869 	meta_ctx = _prepare_for_load_init(bdev_desc, 0);
1870 	if (meta_ctx == NULL) {
1871 		spdk_bdev_close(bdev_desc);
1872 		spdk_bdev_module_examine_done(&compress_if);
1873 		return;
1874 	}
1875 
1876 	/* Save the thread where the base device is opened */
1877 	meta_ctx->thread = spdk_get_thread();
1878 
1879 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1880 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1881 }
1882 
1883 int
1884 compress_set_pmd(enum compress_pmd *opts)
1885 {
1886 	g_opts = *opts;
1887 
1888 	return 0;
1889 }
1890 
1891 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1892