xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 307b8c112ffd90a26d53dd15fad67bd9038ef526)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "vbdev_compress.h"
8 
9 #include "spdk/reduce.h"
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/bdev_module.h"
18 #include "spdk/likely.h"
19 
20 #include "spdk/log.h"
21 
22 #include <rte_config.h>
23 #include <rte_bus_vdev.h>
24 #include <rte_compressdev.h>
25 #include <rte_comp.h>
26 #include <rte_mbuf_dyn.h>
27 
28 /* Used to store IO context in mbuf */
29 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
30 	.name = "context_reduce",
31 	.size = sizeof(uint64_t),
32 	.align = __alignof__(uint64_t),
33 	.flags = 0,
34 };
35 static int g_mbuf_offset;
36 
37 #define NUM_MAX_XFORMS 2
38 #define NUM_MAX_INFLIGHT_OPS 128
39 #define DEFAULT_WINDOW_SIZE 15
40 /* We need extra mbufs per operation to accommodate host buffers that
41  *  span a physical page boundary.
42  */
43 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
44 #define CHUNK_SIZE (1024 * 16)
45 #define COMP_BDEV_NAME "compress"
46 #define BACKING_IO_SZ (4 * 1024)
47 
48 #define ISAL_PMD "compress_isal"
49 #define QAT_PMD "compress_qat"
50 #define MLX5_PMD "mlx5_pci"
51 #define NUM_MBUFS		8192
52 #define POOL_CACHE_SIZE		256
53 
54 static enum compress_pmd g_opts;
55 
56 /* Global list of available compression devices. */
57 struct compress_dev {
58 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
59 	uint8_t				cdev_id;	/* identifier for the device */
60 	void				*comp_xform;	/* shared private xform for comp on this PMD */
61 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
62 	TAILQ_ENTRY(compress_dev)	link;
63 };
64 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
65 
66 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
67  * the length of the internal ring name that it creates, it breaks a limit in the generic
68  * ring code and fails the qp initialization.
69  * FIXME: Reduce number of qpairs to 48, due to issue #2338
70  */
71 #define MAX_NUM_QP 48
72 /* Global list and lock for unique device/queue pair combos */
73 struct comp_device_qp {
74 	struct compress_dev		*device;	/* ptr to compression device */
75 	uint8_t				qp;		/* queue pair for this node */
76 	struct spdk_thread		*thread;	/* thread that this qp is assigned to */
77 	TAILQ_ENTRY(comp_device_qp)	link;
78 };
79 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
80 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
81 
82 /* For queueing up compression operations that we can't submit for some reason */
83 struct vbdev_comp_op {
84 	struct spdk_reduce_backing_dev	*backing_dev;
85 	struct iovec			*src_iovs;
86 	int				src_iovcnt;
87 	struct iovec			*dst_iovs;
88 	int				dst_iovcnt;
89 	bool				compress;
90 	void				*cb_arg;
91 	TAILQ_ENTRY(vbdev_comp_op)	link;
92 };
93 
94 struct vbdev_comp_delete_ctx {
95 	spdk_delete_compress_complete	cb_fn;
96 	void				*cb_arg;
97 	int				cb_rc;
98 	struct spdk_thread		*orig_thread;
99 };
100 
101 /* List of virtual bdevs and associated info for each. */
102 struct vbdev_compress {
103 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
104 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
105 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
106 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
107 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
108 	char				*drv_name;	/* name of the compression device driver */
109 	struct comp_device_qp		*device_qp;
110 	struct spdk_thread		*reduce_thread;
111 	pthread_mutex_t			reduce_lock;
112 	uint32_t			ch_count;
113 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
114 	struct spdk_poller		*poller;	/* completion poller */
115 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
116 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
117 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
118 	struct vbdev_comp_delete_ctx	*delete_ctx;
119 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
120 	int				reduce_errno;
121 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
122 	TAILQ_ENTRY(vbdev_compress)	link;
123 	struct spdk_thread		*thread;	/* thread where base device is opened */
124 };
125 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
126 
127 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
128  */
129 struct comp_io_channel {
130 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
131 };
132 
133 /* Per I/O context for the compression vbdev. */
134 struct comp_bdev_io {
135 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
136 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
137 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
138 	struct spdk_bdev_io		*orig_io;		/* the original IO */
139 	struct spdk_io_channel		*ch;			/* for resubmission */
140 	int				status;			/* save for completion on orig thread */
141 };
142 
143 /* Shared mempools between all devices on this system */
144 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
145 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
146 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
147 static bool g_qat_available = false;
148 static bool g_isal_available = false;
149 static bool g_mlx5_pci_available = false;
150 
151 /* Create shared (between all ops per PMD) compress xforms. */
152 static struct rte_comp_xform g_comp_xform = {
153 	.type = RTE_COMP_COMPRESS,
154 	.compress = {
155 		.algo = RTE_COMP_ALGO_DEFLATE,
156 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
157 		.level = RTE_COMP_LEVEL_MAX,
158 		.window_size = DEFAULT_WINDOW_SIZE,
159 		.chksum = RTE_COMP_CHECKSUM_NONE,
160 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
161 	}
162 };
163 /* Create shared (between all ops per PMD) decompress xforms. */
164 static struct rte_comp_xform g_decomp_xform = {
165 	.type = RTE_COMP_DECOMPRESS,
166 	.decompress = {
167 		.algo = RTE_COMP_ALGO_DEFLATE,
168 		.chksum = RTE_COMP_CHECKSUM_NONE,
169 		.window_size = DEFAULT_WINDOW_SIZE,
170 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
171 	}
172 };
173 
174 static void vbdev_compress_examine(struct spdk_bdev *bdev);
175 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
176 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
177 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
178 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
179 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
180 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
181 
182 /* Dummy function used by DPDK to free ext attached buffers
183  * to mbufs, we free them ourselves but this callback has to
184  * be here.
185  */
186 static void
187 shinfo_free_cb(void *arg1, void *arg2)
188 {
189 }
190 
191 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
192 static int
193 create_compress_dev(uint8_t index)
194 {
195 	struct compress_dev *device;
196 	uint16_t q_pairs;
197 	uint8_t cdev_id;
198 	int rc, i;
199 	struct comp_device_qp *dev_qp;
200 	struct comp_device_qp *tmp_qp;
201 
202 	device = calloc(1, sizeof(struct compress_dev));
203 	if (!device) {
204 		return -ENOMEM;
205 	}
206 
207 	/* Get details about this device. */
208 	rte_compressdev_info_get(index, &device->cdev_info);
209 
210 	cdev_id = device->cdev_id = index;
211 
212 	/* Zero means no limit so choose number of lcores. */
213 	if (device->cdev_info.max_nb_queue_pairs == 0) {
214 		q_pairs = MAX_NUM_QP;
215 	} else {
216 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
217 	}
218 
219 	/* Configure the compression device. */
220 	struct rte_compressdev_config config = {
221 		.socket_id = rte_socket_id(),
222 		.nb_queue_pairs = q_pairs,
223 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
224 		.max_nb_streams = 0
225 	};
226 	rc = rte_compressdev_configure(cdev_id, &config);
227 	if (rc < 0) {
228 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
229 		goto err;
230 	}
231 
232 	/* Pre-setup all potential qpairs now and assign them in the channel
233 	 * callback.
234 	 */
235 	for (i = 0; i < q_pairs; i++) {
236 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
237 						      NUM_MAX_INFLIGHT_OPS,
238 						      rte_socket_id());
239 		if (rc) {
240 			if (i > 0) {
241 				q_pairs = i;
242 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
243 					       "compressdev %u with error %u "
244 					       "so limiting to %u qpairs\n",
245 					       cdev_id, rc, q_pairs);
246 				break;
247 			} else {
248 				SPDK_ERRLOG("Failed to setup queue pair on "
249 					    "compressdev %u with error %u\n", cdev_id, rc);
250 				rc = -EINVAL;
251 				goto err;
252 			}
253 		}
254 	}
255 
256 	rc = rte_compressdev_start(cdev_id);
257 	if (rc < 0) {
258 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
259 			    cdev_id, rc);
260 		goto err;
261 	}
262 
263 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
264 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
265 				&device->comp_xform);
266 		if (rc < 0) {
267 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
268 				    cdev_id, rc);
269 			goto err;
270 		}
271 
272 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
273 				&device->decomp_xform);
274 		if (rc) {
275 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
276 				    cdev_id, rc);
277 			goto err;
278 		}
279 	} else {
280 		SPDK_ERRLOG("PMD does not support shared transforms\n");
281 		goto err;
282 	}
283 
284 	/* Build up list of device/qp combinations */
285 	for (i = 0; i < q_pairs; i++) {
286 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
287 		if (!dev_qp) {
288 			rc = -ENOMEM;
289 			goto err;
290 		}
291 		dev_qp->device = device;
292 		dev_qp->qp = i;
293 		dev_qp->thread = NULL;
294 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
295 	}
296 
297 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
298 
299 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
300 		g_qat_available = true;
301 	}
302 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
303 		g_isal_available = true;
304 	}
305 	if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
306 		g_mlx5_pci_available = true;
307 	}
308 
309 	return 0;
310 
311 err:
312 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
313 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
314 		free(dev_qp);
315 	}
316 	free(device);
317 	return rc;
318 }
319 
320 /* Called from driver init entry point, vbdev_compress_init() */
321 static int
322 vbdev_init_compress_drivers(void)
323 {
324 	uint8_t cdev_count, i;
325 	struct compress_dev *tmp_dev;
326 	struct compress_dev *device;
327 	int rc;
328 
329 	/* We always init the compress_isal PMD */
330 	rc = rte_vdev_init(ISAL_PMD, NULL);
331 	if (rc == 0) {
332 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
333 	} else if (rc == -EEXIST) {
334 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
335 	} else {
336 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
337 		return -EINVAL;
338 	}
339 
340 	/* If we have no compression devices, there's no reason to continue. */
341 	cdev_count = rte_compressdev_count();
342 	if (cdev_count == 0) {
343 		return 0;
344 	}
345 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
346 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
347 		return -EINVAL;
348 	}
349 
350 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
351 	if (g_mbuf_offset < 0) {
352 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
353 		return -EINVAL;
354 	}
355 
356 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
357 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
358 	if (g_mbuf_mp == NULL) {
359 		SPDK_ERRLOG("Cannot create mbuf pool\n");
360 		rc = -ENOMEM;
361 		goto error_create_mbuf;
362 	}
363 
364 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
365 					       0, rte_socket_id());
366 	if (g_comp_op_mp == NULL) {
367 		SPDK_ERRLOG("Cannot create comp op pool\n");
368 		rc = -ENOMEM;
369 		goto error_create_op;
370 	}
371 
372 	/* Init all devices */
373 	for (i = 0; i < cdev_count; i++) {
374 		rc = create_compress_dev(i);
375 		if (rc != 0) {
376 			goto error_create_compress_devs;
377 		}
378 	}
379 
380 	if (g_qat_available == true) {
381 		SPDK_NOTICELOG("initialized QAT PMD\n");
382 	}
383 
384 	g_shinfo.free_cb = shinfo_free_cb;
385 
386 	return 0;
387 
388 	/* Error cleanup paths. */
389 error_create_compress_devs:
390 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
391 		TAILQ_REMOVE(&g_compress_devs, device, link);
392 		free(device);
393 	}
394 error_create_op:
395 error_create_mbuf:
396 	rte_mempool_free(g_mbuf_mp);
397 
398 	return rc;
399 }
400 
401 /* for completing rw requests on the orig IO thread. */
402 static void
403 _reduce_rw_blocks_cb(void *arg)
404 {
405 	struct comp_bdev_io *io_ctx = arg;
406 
407 	if (spdk_likely(io_ctx->status == 0)) {
408 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
409 	} else if (io_ctx->status == -ENOMEM) {
410 		vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx));
411 	} else {
412 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
413 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
414 	}
415 }
416 
417 /* Completion callback for r/w that were issued via reducelib. */
418 static void
419 reduce_rw_blocks_cb(void *arg, int reduce_errno)
420 {
421 	struct spdk_bdev_io *bdev_io = arg;
422 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
423 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
424 	struct spdk_thread *orig_thread;
425 
426 	/* TODO: need to decide which error codes are bdev_io success vs failure;
427 	 * example examine calls reading metadata */
428 
429 	io_ctx->status = reduce_errno;
430 
431 	/* Send this request to the orig IO thread. */
432 	orig_thread = spdk_io_channel_get_thread(ch);
433 
434 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
435 }
436 
437 static int
438 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
439 		     struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
440 {
441 	uint64_t updated_length, remainder, phys_addr;
442 	uint8_t *current_base = NULL;
443 	int iov_index, mbuf_index;
444 	int rc = 0;
445 
446 	/* Setup mbufs */
447 	iov_index = mbuf_index = 0;
448 	while (iov_index < iovcnt) {
449 
450 		current_base = iovs[iov_index].iov_base;
451 		if (total_length) {
452 			*total_length += iovs[iov_index].iov_len;
453 		}
454 		assert(mbufs[mbuf_index] != NULL);
455 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
456 		updated_length = iovs[iov_index].iov_len;
457 		phys_addr = spdk_vtophys((void *)current_base, &updated_length);
458 
459 		rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
460 					  current_base,
461 					  phys_addr,
462 					  updated_length,
463 					  &g_shinfo);
464 		rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
465 		remainder = iovs[iov_index].iov_len - updated_length;
466 
467 		if (mbuf_index > 0) {
468 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
469 		}
470 
471 		/* If we crossed 2 physical pages boundary we need another mbuf for the remainder */
472 		if (remainder > 0) {
473 			/* allocate an mbuf at the end of the array */
474 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
475 						    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
476 			if (rc) {
477 				SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
478 				return -1;
479 			}
480 			(*mbuf_total)++;
481 			mbuf_index++;
482 			*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
483 			current_base += updated_length;
484 			phys_addr = spdk_vtophys((void *)current_base, &remainder);
485 			/* assert we don't cross another */
486 			assert(remainder == iovs[iov_index].iov_len - updated_length);
487 
488 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
489 						  current_base,
490 						  phys_addr,
491 						  remainder,
492 						  &g_shinfo);
493 			rte_pktmbuf_append(mbufs[mbuf_index], remainder);
494 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
495 		}
496 		iov_index++;
497 		mbuf_index++;
498 	}
499 
500 	return 0;
501 }
502 
503 static int
504 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
505 		    int src_iovcnt, struct iovec *dst_iovs,
506 		    int dst_iovcnt, bool compress, void *cb_arg)
507 {
508 	void *reduce_cb_arg = cb_arg;
509 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
510 					   backing_dev);
511 	struct rte_comp_op *comp_op;
512 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
513 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
514 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
515 	uint64_t total_length = 0;
516 	int rc = 0;
517 	struct vbdev_comp_op *op_to_queue;
518 	int src_mbuf_total = src_iovcnt;
519 	int dst_mbuf_total = dst_iovcnt;
520 	bool device_error = false;
521 
522 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
523 
524 #ifdef DEBUG
525 	memset(src_mbufs, 0, sizeof(src_mbufs));
526 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
527 #endif
528 
529 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
530 	if (!comp_op) {
531 		SPDK_ERRLOG("trying to get a comp op!\n");
532 		rc = -ENOMEM;
533 		goto error_get_op;
534 	}
535 
536 	/* get an mbuf per iov, src and dst */
537 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
538 	if (rc) {
539 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
540 		rc = -ENOMEM;
541 		goto error_get_src;
542 	}
543 	assert(src_mbufs[0]);
544 
545 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
546 	if (rc) {
547 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
548 		rc = -ENOMEM;
549 		goto error_get_dst;
550 	}
551 	assert(dst_mbufs[0]);
552 
553 	/* There is a 1:1 mapping between a bdev_io and a compression operation
554 	 * Some PMDs that SPDK uses don't support chaining, but reduce library should
555 	 * provide correct buffers
556 	 * Build our mbuf chain and associate it with our single comp_op.
557 	 */
558 	rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length,
559 				  src_iovs, src_iovcnt, reduce_cb_arg);
560 	if (rc < 0) {
561 		goto error_src_dst;
562 	}
563 	if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) {
564 		if (src_iovcnt == 1) {
565 			SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n",
566 				    comp_bdev->drv_name);
567 		} else {
568 			SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name);
569 		}
570 		rc = -EINVAL;
571 		goto error_src_dst;
572 	}
573 
574 	comp_op->m_src = src_mbufs[0];
575 	comp_op->src.offset = 0;
576 	comp_op->src.length = total_length;
577 
578 	rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL,
579 				  dst_iovs, dst_iovcnt, reduce_cb_arg);
580 	if (rc < 0) {
581 		goto error_src_dst;
582 	}
583 	if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) {
584 		if (dst_iovcnt == 1) {
585 			SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n",
586 				    comp_bdev->drv_name);
587 		} else {
588 			SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name);
589 		}
590 		rc = -EINVAL;
591 		goto error_src_dst;
592 	}
593 
594 	comp_op->m_dst = dst_mbufs[0];
595 	comp_op->dst.offset = 0;
596 
597 	if (compress == true) {
598 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
599 	} else {
600 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
601 	}
602 
603 	comp_op->op_type = RTE_COMP_OP_STATELESS;
604 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
605 
606 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
607 	assert(rc <= 1);
608 
609 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
610 	if (rc == 1) {
611 		return 0;
612 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
613 		rc = -EAGAIN;
614 	} else {
615 		device_error = true;
616 	}
617 
618 	/* Error cleanup paths. */
619 error_src_dst:
620 	rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt);
621 error_get_dst:
622 	rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt);
623 error_get_src:
624 	rte_comp_op_free(comp_op);
625 error_get_op:
626 
627 	if (device_error == true) {
628 		/* There was an error sending the op to the device, most
629 		 * likely with the parameters.
630 		 */
631 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
632 		return -EINVAL;
633 	}
634 	if (rc != -ENOMEM && rc != -EAGAIN) {
635 		return rc;
636 	}
637 
638 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
639 	if (op_to_queue == NULL) {
640 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
641 		return -ENOMEM;
642 	}
643 	op_to_queue->backing_dev = backing_dev;
644 	op_to_queue->src_iovs = src_iovs;
645 	op_to_queue->src_iovcnt = src_iovcnt;
646 	op_to_queue->dst_iovs = dst_iovs;
647 	op_to_queue->dst_iovcnt = dst_iovcnt;
648 	op_to_queue->compress = compress;
649 	op_to_queue->cb_arg = cb_arg;
650 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
651 			  op_to_queue,
652 			  link);
653 	return 0;
654 }
655 
656 /* Poller for the DPDK compression driver. */
657 static int
658 comp_dev_poller(void *args)
659 {
660 	struct vbdev_compress *comp_bdev = args;
661 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
662 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
663 	uint16_t num_deq;
664 	struct spdk_reduce_vol_cb_args *reduce_args;
665 	struct vbdev_comp_op *op_to_resubmit;
666 	int rc, i;
667 
668 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
669 						NUM_MAX_INFLIGHT_OPS);
670 	for (i = 0; i < num_deq; i++) {
671 		reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
672 				uint64_t *);
673 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
674 
675 			/* tell reduce this is done and what the bytecount was */
676 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
677 		} else {
678 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
679 				       deq_ops[i]->status);
680 
681 			/* Reduce will simply store uncompressed on neg errno value. */
682 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
683 		}
684 
685 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
686 		 * call takes care of freeing all of the mbufs in the chain back to their
687 		 * original pool.
688 		 */
689 		rte_pktmbuf_free(deq_ops[i]->m_src);
690 		rte_pktmbuf_free(deq_ops[i]->m_dst);
691 
692 		/* There is no bulk free for com ops so we have to free them one at a time
693 		 * here however it would be rare that we'd ever have more than 1 at a time
694 		 * anyways.
695 		 */
696 		rte_comp_op_free(deq_ops[i]);
697 
698 		/* Check if there are any pending comp ops to process, only pull one
699 		 * at a time off as _compress_operation() may re-queue the op.
700 		 */
701 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
702 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
703 			rc = _compress_operation(op_to_resubmit->backing_dev,
704 						 op_to_resubmit->src_iovs,
705 						 op_to_resubmit->src_iovcnt,
706 						 op_to_resubmit->dst_iovs,
707 						 op_to_resubmit->dst_iovcnt,
708 						 op_to_resubmit->compress,
709 						 op_to_resubmit->cb_arg);
710 			if (rc == 0) {
711 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
712 				free(op_to_resubmit);
713 			}
714 		}
715 	}
716 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
717 }
718 
719 /* Entry point for reduce lib to issue a compress operation. */
720 static void
721 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
722 		      struct iovec *src_iovs, int src_iovcnt,
723 		      struct iovec *dst_iovs, int dst_iovcnt,
724 		      struct spdk_reduce_vol_cb_args *cb_arg)
725 {
726 	int rc;
727 
728 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
729 	if (rc) {
730 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
731 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
732 	}
733 }
734 
735 /* Entry point for reduce lib to issue a decompress operation. */
736 static void
737 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
738 			struct iovec *src_iovs, int src_iovcnt,
739 			struct iovec *dst_iovs, int dst_iovcnt,
740 			struct spdk_reduce_vol_cb_args *cb_arg)
741 {
742 	int rc;
743 
744 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
745 	if (rc) {
746 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
747 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
748 	}
749 }
750 
751 /* Callback for getting a buf from the bdev pool in the event that the caller passed
752  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
753  * beneath us before we're done with it.
754  */
755 static void
756 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
757 {
758 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
759 					   comp_bdev);
760 
761 	if (spdk_unlikely(!success)) {
762 		SPDK_ERRLOG("Failed to get data buffer\n");
763 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
764 		return;
765 	}
766 
767 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
768 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
769 			      reduce_rw_blocks_cb, bdev_io);
770 }
771 
772 /* scheduled for completion on IO thread */
773 static void
774 _complete_other_io(void *arg)
775 {
776 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
777 	if (io_ctx->status == 0) {
778 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
779 	} else {
780 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
781 	}
782 }
783 
784 /* scheduled for submission on reduce thread */
785 static void
786 _comp_bdev_io_submit(void *arg)
787 {
788 	struct spdk_bdev_io *bdev_io = arg;
789 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
790 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
791 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
792 					   comp_bdev);
793 	struct spdk_thread *orig_thread;
794 
795 	switch (bdev_io->type) {
796 	case SPDK_BDEV_IO_TYPE_READ:
797 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
798 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
799 		return;
800 	case SPDK_BDEV_IO_TYPE_WRITE:
801 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
802 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
803 				       reduce_rw_blocks_cb, bdev_io);
804 		return;
805 	/* TODO support RESET in future patch in the series */
806 	case SPDK_BDEV_IO_TYPE_RESET:
807 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
808 	case SPDK_BDEV_IO_TYPE_UNMAP:
809 	case SPDK_BDEV_IO_TYPE_FLUSH:
810 	default:
811 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
812 		io_ctx->status = -ENOTSUP;
813 
814 		/* Complete this on the orig IO thread. */
815 		orig_thread = spdk_io_channel_get_thread(ch);
816 		if (orig_thread != spdk_get_thread()) {
817 			spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
818 		} else {
819 			_complete_other_io(io_ctx);
820 		}
821 	}
822 }
823 
824 /* Called when someone above submits IO to this vbdev. */
825 static void
826 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
827 {
828 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
829 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
830 					   comp_bdev);
831 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
832 
833 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
834 	io_ctx->comp_bdev = comp_bdev;
835 	io_ctx->comp_ch = comp_ch;
836 	io_ctx->orig_io = bdev_io;
837 
838 	/* Send this request to the reduce_thread if that's not what we're on. */
839 	if (spdk_get_thread() != comp_bdev->reduce_thread) {
840 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
841 	} else {
842 		_comp_bdev_io_submit(bdev_io);
843 	}
844 }
845 
846 static bool
847 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
848 {
849 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
850 
851 	switch (io_type) {
852 	case SPDK_BDEV_IO_TYPE_READ:
853 	case SPDK_BDEV_IO_TYPE_WRITE:
854 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
855 	case SPDK_BDEV_IO_TYPE_UNMAP:
856 	case SPDK_BDEV_IO_TYPE_RESET:
857 	case SPDK_BDEV_IO_TYPE_FLUSH:
858 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
859 	default:
860 		return false;
861 	}
862 }
863 
864 /* Resubmission function used by the bdev layer when a queued IO is ready to be
865  * submitted.
866  */
867 static void
868 vbdev_compress_resubmit_io(void *arg)
869 {
870 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
871 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
872 
873 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
874 }
875 
876 /* Used to queue an IO in the event of resource issues. */
877 static void
878 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
879 {
880 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
881 	int rc;
882 
883 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
884 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
885 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
886 
887 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
888 	if (rc) {
889 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
890 		assert(false);
891 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
892 	}
893 }
894 
895 /* Callback for unregistering the IO device. */
896 static void
897 _device_unregister_cb(void *io_device)
898 {
899 	struct vbdev_compress *comp_bdev = io_device;
900 
901 	/* Done with this comp_bdev. */
902 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
903 	free(comp_bdev->comp_bdev.name);
904 	free(comp_bdev);
905 }
906 
907 static void
908 _vbdev_compress_destruct_cb(void *ctx)
909 {
910 	struct vbdev_compress *comp_bdev = ctx;
911 
912 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
913 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
914 	/* Close the underlying bdev on its same opened thread. */
915 	spdk_bdev_close(comp_bdev->base_desc);
916 	comp_bdev->vol = NULL;
917 	if (comp_bdev->orphaned == false) {
918 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
919 	} else {
920 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
921 		_device_unregister_cb(comp_bdev);
922 	}
923 }
924 
925 static void
926 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
927 {
928 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
929 
930 	if (reduce_errno) {
931 		SPDK_ERRLOG("number %d\n", reduce_errno);
932 	} else {
933 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
934 			spdk_thread_send_msg(comp_bdev->thread,
935 					     _vbdev_compress_destruct_cb, comp_bdev);
936 		} else {
937 			_vbdev_compress_destruct_cb(comp_bdev);
938 		}
939 	}
940 }
941 
942 static void
943 _reduce_destroy_cb(void *ctx, int reduce_errno)
944 {
945 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
946 
947 	if (reduce_errno) {
948 		SPDK_ERRLOG("number %d\n", reduce_errno);
949 	}
950 
951 	comp_bdev->vol = NULL;
952 	spdk_put_io_channel(comp_bdev->base_ch);
953 	if (comp_bdev->orphaned == false) {
954 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
955 				     comp_bdev->delete_ctx);
956 	} else {
957 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
958 	}
959 
960 }
961 
962 static void
963 _delete_vol_unload_cb(void *ctx)
964 {
965 	struct vbdev_compress *comp_bdev = ctx;
966 
967 	/* FIXME: Assert if these conditions are not satisfied for now. */
968 	assert(!comp_bdev->reduce_thread ||
969 	       comp_bdev->reduce_thread == spdk_get_thread());
970 
971 	/* reducelib needs a channel to comm with the backing device */
972 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
973 
974 	/* Clean the device before we free our resources. */
975 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
976 }
977 
978 /* Called by reduceLib after performing unload vol actions */
979 static void
980 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
981 {
982 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
983 
984 	if (reduce_errno) {
985 		SPDK_ERRLOG("number %d\n", reduce_errno);
986 		/* FIXME: callback should be executed. */
987 		return;
988 	}
989 
990 	pthread_mutex_lock(&comp_bdev->reduce_lock);
991 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
992 		spdk_thread_send_msg(comp_bdev->reduce_thread,
993 				     _delete_vol_unload_cb, comp_bdev);
994 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
995 	} else {
996 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
997 
998 		_delete_vol_unload_cb(comp_bdev);
999 	}
1000 }
1001 
1002 const char *
1003 compress_get_name(const struct vbdev_compress *comp_bdev)
1004 {
1005 	return comp_bdev->comp_bdev.name;
1006 }
1007 
1008 struct vbdev_compress *
1009 compress_bdev_first(void)
1010 {
1011 	struct vbdev_compress *comp_bdev;
1012 
1013 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1014 
1015 	return comp_bdev;
1016 }
1017 
1018 struct vbdev_compress *
1019 compress_bdev_next(struct vbdev_compress *prev)
1020 {
1021 	struct vbdev_compress *comp_bdev;
1022 
1023 	comp_bdev = TAILQ_NEXT(prev, link);
1024 
1025 	return comp_bdev;
1026 }
1027 
1028 bool
1029 compress_has_orphan(const char *name)
1030 {
1031 	struct vbdev_compress *comp_bdev;
1032 
1033 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1034 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1035 			return true;
1036 		}
1037 	}
1038 	return false;
1039 }
1040 
1041 /* Called after we've unregistered following a hot remove callback.
1042  * Our finish entry point will be called next.
1043  */
1044 static int
1045 vbdev_compress_destruct(void *ctx)
1046 {
1047 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1048 
1049 	if (comp_bdev->vol != NULL) {
1050 		/* Tell reducelib that we're done with this volume. */
1051 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1052 	} else {
1053 		vbdev_compress_destruct_cb(comp_bdev, 0);
1054 	}
1055 
1056 	return 0;
1057 }
1058 
1059 /* We supplied this as an entry point for upper layers who want to communicate to this
1060  * bdev.  This is how they get a channel.
1061  */
1062 static struct spdk_io_channel *
1063 vbdev_compress_get_io_channel(void *ctx)
1064 {
1065 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1066 
1067 	/* The IO channel code will allocate a channel for us which consists of
1068 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1069 	 * that we passed in when we registered our IO device. It will then call
1070 	 * our channel create callback to populate any elements that we need to
1071 	 * update.
1072 	 */
1073 	return spdk_get_io_channel(comp_bdev);
1074 }
1075 
1076 /* This is the output for bdev_get_bdevs() for this vbdev */
1077 static int
1078 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1079 {
1080 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1081 
1082 	spdk_json_write_name(w, "compress");
1083 	spdk_json_write_object_begin(w);
1084 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1085 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1086 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1087 	spdk_json_write_object_end(w);
1088 
1089 	return 0;
1090 }
1091 
1092 /* This is used to generate JSON that can configure this module to its current state. */
1093 static int
1094 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1095 {
1096 	struct vbdev_compress *comp_bdev;
1097 
1098 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1099 		spdk_json_write_object_begin(w);
1100 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1101 		spdk_json_write_named_object_begin(w, "params");
1102 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1103 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1104 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1105 		spdk_json_write_object_end(w);
1106 		spdk_json_write_object_end(w);
1107 	}
1108 	return 0;
1109 }
1110 
1111 static void
1112 _vbdev_reduce_init_cb(void *ctx)
1113 {
1114 	struct vbdev_compress *meta_ctx = ctx;
1115 	int rc;
1116 
1117 	assert(meta_ctx->base_desc != NULL);
1118 
1119 	/* We're done with metadata operations */
1120 	spdk_put_io_channel(meta_ctx->base_ch);
1121 
1122 	if (meta_ctx->vol) {
1123 		rc = vbdev_compress_claim(meta_ctx);
1124 		if (rc == 0) {
1125 			return;
1126 		}
1127 	}
1128 
1129 	/* Close the underlying bdev on its same opened thread. */
1130 	spdk_bdev_close(meta_ctx->base_desc);
1131 	free(meta_ctx);
1132 }
1133 
1134 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1135  * used for initial metadata operations to claim where it will be further filled out
1136  * and added to the global list.
1137  */
1138 static void
1139 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1140 {
1141 	struct vbdev_compress *meta_ctx = cb_arg;
1142 
1143 	if (reduce_errno == 0) {
1144 		meta_ctx->vol = vol;
1145 	} else {
1146 		SPDK_ERRLOG("for vol %s, error %u\n",
1147 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1148 	}
1149 
1150 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1151 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
1152 	} else {
1153 		_vbdev_reduce_init_cb(meta_ctx);
1154 	}
1155 }
1156 
1157 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1158  * call the callback provided by reduceLib when it called the read/write/unmap function and
1159  * free the bdev_io.
1160  */
1161 static void
1162 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1163 {
1164 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1165 	int reduce_errno;
1166 
1167 	if (success) {
1168 		reduce_errno = 0;
1169 	} else {
1170 		reduce_errno = -EIO;
1171 	}
1172 	spdk_bdev_free_io(bdev_io);
1173 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1174 }
1175 
1176 /* This is the function provided to the reduceLib for sending reads directly to
1177  * the backing device.
1178  */
1179 static void
1180 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1181 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1182 {
1183 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1184 					   backing_dev);
1185 	int rc;
1186 
1187 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1188 				    iov, iovcnt, lba, lba_count,
1189 				    comp_reduce_io_cb,
1190 				    args);
1191 	if (rc) {
1192 		if (rc == -ENOMEM) {
1193 			SPDK_ERRLOG("No memory, start to queue io.\n");
1194 			/* TODO: there's no bdev_io to queue */
1195 		} else {
1196 			SPDK_ERRLOG("submitting readv request\n");
1197 		}
1198 		args->cb_fn(args->cb_arg, rc);
1199 	}
1200 }
1201 
1202 /* This is the function provided to the reduceLib for sending writes directly to
1203  * the backing device.
1204  */
1205 static void
1206 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1207 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1208 {
1209 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1210 					   backing_dev);
1211 	int rc;
1212 
1213 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1214 				     iov, iovcnt, lba, lba_count,
1215 				     comp_reduce_io_cb,
1216 				     args);
1217 	if (rc) {
1218 		if (rc == -ENOMEM) {
1219 			SPDK_ERRLOG("No memory, start to queue io.\n");
1220 			/* TODO: there's no bdev_io to queue */
1221 		} else {
1222 			SPDK_ERRLOG("error submitting writev request\n");
1223 		}
1224 		args->cb_fn(args->cb_arg, rc);
1225 	}
1226 }
1227 
1228 /* This is the function provided to the reduceLib for sending unmaps directly to
1229  * the backing device.
1230  */
1231 static void
1232 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1233 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1234 {
1235 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1236 					   backing_dev);
1237 	int rc;
1238 
1239 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1240 				    lba, lba_count,
1241 				    comp_reduce_io_cb,
1242 				    args);
1243 
1244 	if (rc) {
1245 		if (rc == -ENOMEM) {
1246 			SPDK_ERRLOG("No memory, start to queue io.\n");
1247 			/* TODO: there's no bdev_io to queue */
1248 		} else {
1249 			SPDK_ERRLOG("submitting unmap request\n");
1250 		}
1251 		args->cb_fn(args->cb_arg, rc);
1252 	}
1253 }
1254 
1255 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1256 static void
1257 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1258 {
1259 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1260 
1261 	if (reduce_errno) {
1262 		SPDK_ERRLOG("number %d\n", reduce_errno);
1263 	}
1264 
1265 	comp_bdev->vol = NULL;
1266 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1267 }
1268 
1269 static void
1270 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1271 {
1272 	struct vbdev_compress *comp_bdev, *tmp;
1273 
1274 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1275 		if (bdev_find == comp_bdev->base_bdev) {
1276 			/* Tell reduceLib that we're done with this volume. */
1277 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1278 		}
1279 	}
1280 }
1281 
1282 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1283 static void
1284 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1285 				  void *event_ctx)
1286 {
1287 	switch (type) {
1288 	case SPDK_BDEV_EVENT_REMOVE:
1289 		vbdev_compress_base_bdev_hotremove_cb(bdev);
1290 		break;
1291 	default:
1292 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1293 		break;
1294 	}
1295 }
1296 
1297 /* TODO: determine which parms we want user configurable, HC for now
1298  * params.vol_size
1299  * params.chunk_size
1300  * compression PMD, algorithm, window size, comp level, etc.
1301  * DEV_MD_PATH
1302  */
1303 
1304 /* Common function for init and load to allocate and populate the minimal
1305  * information for reducelib to init or load.
1306  */
1307 struct vbdev_compress *
1308 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
1309 {
1310 	struct vbdev_compress *meta_ctx;
1311 	struct spdk_bdev *bdev;
1312 
1313 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1314 	if (meta_ctx == NULL) {
1315 		SPDK_ERRLOG("failed to alloc init contexts\n");
1316 		return NULL;
1317 	}
1318 
1319 	meta_ctx->drv_name = "None";
1320 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1321 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1322 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1323 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1324 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1325 
1326 	meta_ctx->base_desc = bdev_desc;
1327 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
1328 	meta_ctx->base_bdev = bdev;
1329 
1330 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1331 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1332 
1333 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1334 	if (lb_size == 0) {
1335 		meta_ctx->params.logical_block_size = bdev->blocklen;
1336 	} else {
1337 		meta_ctx->params.logical_block_size = lb_size;
1338 	}
1339 
1340 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1341 	return meta_ctx;
1342 }
1343 
1344 static bool
1345 _set_pmd(struct vbdev_compress *comp_dev)
1346 {
1347 	if (g_opts == COMPRESS_PMD_AUTO) {
1348 		if (g_qat_available) {
1349 			comp_dev->drv_name = QAT_PMD;
1350 		} else if (g_mlx5_pci_available) {
1351 			comp_dev->drv_name = MLX5_PMD;
1352 		} else {
1353 			comp_dev->drv_name = ISAL_PMD;
1354 		}
1355 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1356 		comp_dev->drv_name = QAT_PMD;
1357 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1358 		comp_dev->drv_name = ISAL_PMD;
1359 	} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
1360 		comp_dev->drv_name = MLX5_PMD;
1361 	} else {
1362 		SPDK_ERRLOG("Requested PMD is not available.\n");
1363 		return false;
1364 	}
1365 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1366 	return true;
1367 }
1368 
1369 /* Call reducelib to initialize a new volume */
1370 static int
1371 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1372 {
1373 	struct spdk_bdev_desc *bdev_desc = NULL;
1374 	struct vbdev_compress *meta_ctx;
1375 	int rc;
1376 
1377 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1378 				NULL, &bdev_desc);
1379 	if (rc) {
1380 		SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
1381 		return rc;
1382 	}
1383 
1384 	meta_ctx = _prepare_for_load_init(bdev_desc, lb_size);
1385 	if (meta_ctx == NULL) {
1386 		spdk_bdev_close(bdev_desc);
1387 		return -EINVAL;
1388 	}
1389 
1390 	if (_set_pmd(meta_ctx) == false) {
1391 		SPDK_ERRLOG("could not find required pmd\n");
1392 		free(meta_ctx);
1393 		spdk_bdev_close(bdev_desc);
1394 		return -EINVAL;
1395 	}
1396 
1397 	/* Save the thread where the base device is opened */
1398 	meta_ctx->thread = spdk_get_thread();
1399 
1400 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1401 
1402 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1403 			     pm_path,
1404 			     vbdev_reduce_init_cb,
1405 			     meta_ctx);
1406 	return 0;
1407 }
1408 
1409 /* We provide this callback for the SPDK channel code to create a channel using
1410  * the channel struct we provided in our module get_io_channel() entry point. Here
1411  * we get and save off an underlying base channel of the device below us so that
1412  * we can communicate with the base bdev on a per channel basis.  If we needed
1413  * our own poller for this vbdev, we'd register it here.
1414  */
1415 static int
1416 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1417 {
1418 	struct vbdev_compress *comp_bdev = io_device;
1419 	struct comp_device_qp *device_qp;
1420 
1421 	/* Now set the reduce channel if it's not already set. */
1422 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1423 	if (comp_bdev->ch_count == 0) {
1424 		/* We use this queue to track outstanding IO in our layer. */
1425 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1426 
1427 		/* We use this to queue up compression operations as needed. */
1428 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1429 
1430 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1431 		comp_bdev->reduce_thread = spdk_get_thread();
1432 		comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
1433 		/* Now assign a q pair */
1434 		pthread_mutex_lock(&g_comp_device_qp_lock);
1435 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1436 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1437 				if (device_qp->thread == spdk_get_thread()) {
1438 					comp_bdev->device_qp = device_qp;
1439 					break;
1440 				}
1441 				if (device_qp->thread == NULL) {
1442 					comp_bdev->device_qp = device_qp;
1443 					device_qp->thread = spdk_get_thread();
1444 					break;
1445 				}
1446 			}
1447 		}
1448 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1449 	}
1450 	comp_bdev->ch_count++;
1451 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1452 
1453 	if (comp_bdev->device_qp != NULL) {
1454 		uint64_t comp_feature_flags =
1455 			comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags;
1456 
1457 		if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) {
1458 			comp_bdev->backing_dev.sgl_in = true;
1459 		}
1460 		if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) {
1461 			comp_bdev->backing_dev.sgl_out = true;
1462 		}
1463 		return 0;
1464 	} else {
1465 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1466 		assert(false);
1467 		return -ENOMEM;
1468 	}
1469 }
1470 
1471 static void
1472 _channel_cleanup(struct vbdev_compress *comp_bdev)
1473 {
1474 	/* Note: comp_bdevs can share a device_qp if they are
1475 	 * on the same thread so we leave the device_qp element
1476 	 * alone for this comp_bdev and just clear the reduce thread.
1477 	 */
1478 	spdk_put_io_channel(comp_bdev->base_ch);
1479 	comp_bdev->reduce_thread = NULL;
1480 	spdk_poller_unregister(&comp_bdev->poller);
1481 }
1482 
1483 /* Used to reroute destroy_ch to the correct thread */
1484 static void
1485 _comp_bdev_ch_destroy_cb(void *arg)
1486 {
1487 	struct vbdev_compress *comp_bdev = arg;
1488 
1489 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1490 	_channel_cleanup(comp_bdev);
1491 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1492 }
1493 
1494 /* We provide this callback for the SPDK channel code to destroy a channel
1495  * created with our create callback. We just need to undo anything we did
1496  * when we created. If this bdev used its own poller, we'd unregister it here.
1497  */
1498 static void
1499 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1500 {
1501 	struct vbdev_compress *comp_bdev = io_device;
1502 
1503 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1504 	comp_bdev->ch_count--;
1505 	if (comp_bdev->ch_count == 0) {
1506 		/* Send this request to the thread where the channel was created. */
1507 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1508 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1509 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1510 		} else {
1511 			_channel_cleanup(comp_bdev);
1512 		}
1513 	}
1514 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1515 }
1516 
1517 /* RPC entry point for compression vbdev creation. */
1518 int
1519 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1520 {
1521 	struct vbdev_compress *comp_bdev = NULL;
1522 
1523 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1524 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1525 		return -EINVAL;
1526 	}
1527 
1528 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1529 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
1530 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
1531 			return -EBUSY;
1532 		}
1533 	}
1534 	return vbdev_init_reduce(bdev_name, pm_path, lb_size);
1535 }
1536 
1537 /* On init, just init the compress drivers. All metadata is stored on disk. */
1538 static int
1539 vbdev_compress_init(void)
1540 {
1541 	if (vbdev_init_compress_drivers()) {
1542 		SPDK_ERRLOG("Error setting up compression devices\n");
1543 		return -EINVAL;
1544 	}
1545 
1546 	return 0;
1547 }
1548 
1549 /* Called when the entire module is being torn down. */
1550 static void
1551 vbdev_compress_finish(void)
1552 {
1553 	struct comp_device_qp *dev_qp;
1554 	/* TODO: unload vol in a future patch */
1555 
1556 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1557 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1558 		free(dev_qp);
1559 	}
1560 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1561 
1562 	rte_mempool_free(g_comp_op_mp);
1563 	rte_mempool_free(g_mbuf_mp);
1564 }
1565 
1566 /* During init we'll be asked how much memory we'd like passed to us
1567  * in bev_io structures as context. Here's where we specify how
1568  * much context we want per IO.
1569  */
1570 static int
1571 vbdev_compress_get_ctx_size(void)
1572 {
1573 	return sizeof(struct comp_bdev_io);
1574 }
1575 
1576 /* When we register our bdev this is how we specify our entry points. */
1577 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1578 	.destruct		= vbdev_compress_destruct,
1579 	.submit_request		= vbdev_compress_submit_request,
1580 	.io_type_supported	= vbdev_compress_io_type_supported,
1581 	.get_io_channel		= vbdev_compress_get_io_channel,
1582 	.dump_info_json		= vbdev_compress_dump_info_json,
1583 	.write_config_json	= NULL,
1584 };
1585 
1586 static struct spdk_bdev_module compress_if = {
1587 	.name = "compress",
1588 	.module_init = vbdev_compress_init,
1589 	.get_ctx_size = vbdev_compress_get_ctx_size,
1590 	.examine_disk = vbdev_compress_examine,
1591 	.module_fini = vbdev_compress_finish,
1592 	.config_json = vbdev_compress_config_json
1593 };
1594 
1595 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1596 
1597 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1598 {
1599 	struct spdk_bdev_alias *aliases;
1600 
1601 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1602 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1603 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1604 		if (!comp_bdev->comp_bdev.name) {
1605 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1606 			return -ENOMEM;
1607 		}
1608 	} else {
1609 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1610 		if (!comp_bdev->comp_bdev.name) {
1611 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1612 			return -ENOMEM;
1613 		}
1614 	}
1615 	return 0;
1616 }
1617 
1618 static int
1619 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1620 {
1621 	int rc;
1622 
1623 	if (_set_compbdev_name(comp_bdev)) {
1624 		return -EINVAL;
1625 	}
1626 
1627 	/* Note: some of the fields below will change in the future - for example,
1628 	 * blockcnt specifically will not match (the compressed volume size will
1629 	 * be slightly less than the base bdev size)
1630 	 */
1631 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1632 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1633 
1634 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1635 		comp_bdev->comp_bdev.required_alignment =
1636 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1637 				 comp_bdev->base_bdev->required_alignment);
1638 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1639 			       comp_bdev->comp_bdev.required_alignment);
1640 	} else {
1641 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1642 	}
1643 	comp_bdev->comp_bdev.optimal_io_boundary =
1644 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1645 
1646 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1647 
1648 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1649 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1650 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1651 
1652 	/* This is the context that is passed to us when the bdev
1653 	 * layer calls in so we'll save our comp_bdev node here.
1654 	 */
1655 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1656 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1657 	comp_bdev->comp_bdev.module = &compress_if;
1658 
1659 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1660 
1661 	/* Save the thread where the base device is opened */
1662 	comp_bdev->thread = spdk_get_thread();
1663 
1664 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1665 				sizeof(struct comp_io_channel),
1666 				comp_bdev->comp_bdev.name);
1667 
1668 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1669 					 comp_bdev->comp_bdev.module);
1670 	if (rc) {
1671 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1672 		goto error_claim;
1673 	}
1674 
1675 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1676 	if (rc < 0) {
1677 		SPDK_ERRLOG("trying to register bdev\n");
1678 		goto error_bdev_register;
1679 	}
1680 
1681 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1682 
1683 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1684 
1685 	return 0;
1686 
1687 	/* Error cleanup paths. */
1688 error_bdev_register:
1689 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1690 error_claim:
1691 	spdk_io_device_unregister(comp_bdev, NULL);
1692 	free(comp_bdev->comp_bdev.name);
1693 	return rc;
1694 }
1695 
1696 static void
1697 _vbdev_compress_delete_done(void *_ctx)
1698 {
1699 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1700 
1701 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1702 
1703 	free(ctx);
1704 }
1705 
1706 static void
1707 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1708 {
1709 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1710 
1711 	ctx->cb_rc = bdeverrno;
1712 
1713 	if (ctx->orig_thread != spdk_get_thread()) {
1714 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1715 	} else {
1716 		_vbdev_compress_delete_done(ctx);
1717 	}
1718 }
1719 
1720 void
1721 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1722 {
1723 	struct vbdev_compress *comp_bdev = NULL;
1724 	struct vbdev_comp_delete_ctx *ctx;
1725 
1726 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1727 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1728 			break;
1729 		}
1730 	}
1731 
1732 	if (comp_bdev == NULL) {
1733 		cb_fn(cb_arg, -ENODEV);
1734 		return;
1735 	}
1736 
1737 	ctx = calloc(1, sizeof(*ctx));
1738 	if (ctx == NULL) {
1739 		SPDK_ERRLOG("Failed to allocate delete context\n");
1740 		cb_fn(cb_arg, -ENOMEM);
1741 		return;
1742 	}
1743 
1744 	/* Save these for after the vol is destroyed. */
1745 	ctx->cb_fn = cb_fn;
1746 	ctx->cb_arg = cb_arg;
1747 	ctx->orig_thread = spdk_get_thread();
1748 
1749 	comp_bdev->delete_ctx = ctx;
1750 
1751 	/* Tell reducelib that we're done with this volume. */
1752 	if (comp_bdev->orphaned == false) {
1753 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1754 	} else {
1755 		delete_vol_unload_cb(comp_bdev, 0);
1756 	}
1757 }
1758 
1759 static void
1760 _vbdev_reduce_load_cb(void *ctx)
1761 {
1762 	struct vbdev_compress *meta_ctx = ctx;
1763 	int rc;
1764 
1765 	assert(meta_ctx->base_desc != NULL);
1766 
1767 	/* Done with metadata operations */
1768 	spdk_put_io_channel(meta_ctx->base_ch);
1769 
1770 	if (meta_ctx->reduce_errno == 0) {
1771 		if (_set_pmd(meta_ctx) == false) {
1772 			SPDK_ERRLOG("could not find required pmd\n");
1773 			goto err;
1774 		}
1775 
1776 		rc = vbdev_compress_claim(meta_ctx);
1777 		if (rc != 0) {
1778 			goto err;
1779 		}
1780 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1781 		if (_set_compbdev_name(meta_ctx)) {
1782 			goto err;
1783 		}
1784 
1785 		/* Save the thread where the base device is opened */
1786 		meta_ctx->thread = spdk_get_thread();
1787 
1788 		meta_ctx->comp_bdev.module = &compress_if;
1789 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1790 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1791 						 meta_ctx->comp_bdev.module);
1792 		if (rc) {
1793 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1794 			free(meta_ctx->comp_bdev.name);
1795 			goto err;
1796 		}
1797 
1798 		meta_ctx->orphaned = true;
1799 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1800 	} else {
1801 		if (meta_ctx->reduce_errno != -EILSEQ) {
1802 			SPDK_ERRLOG("for vol %s, error %u\n",
1803 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1804 		}
1805 		goto err;
1806 	}
1807 
1808 	spdk_bdev_module_examine_done(&compress_if);
1809 	return;
1810 
1811 err:
1812 	/* Close the underlying bdev on its same opened thread. */
1813 	spdk_bdev_close(meta_ctx->base_desc);
1814 	free(meta_ctx);
1815 	spdk_bdev_module_examine_done(&compress_if);
1816 }
1817 
1818 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1819  * used for initial metadata operations to claim where it will be further filled out
1820  * and added to the global list.
1821  */
1822 static void
1823 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1824 {
1825 	struct vbdev_compress *meta_ctx = cb_arg;
1826 
1827 	if (reduce_errno == 0) {
1828 		/* Update information following volume load. */
1829 		meta_ctx->vol = vol;
1830 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1831 		       sizeof(struct spdk_reduce_vol_params));
1832 	}
1833 
1834 	meta_ctx->reduce_errno = reduce_errno;
1835 
1836 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1837 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1838 	} else {
1839 		_vbdev_reduce_load_cb(meta_ctx);
1840 	}
1841 
1842 }
1843 
1844 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1845  * and if so will go ahead and claim it.
1846  */
1847 static void
1848 vbdev_compress_examine(struct spdk_bdev *bdev)
1849 {
1850 	struct spdk_bdev_desc *bdev_desc = NULL;
1851 	struct vbdev_compress *meta_ctx;
1852 	int rc;
1853 
1854 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1855 		spdk_bdev_module_examine_done(&compress_if);
1856 		return;
1857 	}
1858 
1859 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1860 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1861 	if (rc) {
1862 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1863 		spdk_bdev_module_examine_done(&compress_if);
1864 		return;
1865 	}
1866 
1867 	meta_ctx = _prepare_for_load_init(bdev_desc, 0);
1868 	if (meta_ctx == NULL) {
1869 		spdk_bdev_close(bdev_desc);
1870 		spdk_bdev_module_examine_done(&compress_if);
1871 		return;
1872 	}
1873 
1874 	/* Save the thread where the base device is opened */
1875 	meta_ctx->thread = spdk_get_thread();
1876 
1877 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1878 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1879 }
1880 
1881 int
1882 compress_set_pmd(enum compress_pmd *opts)
1883 {
1884 	g_opts = *opts;
1885 
1886 	return 0;
1887 }
1888 
1889 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1890