xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 1914de09202410b6881d8bcff916ce78fb749ad6)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_compress.h"
35 
36 #include "spdk/reduce.h"
37 #include "spdk/stdinc.h"
38 #include "spdk/rpc.h"
39 #include "spdk/env.h"
40 #include "spdk/conf.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 
47 #include "spdk_internal/log.h"
48 
49 #include <rte_config.h>
50 #include <rte_bus_vdev.h>
51 #include <rte_compressdev.h>
52 #include <rte_comp.h>
53 
54 #define NUM_MAX_XFORMS 2
55 #define NUM_MAX_INFLIGHT_OPS 128
56 #define DEFAULT_WINDOW_SIZE 15
57 /* We need extra mbufs per operation to accommodate host buffers that
58  *  span a 2MB boundary.
59  */
60 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
61 #define CHUNK_SIZE (1024 * 16)
62 #define COMP_BDEV_NAME "compress"
63 #define BACKING_IO_SZ (4 * 1024)
64 
65 #define ISAL_PMD "compress_isal"
66 #define QAT_PMD "compress_qat"
67 #define NUM_MBUFS		8192
68 #define POOL_CACHE_SIZE		256
69 
70 static enum compress_pmd g_opts;
71 
72 /* Global list of available compression devices. */
73 struct compress_dev {
74 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
75 	uint8_t				cdev_id;	/* identifier for the device */
76 	void				*comp_xform;	/* shared private xform for comp on this PMD */
77 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
78 	TAILQ_ENTRY(compress_dev)	link;
79 };
80 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
81 
82 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
83  * the length of the internal ring name that it creates, it breaks a limit in the generic
84  * ring code and fails the qp initialization.
85  */
86 #define MAX_NUM_QP 99
87 /* Global list and lock for unique device/queue pair combos */
88 struct comp_device_qp {
89 	struct compress_dev		*device;	/* ptr to compression device */
90 	uint8_t				qp;		/* queue pair for this node */
91 	struct spdk_thread		*thread;	/* thead that this qp is assigned to */
92 	TAILQ_ENTRY(comp_device_qp)	link;
93 };
94 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
95 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
96 
97 /* For queueing up compression operations that we can't submit for some reason */
98 struct vbdev_comp_op {
99 	struct spdk_reduce_backing_dev	*backing_dev;
100 	struct iovec			*src_iovs;
101 	int				src_iovcnt;
102 	struct iovec			*dst_iovs;
103 	int				dst_iovcnt;
104 	bool				compress;
105 	void				*cb_arg;
106 	TAILQ_ENTRY(vbdev_comp_op)	link;
107 };
108 
109 /* List of virtual bdevs and associated info for each. */
110 struct vbdev_compress {
111 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
112 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
113 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
114 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
115 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
116 	char				*drv_name;	/* name of the compression device driver */
117 	struct comp_device_qp		*device_qp;
118 	struct spdk_thread		*reduce_thread;
119 	pthread_mutex_t			reduce_lock;
120 	uint32_t			ch_count;
121 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
122 	struct spdk_poller		*poller;	/* completion poller */
123 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
124 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
125 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
126 	spdk_delete_compress_complete	delete_cb_fn;
127 	void				*delete_cb_arg;
128 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
129 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
130 	TAILQ_ENTRY(vbdev_compress)	link;
131 	struct spdk_thread		*thread;	/* thread where base device is opened */
132 };
133 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
134 
135 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
136  */
137 struct comp_io_channel {
138 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
139 };
140 
141 /* Per I/O context for the compression vbdev. */
142 struct comp_bdev_io {
143 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
144 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
145 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
146 	struct spdk_bdev_io		*orig_io;		/* the original IO */
147 	struct spdk_io_channel		*ch;			/* for resubmission */
148 	int				status;			/* save for completion on orig thread */
149 };
150 
151 /* Shared mempools between all devices on this system */
152 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
153 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
154 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
155 static bool g_qat_available = false;
156 static bool g_isal_available = false;
157 
158 /* Create shared (between all ops per PMD) compress xforms. */
159 static struct rte_comp_xform g_comp_xform = {
160 	.type = RTE_COMP_COMPRESS,
161 	.compress = {
162 		.algo = RTE_COMP_ALGO_DEFLATE,
163 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
164 		.level = RTE_COMP_LEVEL_MAX,
165 		.window_size = DEFAULT_WINDOW_SIZE,
166 		.chksum = RTE_COMP_CHECKSUM_NONE,
167 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
168 	}
169 };
170 /* Create shared (between all ops per PMD) decompress xforms. */
171 static struct rte_comp_xform g_decomp_xform = {
172 	.type = RTE_COMP_DECOMPRESS,
173 	.decompress = {
174 		.algo = RTE_COMP_ALGO_DEFLATE,
175 		.chksum = RTE_COMP_CHECKSUM_NONE,
176 		.window_size = DEFAULT_WINDOW_SIZE,
177 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
178 	}
179 };
180 
181 static void vbdev_compress_examine(struct spdk_bdev *bdev);
182 static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
183 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
184 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev);
185 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
186 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
187 
188 /* Dummy function used by DPDK to free ext attached buffers
189  * to mbufs, we free them ourselves but this callback has to
190  * be here.
191  */
192 static void
193 shinfo_free_cb(void *arg1, void *arg2)
194 {
195 }
196 
197 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
198 static int
199 create_compress_dev(uint8_t index)
200 {
201 	struct compress_dev *device;
202 	uint16_t q_pairs;
203 	uint8_t cdev_id;
204 	int rc, i;
205 	struct comp_device_qp *dev_qp;
206 	struct comp_device_qp *tmp_qp;
207 
208 	device = calloc(1, sizeof(struct compress_dev));
209 	if (!device) {
210 		return -ENOMEM;
211 	}
212 
213 	/* Get details about this device. */
214 	rte_compressdev_info_get(index, &device->cdev_info);
215 
216 	cdev_id = device->cdev_id = index;
217 
218 	/* Zero means no limit so choose number of lcores. */
219 	if (device->cdev_info.max_nb_queue_pairs == 0) {
220 		q_pairs = MAX_NUM_QP;
221 	} else {
222 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
223 	}
224 
225 	/* Configure the compression device. */
226 	struct rte_compressdev_config config = {
227 		.socket_id = rte_socket_id(),
228 		.nb_queue_pairs = q_pairs,
229 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
230 		.max_nb_streams = 0
231 	};
232 	rc = rte_compressdev_configure(cdev_id, &config);
233 	if (rc < 0) {
234 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
235 		goto err;
236 	}
237 
238 	/* Pre-setup all potential qpairs now and assign them in the channel
239 	 * callback.
240 	 */
241 	for (i = 0; i < q_pairs; i++) {
242 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
243 						      NUM_MAX_INFLIGHT_OPS,
244 						      rte_socket_id());
245 		if (rc) {
246 			if (i > 0) {
247 				q_pairs = i;
248 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
249 					       "compressdev %u with error %u "
250 					       "so limiting to %u qpairs\n",
251 					       cdev_id, rc, q_pairs);
252 				break;
253 			} else {
254 				SPDK_ERRLOG("Failed to setup queue pair on "
255 					    "compressdev %u with error %u\n", cdev_id, rc);
256 				rc = -EINVAL;
257 				goto err;
258 			}
259 		}
260 	}
261 
262 	rc = rte_compressdev_start(cdev_id);
263 	if (rc < 0) {
264 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
265 			    cdev_id, rc);
266 		goto err;
267 	}
268 
269 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
270 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
271 				&device->comp_xform);
272 		if (rc < 0) {
273 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
274 				    cdev_id, rc);
275 			goto err;
276 		}
277 
278 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
279 				&device->decomp_xform);
280 		if (rc) {
281 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
282 				    cdev_id, rc);
283 			goto err;
284 		}
285 	} else {
286 		SPDK_ERRLOG("PMD does not support shared transforms\n");
287 		goto err;
288 	}
289 
290 	/* Build up list of device/qp combinations */
291 	for (i = 0; i < q_pairs; i++) {
292 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
293 		if (!dev_qp) {
294 			rc = -ENOMEM;
295 			goto err;
296 		}
297 		dev_qp->device = device;
298 		dev_qp->qp = i;
299 		dev_qp->thread = NULL;
300 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
301 	}
302 
303 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
304 
305 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
306 		g_qat_available = true;
307 	}
308 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
309 		g_isal_available = true;
310 	}
311 
312 	return 0;
313 
314 err:
315 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
316 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
317 		free(dev_qp);
318 	}
319 	free(device);
320 	return rc;
321 }
322 
323 /* Called from driver init entry point, vbdev_compress_init() */
324 static int
325 vbdev_init_compress_drivers(void)
326 {
327 	uint8_t cdev_count, i;
328 	struct compress_dev *tmp_dev;
329 	struct compress_dev *device;
330 	int rc;
331 
332 	/* We always init the compress_isal PMD */
333 	rc = rte_vdev_init(ISAL_PMD, NULL);
334 	if (rc == 0) {
335 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
336 	} else if (rc == -EEXIST) {
337 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
338 	} else {
339 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
340 		return -EINVAL;
341 	}
342 
343 	/* If we have no compression devices, there's no reason to continue. */
344 	cdev_count = rte_compressdev_count();
345 	if (cdev_count == 0) {
346 		return 0;
347 	}
348 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
349 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
350 		return -EINVAL;
351 	}
352 
353 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
354 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
355 	if (g_mbuf_mp == NULL) {
356 		SPDK_ERRLOG("Cannot create mbuf pool\n");
357 		rc = -ENOMEM;
358 		goto error_create_mbuf;
359 	}
360 
361 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
362 					       0, rte_socket_id());
363 	if (g_comp_op_mp == NULL) {
364 		SPDK_ERRLOG("Cannot create comp op pool\n");
365 		rc = -ENOMEM;
366 		goto error_create_op;
367 	}
368 
369 	/* Init all devices */
370 	for (i = 0; i < cdev_count; i++) {
371 		rc = create_compress_dev(i);
372 		if (rc != 0) {
373 			goto error_create_compress_devs;
374 		}
375 	}
376 
377 	if (g_qat_available == true) {
378 		SPDK_NOTICELOG("initialized QAT PMD\n");
379 	}
380 
381 	g_shinfo.free_cb = shinfo_free_cb;
382 
383 	return 0;
384 
385 	/* Error cleanup paths. */
386 error_create_compress_devs:
387 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
388 		TAILQ_REMOVE(&g_compress_devs, device, link);
389 		free(device);
390 	}
391 error_create_op:
392 error_create_mbuf:
393 	rte_mempool_free(g_mbuf_mp);
394 
395 	return rc;
396 }
397 
398 /* for completing rw requests on the orig IO thread. */
399 static void
400 _spdk_reduce_rw_blocks_cb(void *arg)
401 {
402 	struct comp_bdev_io *io_ctx = arg;
403 
404 	if (io_ctx->status == 0) {
405 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
406 	} else {
407 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
408 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
409 	}
410 }
411 
412 /* Completion callback for r/w that were issued via reducelib. */
413 static void
414 spdk_reduce_rw_blocks_cb(void *arg, int reduce_errno)
415 {
416 	struct spdk_bdev_io *bdev_io = arg;
417 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
418 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
419 
420 	/* TODO: need to decide which error codes are bdev_io success vs failure;
421 	 * example examine calls reading metadata */
422 
423 	io_ctx->status = reduce_errno;
424 
425 	/* Send this request to the orig IO thread. */
426 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
427 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _spdk_reduce_rw_blocks_cb, io_ctx);
428 	} else {
429 		_spdk_reduce_rw_blocks_cb(io_ctx);
430 	}
431 }
432 
433 static int
434 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
435 		    int src_iovcnt, struct iovec *dst_iovs,
436 		    int dst_iovcnt, bool compress, void *cb_arg)
437 {
438 	void *reduce_cb_arg = cb_arg;
439 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
440 					   backing_dev);
441 	struct rte_comp_op *comp_op;
442 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
443 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
444 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
445 	uint64_t updated_length, remainder, phys_addr, total_length = 0;
446 	uint8_t *current_src_base = NULL;
447 	uint8_t *current_dst_base = NULL;
448 	int iov_index, mbuf_index;
449 	int rc = 0;
450 	struct vbdev_comp_op *op_to_queue;
451 	int i;
452 	int src_mbuf_total = src_iovcnt;
453 	int dst_mbuf_total = dst_iovcnt;
454 	bool device_error = false;
455 
456 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
457 
458 #ifdef DEBUG
459 	memset(src_mbufs, 0, sizeof(src_mbufs));
460 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
461 #endif
462 
463 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
464 	if (!comp_op) {
465 		SPDK_ERRLOG("trying to get a comp op!\n");
466 		goto error_get_op;
467 	}
468 
469 	/* get an mbuf per iov, src and dst */
470 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
471 	if (rc) {
472 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
473 		goto error_get_src;
474 	}
475 
476 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
477 	if (rc) {
478 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
479 		goto error_get_dst;
480 	}
481 
482 	/* There is a 1:1 mapping between a bdev_io and a compression operation, but
483 	 * all compression PMDs that SPDK uses support chaining so build our mbuf chain
484 	 * and associate with our single comp_op.
485 	 */
486 
487 	/* Setup src mbufs */
488 	iov_index = mbuf_index = 0;
489 	while (iov_index < src_iovcnt) {
490 
491 		current_src_base = src_iovs[iov_index].iov_base;
492 		total_length += src_iovs[iov_index].iov_len;
493 		assert(src_mbufs[mbuf_index] != NULL);
494 		src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
495 		updated_length = src_iovs[iov_index].iov_len;
496 		phys_addr = spdk_vtophys((void *)current_src_base, &updated_length);
497 
498 		rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
499 					  current_src_base,
500 					  phys_addr,
501 					  updated_length,
502 					  &g_shinfo);
503 		rte_pktmbuf_append(src_mbufs[mbuf_index], updated_length);
504 		remainder = src_iovs[iov_index].iov_len - updated_length;
505 
506 		if (mbuf_index > 0) {
507 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
508 		}
509 
510 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
511 		if (remainder > 0) {
512 			/* allocate an mbuf at the end of the array */
513 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[src_mbuf_total], 1);
514 			if (rc) {
515 				SPDK_ERRLOG("ERROR trying to get an extra src_mbuf!\n");
516 				goto error_src_dst;
517 			}
518 			src_mbuf_total++;
519 			mbuf_index++;
520 			src_mbufs[mbuf_index]->userdata = reduce_cb_arg;
521 			current_src_base += updated_length;
522 			phys_addr = spdk_vtophys((void *)current_src_base, &remainder);
523 			/* assert we don't cross another */
524 			assert(remainder == src_iovs[iov_index].iov_len - updated_length);
525 
526 			rte_pktmbuf_attach_extbuf(src_mbufs[mbuf_index],
527 						  current_src_base,
528 						  phys_addr,
529 						  remainder,
530 						  &g_shinfo);
531 			rte_pktmbuf_append(src_mbufs[mbuf_index], remainder);
532 			rte_pktmbuf_chain(src_mbufs[0], src_mbufs[mbuf_index]);
533 		}
534 		iov_index++;
535 		mbuf_index++;
536 	}
537 
538 	comp_op->m_src = src_mbufs[0];
539 	comp_op->src.offset = 0;
540 	comp_op->src.length = total_length;
541 
542 	/* setup dst mbufs, for the current test being used with this code there's only one vector */
543 	iov_index = mbuf_index = 0;
544 	while (iov_index < dst_iovcnt) {
545 
546 		current_dst_base = dst_iovs[iov_index].iov_base;
547 		updated_length = dst_iovs[iov_index].iov_len;
548 		phys_addr = spdk_vtophys((void *)current_dst_base, &updated_length);
549 
550 		rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
551 					  current_dst_base,
552 					  phys_addr,
553 					  updated_length,
554 					  &g_shinfo);
555 		rte_pktmbuf_append(dst_mbufs[mbuf_index], updated_length);
556 		remainder = dst_iovs[iov_index].iov_len - updated_length;
557 
558 		if (mbuf_index > 0) {
559 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
560 		}
561 
562 		/* If we crossed 2 2MB boundary we need another mbuf for the remainder */
563 		if (remainder > 0) {
564 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[dst_mbuf_total], 1);
565 			if (rc) {
566 				SPDK_ERRLOG("ERROR trying to get an extra dst_mbuf!\n");
567 				goto error_src_dst;
568 			}
569 			dst_mbuf_total++;
570 			mbuf_index++;
571 			current_dst_base += updated_length;
572 			phys_addr = spdk_vtophys((void *)current_dst_base, &remainder);
573 			/* assert we don't cross another */
574 			assert(remainder == dst_iovs[iov_index].iov_len - updated_length);
575 
576 			rte_pktmbuf_attach_extbuf(dst_mbufs[mbuf_index],
577 						  current_dst_base,
578 						  phys_addr,
579 						  remainder,
580 						  &g_shinfo);
581 			rte_pktmbuf_append(dst_mbufs[mbuf_index], remainder);
582 			rte_pktmbuf_chain(dst_mbufs[0], dst_mbufs[mbuf_index]);
583 		}
584 		iov_index++;
585 		mbuf_index++;
586 	}
587 
588 	comp_op->m_dst = dst_mbufs[0];
589 	comp_op->dst.offset = 0;
590 
591 	if (compress == true) {
592 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
593 	} else {
594 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
595 	}
596 
597 	comp_op->op_type = RTE_COMP_OP_STATELESS;
598 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
599 
600 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
601 	assert(rc <= 1);
602 
603 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
604 	if (rc == 1) {
605 		return 0;
606 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
607 		/* we free mbufs differently depending on whether they were chained or not */
608 		rte_pktmbuf_free(comp_op->m_src);
609 		rte_pktmbuf_free(comp_op->m_dst);
610 		goto error_enqueue;
611 	} else {
612 		device_error = true;
613 		goto error_src_dst;
614 	}
615 
616 	/* Error cleanup paths. */
617 error_src_dst:
618 	for (i = 0; i < dst_mbuf_total; i++) {
619 		rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
620 	}
621 error_get_dst:
622 	for (i = 0; i < src_mbuf_total; i++) {
623 		rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
624 	}
625 error_get_src:
626 error_enqueue:
627 	rte_comp_op_free(comp_op);
628 error_get_op:
629 
630 	if (device_error == true) {
631 		/* There was an error sending the op to the device, most
632 		 * likely with the parameters.
633 		 */
634 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
635 		return -EINVAL;
636 	}
637 
638 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
639 	if (op_to_queue == NULL) {
640 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
641 		return -ENOMEM;
642 	}
643 	op_to_queue->backing_dev = backing_dev;
644 	op_to_queue->src_iovs = src_iovs;
645 	op_to_queue->src_iovcnt = src_iovcnt;
646 	op_to_queue->dst_iovs = dst_iovs;
647 	op_to_queue->dst_iovcnt = dst_iovcnt;
648 	op_to_queue->compress = compress;
649 	op_to_queue->cb_arg = cb_arg;
650 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
651 			  op_to_queue,
652 			  link);
653 	return 0;
654 }
655 
656 /* Poller for the DPDK compression driver. */
657 static int
658 comp_dev_poller(void *args)
659 {
660 	struct vbdev_compress *comp_bdev = args;
661 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
662 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
663 	uint16_t num_deq;
664 	struct spdk_reduce_vol_cb_args *reduce_args;
665 	struct vbdev_comp_op *op_to_resubmit;
666 	int rc, i;
667 
668 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
669 						NUM_MAX_INFLIGHT_OPS);
670 	for (i = 0; i < num_deq; i++) {
671 		reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata;
672 
673 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
674 
675 			/* tell reduce this is done and what the bytecount was */
676 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
677 		} else {
678 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
679 				       deq_ops[i]->status);
680 
681 			/* Reduce will simply store uncompressed on neg errno value. */
682 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
683 		}
684 
685 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
686 		 * call takes care of freeing all of the mbufs in the chain back to their
687 		 * original pool.
688 		 */
689 		rte_pktmbuf_free(deq_ops[i]->m_src);
690 		rte_pktmbuf_free(deq_ops[i]->m_dst);
691 
692 		/* There is no bulk free for com ops so we have to free them one at a time
693 		 * here however it would be rare that we'd ever have more than 1 at a time
694 		 * anyways.
695 		 */
696 		rte_comp_op_free(deq_ops[i]);
697 
698 		/* Check if there are any pending comp ops to process, only pull one
699 		 * at a time off as _compress_operation() may re-queue the op.
700 		 */
701 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
702 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
703 			rc = _compress_operation(op_to_resubmit->backing_dev,
704 						 op_to_resubmit->src_iovs,
705 						 op_to_resubmit->src_iovcnt,
706 						 op_to_resubmit->dst_iovs,
707 						 op_to_resubmit->dst_iovcnt,
708 						 op_to_resubmit->compress,
709 						 op_to_resubmit->cb_arg);
710 			if (rc == 0) {
711 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
712 				free(op_to_resubmit);
713 			}
714 		}
715 	}
716 	return 0;
717 }
718 
719 /* Entry point for reduce lib to issue a compress operation. */
720 static void
721 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
722 		      struct iovec *src_iovs, int src_iovcnt,
723 		      struct iovec *dst_iovs, int dst_iovcnt,
724 		      struct spdk_reduce_vol_cb_args *cb_arg)
725 {
726 	int rc;
727 
728 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
729 	if (rc) {
730 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
731 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
732 	}
733 }
734 
735 /* Entry point for reduce lib to issue a decompress operation. */
736 static void
737 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
738 			struct iovec *src_iovs, int src_iovcnt,
739 			struct iovec *dst_iovs, int dst_iovcnt,
740 			struct spdk_reduce_vol_cb_args *cb_arg)
741 {
742 	int rc;
743 
744 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
745 	if (rc) {
746 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
747 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
748 	}
749 }
750 
751 /* Callback for getting a buf from the bdev pool in the event that the caller passed
752  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
753  * beneath us before we're done with it.
754  */
755 static void
756 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
757 {
758 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
759 					   comp_bdev);
760 
761 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
762 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
763 			      spdk_reduce_rw_blocks_cb, bdev_io);
764 }
765 
766 /* scheduled for completion on IO thread */
767 static void
768 _complete_other_io(void *arg)
769 {
770 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
771 	if (io_ctx->status == 0) {
772 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
773 	} else {
774 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
775 	}
776 }
777 
778 /* scheduled for submission on reduce thread */
779 static void
780 _comp_bdev_io_submit(void *arg)
781 {
782 	struct spdk_bdev_io *bdev_io = arg;
783 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
784 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
785 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
786 					   comp_bdev);
787 	int rc = 0;
788 
789 	switch (bdev_io->type) {
790 	case SPDK_BDEV_IO_TYPE_READ:
791 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
792 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
793 		return;
794 	case SPDK_BDEV_IO_TYPE_WRITE:
795 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
796 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
797 				       spdk_reduce_rw_blocks_cb, bdev_io);
798 		return;
799 	/* TODO in future patch in the series */
800 	case SPDK_BDEV_IO_TYPE_RESET:
801 		break;
802 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
803 	case SPDK_BDEV_IO_TYPE_UNMAP:
804 	case SPDK_BDEV_IO_TYPE_FLUSH:
805 	default:
806 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
807 		rc = -EINVAL;
808 	}
809 
810 	if (rc) {
811 		if (rc == -ENOMEM) {
812 			SPDK_ERRLOG("No memory, start to queue io for compress.\n");
813 			io_ctx->ch = ch;
814 			vbdev_compress_queue_io(bdev_io);
815 			return;
816 		} else {
817 			SPDK_ERRLOG("on bdev_io submission!\n");
818 			io_ctx->status = rc;
819 		}
820 	}
821 
822 	/* Complete this on the orig IO thread. */
823 	if (spdk_io_channel_get_thread(ch) != spdk_get_thread()) {
824 		spdk_thread_send_msg(spdk_io_channel_get_thread(ch), _complete_other_io, io_ctx);
825 	} else {
826 		_complete_other_io(io_ctx);
827 	}
828 }
829 
830 /* Called when someone above submits IO to this vbdev. */
831 static void
832 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
833 {
834 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
835 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
836 					   comp_bdev);
837 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
838 
839 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
840 	io_ctx->comp_bdev = comp_bdev;
841 	io_ctx->comp_ch = comp_ch;
842 	io_ctx->orig_io = bdev_io;
843 
844 	/* Send this request to the reduce_thread if that's not what we're on. */
845 	if (spdk_io_channel_get_thread(ch) != comp_bdev->reduce_thread) {
846 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
847 	} else {
848 		_comp_bdev_io_submit(bdev_io);
849 	}
850 }
851 
852 static bool
853 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
854 {
855 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
856 
857 	switch (io_type) {
858 	case SPDK_BDEV_IO_TYPE_READ:
859 	case SPDK_BDEV_IO_TYPE_WRITE:
860 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
861 	case SPDK_BDEV_IO_TYPE_UNMAP:
862 	case SPDK_BDEV_IO_TYPE_RESET:
863 	case SPDK_BDEV_IO_TYPE_FLUSH:
864 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
865 	default:
866 		return false;
867 	}
868 }
869 
870 /* Resubmission function used by the bdev layer when a queued IO is ready to be
871  * submitted.
872  */
873 static void
874 vbdev_compress_resubmit_io(void *arg)
875 {
876 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
877 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
878 
879 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
880 }
881 
882 /* Used to queue an IO in the event of resource issues. */
883 static void
884 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
885 {
886 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
887 	int rc;
888 
889 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
890 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
891 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
892 
893 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
894 	if (rc) {
895 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
896 		assert(false);
897 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
898 	}
899 }
900 
901 /* Callback for unregistering the IO device. */
902 static void
903 _device_unregister_cb(void *io_device)
904 {
905 	struct vbdev_compress *comp_bdev = io_device;
906 
907 	/* Done with this comp_bdev. */
908 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
909 	free(comp_bdev->comp_bdev.name);
910 	free(comp_bdev);
911 }
912 
913 static void
914 _vbdev_compress_destruct_cb(void *ctx)
915 {
916 	struct spdk_bdev_desc *desc = ctx;
917 
918 	spdk_bdev_close(desc);
919 }
920 
921 static void
922 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
923 {
924 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
925 
926 	if (reduce_errno) {
927 		SPDK_ERRLOG("number %d\n", reduce_errno);
928 	} else {
929 		TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
930 		spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
931 		/* Close the underlying bdev on its same opened thread. */
932 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
933 			spdk_thread_send_msg(comp_bdev->thread, _vbdev_compress_destruct_cb, comp_bdev->base_desc);
934 		} else {
935 			spdk_bdev_close(comp_bdev->base_desc);
936 		}
937 		comp_bdev->vol = NULL;
938 		if (comp_bdev->orphaned == false) {
939 			spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
940 		} else {
941 			comp_bdev->delete_cb_fn(comp_bdev->delete_cb_arg, 0);
942 			_device_unregister_cb(comp_bdev);
943 		}
944 	}
945 }
946 
947 static void
948 _reduce_destroy_cb(void *ctx, int reduce_errno)
949 {
950 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
951 
952 	if (reduce_errno) {
953 		SPDK_ERRLOG("number %d\n", reduce_errno);
954 	}
955 
956 	comp_bdev->vol = NULL;
957 	spdk_put_io_channel(comp_bdev->base_ch);
958 	if (comp_bdev->orphaned == false) {
959 		spdk_bdev_unregister(&comp_bdev->comp_bdev, comp_bdev->delete_cb_fn,
960 				     comp_bdev->delete_cb_arg);
961 	} else {
962 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
963 	}
964 
965 }
966 
967 /* Called by reduceLib after performing unload vol actions */
968 static void
969 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
970 {
971 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
972 
973 	if (reduce_errno) {
974 		SPDK_ERRLOG("number %d\n", reduce_errno);
975 	} else {
976 		/* reducelib needs a channel to comm with the backing device */
977 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
978 
979 		/* Clean the device before we free our resources. */
980 		spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
981 	}
982 }
983 
984 const char *
985 compress_get_name(const struct vbdev_compress *comp_bdev)
986 {
987 	return comp_bdev->comp_bdev.name;
988 }
989 
990 struct vbdev_compress *
991 compress_bdev_first(void)
992 {
993 	struct vbdev_compress *comp_bdev;
994 
995 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
996 
997 	return comp_bdev;
998 }
999 
1000 struct vbdev_compress *
1001 compress_bdev_next(struct vbdev_compress *prev)
1002 {
1003 	struct vbdev_compress *comp_bdev;
1004 
1005 	comp_bdev = TAILQ_NEXT(prev, link);
1006 
1007 	return comp_bdev;
1008 }
1009 
1010 bool
1011 compress_has_orphan(const char *name)
1012 {
1013 	struct vbdev_compress *comp_bdev;
1014 
1015 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1016 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1017 			return true;
1018 		}
1019 	}
1020 	return false;
1021 }
1022 
1023 /* Called after we've unregistered following a hot remove callback.
1024  * Our finish entry point will be called next.
1025  */
1026 static int
1027 vbdev_compress_destruct(void *ctx)
1028 {
1029 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1030 
1031 	if (comp_bdev->vol != NULL) {
1032 		/* Tell reducelib that we're done with this volume. */
1033 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1034 	} else {
1035 		vbdev_compress_destruct_cb(comp_bdev, 0);
1036 	}
1037 
1038 	return 0;
1039 }
1040 
1041 /* We supplied this as an entry point for upper layers who want to communicate to this
1042  * bdev.  This is how they get a channel.
1043  */
1044 static struct spdk_io_channel *
1045 vbdev_compress_get_io_channel(void *ctx)
1046 {
1047 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1048 
1049 	/* The IO channel code will allocate a channel for us which consists of
1050 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1051 	 * that we passed in when we registered our IO device. It will then call
1052 	 * our channel create callback to populate any elements that we need to
1053 	 * update.
1054 	 */
1055 	return spdk_get_io_channel(comp_bdev);
1056 }
1057 
1058 /* This is the output for bdev_get_bdevs() for this vbdev */
1059 static int
1060 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1061 {
1062 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1063 
1064 	spdk_json_write_name(w, "compress");
1065 	spdk_json_write_object_begin(w);
1066 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1067 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1068 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1069 	spdk_json_write_object_end(w);
1070 
1071 	return 0;
1072 }
1073 
1074 /* This is used to generate JSON that can configure this module to its current state. */
1075 static int
1076 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1077 {
1078 	struct vbdev_compress *comp_bdev;
1079 
1080 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1081 		spdk_json_write_object_begin(w);
1082 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1083 		spdk_json_write_named_object_begin(w, "params");
1084 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1085 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1086 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1087 		spdk_json_write_object_end(w);
1088 		spdk_json_write_object_end(w);
1089 	}
1090 	return 0;
1091 }
1092 
1093 static void
1094 _vbdev_reduce_init_cb(void *ctx)
1095 {
1096 	struct spdk_bdev_desc *desc = ctx;
1097 
1098 	spdk_bdev_close(desc);
1099 }
1100 
1101 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1102  * used for initial metadata operations to claim where it will be further filled out
1103  * and added to the global list.
1104  */
1105 static void
1106 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1107 {
1108 	struct vbdev_compress *meta_ctx = cb_arg;
1109 
1110 	/* We're done with metadata operations */
1111 	spdk_put_io_channel(meta_ctx->base_ch);
1112 	/* Close the underlying bdev on its same opened thread. */
1113 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1114 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx->base_desc);
1115 	} else {
1116 		spdk_bdev_close(meta_ctx->base_desc);
1117 	}
1118 	meta_ctx->base_desc = NULL;
1119 
1120 	if (reduce_errno == 0) {
1121 		meta_ctx->vol = vol;
1122 		vbdev_compress_claim(meta_ctx);
1123 	} else {
1124 		SPDK_ERRLOG("for vol %s, error %u\n",
1125 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1126 		free(meta_ctx);
1127 	}
1128 }
1129 
1130 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1131  * call the callback provided by reduceLib when it called the read/write/unmap function and
1132  * free the bdev_io.
1133  */
1134 static void
1135 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1136 {
1137 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1138 	int reduce_errno;
1139 
1140 	if (success) {
1141 		reduce_errno = 0;
1142 	} else {
1143 		reduce_errno = -EIO;
1144 	}
1145 	spdk_bdev_free_io(bdev_io);
1146 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1147 }
1148 
1149 /* This is the function provided to the reduceLib for sending reads directly to
1150  * the backing device.
1151  */
1152 static void
1153 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1154 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1155 {
1156 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1157 					   backing_dev);
1158 	int rc;
1159 
1160 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1161 				    iov, iovcnt, lba, lba_count,
1162 				    comp_reduce_io_cb,
1163 				    args);
1164 	if (rc) {
1165 		if (rc == -ENOMEM) {
1166 			SPDK_ERRLOG("No memory, start to queue io.\n");
1167 			/* TODO: there's no bdev_io to queue */
1168 		} else {
1169 			SPDK_ERRLOG("submitting readv request\n");
1170 		}
1171 		args->cb_fn(args->cb_arg, rc);
1172 	}
1173 }
1174 
1175 /* This is the function provided to the reduceLib for sending writes directly to
1176  * the backing device.
1177  */
1178 static void
1179 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1180 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1181 {
1182 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1183 					   backing_dev);
1184 	int rc;
1185 
1186 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1187 				     iov, iovcnt, lba, lba_count,
1188 				     comp_reduce_io_cb,
1189 				     args);
1190 	if (rc) {
1191 		if (rc == -ENOMEM) {
1192 			SPDK_ERRLOG("No memory, start to queue io.\n");
1193 			/* TODO: there's no bdev_io to queue */
1194 		} else {
1195 			SPDK_ERRLOG("error submitting writev request\n");
1196 		}
1197 		args->cb_fn(args->cb_arg, rc);
1198 	}
1199 }
1200 
1201 /* This is the function provided to the reduceLib for sending unmaps directly to
1202  * the backing device.
1203  */
1204 static void
1205 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1206 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1207 {
1208 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1209 					   backing_dev);
1210 	int rc;
1211 
1212 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1213 				    lba, lba_count,
1214 				    comp_reduce_io_cb,
1215 				    args);
1216 
1217 	if (rc) {
1218 		if (rc == -ENOMEM) {
1219 			SPDK_ERRLOG("No memory, start to queue io.\n");
1220 			/* TODO: there's no bdev_io to queue */
1221 		} else {
1222 			SPDK_ERRLOG("submitting unmap request\n");
1223 		}
1224 		args->cb_fn(args->cb_arg, rc);
1225 	}
1226 }
1227 
1228 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1229 static void
1230 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1231 {
1232 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1233 
1234 	if (reduce_errno) {
1235 		SPDK_ERRLOG("number %d\n", reduce_errno);
1236 	}
1237 
1238 	comp_bdev->vol = NULL;
1239 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1240 }
1241 
1242 /* Called when the underlying base bdev goes away. */
1243 static void
1244 vbdev_compress_base_bdev_hotremove_cb(void *ctx)
1245 {
1246 	struct vbdev_compress *comp_bdev, *tmp;
1247 	struct spdk_bdev *bdev_find = ctx;
1248 
1249 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1250 		if (bdev_find == comp_bdev->base_bdev) {
1251 			/* Tell reduceLib that we're done with this volume. */
1252 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1253 		}
1254 	}
1255 }
1256 
1257 /* TODO: determine which parms we want user configurable, HC for now
1258  * params.vol_size
1259  * params.chunk_size
1260  * compression PMD, algorithm, window size, comp level, etc.
1261  * DEV_MD_PATH
1262  */
1263 
1264 /* Common function for init and load to allocate and populate the minimal
1265  * information for reducelib to init or load.
1266  */
1267 struct vbdev_compress *
1268 _prepare_for_load_init(struct spdk_bdev *bdev)
1269 {
1270 	struct vbdev_compress *meta_ctx;
1271 
1272 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1273 	if (meta_ctx == NULL) {
1274 		SPDK_ERRLOG("failed to alloc init contexts\n");
1275 		return NULL;
1276 	}
1277 
1278 	meta_ctx->drv_name = "None";
1279 	meta_ctx->base_bdev = bdev;
1280 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1281 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1282 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1283 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1284 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1285 
1286 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1287 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1288 
1289 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1290 	meta_ctx->params.logical_block_size = bdev->blocklen;
1291 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1292 	return meta_ctx;
1293 }
1294 
1295 static bool
1296 _set_pmd(struct vbdev_compress *comp_dev)
1297 {
1298 	if (g_opts == COMPRESS_PMD_AUTO) {
1299 		if (g_qat_available) {
1300 			comp_dev->drv_name = QAT_PMD;
1301 		} else {
1302 			comp_dev->drv_name = ISAL_PMD;
1303 		}
1304 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1305 		comp_dev->drv_name = QAT_PMD;
1306 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1307 		comp_dev->drv_name = ISAL_PMD;
1308 	} else {
1309 		SPDK_ERRLOG("Requested PMD is not available.\n");
1310 		return false;
1311 	}
1312 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1313 	return true;
1314 }
1315 
1316 /* Call reducelib to initialize a new volume */
1317 static int
1318 vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path)
1319 {
1320 	struct vbdev_compress *meta_ctx;
1321 	int rc;
1322 
1323 	meta_ctx = _prepare_for_load_init(bdev);
1324 	if (meta_ctx == NULL) {
1325 		return -EINVAL;
1326 	}
1327 
1328 	if (_set_pmd(meta_ctx) == false) {
1329 		SPDK_ERRLOG("could not find required pmd\n");
1330 		free(meta_ctx);
1331 		return -EINVAL;
1332 	}
1333 
1334 	rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1335 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1336 	if (rc) {
1337 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1338 		free(meta_ctx);
1339 		return -EINVAL;
1340 	}
1341 
1342 	/* Save the thread where the base device is opened */
1343 	meta_ctx->thread = spdk_get_thread();
1344 
1345 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1346 
1347 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1348 			     pm_path,
1349 			     vbdev_reduce_init_cb,
1350 			     meta_ctx);
1351 	return 0;
1352 }
1353 
1354 /* We provide this callback for the SPDK channel code to create a channel using
1355  * the channel struct we provided in our module get_io_channel() entry point. Here
1356  * we get and save off an underlying base channel of the device below us so that
1357  * we can communicate with the base bdev on a per channel basis.  If we needed
1358  * our own poller for this vbdev, we'd register it here.
1359  */
1360 static int
1361 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1362 {
1363 	struct vbdev_compress *comp_bdev = io_device;
1364 	struct comp_device_qp *device_qp;
1365 
1366 	/* We use this queue to track outstanding IO in our layer. */
1367 	TAILQ_INIT(&comp_bdev->pending_comp_ios);
1368 
1369 	/* We use this to queue up compression operations as needed. */
1370 	TAILQ_INIT(&comp_bdev->queued_comp_ops);
1371 
1372 	/* Now set the reduce channel if it's not already set. */
1373 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1374 	if (comp_bdev->ch_count == 0) {
1375 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1376 		comp_bdev->reduce_thread = spdk_get_thread();
1377 		comp_bdev->poller = spdk_poller_register(comp_dev_poller, comp_bdev, 0);
1378 		/* Now assign a q pair */
1379 		pthread_mutex_lock(&g_comp_device_qp_lock);
1380 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1381 			if ((strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0)) {
1382 				if (device_qp->thread == spdk_get_thread()) {
1383 					comp_bdev->device_qp = device_qp;
1384 					break;
1385 				}
1386 				if (device_qp->thread == NULL) {
1387 					comp_bdev->device_qp = device_qp;
1388 					device_qp->thread = spdk_get_thread();
1389 					break;
1390 				}
1391 			}
1392 		}
1393 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1394 	}
1395 	comp_bdev->ch_count++;
1396 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1397 
1398 	if (comp_bdev->device_qp != NULL) {
1399 		return 0;
1400 	} else {
1401 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1402 		assert(false);
1403 		return -ENOMEM;
1404 	}
1405 }
1406 
1407 static void
1408 _channel_cleanup(struct vbdev_compress *comp_bdev)
1409 {
1410 	/* Note: comp_bdevs can share a device_qp if they are
1411 	 * on the same thread so we leave the device_qp element
1412 	 * alone for this comp_bdev and just clear the reduce thread.
1413 	 */
1414 	spdk_put_io_channel(comp_bdev->base_ch);
1415 	comp_bdev->reduce_thread = NULL;
1416 	spdk_poller_unregister(&comp_bdev->poller);
1417 }
1418 
1419 /* Used to reroute destroy_ch to the correct thread */
1420 static void
1421 _comp_bdev_ch_destroy_cb(void *arg)
1422 {
1423 	struct vbdev_compress *comp_bdev = arg;
1424 
1425 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1426 	if (comp_bdev->ch_count == 0) {
1427 		_channel_cleanup(comp_bdev);
1428 	}
1429 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1430 }
1431 
1432 /* We provide this callback for the SPDK channel code to destroy a channel
1433  * created with our create callback. We just need to undo anything we did
1434  * when we created. If this bdev used its own poller, we'd unregister it here.
1435  */
1436 static void
1437 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1438 {
1439 	struct vbdev_compress *comp_bdev = io_device;
1440 
1441 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1442 	comp_bdev->ch_count--;
1443 	if (comp_bdev->ch_count == 0) {
1444 		/* Send this request to the thread where the channel was created. */
1445 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1446 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1447 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1448 		} else {
1449 			_channel_cleanup(comp_bdev);
1450 		}
1451 	}
1452 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1453 }
1454 
1455 /* RPC entry point for compression vbdev creation. */
1456 int
1457 create_compress_bdev(const char *bdev_name, const char *pm_path)
1458 {
1459 	struct spdk_bdev *bdev;
1460 
1461 	bdev = spdk_bdev_get_by_name(bdev_name);
1462 	if (!bdev) {
1463 		return -ENODEV;
1464 	}
1465 
1466 	return vbdev_init_reduce(bdev, pm_path);;
1467 }
1468 
1469 /* On init, just init the compress drivers. All metadata is stored on disk. */
1470 static int
1471 vbdev_compress_init(void)
1472 {
1473 	if (vbdev_init_compress_drivers()) {
1474 		SPDK_ERRLOG("Error setting up compression devices\n");
1475 		return -EINVAL;
1476 	}
1477 
1478 	return 0;
1479 }
1480 
1481 /* Called when the entire module is being torn down. */
1482 static void
1483 vbdev_compress_finish(void)
1484 {
1485 	struct comp_device_qp *dev_qp;
1486 	/* TODO: unload vol in a future patch */
1487 
1488 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1489 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1490 		free(dev_qp);
1491 	}
1492 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1493 
1494 	rte_mempool_free(g_comp_op_mp);
1495 	rte_mempool_free(g_mbuf_mp);
1496 }
1497 
1498 /* During init we'll be asked how much memory we'd like passed to us
1499  * in bev_io structures as context. Here's where we specify how
1500  * much context we want per IO.
1501  */
1502 static int
1503 vbdev_compress_get_ctx_size(void)
1504 {
1505 	return sizeof(struct comp_bdev_io);
1506 }
1507 
1508 /* When we register our bdev this is how we specify our entry points. */
1509 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1510 	.destruct		= vbdev_compress_destruct,
1511 	.submit_request		= vbdev_compress_submit_request,
1512 	.io_type_supported	= vbdev_compress_io_type_supported,
1513 	.get_io_channel		= vbdev_compress_get_io_channel,
1514 	.dump_info_json		= vbdev_compress_dump_info_json,
1515 	.write_config_json	= NULL,
1516 };
1517 
1518 static struct spdk_bdev_module compress_if = {
1519 	.name = "compress",
1520 	.module_init = vbdev_compress_init,
1521 	.config_text = NULL,
1522 	.get_ctx_size = vbdev_compress_get_ctx_size,
1523 	.examine_disk = vbdev_compress_examine,
1524 	.module_fini = vbdev_compress_finish,
1525 	.config_json = vbdev_compress_config_json
1526 };
1527 
1528 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1529 
1530 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1531 {
1532 	struct spdk_bdev_alias *aliases;
1533 
1534 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1535 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1536 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias);
1537 		if (!comp_bdev->comp_bdev.name) {
1538 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1539 			return -ENOMEM;
1540 		}
1541 	} else {
1542 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1543 		if (!comp_bdev->comp_bdev.name) {
1544 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1545 			return -ENOMEM;
1546 		}
1547 	}
1548 	return 0;
1549 }
1550 
1551 static void
1552 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1553 {
1554 	int rc;
1555 
1556 	if (_set_compbdev_name(comp_bdev)) {
1557 		goto error_bdev_name;
1558 	}
1559 
1560 	/* Note: some of the fields below will change in the future - for example,
1561 	 * blockcnt specifically will not match (the compressed volume size will
1562 	 * be slightly less than the base bdev size)
1563 	 */
1564 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1565 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1566 
1567 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1568 		comp_bdev->comp_bdev.required_alignment =
1569 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1570 				 comp_bdev->base_bdev->required_alignment);
1571 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1572 			       comp_bdev->comp_bdev.required_alignment);
1573 	} else {
1574 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1575 	}
1576 	comp_bdev->comp_bdev.optimal_io_boundary =
1577 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1578 
1579 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1580 
1581 	comp_bdev->comp_bdev.blocklen = comp_bdev->base_bdev->blocklen;
1582 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1583 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1584 
1585 	/* This is the context that is passed to us when the bdev
1586 	 * layer calls in so we'll save our comp_bdev node here.
1587 	 */
1588 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1589 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1590 	comp_bdev->comp_bdev.module = &compress_if;
1591 
1592 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1593 
1594 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1595 
1596 	rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1597 			    comp_bdev->base_bdev, &comp_bdev->base_desc);
1598 	if (rc) {
1599 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1600 		goto error_open;
1601 	}
1602 
1603 	/* Save the thread where the base device is opened */
1604 	comp_bdev->thread = spdk_get_thread();
1605 
1606 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1607 				sizeof(struct comp_io_channel),
1608 				comp_bdev->comp_bdev.name);
1609 
1610 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1611 					 comp_bdev->comp_bdev.module);
1612 	if (rc) {
1613 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1614 		goto error_claim;
1615 	}
1616 
1617 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1618 	if (rc < 0) {
1619 		SPDK_ERRLOG("trying to register bdev\n");
1620 		goto error_bdev_register;
1621 	}
1622 
1623 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1624 
1625 	return;
1626 	/* Error cleanup paths. */
1627 error_bdev_register:
1628 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1629 error_claim:
1630 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
1631 	spdk_io_device_unregister(comp_bdev, NULL);
1632 error_open:
1633 	free(comp_bdev->comp_bdev.name);
1634 error_bdev_name:
1635 	spdk_put_io_channel(comp_bdev->base_ch);
1636 	spdk_bdev_close(comp_bdev->base_desc);
1637 	free(comp_bdev);
1638 	spdk_bdev_module_examine_done(&compress_if);
1639 }
1640 
1641 void
1642 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1643 {
1644 	struct vbdev_compress *comp_bdev = NULL;
1645 
1646 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1647 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1648 			break;
1649 		}
1650 	}
1651 
1652 	if (comp_bdev == NULL) {
1653 		cb_fn(cb_arg, -ENODEV);
1654 		return;
1655 	}
1656 
1657 	/* Save these for after the vol is destroyed. */
1658 	comp_bdev->delete_cb_fn = cb_fn;
1659 	comp_bdev->delete_cb_arg = cb_arg;
1660 
1661 	/* Tell reducelib that we're done with this volume. */
1662 	if (comp_bdev->orphaned == false) {
1663 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1664 	} else {
1665 		delete_vol_unload_cb(comp_bdev, 0);
1666 	}
1667 }
1668 
1669 static void
1670 _vbdev_reduce_load_cb(void *ctx)
1671 {
1672 	struct spdk_bdev_desc *desc = ctx;
1673 
1674 	spdk_bdev_close(desc);
1675 }
1676 
1677 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1678  * used for initial metadata operations to claim where it will be further filled out
1679  * and added to the global list.
1680  */
1681 static void
1682 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1683 {
1684 	struct vbdev_compress *meta_ctx = cb_arg;
1685 	int rc;
1686 
1687 	/* Done with metadata operations */
1688 	spdk_put_io_channel(meta_ctx->base_ch);
1689 	/* Close the underlying bdev on its same opened thread. */
1690 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1691 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx->base_desc);
1692 	} else {
1693 		spdk_bdev_close(meta_ctx->base_desc);
1694 	}
1695 	meta_ctx->base_desc = NULL;
1696 
1697 	if (reduce_errno != 0 && reduce_errno != -ENOENT) {
1698 		/* This error means it is not a compress disk. */
1699 		if (reduce_errno != -EILSEQ) {
1700 			SPDK_ERRLOG("for vol %s, error %u\n",
1701 				    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1702 		}
1703 		free(meta_ctx);
1704 		spdk_bdev_module_examine_done(&compress_if);
1705 		return;
1706 	}
1707 
1708 	/* this status means that the vol could not be loaded because
1709 	 * the pmem file can't be found.
1710 	 */
1711 	if (reduce_errno == -ENOENT) {
1712 		if (_set_compbdev_name(meta_ctx)) {
1713 			goto err;
1714 		}
1715 
1716 		/* We still want to open and claim the backing device to protect the data until
1717 		 * either the pm metadata file is recovered or the comp bdev is deleted.
1718 		 */
1719 		rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
1720 				    meta_ctx->base_bdev, &meta_ctx->base_desc);
1721 		if (rc) {
1722 			SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1723 			goto err;
1724 		}
1725 
1726 		/* Save the thread where the base device is opened */
1727 		meta_ctx->thread = spdk_get_thread();
1728 
1729 		meta_ctx->comp_bdev.module = &compress_if;
1730 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1731 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1732 						 meta_ctx->comp_bdev.module);
1733 		if (rc) {
1734 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1735 			goto err;
1736 		}
1737 
1738 		meta_ctx->orphaned = true;
1739 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1740 err:
1741 		spdk_bdev_module_examine_done(&compress_if);
1742 		return;
1743 	}
1744 
1745 	if (_set_pmd(meta_ctx) == false) {
1746 		SPDK_ERRLOG("could not find required pmd\n");
1747 		free(meta_ctx);
1748 		spdk_bdev_module_examine_done(&compress_if);
1749 		return;
1750 	}
1751 
1752 	/* Update information following volume load. */
1753 	meta_ctx->vol = vol;
1754 	memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1755 	       sizeof(struct spdk_reduce_vol_params));
1756 	vbdev_compress_claim(meta_ctx);
1757 	spdk_bdev_module_examine_done(&compress_if);
1758 }
1759 
1760 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1761  * and if so will go ahead and claim it.
1762  */
1763 static void
1764 vbdev_compress_examine(struct spdk_bdev *bdev)
1765 {
1766 	struct vbdev_compress *meta_ctx;
1767 	int rc;
1768 
1769 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1770 		spdk_bdev_module_examine_done(&compress_if);
1771 		return;
1772 	}
1773 
1774 	meta_ctx = _prepare_for_load_init(bdev);
1775 	if (meta_ctx == NULL) {
1776 		spdk_bdev_module_examine_done(&compress_if);
1777 		return;
1778 	}
1779 
1780 	rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
1781 			    meta_ctx->base_bdev, &meta_ctx->base_desc);
1782 	if (rc) {
1783 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1784 		free(meta_ctx);
1785 		spdk_bdev_module_examine_done(&compress_if);
1786 		return;
1787 	}
1788 
1789 	/* Save the thread where the base device is opened */
1790 	meta_ctx->thread = spdk_get_thread();
1791 
1792 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1793 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1794 }
1795 
1796 int
1797 compress_set_pmd(enum compress_pmd *opts)
1798 {
1799 	g_opts = *opts;
1800 
1801 	return 0;
1802 }
1803 
1804 SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
1805