xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 94a84ae98590bea46939eb1dcd7a9876bd393b54)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_compress.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/stdinc.h"
38 #include "spdk/rpc.h"
39 #include "spdk/env.h"
40 #include "spdk/conf.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 
54 #define NUM_MAX_XFORMS 2
55 #define NUM_MAX_INFLIGHT_OPS 128
56 #define DEFAULT_WINDOW_SIZE 15
57 /* We need extra mbufs per operation to accommodate host buffers that
58  *  span a 2MB boundary.
59  */
60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
61 #define CHUNK_SIZE (1024 * 16)
62 #define COMP_BDEV_NAME "compress"
63 #define BACKING_IO_SZ (4 * 1024)
64 
65 #define ISAL_PMD "compress_isal"
66 #define QAT_PMD "compress_qat"
67 #define NUM_MBUFS		8192
68 #define POOL_CACHE_SIZE		256
69 
70 static enum compress_pmd g_opts;
71 
72 /* Global list of available compression devices. */
73 struct compress_dev {
74 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
75 	uint8_t				cdev_id;	/* identifier for the device */
76 	void				*comp_xform;	/* shared private xform for comp on this PMD */
77 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
78 	TAILQ_ENTRY(compress_dev)	link;
79 };
80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
81 
82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
83  * the length of the internal ring name that it creates, it breaks a limit in the generic
84  * ring code and fails the qp initialization.
85  */
86 #define MAX_NUM_QP 99
87 /* Global list and lock for unique device/queue pair combos */
88 struct comp_device_qp {
89 	struct compress_dev		*device;	/* ptr to compression device */
90 	uint8_t				qp;		/* queue pair for this node */
91 	struct spdk_thread		*thread;	/* thead that this qp is assigned to */
92 	TAILQ_ENTRY(comp_device_qp)	link;
93 };
94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
96 
97 /* For queueing up compression operations that we can't submit for some reason */
98 struct vbdev_comp_op {
99 	struct spdk_reduce_backing_dev	*backing_dev;
100 	struct iovec			*src_iovs;
101 	int				src_iovcnt;
102 	struct iovec			*dst_iovs;
103 	int				dst_iovcnt;
104 	bool				compress;
105 	void				*cb_arg;
106 	TAILQ_ENTRY(vbdev_comp_op)	link;
107 };
108 
109 struct vbdev_comp_delete_ctx {
110 	spdk_delete_compress_complete	cb_fn;
111 	void				*cb_arg;
112 	int				cb_rc;
113 	struct spdk_thread		*orig_thread;
114 };
115 
116 /* List of virtual bdevs and associated info for each. */
117 struct vbdev_compress {
118 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
119 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
120 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
121 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
122 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
123 	char				*drv_name;	/* name of the compression device driver */
124 	struct comp_device_qp		*device_qp;
125 	struct spdk_thread		*reduce_thread;
126 	pthread_mutex_t			reduce_lock;
127 	uint32_t			ch_count;
128 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
129 	struct spdk_poller		*poller;	/* completion poller */
130 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
131 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
132 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
133 	struct vbdev_comp_delete_ctx	*delete_ctx;
134 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
135 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
136 	TAILQ_ENTRY(vbdev_compress)	link;
137 	struct spdk_thread		*thread;	/* thread where base device is opened */
138 };
139 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
140 
141 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
142  */
143 struct comp_io_channel {
144 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
145 };
146 
147 /* Per I/O context for the compression vbdev. */
148 struct comp_bdev_io {
149 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
150 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
151 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
152 	struct spdk_bdev_io		*orig_io;		/* the original IO */
153 	struct spdk_io_channel		*ch;			/* for resubmission */
154 	int				status;			/* save for completion on orig thread */
155 };
156 
157 /* Shared mempools between all devices on this system */
158 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
159 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
160 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
161 static bool g_qat_available = false;
162 static bool g_isal_available = false;
163 
164 /* Create shared (between all ops per PMD) compress xforms. */
165 static struct rte_comp_xform g_comp_xform = {
166 	.type = RTE_COMP_COMPRESS,
167 	.compress = {
168 		.algo = RTE_COMP_ALGO_DEFLATE,
169 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
170 		.level = RTE_COMP_LEVEL_MAX,
171 		.window_size = DEFAULT_WINDOW_SIZE,
172 		.chksum = RTE_COMP_CHECKSUM_NONE,
173 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
174 	}
175 };
176 /* Create shared (between all ops per PMD) decompress xforms. */
177 static struct rte_comp_xform g_decomp_xform = {
178 	.type = RTE_COMP_DECOMPRESS,
179 	.decompress = {
180 		.algo = RTE_COMP_ALGO_DEFLATE,
181 		.chksum = RTE_COMP_CHECKSUM_NONE,
182 		.window_size = DEFAULT_WINDOW_SIZE,
183 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
184 	}
185 };
186 
187 static void vbdev_compress_examine(struct spdk_bdev *bdev);
188 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
189 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
190 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
191 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
192 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
193 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
194 
195 /* Dummy function used by DPDK to free ext attached buffers
196  * to mbufs, we free them ourselves but this callback has to
197  * be here.
198  */
199 static void
200 shinfo_free_cb(void *arg1, void *arg2)
201 {
202 }
203 
204 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
205 static int
206 create_compress_dev(uint8_t index)
207 {
208 	struct compress_dev *device;
209 	uint16_t q_pairs;
210 	uint8_t cdev_id;
211 	int rc, i;
212 	struct comp_device_qp *dev_qp;
213 	struct comp_device_qp *tmp_qp;
214 
215 	device = calloc(1, sizeof(struct compress_dev));
216 	if (!device) {
217 		return -ENOMEM;
218 	}
219 
220 	/* Get details about this device. */
221 	rte_compressdev_info_get(index, &device->cdev_info);
222 
223 	cdev_id = device->cdev_id = index;
224 
225 	/* Zero means no limit so choose number of lcores. */
226 	if (device->cdev_info.max_nb_queue_pairs == 0) {
227 		q_pairs = MAX_NUM_QP;
228 	} else {
229 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
230 	}
231 
232 	/* Configure the compression device. */
233 	struct rte_compressdev_config config = {
234 		.socket_id = rte_socket_id(),
235 		.nb_queue_pairs = q_pairs,
236 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
237 		.max_nb_streams = 0
238 	};
239 	rc = rte_compressdev_configure(cdev_id, &config);
240 	if (rc < 0) {
241 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
242 		goto err;
243 	}
244 
245 	/* Pre-setup all potential qpairs now and assign them in the channel
246 	 * callback.
247 	 */
248 	for (i = 0; i < q_pairs; i++) {
249 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
250 						      NUM_MAX_INFLIGHT_OPS,
251 						      rte_socket_id());
252 		if (rc) {
253 			if (i > 0) {
254 				q_pairs = i;
255 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
256 					       "compressdev %u with error %u "
257 					       "so limiting to %u qpairs\n",
258 					       cdev_id, rc, q_pairs);
259 				break;
260 			} else {
261 				SPDK_ERRLOG("Failed to setup queue pair on "
262 					    "compressdev %u with error %u\n", cdev_id, rc);
263 				rc = -EINVAL;
264 				goto err;
265 			}
266 		}
267 	}
268 
269 	rc = rte_compressdev_start(cdev_id);
270 	if (rc < 0) {
271 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
272 			    cdev_id, rc);
273 		goto err;
274 	}
275 
276 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
277 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
278 				&device->comp_xform);
279 		if (rc < 0) {
280 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
281 				    cdev_id, rc);
282 			goto err;
283 		}
284 
285 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
286 				&device->decomp_xform);
287 		if (rc) {
288 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
289 				    cdev_id, rc);
290 			goto err;
291 		}
292 	} else {
293 		SPDK_ERRLOG("PMD does not support shared transforms\n");
294 		goto err;
295 	}
296 
297 	/* Build up list of device/qp combinations */
298 	for (i = 0; i < q_pairs; i++) {
299 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
300 		if (!dev_qp) {
301 			rc = -ENOMEM;
302 			goto err;
303 		}
304 		dev_qp->device = device;
305 		dev_qp->qp = i;
306 		dev_qp->thread = NULL;
307 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
308 	}
309 
310 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
311 
312 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
313 		g_qat_available = true;
314 	}
315 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
316 		g_isal_available = true;
317 	}
318 
319 	return 0;
320 
321 err:
322 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
323 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
324 		free(dev_qp);
325 	}
326 	free(device);
327 	return rc;
328 }
329 
330 /* Called from driver init entry point, vbdev_compress_init() */
331 static int
332 vbdev_init_compress_drivers(void)
333 {
334 	uint8_t cdev_count, i;
335 	struct compress_dev *tmp_dev;
336 	struct compress_dev *device;
337 	int rc;
338 
339 	/* We always init the compress_isal PMD */
340 	rc = rte_vdev_init(ISAL_PMD, NULL);
341 	if (rc == 0) {
342 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
343 	} else if (rc == -EEXIST) {
344 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
345 	} else {
346 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
347 		return -EINVAL;
348 	}
349 
350 	/* If we have no compression devices, there's no reason to continue. */
351 	cdev_count = rte_compressdev_count();
352 	if (cdev_count == 0) {
353 		return 0;
354 	}
355 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
356 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
357 		return -EINVAL;
358 	}
359 
360 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
361 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
362 	if (g_mbuf_mp == NULL) {
363 		SPDK_ERRLOG("Cannot create mbuf pool\n");
364 		rc = -ENOMEM;
365 		goto error_create_mbuf;
366 	}
367 
368 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
369 					       0, rte_socket_id());
370 	if (g_comp_op_mp == NULL) {
371 		SPDK_ERRLOG("Cannot create comp op pool\n");
372 		rc = -ENOMEM;
373 		goto error_create_op;
374 	}
375 
376 	/* Init all devices */
377 	for (i = 0; i < cdev_count; i++) {
378 		rc = create_compress_dev(i);
379 		if (rc != 0) {
380 			goto error_create_compress_devs;
381 		}
382 	}
383 
384 	if (g_qat_available == true) {
385 		SPDK_NOTICELOG("initialized QAT PMD\n");
386 	}
387 
388 	g_shinfo.free_cb = shinfo_free_cb;
389 
390 	return 0;
391 
392 	/* Error cleanup paths. */
393 error_create_compress_devs:
394 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
395 		TAILQ_REMOVE(&g_compress_devs, device, link);
396 		free(device);
397 	}
398 error_create_op:
399 error_create_mbuf:
400 	rte_mempool_free(g_mbuf_mp);
401 
402 	return rc;
403 }
404 
405 /* for completing rw requests on the orig IO thread. */
406 static void
407 _spdk_reduce_rw_blocks_cb(void *arg)
408 {
409 	struct comp_bdev_io *io_ctx = arg;
410 
411 	if (io_ctx->status == 0) {
412 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
413 	} else {
414 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
415 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
416 	}
417 }
418 
419 /* Completion callback for r/w that were issued via reducelib. */
420 static void
421 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
422 {
423 	struct spdk_bdev_io *bdev_io = arg;
424 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
425 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
426 
427 	/* TODO: need to decide which error codes are bdev_io success vs failure;
428 	 * example examine calls reading metadata */
429 
430 	io_ctx->status = reduce_errno;
431 
432 	/* Send this request to the orig IO thread. */
433 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
434 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx);
435 	} else {
436 		_spdk_reduce_rw_blocks_cb(io_ctx);
437 	}
438 }
439 
440 static int
441 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
442 		    int src_iovcnt, struct iovec *dst_iovs,
443 		    int dst_iovcnt, bool compress, void *cb_arg)
444 {
445 	void *reduce_cb_arg = cb_arg;
446 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
447 					   backing_dev);
448 	struct rte_comp_op *comp_op;
449 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
450 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
451 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
452 	uint64_t updated_length, remainder, phys_addr, total_length = 0;
453 	uint8_t *current_src_base = NULL;
454 	uint8_t *current_dst_base = NULL;
455 	int iov_index, mbuf_index;
456 	int rc = 0;
457 	struct vbdev_comp_op *op_to_queue;
458 	int i;
459 	int src_mbuf_total = src_iovcnt;
460 	int dst_mbuf_total = dst_iovcnt;
461 	bool device_error = false;
462 
463 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
464 
465 #ifdef DEBUG
466 	memset(src_mbufs, 0, sizeof(src_mbufs));
467 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
468 #endif
469 
470 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
471 	if (!comp_op) {
472 		SPDK_ERRLOG("trying to get a comp op!\n");
473 		goto error_get_op;
474 	}
475 
476 	/* get an mbuf per iov, src and dst */
477 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
478 	if (rc) {
479 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
480 		goto error_get_src;
481 	}
482 
483 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
484 	if (rc) {
485 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
486 		goto error_get_dst;
487 	}
488 
489 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
490 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
491 	 * and associate with our single comp_op.
492 	 */
493 
494 	/* Setup src mbufs */
495 	iov_index = mbuf_index = 0;
496 	while (iov_index < src_iovcnt) {
497 
498 		current_src_base = src_iovs[iov_index].iov_base;
499 		total_length += src_iovs[iov_index].iov_len;
500 		assert(src_mbufs[mbuf_index] != NULL);
501 		src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
502 		updated_length = src_iovs[iov_index].iov_len;
503 		phys_addr = spdk_vtophys((void *)current_src_base, &updated_length);
504 
505 		rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
506 					  current_src_base,
507 					  phys_addr,
508 					  updated_length,
509 					  &g_shinfo);
510 		rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length);
511 		remainder = src_iovs[iov_index].iov_len - updated_length;
512 
513 		if (mbuf_index > 0) {
514 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
515 		}
516 
517 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
518 		if (remainder > 0) {
519 			/* allocate an mbuf at the end of the array */
520 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1);
521 			if (rc) {
522 				SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n");
523 				goto error_src_dst;
524 			}
525 			src_mbuf_total++;
526 			mbuf_index++;
527 			src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
528 			current_src_base += updated_length;
529 			phys_addr = spdk_vtophys((void *)current_src_base, &remainder);
530 			/* assert we don't cross another */
531 			assert(remainder == src_iovs[iov_index].iov_len - updated_length);
532 
533 			rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
534 						  current_src_base,
535 						  phys_addr,
536 						  remainder,
537 						  &g_shinfo);
538 			rte_pktmbuf_append(src_mbufs[mbuf_index], remainder);
539 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
540 		}
541 		iov_index++;
542 		mbuf_index++;
543 	}
544 
545 	comp_op->m_src = src_mbufs[0];
546 	comp_op->src.offset = 0;
547 	comp_op->src.length = total_length;
548 
549 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
550 	iov_index = mbuf_index = 0;
551 	while (iov_index < dst_iovcnt) {
552 
553 		current_dst_base = dst_iovs[iov_index].iov_base;
554 		updated_length = dst_iovs[iov_index].iov_len;
555 		phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length);
556 
557 		rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
558 					  current_dst_base,
559 					  phys_addr,
560 					  updated_length,
561 					  &g_shinfo);
562 		rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length);
563 		remainder = dst_iovs[iov_index].iov_len - updated_length;
564 
565 		if (mbuf_index > 0) {
566 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
567 		}
568 
569 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
570 		if (remainder > 0) {
571 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1);
572 			if (rc) {
573 				SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n");
574 				goto error_src_dst;
575 			}
576 			dst_mbuf_total++;
577 			mbuf_index++;
578 			current_dst_base += updated_length;
579 			phys_addr = spdk_vtophys((void *)current_dst_base, &remainder);
580 			/* assert we don't cross another */
581 			assert(remainder == dst_iovs[iov_index].iov_len - updated_length);
582 
583 			rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
584 						  current_dst_base,
585 						  phys_addr,
586 						  remainder,
587 						  &g_shinfo);
588 			rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder);
589 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
590 		}
591 		iov_index++;
592 		mbuf_index++;
593 	}
594 
595 	comp_op->m_dst = dst_mbufs[0];
596 	comp_op->dst.offset = 0;
597 
598 	if (compress == true) {
599 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
600 	} else {
601 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
602 	}
603 
604 	comp_op->op_type = RTE_COMP_OP_STATELESS;
605 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
606 
607 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
608 	assert(rc <= 1);
609 
610 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
611 	if (rc == 1) {
612 		return 0;
613 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
614 		/* we free mbufs differently depending on whether they were chained or not */
615 		rte_pktmbuf_free(comp_op->m_src);
616 		rte_pktmbuf_free(comp_op->m_dst);
617 		goto error_enqueue;
618 	} else {
619 		device_error = true;
620 		goto error_src_dst;
621 	}
622 
623 	/* Error cleanup paths. */
624 error_src_dst:
625 	for (i = 0; i < dst_mbuf_total; i++) {
626 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
627 	}
628 error_get_dst:
629 	for (i = 0; i < src_mbuf_total; i++) {
630 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
631 	}
632 error_get_src:
633 error_enqueue:
634 	rte_comp_op_free(comp_op);
635 error_get_op:
636 
637 	if (device_error == true) {
638 		/* There was an error sending the op to the device, most
639 		 * likely with the parameters.
640 		 */
641 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
642 		return -EINVAL;
643 	}
644 
645 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
646 	if (op_to_queue == NULL) {
647 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
648 		return -ENOMEM;
649 	}
650 	op_to_queue->backing_dev = backing_dev;
651 	op_to_queue->src_iovs = src_iovs;
652 	op_to_queue->src_iovcnt = src_iovcnt;
653 	op_to_queue->dst_iovs = dst_iovs;
654 	op_to_queue->dst_iovcnt = dst_iovcnt;
655 	op_to_queue->compress = compress;
656 	op_to_queue->cb_arg = cb_arg;
657 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
658 			  op_to_queue,
659 			  link);
660 	return 0;
661 }
662 
663 /* Poller for the DPDK compression driver. */
664 static int
665 comp_dev_poller(void *args)
666 {
667 	struct vbdev_compress *comp_bdev = args;
668 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
669 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
670 	uint16_t num_deq;
671 	struct spdk_reduce_vol_cb_args *reduce_args;
672 	struct vbdev_comp_op *op_to_resubmit;
673 	int rc, i;
674 
675 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
676 						NUM_MAX_INFLIGHT_OPS);
677 	for (i = 0; i < num_deq; i++) {
678 		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata;
679 
680 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
681 
682 			/* tell reduce this is done and what the bytecount was */
683 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
684 		} else {
685 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
686 				       deq_ops[i]->status);
687 
688 			/* Reduce will simply store uncompressed on neg errno value. */
689 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
690 		}
691 
692 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
693 		 * call takes care of freeing all of the mbufs in the chain back to their
694 		 * original pool.
695 		 */
696 		rte_pktmbuf_free(deq_ops[i]->m_src);
697 		rte_pktmbuf_free(deq_ops[i]->m_dst);
698 
699 		/* There is no bulk free for com ops so we have to free them one at a time
700 		 * here however it would be rare that we'd ever have more than 1 at a time
701 		 * anyways.
702 		 */
703 		rte_comp_op_free(deq_ops[i]);
704 
705 		/* Check if there are any pending comp ops to process, only pull one
706 		 * at a time off as _compress_operation() may re-queue the op.
707 		 */
708 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
709 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
710 			rc = _compress_operation(op_to_resubmit->backing_dev,
711 						 op_to_resubmit->src_iovs,
712 						 op_to_resubmit->src_iovcnt,
713 						 op_to_resubmit->dst_iovs,
714 						 op_to_resubmit->dst_iovcnt,
715 						 op_to_resubmit->compress,
716 						 op_to_resubmit->cb_arg);
717 			if (rc == 0) {
718 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
719 				free(op_to_resubmit);
720 			}
721 		}
722 	}
723 	return 0;
724 }
725 
726 /* Entry point for reduce lib to issue a compress operation. */
727 static void
728 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
729 		      struct iovec *src_iovs, int src_iovcnt,
730 		      struct iovec *dst_iovs, int dst_iovcnt,
731 		      struct spdk_reduce_vol_cb_args *cb_arg)
732 {
733 	int rc;
734 
735 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
736 	if (rc) {
737 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
738 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
739 	}
740 }
741 
742 /* Entry point for reduce lib to issue a decompress operation. */
743 static void
744 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
745 			struct iovec *src_iovs, int src_iovcnt,
746 			struct iovec *dst_iovs, int dst_iovcnt,
747 			struct spdk_reduce_vol_cb_args *cb_arg)
748 {
749 	int rc;
750 
751 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
752 	if (rc) {
753 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
754 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
755 	}
756 }
757 
758 /* Callback for getting a buf from the bdev pool in the event that the caller passed
759  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
760  * beneath us before we're done with it.
761  */
762 static void
763 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
764 {
765 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
766 					   comp_bdev);
767 
768 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
769 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
770 			      spdk_reduce_rw_blocks_cb, bdev_io);
771 }
772 
773 /* scheduled for completion on IO thread */
774 static void
775 _complete_other_io(void *arg)
776 {
777 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
778 	if (io_ctx->status == 0) {
779 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
780 	} else {
781 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
782 	}
783 }
784 
785 /* scheduled for submission on reduce thread */
786 static void
787 _comp_bdev_io_submit(void *arg)
788 {
789 	struct spdk_bdev_io *bdev_io = arg;
790 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
791 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
792 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
793 					   comp_bdev);
794 	int rc = 0;
795 
796 	switch (bdev_io->type) {
797 	case SPDK_BDEV_IO_TYPE_READ:
798 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
799 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
800 		return;
801 	case SPDK_BDEV_IO_TYPE_WRITE:
802 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
803 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
804 				       spdk_reduce_rw_blocks_cb, bdev_io);
805 		return;
806 	/* TODO in future patch in the series */
807 	case SPDK_BDEV_IO_TYPE_RESET:
808 		break;
809 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
810 	case SPDK_BDEV_IO_TYPE_UNMAP:
811 	case SPDK_BDEV_IO_TYPE_FLUSH:
812 	default:
813 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
814 		rc = -EINVAL;
815 	}
816 
817 	if (rc) {
818 		if (rc == -ENOMEM) {
819 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
820 			io_ctx->ch = ch;
821 			vbdev_compress_queue_io(bdev_io);
822 			return;
823 		} else {
824 			SPDK_ERRLOG("on bdev_io submission!\n");
825 			io_ctx->status = rc;
826 		}
827 	}
828 
829 	/* Complete this on the orig IO thread. */
830 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
831 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx);
832 	} else {
833 		_complete_other_io(io_ctx);
834 	}
835 }
836 
837 /* Called when someone above submits IO to this vbdev. */
838 static void
839 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
840 {
841 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
842 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
843 					   comp_bdev);
844 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
845 
846 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
847 	io_ctx->comp_bdev = comp_bdev;
848 	io_ctx->comp_ch = comp_ch;
849 	io_ctx->orig_io = bdev_io;
850 
851 	/* Send this request to the reduce_thread if that's not what we're on. */
852 	if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) {
853 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
854 	} else {
855 		_comp_bdev_io_submit(bdev_io);
856 	}
857 }
858 
859 static bool
860 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
861 {
862 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
863 
864 	switch (io_type) {
865 	case SPDK_BDEV_IO_TYPE_READ:
866 	case SPDK_BDEV_IO_TYPE_WRITE:
867 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
868 	case SPDK_BDEV_IO_TYPE_UNMAP:
869 	case SPDK_BDEV_IO_TYPE_RESET:
870 	case SPDK_BDEV_IO_TYPE_FLUSH:
871 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
872 	default:
873 		return false;
874 	}
875 }
876 
877 /* Resubmission function used by the bdev layer when a queued IO is ready to be
878  * submitted.
879  */
880 static void
881 vbdev_compress_resubmit_io(void *arg)
882 {
883 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
884 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
885 
886 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
887 }
888 
889 /* Used to queue an IO in the event of resource issues. */
890 static void
891 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
892 {
893 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
894 	int rc;
895 
896 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
897 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
898 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
899 
900 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
901 	if (rc) {
902 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
903 		assert(false);
904 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
905 	}
906 }
907 
908 /* Callback for unregistering the IO device. */
909 static void
910 _device_unregister_cb(void *io_device)
911 {
912 	struct vbdev_compress *comp_bdev = io_device;
913 
914 	/* Done with this comp_bdev. */
915 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
916 	free(comp_bdev->comp_bdev.name);
917 	free(comp_bdev);
918 }
919 
920 static void
921 _vbdev_compress_destruct_cb(void *ctx)
922 {
923 	struct spdk_bdev_desc *desc = ctx;
924 
925 	spdk_bdev_close(desc);
926 }
927 
928 static void
929 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
930 {
931 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
932 
933 	if (reduce_errno) {
934 		SPDK_ERRLOG("number %d\n", reduce_errno);
935 	} else {
936 		TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
937 		spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
938 		/* Close the underlying bdev on its same opened thread. */
939 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
940 			spdk_thread_send_msg(comp_bdev->thread, _vbdev_compress_destruct_cb, comp_bdev->base_desc);
941 		} else {
942 			spdk_bdev_close(comp_bdev->base_desc);
943 		}
944 		comp_bdev->vol = NULL;
945 		if (comp_bdev->orphaned == false) {
946 			spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
947 		} else {
948 			vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
949 			_device_unregister_cb(comp_bdev);
950 		}
951 	}
952 }
953 
954 static void
955 _reduce_destroy_cb(void *ctx, int reduce_errno)
956 {
957 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
958 
959 	if (reduce_errno) {
960 		SPDK_ERRLOG("number %d\n", reduce_errno);
961 	}
962 
963 	comp_bdev->vol = NULL;
964 	spdk_put_io_channel(comp_bdev->base_ch);
965 	if (comp_bdev->orphaned == false) {
966 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
967 				     comp_bdev->delete_ctx);
968 	} else {
969 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
970 	}
971 
972 }
973 
974 static void
975 _delete_vol_unload_cb(void *ctx)
976 {
977 	struct vbdev_compress *comp_bdev = ctx;
978 
979 	/* FIXME: Assert if these conditions are not satisified for now. */
980 	assert(!comp_bdev->reduce_thread ||
981 	       comp_bdev->reduce_thread == spdk_get_thread());
982 
983 	/* reducelib needs a channel to comm with the backing device */
984 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
985 
986 	/* Clean the device before we free our resources. */
987 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
988 }
989 
990 /* Called by reduceLib after performing unload vol actions */
991 static void
992 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
993 {
994 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
995 
996 	if (reduce_errno) {
997 		SPDK_ERRLOG("number %d\n", reduce_errno);
998 		/* FIXME: callback should be executed. */
999 		return;
1000 	}
1001 
1002 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1003 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
1004 		spdk_thread_send_msg(comp_bdev->reduce_thread,
1005 				     _delete_vol_unload_cb, comp_bdev);
1006 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1007 	} else {
1008 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1009 
1010 		_delete_vol_unload_cb(comp_bdev);
1011 	}
1012 }
1013 
1014 const char *
1015 compress_get_name(const struct vbdev_compress *comp_bdev)
1016 {
1017 	return comp_bdev->comp_bdev.name;
1018 }
1019 
1020 struct vbdev_compress *
1021 compress_bdev_first(void)
1022 {
1023 	struct vbdev_compress *comp_bdev;
1024 
1025 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1026 
1027 	return comp_bdev;
1028 }
1029 
1030 struct vbdev_compress *
1031 compress_bdev_next(struct vbdev_compress *prev)
1032 {
1033 	struct vbdev_compress *comp_bdev;
1034 
1035 	comp_bdev = TAILQ_NEXT(prev, link);
1036 
1037 	return comp_bdev;
1038 }
1039 
1040 bool
1041 compress_has_orphan(const char *name)
1042 {
1043 	struct vbdev_compress *comp_bdev;
1044 
1045 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1046 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1047 			return true;
1048 		}
1049 	}
1050 	return false;
1051 }
1052 
1053 /* Called after we've unregistered following a hot remove callback.
1054  * Our finish entry point will be called next.
1055  */
1056 static int
1057 vbdev_compress_destruct(void *ctx)
1058 {
1059 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1060 
1061 	if (comp_bdev->vol != NULL) {
1062 		/* Tell reducelib that we're done with this volume. */
1063 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1064 	} else {
1065 		vbdev_compress_destruct_cb(comp_bdev, 0);
1066 	}
1067 
1068 	return 0;
1069 }
1070 
1071 /* We supplied this as an entry point for upper layers who want to communicate to this
1072  * bdev.  This is how they get a channel.
1073  */
1074 static struct spdk_io_channel *
1075 vbdev_compress_get_io_channel(void *ctx)
1076 {
1077 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1078 
1079 	/* The IO channel code will allocate a channel for us which consists of
1080 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1081 	 * that we passed in when we registered our IO device. It will then call
1082 	 * our channel create callback to populate any elements that we need to
1083 	 * update.
1084 	 */
1085 	return spdk_get_io_channel(comp_bdev);
1086 }
1087 
1088 /* This is the output for bdev_get_bdevs() for this vbdev */
1089 static int
1090 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1091 {
1092 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1093 
1094 	spdk_json_write_name(w, "compress");
1095 	spdk_json_write_object_begin(w);
1096 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1097 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1098 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1099 	spdk_json_write_object_end(w);
1100 
1101 	return 0;
1102 }
1103 
1104 /* This is used to generate JSON that can configure this module to its current state. */
1105 static int
1106 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1107 {
1108 	struct vbdev_compress *comp_bdev;
1109 
1110 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1111 		spdk_json_write_object_begin(w);
1112 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1113 		spdk_json_write_named_object_begin(w, "params");
1114 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1115 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1116 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1117 		spdk_json_write_object_end(w);
1118 		spdk_json_write_object_end(w);
1119 	}
1120 	return 0;
1121 }
1122 
1123 static void
1124 _vbdev_reduce_init_cb(void *ctx)
1125 {
1126 	struct spdk_bdev_desc *desc = ctx;
1127 
1128 	spdk_bdev_close(desc);
1129 }
1130 
1131 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1132  * used for initial metadata operations to claim where it will be further filled out
1133  * and added to the global list.
1134  */
1135 static void
1136 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1137 {
1138 	struct vbdev_compress *meta_ctx = cb_arg;
1139 
1140 	/* We're done with metadata operations */
1141 	spdk_put_io_channel(meta_ctx->base_ch);
1142 	/* Close the underlying bdev on its same opened thread. */
1143 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1144 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx->base_desc);
1145 	} else {
1146 		spdk_bdev_close(meta_ctx->base_desc);
1147 	}
1148 	meta_ctx->base_desc = NULL;
1149 
1150 	if (reduce_errno == 0) {
1151 		meta_ctx->vol = vol;
1152 		vbdev_compress_claim(meta_ctx);
1153 	} else {
1154 		SPDK_ERRLOG("for vol %s, error %u\n",
1155 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1156 		free(meta_ctx);
1157 	}
1158 }
1159 
1160 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1161  * call the callback provided by reduceLib when it called the read/write/unmap function and
1162  * free the bdev_io.
1163  */
1164 static void
1165 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1166 {
1167 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1168 	int reduce_errno;
1169 
1170 	if (success) {
1171 		reduce_errno = 0;
1172 	} else {
1173 		reduce_errno = -EIO;
1174 	}
1175 	spdk_bdev_free_io(bdev_io);
1176 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1177 }
1178 
1179 /* This is the function provided to the reduceLib for sending reads directly to
1180  * the backing device.
1181  */
1182 static void
1183 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1184 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1185 {
1186 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1187 					   backing_dev);
1188 	int rc;
1189 
1190 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1191 				    iov, iovcnt, lba, lba_count,
1192 				    comp_reduce_io_cb,
1193 				    args);
1194 	if (rc) {
1195 		if (rc == -ENOMEM) {
1196 			SPDK_ERRLOG("No memory, start to queue io.\n");
1197 			/* TODO: there's no bdev_io to queue */
1198 		} else {
1199 			SPDK_ERRLOG("submitting readv request\n");
1200 		}
1201 		args->cb_fn(args->cb_arg, rc);
1202 	}
1203 }
1204 
1205 /* This is the function provided to the reduceLib for sending writes directly to
1206  * the backing device.
1207  */
1208 static void
1209 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1210 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1211 {
1212 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1213 					   backing_dev);
1214 	int rc;
1215 
1216 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1217 				     iov, iovcnt, lba, lba_count,
1218 				     comp_reduce_io_cb,
1219 				     args);
1220 	if (rc) {
1221 		if (rc == -ENOMEM) {
1222 			SPDK_ERRLOG("No memory, start to queue io.\n");
1223 			/* TODO: there's no bdev_io to queue */
1224 		} else {
1225 			SPDK_ERRLOG("error submitting writev request\n");
1226 		}
1227 		args->cb_fn(args->cb_arg, rc);
1228 	}
1229 }
1230 
1231 /* This is the function provided to the reduceLib for sending unmaps directly to
1232  * the backing device.
1233  */
1234 static void
1235 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1236 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1237 {
1238 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1239 					   backing_dev);
1240 	int rc;
1241 
1242 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1243 				    lba, lba_count,
1244 				    comp_reduce_io_cb,
1245 				    args);
1246 
1247 	if (rc) {
1248 		if (rc == -ENOMEM) {
1249 			SPDK_ERRLOG("No memory, start to queue io.\n");
1250 			/* TODO: there's no bdev_io to queue */
1251 		} else {
1252 			SPDK_ERRLOG("submitting unmap request\n");
1253 		}
1254 		args->cb_fn(args->cb_arg, rc);
1255 	}
1256 }
1257 
1258 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1259 static void
1260 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1261 {
1262 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1263 
1264 	if (reduce_errno) {
1265 		SPDK_ERRLOG("number %d\n", reduce_errno);
1266 	}
1267 
1268 	comp_bdev->vol = NULL;
1269 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1270 }
1271 
1272 /* Called when the underlying base bdev goes away. */
1273 static void
1274 vbdev_compress_base_bdev_hotremove_cb(void *ctx)
1275 {
1276 	struct vbdev_compress *comp_bdev, *tmp;
1277 	struct spdk_bdev *bdev_find = ctx;
1278 
1279 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1280 		if (bdev_find == comp_bdev->base_bdev) {
1281 			/* Tell reduceLib that we're done with this volume. */
1282 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1283 		}
1284 	}
1285 }
1286 
1287 /* TODO: determine which parms we want user configurable, HC for now
1288  * params.vol_size
1289  * params.chunk_size
1290  * compression PMD, algorithm, window size, comp level, etc.
1291  * DEV_MD_PATH
1292  */
1293 
1294 /* Common function for init and load to allocate and populate the minimal
1295  * information for reducelib to init or load.
1296  */
1297 struct vbdev_compress *
1298 _prepare_for_load_init(struct spdk_bdev *bdev)
1299 {
1300 	struct vbdev_compress *meta_ctx;
1301 
1302 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1303 	if (meta_ctx == NULL) {
1304 		SPDK_ERRLOG("failed to alloc init contexts\n");
1305 		return NULL;
1306 	}
1307 
1308 	meta_ctx->drv_name = "None";
1309 	meta_ctx->base_bdev = bdev;
1310 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1311 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1312 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1313 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1314 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1315 
1316 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1317 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1318 
1319 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1320 	meta_ctx->params.logical_block_size = bdev->blocklen;
1321 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1322 	return meta_ctx;
1323 }
1324 
1325 static bool
1326 _set_pmd(struct vbdev_compress *comp_dev)
1327 {
1328 	if (g_opts == COMPRESS_PMD_AUTO) {
1329 		if (g_qat_available) {
1330 			comp_dev->drv_name = QAT_PMD;
1331 		} else {
1332 			comp_dev->drv_name = ISAL_PMD;
1333 		}
1334 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1335 		comp_dev->drv_name = QAT_PMD;
1336 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1337 		comp_dev->drv_name = ISAL_PMD;
1338 	} else {
1339 		SPDK_ERRLOG("Requested PMD is not available.\n");
1340 		return false;
1341 	}
1342 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1343 	return true;
1344 }
1345 
1346 /* Call reducelib to initialize a new volume */
1347 static int
1348 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path)
1349 {
1350 	struct vbdev_compress *meta_ctx;
1351 	int rc;
1352 
1353 	meta_ctx = _prepare_for_load_init(bdev);
1354 	if (meta_ctx == NULL) {
1355 		return -EINVAL;
1356 	}
1357 
1358 	if (_set_pmd(meta_ctx) == false) {
1359 		SPDK_ERRLOG("could not find required pmd\n");
1360 		free(meta_ctx);
1361 		return -EINVAL;
1362 	}
1363 
1364 	rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1365 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1366 	if (rc) {
1367 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1368 		free(meta_ctx);
1369 		return -EINVAL;
1370 	}
1371 
1372 	/* Save the thread where the base device is opened */
1373 	meta_ctx->thread = spdk_get_thread();
1374 
1375 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1376 
1377 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1378 			     pm_path,
1379 			     vbdev_reduce_init_cb,
1380 			     meta_ctx);
1381 	return 0;
1382 }
1383 
1384 /* We provide this callback for the SPDK channel code to create a channel using
1385  * the channel struct we provided in our module get_io_channel() entry point. Here
1386  * we get and save off an underlying base channel of the device below us so that
1387  * we can communicate with the base bdev on a per channel basis.  If we needed
1388  * our own poller for this vbdev, we'd register it here.
1389  */
1390 static int
1391 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1392 {
1393 	struct vbdev_compress *comp_bdev = io_device;
1394 	struct comp_device_qp *device_qp;
1395 
1396 	/* Now set the reduce channel if it's not already set. */
1397 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1398 	if (comp_bdev->ch_count == 0) {
1399 		/* We use this queue to track outstanding IO in our layer. */
1400 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1401 
1402 		/* We use this to queue up compression operations as needed. */
1403 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1404 
1405 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1406 		comp_bdev->reduce_thread = spdk_get_thread();
1407 		comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0);
1408 		/* Now assign a q pair */
1409 		pthread_mutex_lock(&g_comp_device_qp_lock);
1410 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1411 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1412 				if (device_qp->thread == spdk_get_thread()) {
1413 					comp_bdev->device_qp = device_qp;
1414 					break;
1415 				}
1416 				if (device_qp->thread == NULL) {
1417 					comp_bdev->device_qp = device_qp;
1418 					device_qp->thread = spdk_get_thread();
1419 					break;
1420 				}
1421 			}
1422 		}
1423 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1424 	}
1425 	comp_bdev->ch_count++;
1426 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1427 
1428 	if (comp_bdev->device_qp != NULL) {
1429 		return 0;
1430 	} else {
1431 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1432 		assert(false);
1433 		return -ENOMEM;
1434 	}
1435 }
1436 
1437 static void
1438 _channel_cleanup(struct vbdev_compress *comp_bdev)
1439 {
1440 	/* Note: comp_bdevs can share a device_qp if they are
1441 	 * on the same thread so we leave the device_qp element
1442 	 * alone for this comp_bdev and just clear the reduce thread.
1443 	 */
1444 	spdk_put_io_channel(comp_bdev->base_ch);
1445 	comp_bdev->reduce_thread = NULL;
1446 	spdk_poller_unregister(&comp_bdev->poller);
1447 }
1448 
1449 /* Used to reroute destroy_ch to the correct thread */
1450 static void
1451 _comp_bdev_ch_destroy_cb(void *arg)
1452 {
1453 	struct vbdev_compress *comp_bdev = arg;
1454 
1455 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1456 	_channel_cleanup(comp_bdev);
1457 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1458 }
1459 
1460 /* We provide this callback for the SPDK channel code to destroy a channel
1461  * created with our create callback. We just need to undo anything we did
1462  * when we created. If this bdev used its own poller, we'd unregister it here.
1463  */
1464 static void
1465 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1466 {
1467 	struct vbdev_compress *comp_bdev = io_device;
1468 
1469 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1470 	comp_bdev->ch_count--;
1471 	if (comp_bdev->ch_count == 0) {
1472 		/* Send this request to the thread where the channel was created. */
1473 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1474 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1475 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1476 		} else {
1477 			_channel_cleanup(comp_bdev);
1478 		}
1479 	}
1480 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1481 }
1482 
1483 /* RPC entry point for compression vbdev creation. */
1484 int
1485 create_compress_bdev(const char *bdev_name, const char *pm_path)
1486 {
1487 	struct spdk_bdev *bdev;
1488 
1489 	bdev = spdk_bdev_get_by_name(bdev_name);
1490 	if (!bdev) {
1491 		return -ENODEV;
1492 	}
1493 
1494 	return vbdev_init_reduce(bdev, pm_path);;
1495 }
1496 
1497 /* On init, just init the compress drivers. All metadata is stored on disk. */
1498 static int
1499 vbdev_compress_init(void)
1500 {
1501 	if (vbdev_init_compress_drivers()) {
1502 		SPDK_ERRLOG("Error setting up compression devices\n");
1503 		return -EINVAL;
1504 	}
1505 
1506 	return 0;
1507 }
1508 
1509 /* Called when the entire module is being torn down. */
1510 static void
1511 vbdev_compress_finish(void)
1512 {
1513 	struct comp_device_qp *dev_qp;
1514 	/* TODO: unload vol in a future patch */
1515 
1516 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1517 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1518 		free(dev_qp);
1519 	}
1520 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1521 
1522 	rte_mempool_free(g_comp_op_mp);
1523 	rte_mempool_free(g_mbuf_mp);
1524 }
1525 
1526 /* During init we'll be asked how much memory we'd like passed to us
1527  * in bev_io structures as context. Here's where we specify how
1528  * much context we want per IO.
1529  */
1530 static int
1531 vbdev_compress_get_ctx_size(void)
1532 {
1533 	return sizeof(struct comp_bdev_io);
1534 }
1535 
1536 /* When we register our bdev this is how we specify our entry points. */
1537 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1538 	.destruct		= vbdev_compress_destruct,
1539 	.submit_request		= vbdev_compress_submit_request,
1540 	.io_type_supported	= vbdev_compress_io_type_supported,
1541 	.get_io_channel		= vbdev_compress_get_io_channel,
1542 	.dump_info_json		= vbdev_compress_dump_info_json,
1543 	.write_config_json	= NULL,
1544 };
1545 
1546 static struct spdk_bdev_module compress_if = {
1547 	.name = "compress",
1548 	.module_init = vbdev_compress_init,
1549 	.config_text = NULL,
1550 	.get_ctx_size = vbdev_compress_get_ctx_size,
1551 	.examine_disk = vbdev_compress_examine,
1552 	.module_fini = vbdev_compress_finish,
1553 	.config_json = vbdev_compress_config_json
1554 };
1555 
1556 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1557 
1558 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1559 {
1560 	struct spdk_bdev_alias *aliases;
1561 
1562 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1563 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1564 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias);
1565 		if (!comp_bdev->comp_bdev.name) {
1566 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1567 			return -ENOMEM;
1568 		}
1569 	} else {
1570 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1571 		if (!comp_bdev->comp_bdev.name) {
1572 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1573 			return -ENOMEM;
1574 		}
1575 	}
1576 	return 0;
1577 }
1578 
1579 static void
1580 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1581 {
1582 	int rc;
1583 
1584 	if (_set_compbdev_name(comp_bdev)) {
1585 		goto error_bdev_name;
1586 	}
1587 
1588 	/* Note: some of the fields below will change in the future - for example,
1589 	 * blockcnt specifically will not match (the compressed volume size will
1590 	 * be slightly less than the base bdev size)
1591 	 */
1592 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1593 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1594 
1595 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1596 		comp_bdev->comp_bdev.required_alignment =
1597 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1598 				 comp_bdev->base_bdev->required_alignment);
1599 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1600 			       comp_bdev->comp_bdev.required_alignment);
1601 	} else {
1602 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1603 	}
1604 	comp_bdev->comp_bdev.optimal_io_boundary =
1605 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1606 
1607 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1608 
1609 	comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen;
1610 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1611 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1612 
1613 	/* This is the context that is passed to us when the bdev
1614 	 * layer calls in so we'll save our comp_bdev node here.
1615 	 */
1616 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1617 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1618 	comp_bdev->comp_bdev.module = &compress_if;
1619 
1620 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1621 
1622 	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1623 			    comp_bdev->base_bdev, &comp_bdev->base_desc);
1624 	if (rc) {
1625 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1626 		goto error_open;
1627 	}
1628 
1629 	/* Save the thread where the base device is opened */
1630 	comp_bdev->thread = spdk_get_thread();
1631 
1632 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1633 				sizeof(struct comp_io_channel),
1634 				comp_bdev->comp_bdev.name);
1635 
1636 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1637 					 comp_bdev->comp_bdev.module);
1638 	if (rc) {
1639 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1640 		goto error_claim;
1641 	}
1642 
1643 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1644 	if (rc < 0) {
1645 		SPDK_ERRLOG("trying to register bdev\n");
1646 		goto error_bdev_register;
1647 	}
1648 
1649 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1650 
1651 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1652 
1653 	return;
1654 	/* Error cleanup paths. */
1655 error_bdev_register:
1656 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1657 error_claim:
1658 	spdk_io_device_unregister(comp_bdev, NULL);
1659 	spdk_bdev_close(comp_bdev->base_desc);
1660 error_open:
1661 	free(comp_bdev->comp_bdev.name);
1662 error_bdev_name:
1663 	free(comp_bdev);
1664 }
1665 
1666 static void
1667 _vbdev_compress_delete_done(void *_ctx)
1668 {
1669 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1670 
1671 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1672 
1673 	free(ctx);
1674 }
1675 
1676 static void
1677 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1678 {
1679 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1680 
1681 	ctx->cb_rc = bdeverrno;
1682 
1683 	if (ctx->orig_thread != spdk_get_thread()) {
1684 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1685 	} else {
1686 		_vbdev_compress_delete_done(ctx);
1687 	}
1688 }
1689 
1690 void
1691 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1692 {
1693 	struct vbdev_compress *comp_bdev = NULL;
1694 	struct vbdev_comp_delete_ctx *ctx;
1695 
1696 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1697 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1698 			break;
1699 		}
1700 	}
1701 
1702 	if (comp_bdev == NULL) {
1703 		cb_fn(cb_arg, -ENODEV);
1704 		return;
1705 	}
1706 
1707 	ctx = calloc(1, sizeof(*ctx));
1708 	if (ctx == NULL) {
1709 		SPDK_ERRLOG("Failed to allocate delete context\n");
1710 		cb_fn(cb_arg, -ENOMEM);
1711 		return;
1712 	}
1713 
1714 	/* Save these for after the vol is destroyed. */
1715 	ctx->cb_fn = cb_fn;
1716 	ctx->cb_arg = cb_arg;
1717 	ctx->orig_thread = spdk_get_thread();
1718 
1719 	comp_bdev->delete_ctx = ctx;
1720 
1721 	/* Tell reducelib that we're done with this volume. */
1722 	if (comp_bdev->orphaned == false) {
1723 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1724 	} else {
1725 		delete_vol_unload_cb(comp_bdev, 0);
1726 	}
1727 }
1728 
1729 static void
1730 _vbdev_reduce_load_cb(void *ctx)
1731 {
1732 	struct spdk_bdev_desc *desc = ctx;
1733 
1734 	spdk_bdev_close(desc);
1735 }
1736 
1737 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1738  * used for initial metadata operations to claim where it will be further filled out
1739  * and added to the global list.
1740  */
1741 static void
1742 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1743 {
1744 	struct vbdev_compress *meta_ctx = cb_arg;
1745 	int rc;
1746 
1747 	/* Done with metadata operations */
1748 	spdk_put_io_channel(meta_ctx->base_ch);
1749 	/* Close the underlying bdev on its same opened thread. */
1750 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1751 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx->base_desc);
1752 	} else {
1753 		spdk_bdev_close(meta_ctx->base_desc);
1754 	}
1755 	meta_ctx->base_desc = NULL;
1756 
1757 	if (reduce_errno == 0) {
1758 		if (_set_pmd(meta_ctx) == false) {
1759 			SPDK_ERRLOG("could not find required pmd\n");
1760 			goto err;
1761 		}
1762 
1763 		/* Update information following volume load. */
1764 		meta_ctx->vol = vol;
1765 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1766 		       sizeof(struct spdk_reduce_vol_params));
1767 		vbdev_compress_claim(meta_ctx);
1768 	} else if (reduce_errno == -ENOENT) {
1769 		if (_set_compbdev_name(meta_ctx)) {
1770 			goto err;
1771 		}
1772 
1773 		/* We still want to open and claim the backing device to protect the data until
1774 		 * either the pm metadata file is recovered or the comp bdev is deleted.
1775 		 */
1776 		rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1777 				    meta_ctx->base_bdev, &meta_ctx->base_desc);
1778 		if (rc) {
1779 			SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1780 			free(meta_ctx->comp_bdev.name);
1781 			goto err;
1782 		}
1783 
1784 		/* Save the thread where the base device is opened */
1785 		meta_ctx->thread = spdk_get_thread();
1786 
1787 		meta_ctx->comp_bdev.module = &compress_if;
1788 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1789 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1790 						 meta_ctx->comp_bdev.module);
1791 		if (rc) {
1792 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1793 			spdk_bdev_close(meta_ctx->base_desc);
1794 			free(meta_ctx->comp_bdev.name);
1795 			goto err;
1796 		}
1797 
1798 		meta_ctx->orphaned = true;
1799 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1800 	} else {
1801 		if (reduce_errno != -EILSEQ) {
1802 			SPDK_ERRLOG("for vol %s, error %u\n",
1803 				    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1804 		}
1805 		goto err;
1806 	}
1807 
1808 	spdk_bdev_module_examine_done(&compress_if);
1809 	return;
1810 
1811 err:
1812 	free(meta_ctx);
1813 	spdk_bdev_module_examine_done(&compress_if);
1814 }
1815 
1816 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1817  * and if so will go ahead and claim it.
1818  */
1819 static void
1820 vbdev_compress_examine(struct spdk_bdev *bdev)
1821 {
1822 	struct vbdev_compress *meta_ctx;
1823 	int rc;
1824 
1825 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1826 		spdk_bdev_module_examine_done(&compress_if);
1827 		return;
1828 	}
1829 
1830 	meta_ctx = _prepare_for_load_init(bdev);
1831 	if (meta_ctx == NULL) {
1832 		spdk_bdev_module_examine_done(&compress_if);
1833 		return;
1834 	}
1835 
1836 	rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
1837 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1838 	if (rc) {
1839 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1840 		free(meta_ctx);
1841 		spdk_bdev_module_examine_done(&compress_if);
1842 		return;
1843 	}
1844 
1845 	/* Save the thread where the base device is opened */
1846 	meta_ctx->thread = spdk_get_thread();
1847 
1848 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1849 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1850 }
1851 
1852 int
1853 compress_set_pmd(enum compress_pmd *opts)
1854 {
1855 	g_opts = *opts;
1856 
1857 	return 0;
1858 }
1859 
1860 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
1861