xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 2f5c602574a98ede645991abe279a96e19c50196)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_compress.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/stdinc.h"
38 #include "spdk/rpc.h"
39 #include "spdk/env.h"
40 #include "spdk/endian.h"
41 #include "spdk/string.h"
42 #include "spdk/thread.h"
43 #include "spdk/util.h"
44 #include "spdk/bdev_module.h"
45 
46 #include "spdk/log.h"
47 
48 #include <rte_config.h>
49 #include <rte_bus_vdev.h>
50 #include <rte_compressdev.h>
51 #include <rte_comp.h>
52 #include <rte_mbuf_dyn.h>
53 
54 /* Used to store IO context in mbuf */
55 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
56 	.name = "context_reduce",
57 	.size = sizeof(uint64_t),
58 	.align = __alignof__(uint64_t),
59 	.flags = 0,
60 };
61 static int g_mbuf_offset;
62 
63 #define NUM_MAX_XFORMS 2
64 #define NUM_MAX_INFLIGHT_OPS 128
65 #define DEFAULT_WINDOW_SIZE 15
66 /* We need extra mbufs per operation to accommodate host buffers that
67  *  span a 2MB boundary.
68  */
69 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
70 #define CHUNK_SIZE (1024 * 16)
71 #define COMP_BDEV_NAME "compress"
72 #define BACKING_IO_SZ (4 * 1024)
73 
74 #define ISAL_PMD "compress_isal"
75 #define QAT_PMD "compress_qat"
76 #define NUM_MBUFS		8192
77 #define POOL_CACHE_SIZE		256
78 
79 static enum compress_pmd g_opts;
80 
81 /* Global list of available compression devices. */
82 struct compress_dev {
83 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
84 	uint8_t				cdev_id;	/* identifier for the device */
85 	void				*comp_xform;	/* shared private xform for comp on this PMD */
86 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
87 	TAILQ_ENTRY(compress_dev)	link;
88 };
89 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
90 
91 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
92  * the length of the internal ring name that it creates, it breaks a limit in the generic
93  * ring code and fails the qp initialization.
94  */
95 #define MAX_NUM_QP 99
96 /* Global list and lock for unique device/queue pair combos */
97 struct comp_device_qp {
98 	struct compress_dev		*device;	/* ptr to compression device */
99 	uint8_t				qp;		/* queue pair for this node */
100 	struct spdk_thread		*thread;	/* thead that this qp is assigned to */
101 	TAILQ_ENTRY(comp_device_qp)	link;
102 };
103 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
104 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
105 
106 /* For queueing up compression operations that we can't submit for some reason */
107 struct vbdev_comp_op {
108 	struct spdk_reduce_backing_dev	*backing_dev;
109 	struct iovec			*src_iovs;
110 	int				src_iovcnt;
111 	struct iovec			*dst_iovs;
112 	int				dst_iovcnt;
113 	bool				compress;
114 	void				*cb_arg;
115 	TAILQ_ENTRY(vbdev_comp_op)	link;
116 };
117 
118 struct vbdev_comp_delete_ctx {
119 	spdk_delete_compress_complete	cb_fn;
120 	void				*cb_arg;
121 	int				cb_rc;
122 	struct spdk_thread		*orig_thread;
123 };
124 
125 /* List of virtual bdevs and associated info for each. */
126 struct vbdev_compress {
127 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
128 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
129 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
130 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
131 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
132 	char				*drv_name;	/* name of the compression device driver */
133 	struct comp_device_qp		*device_qp;
134 	struct spdk_thread		*reduce_thread;
135 	pthread_mutex_t			reduce_lock;
136 	uint32_t			ch_count;
137 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
138 	struct spdk_poller		*poller;	/* completion poller */
139 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
140 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
141 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
142 	struct vbdev_comp_delete_ctx	*delete_ctx;
143 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
144 	int				reduce_errno;
145 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
146 	TAILQ_ENTRY(vbdev_compress)	link;
147 	struct spdk_thread		*thread;	/* thread where base device is opened */
148 };
149 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
150 
151 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
152  */
153 struct comp_io_channel {
154 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
155 };
156 
157 /* Per I/O context for the compression vbdev. */
158 struct comp_bdev_io {
159 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
160 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
161 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
162 	struct spdk_bdev_io		*orig_io;		/* the original IO */
163 	struct spdk_io_channel		*ch;			/* for resubmission */
164 	int				status;			/* save for completion on orig thread */
165 };
166 
167 /* Shared mempools between all devices on this system */
168 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
169 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
170 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
171 static bool g_qat_available = false;
172 static bool g_isal_available = false;
173 
174 /* Create shared (between all ops per PMD) compress xforms. */
175 static struct rte_comp_xform g_comp_xform = {
176 	.type = RTE_COMP_COMPRESS,
177 	.compress = {
178 		.algo = RTE_COMP_ALGO_DEFLATE,
179 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
180 		.level = RTE_COMP_LEVEL_MAX,
181 		.window_size = DEFAULT_WINDOW_SIZE,
182 		.chksum = RTE_COMP_CHECKSUM_NONE,
183 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
184 	}
185 };
186 /* Create shared (between all ops per PMD) decompress xforms. */
187 static struct rte_comp_xform g_decomp_xform = {
188 	.type = RTE_COMP_DECOMPRESS,
189 	.decompress = {
190 		.algo = RTE_COMP_ALGO_DEFLATE,
191 		.chksum = RTE_COMP_CHECKSUM_NONE,
192 		.window_size = DEFAULT_WINDOW_SIZE,
193 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
194 	}
195 };
196 
197 static void vbdev_compress_examine(struct spdk_bdev *bdev);
198 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
199 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
200 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
201 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
202 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
203 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
204 
205 /* Dummy function used by DPDK to free ext attached buffers
206  * to mbufs, we free them ourselves but this callback has to
207  * be here.
208  */
209 static void
210 shinfo_free_cb(void *arg1, void *arg2)
211 {
212 }
213 
214 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
215 static int
216 create_compress_dev(uint8_t index)
217 {
218 	struct compress_dev *device;
219 	uint16_t q_pairs;
220 	uint8_t cdev_id;
221 	int rc, i;
222 	struct comp_device_qp *dev_qp;
223 	struct comp_device_qp *tmp_qp;
224 
225 	device = calloc(1, sizeof(struct compress_dev));
226 	if (!device) {
227 		return -ENOMEM;
228 	}
229 
230 	/* Get details about this device. */
231 	rte_compressdev_info_get(index, &device->cdev_info);
232 
233 	cdev_id = device->cdev_id = index;
234 
235 	/* Zero means no limit so choose number of lcores. */
236 	if (device->cdev_info.max_nb_queue_pairs == 0) {
237 		q_pairs = MAX_NUM_QP;
238 	} else {
239 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
240 	}
241 
242 	/* Configure the compression device. */
243 	struct rte_compressdev_config config = {
244 		.socket_id = rte_socket_id(),
245 		.nb_queue_pairs = q_pairs,
246 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
247 		.max_nb_streams = 0
248 	};
249 	rc = rte_compressdev_configure(cdev_id, &config);
250 	if (rc < 0) {
251 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
252 		goto err;
253 	}
254 
255 	/* Pre-setup all potential qpairs now and assign them in the channel
256 	 * callback.
257 	 */
258 	for (i = 0; i < q_pairs; i++) {
259 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
260 						      NUM_MAX_INFLIGHT_OPS,
261 						      rte_socket_id());
262 		if (rc) {
263 			if (i > 0) {
264 				q_pairs = i;
265 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
266 					       "compressdev %u with error %u "
267 					       "so limiting to %u qpairs\n",
268 					       cdev_id, rc, q_pairs);
269 				break;
270 			} else {
271 				SPDK_ERRLOG("Failed to setup queue pair on "
272 					    "compressdev %u with error %u\n", cdev_id, rc);
273 				rc = -EINVAL;
274 				goto err;
275 			}
276 		}
277 	}
278 
279 	rc = rte_compressdev_start(cdev_id);
280 	if (rc < 0) {
281 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
282 			    cdev_id, rc);
283 		goto err;
284 	}
285 
286 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
287 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
288 				&device->comp_xform);
289 		if (rc < 0) {
290 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
291 				    cdev_id, rc);
292 			goto err;
293 		}
294 
295 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
296 				&device->decomp_xform);
297 		if (rc) {
298 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
299 				    cdev_id, rc);
300 			goto err;
301 		}
302 	} else {
303 		SPDK_ERRLOG("PMD does not support shared transforms\n");
304 		goto err;
305 	}
306 
307 	/* Build up list of device/qp combinations */
308 	for (i = 0; i < q_pairs; i++) {
309 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
310 		if (!dev_qp) {
311 			rc = -ENOMEM;
312 			goto err;
313 		}
314 		dev_qp->device = device;
315 		dev_qp->qp = i;
316 		dev_qp->thread = NULL;
317 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
318 	}
319 
320 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
321 
322 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
323 		g_qat_available = true;
324 	}
325 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
326 		g_isal_available = true;
327 	}
328 
329 	return 0;
330 
331 err:
332 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
333 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
334 		free(dev_qp);
335 	}
336 	free(device);
337 	return rc;
338 }
339 
340 /* Called from driver init entry point, vbdev_compress_init() */
341 static int
342 vbdev_init_compress_drivers(void)
343 {
344 	uint8_t cdev_count, i;
345 	struct compress_dev *tmp_dev;
346 	struct compress_dev *device;
347 	int rc;
348 
349 	/* We always init the compress_isal PMD */
350 	rc = rte_vdev_init(ISAL_PMD, NULL);
351 	if (rc == 0) {
352 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
353 	} else if (rc == -EEXIST) {
354 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
355 	} else {
356 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
357 		return -EINVAL;
358 	}
359 
360 	/* If we have no compression devices, there's no reason to continue. */
361 	cdev_count = rte_compressdev_count();
362 	if (cdev_count == 0) {
363 		return 0;
364 	}
365 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
366 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
367 		return -EINVAL;
368 	}
369 
370 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
371 	if (g_mbuf_offset < 0) {
372 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
373 		return -EINVAL;
374 	}
375 
376 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
377 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
378 	if (g_mbuf_mp == NULL) {
379 		SPDK_ERRLOG("Cannot create mbuf pool\n");
380 		rc = -ENOMEM;
381 		goto error_create_mbuf;
382 	}
383 
384 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
385 					       0, rte_socket_id());
386 	if (g_comp_op_mp == NULL) {
387 		SPDK_ERRLOG("Cannot create comp op pool\n");
388 		rc = -ENOMEM;
389 		goto error_create_op;
390 	}
391 
392 	/* Init all devices */
393 	for (i = 0; i < cdev_count; i++) {
394 		rc = create_compress_dev(i);
395 		if (rc != 0) {
396 			goto error_create_compress_devs;
397 		}
398 	}
399 
400 	if (g_qat_available == true) {
401 		SPDK_NOTICELOG("initialized QAT PMD\n");
402 	}
403 
404 	g_shinfo.free_cb = shinfo_free_cb;
405 
406 	return 0;
407 
408 	/* Error cleanup paths. */
409 error_create_compress_devs:
410 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
411 		TAILQ_REMOVE(&g_compress_devs, device, link);
412 		free(device);
413 	}
414 error_create_op:
415 error_create_mbuf:
416 	rte_mempool_free(g_mbuf_mp);
417 
418 	return rc;
419 }
420 
421 /* for completing rw requests on the orig IO thread. */
422 static void
423 _reduce_rw_blocks_cb(void *arg)
424 {
425 	struct comp_bdev_io *io_ctx = arg;
426 
427 	if (io_ctx->status == 0) {
428 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
429 	} else {
430 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
431 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
432 	}
433 }
434 
435 /* Completion callback for r/w that were issued via reducelib. */
436 static void
437 reduce_rw_blocks_cb(void *arg, int reduce_errno)
438 {
439 	struct spdk_bdev_io *bdev_io = arg;
440 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
441 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
442 	struct spdk_thread *orig_thread;
443 
444 	/* TODO: need to decide which error codes are bdev_io success vs failure;
445 	 * example examine calls reading metadata */
446 
447 	io_ctx->status = reduce_errno;
448 
449 	/* Send this request to the orig IO thread. */
450 	orig_thread = spdk_io_channel_get_thread(ch);
451 	if (orig_thread != spdk_get_thread()) {
452 		spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
453 	} else {
454 		_reduce_rw_blocks_cb(io_ctx);
455 	}
456 }
457 
458 static uint64_t
459 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
460 		     struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
461 {
462 	uint64_t updated_length, remainder, phys_addr;
463 	uint8_t *current_base = NULL;
464 	int iov_index, mbuf_index;
465 	int rc = 0;
466 
467 	/* Setup mbufs */
468 	iov_index = mbuf_index = 0;
469 	while (iov_index < iovcnt) {
470 
471 		current_base = iovs[iov_index].iov_base;
472 		if (total_length) {
473 			*total_length += iovs[iov_index].iov_len;
474 		}
475 		assert(mbufs[mbuf_index] != NULL);
476 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
477 		updated_length = iovs[iov_index].iov_len;
478 		phys_addr = spdk_vtophys((void *)current_base, &updated_length);
479 
480 		rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
481 					  current_base,
482 					  phys_addr,
483 					  updated_length,
484 					  &g_shinfo);
485 		rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
486 		remainder = iovs[iov_index].iov_len - updated_length;
487 
488 		if (mbuf_index > 0) {
489 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
490 		}
491 
492 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
493 		if (remainder > 0) {
494 			/* allocate an mbuf at the end of the array */
495 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
496 						    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
497 			if (rc) {
498 				SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
499 				return -1;
500 			}
501 			(*mbuf_total)++;
502 			mbuf_index++;
503 			*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
504 			current_base += updated_length;
505 			phys_addr = spdk_vtophys((void *)current_base, &remainder);
506 			/* assert we don't cross another */
507 			assert(remainder == iovs[iov_index].iov_len - updated_length);
508 
509 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
510 						  current_base,
511 						  phys_addr,
512 						  remainder,
513 						  &g_shinfo);
514 			rte_pktmbuf_append(mbufs[mbuf_index], remainder);
515 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
516 		}
517 		iov_index++;
518 		mbuf_index++;
519 	}
520 
521 	return 0;
522 }
523 
524 static int
525 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
526 		    int src_iovcnt, struct iovec *dst_iovs,
527 		    int dst_iovcnt, bool compress, void *cb_arg)
528 {
529 	void *reduce_cb_arg = cb_arg;
530 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
531 					   backing_dev);
532 	struct rte_comp_op *comp_op;
533 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
534 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
535 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
536 	uint64_t total_length = 0;
537 	int rc = 0;
538 	struct vbdev_comp_op *op_to_queue;
539 	int i;
540 	int src_mbuf_total = src_iovcnt;
541 	int dst_mbuf_total = dst_iovcnt;
542 	bool device_error = false;
543 
544 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
545 
546 #ifdef DEBUG
547 	memset(src_mbufs, 0, sizeof(src_mbufs));
548 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
549 #endif
550 
551 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
552 	if (!comp_op) {
553 		SPDK_ERRLOG("trying to get a comp op!\n");
554 		goto error_get_op;
555 	}
556 
557 	/* get an mbuf per iov, src and dst */
558 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
559 	if (rc) {
560 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
561 		goto error_get_src;
562 	}
563 
564 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
565 	if (rc) {
566 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
567 		goto error_get_dst;
568 	}
569 
570 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
571 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
572 	 * and associate with our single comp_op.
573 	 */
574 
575 	rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length,
576 				  src_iovs, src_iovcnt, reduce_cb_arg);
577 	if (rc < 0) {
578 		goto error_src_dst;
579 	}
580 
581 	comp_op->m_src = src_mbufs[0];
582 	comp_op->src.offset = 0;
583 	comp_op->src.length = total_length;
584 
585 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
586 	rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL,
587 				  dst_iovs, dst_iovcnt, reduce_cb_arg);
588 	if (rc < 0) {
589 		goto error_src_dst;
590 	}
591 
592 	comp_op->m_dst = dst_mbufs[0];
593 	comp_op->dst.offset = 0;
594 
595 	if (compress == true) {
596 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
597 	} else {
598 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
599 	}
600 
601 	comp_op->op_type = RTE_COMP_OP_STATELESS;
602 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
603 
604 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
605 	assert(rc <= 1);
606 
607 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
608 	if (rc == 1) {
609 		return 0;
610 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
611 		/* we free mbufs differently depending on whether they were chained or not */
612 		rte_pktmbuf_free(comp_op->m_src);
613 		rte_pktmbuf_free(comp_op->m_dst);
614 		goto error_enqueue;
615 	} else {
616 		device_error = true;
617 		goto error_src_dst;
618 	}
619 
620 	/* Error cleanup paths. */
621 error_src_dst:
622 	for (i = 0; i < dst_mbuf_total; i++) {
623 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
624 	}
625 error_get_dst:
626 	for (i = 0; i < src_mbuf_total; i++) {
627 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
628 	}
629 error_get_src:
630 error_enqueue:
631 	rte_comp_op_free(comp_op);
632 error_get_op:
633 
634 	if (device_error == true) {
635 		/* There was an error sending the op to the device, most
636 		 * likely with the parameters.
637 		 */
638 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
639 		return -EINVAL;
640 	}
641 
642 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
643 	if (op_to_queue == NULL) {
644 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
645 		return -ENOMEM;
646 	}
647 	op_to_queue->backing_dev = backing_dev;
648 	op_to_queue->src_iovs = src_iovs;
649 	op_to_queue->src_iovcnt = src_iovcnt;
650 	op_to_queue->dst_iovs = dst_iovs;
651 	op_to_queue->dst_iovcnt = dst_iovcnt;
652 	op_to_queue->compress = compress;
653 	op_to_queue->cb_arg = cb_arg;
654 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
655 			  op_to_queue,
656 			  link);
657 	return 0;
658 }
659 
660 /* Poller for the DPDK compression driver. */
661 static int
662 comp_dev_poller(void *args)
663 {
664 	struct vbdev_compress *comp_bdev = args;
665 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
666 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
667 	uint16_t num_deq;
668 	struct spdk_reduce_vol_cb_args *reduce_args;
669 	struct vbdev_comp_op *op_to_resubmit;
670 	int rc, i;
671 
672 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
673 						NUM_MAX_INFLIGHT_OPS);
674 	for (i = 0; i < num_deq; i++) {
675 		reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
676 				uint64_t *);
677 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
678 
679 			/* tell reduce this is done and what the bytecount was */
680 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
681 		} else {
682 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
683 				       deq_ops[i]->status);
684 
685 			/* Reduce will simply store uncompressed on neg errno value. */
686 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
687 		}
688 
689 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
690 		 * call takes care of freeing all of the mbufs in the chain back to their
691 		 * original pool.
692 		 */
693 		rte_pktmbuf_free(deq_ops[i]->m_src);
694 		rte_pktmbuf_free(deq_ops[i]->m_dst);
695 
696 		/* There is no bulk free for com ops so we have to free them one at a time
697 		 * here however it would be rare that we'd ever have more than 1 at a time
698 		 * anyways.
699 		 */
700 		rte_comp_op_free(deq_ops[i]);
701 
702 		/* Check if there are any pending comp ops to process, only pull one
703 		 * at a time off as _compress_operation() may re-queue the op.
704 		 */
705 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
706 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
707 			rc = _compress_operation(op_to_resubmit->backing_dev,
708 						 op_to_resubmit->src_iovs,
709 						 op_to_resubmit->src_iovcnt,
710 						 op_to_resubmit->dst_iovs,
711 						 op_to_resubmit->dst_iovcnt,
712 						 op_to_resubmit->compress,
713 						 op_to_resubmit->cb_arg);
714 			if (rc == 0) {
715 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
716 				free(op_to_resubmit);
717 			}
718 		}
719 	}
720 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
721 }
722 
723 /* Entry point for reduce lib to issue a compress operation. */
724 static void
725 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
726 		      struct iovec *src_iovs, int src_iovcnt,
727 		      struct iovec *dst_iovs, int dst_iovcnt,
728 		      struct spdk_reduce_vol_cb_args *cb_arg)
729 {
730 	int rc;
731 
732 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
733 	if (rc) {
734 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
735 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
736 	}
737 }
738 
739 /* Entry point for reduce lib to issue a decompress operation. */
740 static void
741 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
742 			struct iovec *src_iovs, int src_iovcnt,
743 			struct iovec *dst_iovs, int dst_iovcnt,
744 			struct spdk_reduce_vol_cb_args *cb_arg)
745 {
746 	int rc;
747 
748 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
749 	if (rc) {
750 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
751 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
752 	}
753 }
754 
755 /* Callback for getting a buf from the bdev pool in the event that the caller passed
756  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
757  * beneath us before we're done with it.
758  */
759 static void
760 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
761 {
762 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
763 					   comp_bdev);
764 
765 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
766 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
767 			      reduce_rw_blocks_cb, bdev_io);
768 }
769 
770 /* scheduled for completion on IO thread */
771 static void
772 _complete_other_io(void *arg)
773 {
774 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
775 	if (io_ctx->status == 0) {
776 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
777 	} else {
778 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
779 	}
780 }
781 
782 /* scheduled for submission on reduce thread */
783 static void
784 _comp_bdev_io_submit(void *arg)
785 {
786 	struct spdk_bdev_io *bdev_io = arg;
787 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
788 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
789 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
790 					   comp_bdev);
791 	struct spdk_thread *orig_thread;
792 	int rc = 0;
793 
794 	switch (bdev_io->type) {
795 	case SPDK_BDEV_IO_TYPE_READ:
796 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
797 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
798 		return;
799 	case SPDK_BDEV_IO_TYPE_WRITE:
800 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
801 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
802 				       reduce_rw_blocks_cb, bdev_io);
803 		return;
804 	/* TODO in future patch in the series */
805 	case SPDK_BDEV_IO_TYPE_RESET:
806 		break;
807 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
808 	case SPDK_BDEV_IO_TYPE_UNMAP:
809 	case SPDK_BDEV_IO_TYPE_FLUSH:
810 	default:
811 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
812 		rc = -EINVAL;
813 	}
814 
815 	if (rc) {
816 		if (rc == -ENOMEM) {
817 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
818 			io_ctx->ch = ch;
819 			vbdev_compress_queue_io(bdev_io);
820 			return;
821 		} else {
822 			SPDK_ERRLOG("on bdev_io submission!\n");
823 			io_ctx->status = rc;
824 		}
825 	}
826 
827 	/* Complete this on the orig IO thread. */
828 	orig_thread = spdk_io_channel_get_thread(ch);
829 	if (orig_thread != spdk_get_thread()) {
830 		spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
831 	} else {
832 		_complete_other_io(io_ctx);
833 	}
834 }
835 
836 /* Called when someone above submits IO to this vbdev. */
837 static void
838 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
839 {
840 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
841 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
842 					   comp_bdev);
843 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
844 
845 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
846 	io_ctx->comp_bdev = comp_bdev;
847 	io_ctx->comp_ch = comp_ch;
848 	io_ctx->orig_io = bdev_io;
849 
850 	/* Send this request to the reduce_thread if that's not what we're on. */
851 	if (spdk_get_thread() != comp_bdev->reduce_thread) {
852 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
853 	} else {
854 		_comp_bdev_io_submit(bdev_io);
855 	}
856 }
857 
858 static bool
859 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
860 {
861 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
862 
863 	switch (io_type) {
864 	case SPDK_BDEV_IO_TYPE_READ:
865 	case SPDK_BDEV_IO_TYPE_WRITE:
866 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
867 	case SPDK_BDEV_IO_TYPE_UNMAP:
868 	case SPDK_BDEV_IO_TYPE_RESET:
869 	case SPDK_BDEV_IO_TYPE_FLUSH:
870 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
871 	default:
872 		return false;
873 	}
874 }
875 
876 /* Resubmission function used by the bdev layer when a queued IO is ready to be
877  * submitted.
878  */
879 static void
880 vbdev_compress_resubmit_io(void *arg)
881 {
882 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
883 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
884 
885 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
886 }
887 
888 /* Used to queue an IO in the event of resource issues. */
889 static void
890 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
891 {
892 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
893 	int rc;
894 
895 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
896 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
897 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
898 
899 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
900 	if (rc) {
901 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
902 		assert(false);
903 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
904 	}
905 }
906 
907 /* Callback for unregistering the IO device. */
908 static void
909 _device_unregister_cb(void *io_device)
910 {
911 	struct vbdev_compress *comp_bdev = io_device;
912 
913 	/* Done with this comp_bdev. */
914 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
915 	free(comp_bdev->comp_bdev.name);
916 	free(comp_bdev);
917 }
918 
919 static void
920 _vbdev_compress_destruct_cb(void *ctx)
921 {
922 	struct vbdev_compress *comp_bdev = ctx;
923 
924 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
925 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
926 	/* Close the underlying bdev on its same opened thread. */
927 	spdk_bdev_close(comp_bdev->base_desc);
928 	comp_bdev->vol = NULL;
929 	if (comp_bdev->orphaned == false) {
930 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
931 	} else {
932 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
933 		_device_unregister_cb(comp_bdev);
934 	}
935 }
936 
937 static void
938 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
939 {
940 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
941 
942 	if (reduce_errno) {
943 		SPDK_ERRLOG("number %d\n", reduce_errno);
944 	} else {
945 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
946 			spdk_thread_send_msg(comp_bdev->thread,
947 					     _vbdev_compress_destruct_cb, comp_bdev);
948 		} else {
949 			_vbdev_compress_destruct_cb(comp_bdev);
950 		}
951 	}
952 }
953 
954 static void
955 _reduce_destroy_cb(void *ctx, int reduce_errno)
956 {
957 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
958 
959 	if (reduce_errno) {
960 		SPDK_ERRLOG("number %d\n", reduce_errno);
961 	}
962 
963 	comp_bdev->vol = NULL;
964 	spdk_put_io_channel(comp_bdev->base_ch);
965 	if (comp_bdev->orphaned == false) {
966 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
967 				     comp_bdev->delete_ctx);
968 	} else {
969 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
970 	}
971 
972 }
973 
974 static void
975 _delete_vol_unload_cb(void *ctx)
976 {
977 	struct vbdev_compress *comp_bdev = ctx;
978 
979 	/* FIXME: Assert if these conditions are not satisified for now. */
980 	assert(!comp_bdev->reduce_thread ||
981 	       comp_bdev->reduce_thread == spdk_get_thread());
982 
983 	/* reducelib needs a channel to comm with the backing device */
984 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
985 
986 	/* Clean the device before we free our resources. */
987 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
988 }
989 
990 /* Called by reduceLib after performing unload vol actions */
991 static void
992 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
993 {
994 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
995 
996 	if (reduce_errno) {
997 		SPDK_ERRLOG("number %d\n", reduce_errno);
998 		/* FIXME: callback should be executed. */
999 		return;
1000 	}
1001 
1002 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1003 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
1004 		spdk_thread_send_msg(comp_bdev->reduce_thread,
1005 				     _delete_vol_unload_cb, comp_bdev);
1006 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1007 	} else {
1008 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1009 
1010 		_delete_vol_unload_cb(comp_bdev);
1011 	}
1012 }
1013 
1014 const char *
1015 compress_get_name(const struct vbdev_compress *comp_bdev)
1016 {
1017 	return comp_bdev->comp_bdev.name;
1018 }
1019 
1020 struct vbdev_compress *
1021 compress_bdev_first(void)
1022 {
1023 	struct vbdev_compress *comp_bdev;
1024 
1025 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1026 
1027 	return comp_bdev;
1028 }
1029 
1030 struct vbdev_compress *
1031 compress_bdev_next(struct vbdev_compress *prev)
1032 {
1033 	struct vbdev_compress *comp_bdev;
1034 
1035 	comp_bdev = TAILQ_NEXT(prev, link);
1036 
1037 	return comp_bdev;
1038 }
1039 
1040 bool
1041 compress_has_orphan(const char *name)
1042 {
1043 	struct vbdev_compress *comp_bdev;
1044 
1045 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1046 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1047 			return true;
1048 		}
1049 	}
1050 	return false;
1051 }
1052 
1053 /* Called after we've unregistered following a hot remove callback.
1054  * Our finish entry point will be called next.
1055  */
1056 static int
1057 vbdev_compress_destruct(void *ctx)
1058 {
1059 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1060 
1061 	if (comp_bdev->vol != NULL) {
1062 		/* Tell reducelib that we're done with this volume. */
1063 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1064 	} else {
1065 		vbdev_compress_destruct_cb(comp_bdev, 0);
1066 	}
1067 
1068 	return 0;
1069 }
1070 
1071 /* We supplied this as an entry point for upper layers who want to communicate to this
1072  * bdev.  This is how they get a channel.
1073  */
1074 static struct spdk_io_channel *
1075 vbdev_compress_get_io_channel(void *ctx)
1076 {
1077 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1078 
1079 	/* The IO channel code will allocate a channel for us which consists of
1080 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1081 	 * that we passed in when we registered our IO device. It will then call
1082 	 * our channel create callback to populate any elements that we need to
1083 	 * update.
1084 	 */
1085 	return spdk_get_io_channel(comp_bdev);
1086 }
1087 
1088 /* This is the output for bdev_get_bdevs() for this vbdev */
1089 static int
1090 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1091 {
1092 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1093 
1094 	spdk_json_write_name(w, "compress");
1095 	spdk_json_write_object_begin(w);
1096 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1097 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1098 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1099 	spdk_json_write_object_end(w);
1100 
1101 	return 0;
1102 }
1103 
1104 /* This is used to generate JSON that can configure this module to its current state. */
1105 static int
1106 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1107 {
1108 	struct vbdev_compress *comp_bdev;
1109 
1110 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1111 		spdk_json_write_object_begin(w);
1112 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1113 		spdk_json_write_named_object_begin(w, "params");
1114 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1115 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1116 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1117 		spdk_json_write_object_end(w);
1118 		spdk_json_write_object_end(w);
1119 	}
1120 	return 0;
1121 }
1122 
1123 static void
1124 _vbdev_reduce_init_cb(void *ctx)
1125 {
1126 	struct vbdev_compress *meta_ctx = ctx;
1127 	int rc;
1128 
1129 	assert(meta_ctx->base_desc != NULL);
1130 
1131 	/* We're done with metadata operations */
1132 	spdk_put_io_channel(meta_ctx->base_ch);
1133 
1134 	if (meta_ctx->vol) {
1135 		rc = vbdev_compress_claim(meta_ctx);
1136 		if (rc == 0) {
1137 			return;
1138 		}
1139 	}
1140 
1141 	/* Close the underlying bdev on its same opened thread. */
1142 	spdk_bdev_close(meta_ctx->base_desc);
1143 	free(meta_ctx);
1144 }
1145 
1146 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1147  * used for initial metadata operations to claim where it will be further filled out
1148  * and added to the global list.
1149  */
1150 static void
1151 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1152 {
1153 	struct vbdev_compress *meta_ctx = cb_arg;
1154 
1155 	if (reduce_errno == 0) {
1156 		meta_ctx->vol = vol;
1157 	} else {
1158 		SPDK_ERRLOG("for vol %s, error %u\n",
1159 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1160 	}
1161 
1162 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1163 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
1164 	} else {
1165 		_vbdev_reduce_init_cb(meta_ctx);
1166 	}
1167 }
1168 
1169 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1170  * call the callback provided by reduceLib when it called the read/write/unmap function and
1171  * free the bdev_io.
1172  */
1173 static void
1174 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1175 {
1176 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1177 	int reduce_errno;
1178 
1179 	if (success) {
1180 		reduce_errno = 0;
1181 	} else {
1182 		reduce_errno = -EIO;
1183 	}
1184 	spdk_bdev_free_io(bdev_io);
1185 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1186 }
1187 
1188 /* This is the function provided to the reduceLib for sending reads directly to
1189  * the backing device.
1190  */
1191 static void
1192 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1193 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1194 {
1195 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1196 					   backing_dev);
1197 	int rc;
1198 
1199 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1200 				    iov, iovcnt, lba, lba_count,
1201 				    comp_reduce_io_cb,
1202 				    args);
1203 	if (rc) {
1204 		if (rc == -ENOMEM) {
1205 			SPDK_ERRLOG("No memory, start to queue io.\n");
1206 			/* TODO: there's no bdev_io to queue */
1207 		} else {
1208 			SPDK_ERRLOG("submitting readv request\n");
1209 		}
1210 		args->cb_fn(args->cb_arg, rc);
1211 	}
1212 }
1213 
1214 /* This is the function provided to the reduceLib for sending writes directly to
1215  * the backing device.
1216  */
1217 static void
1218 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1219 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1220 {
1221 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1222 					   backing_dev);
1223 	int rc;
1224 
1225 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1226 				     iov, iovcnt, lba, lba_count,
1227 				     comp_reduce_io_cb,
1228 				     args);
1229 	if (rc) {
1230 		if (rc == -ENOMEM) {
1231 			SPDK_ERRLOG("No memory, start to queue io.\n");
1232 			/* TODO: there's no bdev_io to queue */
1233 		} else {
1234 			SPDK_ERRLOG("error submitting writev request\n");
1235 		}
1236 		args->cb_fn(args->cb_arg, rc);
1237 	}
1238 }
1239 
1240 /* This is the function provided to the reduceLib for sending unmaps directly to
1241  * the backing device.
1242  */
1243 static void
1244 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1245 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1246 {
1247 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1248 					   backing_dev);
1249 	int rc;
1250 
1251 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1252 				    lba, lba_count,
1253 				    comp_reduce_io_cb,
1254 				    args);
1255 
1256 	if (rc) {
1257 		if (rc == -ENOMEM) {
1258 			SPDK_ERRLOG("No memory, start to queue io.\n");
1259 			/* TODO: there's no bdev_io to queue */
1260 		} else {
1261 			SPDK_ERRLOG("submitting unmap request\n");
1262 		}
1263 		args->cb_fn(args->cb_arg, rc);
1264 	}
1265 }
1266 
1267 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1268 static void
1269 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1270 {
1271 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1272 
1273 	if (reduce_errno) {
1274 		SPDK_ERRLOG("number %d\n", reduce_errno);
1275 	}
1276 
1277 	comp_bdev->vol = NULL;
1278 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1279 }
1280 
1281 static void
1282 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1283 {
1284 	struct vbdev_compress *comp_bdev, *tmp;
1285 
1286 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1287 		if (bdev_find == comp_bdev->base_bdev) {
1288 			/* Tell reduceLib that we're done with this volume. */
1289 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1290 		}
1291 	}
1292 }
1293 
1294 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1295 static void
1296 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1297 				  void *event_ctx)
1298 {
1299 	switch (type) {
1300 	case SPDK_BDEV_EVENT_REMOVE:
1301 		vbdev_compress_base_bdev_hotremove_cb(bdev);
1302 		break;
1303 	default:
1304 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1305 		break;
1306 	}
1307 }
1308 
1309 /* TODO: determine which parms we want user configurable, HC for now
1310  * params.vol_size
1311  * params.chunk_size
1312  * compression PMD, algorithm, window size, comp level, etc.
1313  * DEV_MD_PATH
1314  */
1315 
1316 /* Common function for init and load to allocate and populate the minimal
1317  * information for reducelib to init or load.
1318  */
1319 struct vbdev_compress *
1320 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
1321 {
1322 	struct vbdev_compress *meta_ctx;
1323 	struct spdk_bdev *bdev;
1324 
1325 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1326 	if (meta_ctx == NULL) {
1327 		SPDK_ERRLOG("failed to alloc init contexts\n");
1328 		return NULL;
1329 	}
1330 
1331 	meta_ctx->drv_name = "None";
1332 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1333 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1334 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1335 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1336 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1337 
1338 	meta_ctx->base_desc = bdev_desc;
1339 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
1340 	meta_ctx->base_bdev = bdev;
1341 
1342 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1343 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1344 
1345 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1346 	if (lb_size == 0) {
1347 		meta_ctx->params.logical_block_size = bdev->blocklen;
1348 	} else {
1349 		meta_ctx->params.logical_block_size = lb_size;
1350 	}
1351 
1352 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1353 	return meta_ctx;
1354 }
1355 
1356 static bool
1357 _set_pmd(struct vbdev_compress *comp_dev)
1358 {
1359 	if (g_opts == COMPRESS_PMD_AUTO) {
1360 		if (g_qat_available) {
1361 			comp_dev->drv_name = QAT_PMD;
1362 		} else {
1363 			comp_dev->drv_name = ISAL_PMD;
1364 		}
1365 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1366 		comp_dev->drv_name = QAT_PMD;
1367 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1368 		comp_dev->drv_name = ISAL_PMD;
1369 	} else {
1370 		SPDK_ERRLOG("Requested PMD is not available.\n");
1371 		return false;
1372 	}
1373 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1374 	return true;
1375 }
1376 
1377 /* Call reducelib to initialize a new volume */
1378 static int
1379 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1380 {
1381 	struct spdk_bdev_desc *bdev_desc = NULL;
1382 	struct vbdev_compress *meta_ctx;
1383 	int rc;
1384 
1385 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1386 				NULL, &bdev_desc);
1387 	if (rc) {
1388 		SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
1389 		return rc;
1390 	}
1391 
1392 	meta_ctx = _prepare_for_load_init(bdev_desc, lb_size);
1393 	if (meta_ctx == NULL) {
1394 		spdk_bdev_close(bdev_desc);
1395 		return -EINVAL;
1396 	}
1397 
1398 	if (_set_pmd(meta_ctx) == false) {
1399 		SPDK_ERRLOG("could not find required pmd\n");
1400 		free(meta_ctx);
1401 		spdk_bdev_close(bdev_desc);
1402 		return -EINVAL;
1403 	}
1404 
1405 	/* Save the thread where the base device is opened */
1406 	meta_ctx->thread = spdk_get_thread();
1407 
1408 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1409 
1410 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1411 			     pm_path,
1412 			     vbdev_reduce_init_cb,
1413 			     meta_ctx);
1414 	return 0;
1415 }
1416 
1417 /* We provide this callback for the SPDK channel code to create a channel using
1418  * the channel struct we provided in our module get_io_channel() entry point. Here
1419  * we get and save off an underlying base channel of the device below us so that
1420  * we can communicate with the base bdev on a per channel basis.  If we needed
1421  * our own poller for this vbdev, we'd register it here.
1422  */
1423 static int
1424 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1425 {
1426 	struct vbdev_compress *comp_bdev = io_device;
1427 	struct comp_device_qp *device_qp;
1428 
1429 	/* Now set the reduce channel if it's not already set. */
1430 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1431 	if (comp_bdev->ch_count == 0) {
1432 		/* We use this queue to track outstanding IO in our layer. */
1433 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1434 
1435 		/* We use this to queue up compression operations as needed. */
1436 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1437 
1438 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1439 		comp_bdev->reduce_thread = spdk_get_thread();
1440 		comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
1441 		/* Now assign a q pair */
1442 		pthread_mutex_lock(&g_comp_device_qp_lock);
1443 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1444 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1445 				if (device_qp->thread == spdk_get_thread()) {
1446 					comp_bdev->device_qp = device_qp;
1447 					break;
1448 				}
1449 				if (device_qp->thread == NULL) {
1450 					comp_bdev->device_qp = device_qp;
1451 					device_qp->thread = spdk_get_thread();
1452 					break;
1453 				}
1454 			}
1455 		}
1456 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1457 	}
1458 	comp_bdev->ch_count++;
1459 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1460 
1461 	if (comp_bdev->device_qp != NULL) {
1462 		return 0;
1463 	} else {
1464 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1465 		assert(false);
1466 		return -ENOMEM;
1467 	}
1468 }
1469 
1470 static void
1471 _channel_cleanup(struct vbdev_compress *comp_bdev)
1472 {
1473 	/* Note: comp_bdevs can share a device_qp if they are
1474 	 * on the same thread so we leave the device_qp element
1475 	 * alone for this comp_bdev and just clear the reduce thread.
1476 	 */
1477 	spdk_put_io_channel(comp_bdev->base_ch);
1478 	comp_bdev->reduce_thread = NULL;
1479 	spdk_poller_unregister(&comp_bdev->poller);
1480 }
1481 
1482 /* Used to reroute destroy_ch to the correct thread */
1483 static void
1484 _comp_bdev_ch_destroy_cb(void *arg)
1485 {
1486 	struct vbdev_compress *comp_bdev = arg;
1487 
1488 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1489 	_channel_cleanup(comp_bdev);
1490 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1491 }
1492 
1493 /* We provide this callback for the SPDK channel code to destroy a channel
1494  * created with our create callback. We just need to undo anything we did
1495  * when we created. If this bdev used its own poller, we'd unregister it here.
1496  */
1497 static void
1498 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1499 {
1500 	struct vbdev_compress *comp_bdev = io_device;
1501 
1502 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1503 	comp_bdev->ch_count--;
1504 	if (comp_bdev->ch_count == 0) {
1505 		/* Send this request to the thread where the channel was created. */
1506 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1507 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1508 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1509 		} else {
1510 			_channel_cleanup(comp_bdev);
1511 		}
1512 	}
1513 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1514 }
1515 
1516 /* RPC entry point for compression vbdev creation. */
1517 int
1518 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1519 {
1520 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1521 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1522 		return -EINVAL;
1523 	}
1524 
1525 	return vbdev_init_reduce(bdev_name, pm_path, lb_size);
1526 }
1527 
1528 /* On init, just init the compress drivers. All metadata is stored on disk. */
1529 static int
1530 vbdev_compress_init(void)
1531 {
1532 	if (vbdev_init_compress_drivers()) {
1533 		SPDK_ERRLOG("Error setting up compression devices\n");
1534 		return -EINVAL;
1535 	}
1536 
1537 	return 0;
1538 }
1539 
1540 /* Called when the entire module is being torn down. */
1541 static void
1542 vbdev_compress_finish(void)
1543 {
1544 	struct comp_device_qp *dev_qp;
1545 	/* TODO: unload vol in a future patch */
1546 
1547 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1548 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1549 		free(dev_qp);
1550 	}
1551 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1552 
1553 	rte_mempool_free(g_comp_op_mp);
1554 	rte_mempool_free(g_mbuf_mp);
1555 }
1556 
1557 /* During init we'll be asked how much memory we'd like passed to us
1558  * in bev_io structures as context. Here's where we specify how
1559  * much context we want per IO.
1560  */
1561 static int
1562 vbdev_compress_get_ctx_size(void)
1563 {
1564 	return sizeof(struct comp_bdev_io);
1565 }
1566 
1567 /* When we register our bdev this is how we specify our entry points. */
1568 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1569 	.destruct		= vbdev_compress_destruct,
1570 	.submit_request		= vbdev_compress_submit_request,
1571 	.io_type_supported	= vbdev_compress_io_type_supported,
1572 	.get_io_channel		= vbdev_compress_get_io_channel,
1573 	.dump_info_json		= vbdev_compress_dump_info_json,
1574 	.write_config_json	= NULL,
1575 };
1576 
1577 static struct spdk_bdev_module compress_if = {
1578 	.name = "compress",
1579 	.module_init = vbdev_compress_init,
1580 	.get_ctx_size = vbdev_compress_get_ctx_size,
1581 	.examine_disk = vbdev_compress_examine,
1582 	.module_fini = vbdev_compress_finish,
1583 	.config_json = vbdev_compress_config_json
1584 };
1585 
1586 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1587 
1588 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1589 {
1590 	struct spdk_bdev_alias *aliases;
1591 
1592 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1593 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1594 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1595 		if (!comp_bdev->comp_bdev.name) {
1596 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1597 			return -ENOMEM;
1598 		}
1599 	} else {
1600 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1601 		if (!comp_bdev->comp_bdev.name) {
1602 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1603 			return -ENOMEM;
1604 		}
1605 	}
1606 	return 0;
1607 }
1608 
1609 static int
1610 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1611 {
1612 	int rc;
1613 
1614 	if (_set_compbdev_name(comp_bdev)) {
1615 		return -EINVAL;
1616 	}
1617 
1618 	/* Note: some of the fields below will change in the future - for example,
1619 	 * blockcnt specifically will not match (the compressed volume size will
1620 	 * be slightly less than the base bdev size)
1621 	 */
1622 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1623 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1624 
1625 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1626 		comp_bdev->comp_bdev.required_alignment =
1627 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1628 				 comp_bdev->base_bdev->required_alignment);
1629 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1630 			       comp_bdev->comp_bdev.required_alignment);
1631 	} else {
1632 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1633 	}
1634 	comp_bdev->comp_bdev.optimal_io_boundary =
1635 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1636 
1637 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1638 
1639 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1640 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1641 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1642 
1643 	/* This is the context that is passed to us when the bdev
1644 	 * layer calls in so we'll save our comp_bdev node here.
1645 	 */
1646 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1647 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1648 	comp_bdev->comp_bdev.module = &compress_if;
1649 
1650 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1651 
1652 	/* Save the thread where the base device is opened */
1653 	comp_bdev->thread = spdk_get_thread();
1654 
1655 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1656 				sizeof(struct comp_io_channel),
1657 				comp_bdev->comp_bdev.name);
1658 
1659 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1660 					 comp_bdev->comp_bdev.module);
1661 	if (rc) {
1662 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1663 		goto error_claim;
1664 	}
1665 
1666 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1667 	if (rc < 0) {
1668 		SPDK_ERRLOG("trying to register bdev\n");
1669 		goto error_bdev_register;
1670 	}
1671 
1672 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1673 
1674 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1675 
1676 	return 0;
1677 
1678 	/* Error cleanup paths. */
1679 error_bdev_register:
1680 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1681 error_claim:
1682 	spdk_io_device_unregister(comp_bdev, NULL);
1683 	free(comp_bdev->comp_bdev.name);
1684 	return rc;
1685 }
1686 
1687 static void
1688 _vbdev_compress_delete_done(void *_ctx)
1689 {
1690 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1691 
1692 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1693 
1694 	free(ctx);
1695 }
1696 
1697 static void
1698 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1699 {
1700 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1701 
1702 	ctx->cb_rc = bdeverrno;
1703 
1704 	if (ctx->orig_thread != spdk_get_thread()) {
1705 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1706 	} else {
1707 		_vbdev_compress_delete_done(ctx);
1708 	}
1709 }
1710 
1711 void
1712 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1713 {
1714 	struct vbdev_compress *comp_bdev = NULL;
1715 	struct vbdev_comp_delete_ctx *ctx;
1716 
1717 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1718 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1719 			break;
1720 		}
1721 	}
1722 
1723 	if (comp_bdev == NULL) {
1724 		cb_fn(cb_arg, -ENODEV);
1725 		return;
1726 	}
1727 
1728 	ctx = calloc(1, sizeof(*ctx));
1729 	if (ctx == NULL) {
1730 		SPDK_ERRLOG("Failed to allocate delete context\n");
1731 		cb_fn(cb_arg, -ENOMEM);
1732 		return;
1733 	}
1734 
1735 	/* Save these for after the vol is destroyed. */
1736 	ctx->cb_fn = cb_fn;
1737 	ctx->cb_arg = cb_arg;
1738 	ctx->orig_thread = spdk_get_thread();
1739 
1740 	comp_bdev->delete_ctx = ctx;
1741 
1742 	/* Tell reducelib that we're done with this volume. */
1743 	if (comp_bdev->orphaned == false) {
1744 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1745 	} else {
1746 		delete_vol_unload_cb(comp_bdev, 0);
1747 	}
1748 }
1749 
1750 static void
1751 _vbdev_reduce_load_cb(void *ctx)
1752 {
1753 	struct vbdev_compress *meta_ctx = ctx;
1754 	int rc;
1755 
1756 	assert(meta_ctx->base_desc != NULL);
1757 
1758 	/* Done with metadata operations */
1759 	spdk_put_io_channel(meta_ctx->base_ch);
1760 
1761 	if (meta_ctx->reduce_errno == 0) {
1762 		if (_set_pmd(meta_ctx) == false) {
1763 			SPDK_ERRLOG("could not find required pmd\n");
1764 			goto err;
1765 		}
1766 
1767 		rc = vbdev_compress_claim(meta_ctx);
1768 		if (rc != 0) {
1769 			goto err;
1770 		}
1771 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1772 		if (_set_compbdev_name(meta_ctx)) {
1773 			goto err;
1774 		}
1775 
1776 		/* Save the thread where the base device is opened */
1777 		meta_ctx->thread = spdk_get_thread();
1778 
1779 		meta_ctx->comp_bdev.module = &compress_if;
1780 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1781 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1782 						 meta_ctx->comp_bdev.module);
1783 		if (rc) {
1784 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1785 			free(meta_ctx->comp_bdev.name);
1786 			goto err;
1787 		}
1788 
1789 		meta_ctx->orphaned = true;
1790 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1791 	} else {
1792 		if (meta_ctx->reduce_errno != -EILSEQ) {
1793 			SPDK_ERRLOG("for vol %s, error %u\n",
1794 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1795 		}
1796 		goto err;
1797 	}
1798 
1799 	spdk_bdev_module_examine_done(&compress_if);
1800 	return;
1801 
1802 err:
1803 	/* Close the underlying bdev on its same opened thread. */
1804 	spdk_bdev_close(meta_ctx->base_desc);
1805 	free(meta_ctx);
1806 	spdk_bdev_module_examine_done(&compress_if);
1807 }
1808 
1809 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1810  * used for initial metadata operations to claim where it will be further filled out
1811  * and added to the global list.
1812  */
1813 static void
1814 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1815 {
1816 	struct vbdev_compress *meta_ctx = cb_arg;
1817 
1818 	if (reduce_errno == 0) {
1819 		/* Update information following volume load. */
1820 		meta_ctx->vol = vol;
1821 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1822 		       sizeof(struct spdk_reduce_vol_params));
1823 	}
1824 
1825 	meta_ctx->reduce_errno = reduce_errno;
1826 
1827 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1828 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1829 	} else {
1830 		_vbdev_reduce_load_cb(meta_ctx);
1831 	}
1832 
1833 }
1834 
1835 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1836  * and if so will go ahead and claim it.
1837  */
1838 static void
1839 vbdev_compress_examine(struct spdk_bdev *bdev)
1840 {
1841 	struct spdk_bdev_desc *bdev_desc = NULL;
1842 	struct vbdev_compress *meta_ctx;
1843 	int rc;
1844 
1845 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1846 		spdk_bdev_module_examine_done(&compress_if);
1847 		return;
1848 	}
1849 
1850 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1851 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1852 	if (rc) {
1853 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1854 		spdk_bdev_module_examine_done(&compress_if);
1855 		return;
1856 	}
1857 
1858 	meta_ctx = _prepare_for_load_init(bdev_desc, 0);
1859 	if (meta_ctx == NULL) {
1860 		spdk_bdev_close(bdev_desc);
1861 		spdk_bdev_module_examine_done(&compress_if);
1862 		return;
1863 	}
1864 
1865 	/* Save the thread where the base device is opened */
1866 	meta_ctx->thread = spdk_get_thread();
1867 
1868 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1869 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1870 }
1871 
1872 int
1873 compress_set_pmd(enum compress_pmd *opts)
1874 {
1875 	g_opts = *opts;
1876 
1877 	return 0;
1878 }
1879 
1880 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1881