xref: /spdk/module/bdev/compress/vbdev_compress.c (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "vbdev_compress.h"
36 
37 #include "spdk/reduce.h"
38 #include "spdk/stdinc.h"
39 #include "spdk/rpc.h"
40 #include "spdk/env.h"
41 #include "spdk/endian.h"
42 #include "spdk/string.h"
43 #include "spdk/thread.h"
44 #include "spdk/util.h"
45 #include "spdk/bdev_module.h"
46 #include "spdk/likely.h"
47 
48 #include "spdk/log.h"
49 
50 #include <rte_config.h>
51 #include <rte_bus_vdev.h>
52 #include <rte_compressdev.h>
53 #include <rte_comp.h>
54 #include <rte_mbuf_dyn.h>
55 
56 /* Used to store IO context in mbuf */
57 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
58 	.name = "context_reduce",
59 	.size = sizeof(uint64_t),
60 	.align = __alignof__(uint64_t),
61 	.flags = 0,
62 };
63 static int g_mbuf_offset;
64 
65 #define NUM_MAX_XFORMS 2
66 #define NUM_MAX_INFLIGHT_OPS 128
67 #define DEFAULT_WINDOW_SIZE 15
68 /* We need extra mbufs per operation to accommodate host buffers that
69  *  span a physical page boundary.
70  */
71 #define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
72 #define CHUNK_SIZE (1024 * 16)
73 #define COMP_BDEV_NAME "compress"
74 #define BACKING_IO_SZ (4 * 1024)
75 
76 #define ISAL_PMD "compress_isal"
77 #define QAT_PMD "compress_qat"
78 #define MLX5_PMD "mlx5_pci"
79 #define NUM_MBUFS		8192
80 #define POOL_CACHE_SIZE		256
81 
82 static enum compress_pmd g_opts;
83 
84 /* Global list of available compression devices. */
85 struct compress_dev {
86 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
87 	uint8_t				cdev_id;	/* identifier for the device */
88 	void				*comp_xform;	/* shared private xform for comp on this PMD */
89 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
90 	TAILQ_ENTRY(compress_dev)	link;
91 };
92 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
93 
94 /* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
95  * the length of the internal ring name that it creates, it breaks a limit in the generic
96  * ring code and fails the qp initialization.
97  * FIXME: Reduce number of qpairs to 48, due to issue #2338
98  */
99 #define MAX_NUM_QP 48
100 /* Global list and lock for unique device/queue pair combos */
101 struct comp_device_qp {
102 	struct compress_dev		*device;	/* ptr to compression device */
103 	uint8_t				qp;		/* queue pair for this node */
104 	struct spdk_thread		*thread;	/* thread that this qp is assigned to */
105 	TAILQ_ENTRY(comp_device_qp)	link;
106 };
107 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
108 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
109 
110 /* For queueing up compression operations that we can't submit for some reason */
111 struct vbdev_comp_op {
112 	struct spdk_reduce_backing_dev	*backing_dev;
113 	struct iovec			*src_iovs;
114 	int				src_iovcnt;
115 	struct iovec			*dst_iovs;
116 	int				dst_iovcnt;
117 	bool				compress;
118 	void				*cb_arg;
119 	TAILQ_ENTRY(vbdev_comp_op)	link;
120 };
121 
122 struct vbdev_comp_delete_ctx {
123 	spdk_delete_compress_complete	cb_fn;
124 	void				*cb_arg;
125 	int				cb_rc;
126 	struct spdk_thread		*orig_thread;
127 };
128 
129 /* List of virtual bdevs and associated info for each. */
130 struct vbdev_compress {
131 	struct spdk_bdev		*base_bdev;	/* the thing we're attaching to */
132 	struct spdk_bdev_desc		*base_desc;	/* its descriptor we get from open */
133 	struct spdk_io_channel		*base_ch;	/* IO channel of base device */
134 	struct spdk_bdev		comp_bdev;	/* the compression virtual bdev */
135 	struct comp_io_channel		*comp_ch;	/* channel associated with this bdev */
136 	char				*drv_name;	/* name of the compression device driver */
137 	struct comp_device_qp		*device_qp;
138 	struct spdk_thread		*reduce_thread;
139 	pthread_mutex_t			reduce_lock;
140 	uint32_t			ch_count;
141 	TAILQ_HEAD(, spdk_bdev_io)	pending_comp_ios;	/* outstanding operations to a comp library */
142 	struct spdk_poller		*poller;	/* completion poller */
143 	struct spdk_reduce_vol_params	params;		/* params for the reduce volume */
144 	struct spdk_reduce_backing_dev	backing_dev;	/* backing device info for the reduce volume */
145 	struct spdk_reduce_vol		*vol;		/* the reduce volume */
146 	struct vbdev_comp_delete_ctx	*delete_ctx;
147 	bool				orphaned;	/* base bdev claimed but comp_bdev not registered */
148 	int				reduce_errno;
149 	TAILQ_HEAD(, vbdev_comp_op)	queued_comp_ops;
150 	TAILQ_ENTRY(vbdev_compress)	link;
151 	struct spdk_thread		*thread;	/* thread where base device is opened */
152 };
153 static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
154 
155 /* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
156  */
157 struct comp_io_channel {
158 	struct spdk_io_channel_iter	*iter;	/* used with for_each_channel in reset */
159 };
160 
161 /* Per I/O context for the compression vbdev. */
162 struct comp_bdev_io {
163 	struct comp_io_channel		*comp_ch;		/* used in completion handling */
164 	struct vbdev_compress		*comp_bdev;		/* vbdev associated with this IO */
165 	struct spdk_bdev_io_wait_entry	bdev_io_wait;		/* for bdev_io_wait */
166 	struct spdk_bdev_io		*orig_io;		/* the original IO */
167 	struct spdk_io_channel		*ch;			/* for resubmission */
168 	int				status;			/* save for completion on orig thread */
169 };
170 
171 /* Shared mempools between all devices on this system */
172 static struct rte_mempool *g_mbuf_mp = NULL;			/* mbuf mempool */
173 static struct rte_mempool *g_comp_op_mp = NULL;			/* comp operations, must be rte* mempool */
174 static struct rte_mbuf_ext_shared_info g_shinfo = {};		/* used by DPDK mbuf macros */
175 static bool g_qat_available = false;
176 static bool g_isal_available = false;
177 static bool g_mlx5_pci_available = false;
178 
179 /* Create shared (between all ops per PMD) compress xforms. */
180 static struct rte_comp_xform g_comp_xform = {
181 	.type = RTE_COMP_COMPRESS,
182 	.compress = {
183 		.algo = RTE_COMP_ALGO_DEFLATE,
184 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
185 		.level = RTE_COMP_LEVEL_MAX,
186 		.window_size = DEFAULT_WINDOW_SIZE,
187 		.chksum = RTE_COMP_CHECKSUM_NONE,
188 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
189 	}
190 };
191 /* Create shared (between all ops per PMD) decompress xforms. */
192 static struct rte_comp_xform g_decomp_xform = {
193 	.type = RTE_COMP_DECOMPRESS,
194 	.decompress = {
195 		.algo = RTE_COMP_ALGO_DEFLATE,
196 		.chksum = RTE_COMP_CHECKSUM_NONE,
197 		.window_size = DEFAULT_WINDOW_SIZE,
198 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
199 	}
200 };
201 
202 static void vbdev_compress_examine(struct spdk_bdev *bdev);
203 static int vbdev_compress_claim(struct vbdev_compress *comp_bdev);
204 static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
205 struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size);
206 static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
207 static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
208 static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
209 
210 /* Dummy function used by DPDK to free ext attached buffers
211  * to mbufs, we free them ourselves but this callback has to
212  * be here.
213  */
214 static void
215 shinfo_free_cb(void *arg1, void *arg2)
216 {
217 }
218 
219 /* Called by vbdev_init_compress_drivers() to init each discovered compression device */
220 static int
221 create_compress_dev(uint8_t index)
222 {
223 	struct compress_dev *device;
224 	uint16_t q_pairs;
225 	uint8_t cdev_id;
226 	int rc, i;
227 	struct comp_device_qp *dev_qp;
228 	struct comp_device_qp *tmp_qp;
229 
230 	device = calloc(1, sizeof(struct compress_dev));
231 	if (!device) {
232 		return -ENOMEM;
233 	}
234 
235 	/* Get details about this device. */
236 	rte_compressdev_info_get(index, &device->cdev_info);
237 
238 	cdev_id = device->cdev_id = index;
239 
240 	/* Zero means no limit so choose number of lcores. */
241 	if (device->cdev_info.max_nb_queue_pairs == 0) {
242 		q_pairs = MAX_NUM_QP;
243 	} else {
244 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
245 	}
246 
247 	/* Configure the compression device. */
248 	struct rte_compressdev_config config = {
249 		.socket_id = rte_socket_id(),
250 		.nb_queue_pairs = q_pairs,
251 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
252 		.max_nb_streams = 0
253 	};
254 	rc = rte_compressdev_configure(cdev_id, &config);
255 	if (rc < 0) {
256 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
257 		goto err;
258 	}
259 
260 	/* Pre-setup all potential qpairs now and assign them in the channel
261 	 * callback.
262 	 */
263 	for (i = 0; i < q_pairs; i++) {
264 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
265 						      NUM_MAX_INFLIGHT_OPS,
266 						      rte_socket_id());
267 		if (rc) {
268 			if (i > 0) {
269 				q_pairs = i;
270 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
271 					       "compressdev %u with error %u "
272 					       "so limiting to %u qpairs\n",
273 					       cdev_id, rc, q_pairs);
274 				break;
275 			} else {
276 				SPDK_ERRLOG("Failed to setup queue pair on "
277 					    "compressdev %u with error %u\n", cdev_id, rc);
278 				rc = -EINVAL;
279 				goto err;
280 			}
281 		}
282 	}
283 
284 	rc = rte_compressdev_start(cdev_id);
285 	if (rc < 0) {
286 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
287 			    cdev_id, rc);
288 		goto err;
289 	}
290 
291 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
292 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
293 				&device->comp_xform);
294 		if (rc < 0) {
295 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
296 				    cdev_id, rc);
297 			goto err;
298 		}
299 
300 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
301 				&device->decomp_xform);
302 		if (rc) {
303 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
304 				    cdev_id, rc);
305 			goto err;
306 		}
307 	} else {
308 		SPDK_ERRLOG("PMD does not support shared transforms\n");
309 		goto err;
310 	}
311 
312 	/* Build up list of device/qp combinations */
313 	for (i = 0; i < q_pairs; i++) {
314 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
315 		if (!dev_qp) {
316 			rc = -ENOMEM;
317 			goto err;
318 		}
319 		dev_qp->device = device;
320 		dev_qp->qp = i;
321 		dev_qp->thread = NULL;
322 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
323 	}
324 
325 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
326 
327 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
328 		g_qat_available = true;
329 	}
330 	if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
331 		g_isal_available = true;
332 	}
333 	if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
334 		g_mlx5_pci_available = true;
335 	}
336 
337 	return 0;
338 
339 err:
340 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
341 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
342 		free(dev_qp);
343 	}
344 	free(device);
345 	return rc;
346 }
347 
348 /* Called from driver init entry point, vbdev_compress_init() */
349 static int
350 vbdev_init_compress_drivers(void)
351 {
352 	uint8_t cdev_count, i;
353 	struct compress_dev *tmp_dev;
354 	struct compress_dev *device;
355 	int rc;
356 
357 	/* We always init the compress_isal PMD */
358 	rc = rte_vdev_init(ISAL_PMD, NULL);
359 	if (rc == 0) {
360 		SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
361 	} else if (rc == -EEXIST) {
362 		SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
363 	} else {
364 		SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
365 		return -EINVAL;
366 	}
367 
368 	/* If we have no compression devices, there's no reason to continue. */
369 	cdev_count = rte_compressdev_count();
370 	if (cdev_count == 0) {
371 		return 0;
372 	}
373 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
374 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
375 		return -EINVAL;
376 	}
377 
378 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
379 	if (g_mbuf_offset < 0) {
380 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
381 		return -EINVAL;
382 	}
383 
384 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
385 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
386 	if (g_mbuf_mp == NULL) {
387 		SPDK_ERRLOG("Cannot create mbuf pool\n");
388 		rc = -ENOMEM;
389 		goto error_create_mbuf;
390 	}
391 
392 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
393 					       0, rte_socket_id());
394 	if (g_comp_op_mp == NULL) {
395 		SPDK_ERRLOG("Cannot create comp op pool\n");
396 		rc = -ENOMEM;
397 		goto error_create_op;
398 	}
399 
400 	/* Init all devices */
401 	for (i = 0; i < cdev_count; i++) {
402 		rc = create_compress_dev(i);
403 		if (rc != 0) {
404 			goto error_create_compress_devs;
405 		}
406 	}
407 
408 	if (g_qat_available == true) {
409 		SPDK_NOTICELOG("initialized QAT PMD\n");
410 	}
411 
412 	g_shinfo.free_cb = shinfo_free_cb;
413 
414 	return 0;
415 
416 	/* Error cleanup paths. */
417 error_create_compress_devs:
418 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
419 		TAILQ_REMOVE(&g_compress_devs, device, link);
420 		free(device);
421 	}
422 error_create_op:
423 error_create_mbuf:
424 	rte_mempool_free(g_mbuf_mp);
425 
426 	return rc;
427 }
428 
429 /* for completing rw requests on the orig IO thread. */
430 static void
431 _reduce_rw_blocks_cb(void *arg)
432 {
433 	struct comp_bdev_io *io_ctx = arg;
434 
435 	if (spdk_likely(io_ctx->status == 0)) {
436 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
437 	} else if (io_ctx->status == -ENOMEM) {
438 		vbdev_compress_queue_io(spdk_bdev_io_from_ctx(io_ctx));
439 	} else {
440 		SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
441 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
442 	}
443 }
444 
445 /* Completion callback for r/w that were issued via reducelib. */
446 static void
447 reduce_rw_blocks_cb(void *arg, int reduce_errno)
448 {
449 	struct spdk_bdev_io *bdev_io = arg;
450 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
451 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
452 	struct spdk_thread *orig_thread;
453 
454 	/* TODO: need to decide which error codes are bdev_io success vs failure;
455 	 * example examine calls reading metadata */
456 
457 	io_ctx->status = reduce_errno;
458 
459 	/* Send this request to the orig IO thread. */
460 	orig_thread = spdk_io_channel_get_thread(ch);
461 
462 	spdk_thread_exec_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
463 }
464 
465 static int
466 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
467 		     struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
468 {
469 	uint64_t updated_length, remainder, phys_addr;
470 	uint8_t *current_base = NULL;
471 	int iov_index, mbuf_index;
472 	int rc = 0;
473 
474 	/* Setup mbufs */
475 	iov_index = mbuf_index = 0;
476 	while (iov_index < iovcnt) {
477 
478 		current_base = iovs[iov_index].iov_base;
479 		if (total_length) {
480 			*total_length += iovs[iov_index].iov_len;
481 		}
482 		assert(mbufs[mbuf_index] != NULL);
483 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
484 		updated_length = iovs[iov_index].iov_len;
485 		phys_addr = spdk_vtophys((void *)current_base, &updated_length);
486 
487 		rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
488 					  current_base,
489 					  phys_addr,
490 					  updated_length,
491 					  &g_shinfo);
492 		rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
493 		remainder = iovs[iov_index].iov_len - updated_length;
494 
495 		if (mbuf_index > 0) {
496 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
497 		}
498 
499 		/* If we crossed 2 physical pages boundary we need another mbuf for the remainder */
500 		if (remainder > 0) {
501 			/* allocate an mbuf at the end of the array */
502 			rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
503 						    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
504 			if (rc) {
505 				SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
506 				return -1;
507 			}
508 			(*mbuf_total)++;
509 			mbuf_index++;
510 			*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)reduce_cb_arg;
511 			current_base += updated_length;
512 			phys_addr = spdk_vtophys((void *)current_base, &remainder);
513 			/* assert we don't cross another */
514 			assert(remainder == iovs[iov_index].iov_len - updated_length);
515 
516 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
517 						  current_base,
518 						  phys_addr,
519 						  remainder,
520 						  &g_shinfo);
521 			rte_pktmbuf_append(mbufs[mbuf_index], remainder);
522 			rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
523 		}
524 		iov_index++;
525 		mbuf_index++;
526 	}
527 
528 	return 0;
529 }
530 
531 static int
532 _compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
533 		    int src_iovcnt, struct iovec *dst_iovs,
534 		    int dst_iovcnt, bool compress, void *cb_arg)
535 {
536 	void *reduce_cb_arg = cb_arg;
537 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
538 					   backing_dev);
539 	struct rte_comp_op *comp_op;
540 	struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
541 	struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
542 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
543 	uint64_t total_length = 0;
544 	int rc = 0;
545 	struct vbdev_comp_op *op_to_queue;
546 	int src_mbuf_total = src_iovcnt;
547 	int dst_mbuf_total = dst_iovcnt;
548 	bool device_error = false;
549 
550 	assert(src_iovcnt < MAX_MBUFS_PER_OP);
551 
552 #ifdef DEBUG
553 	memset(src_mbufs, 0, sizeof(src_mbufs));
554 	memset(dst_mbufs, 0, sizeof(dst_mbufs));
555 #endif
556 
557 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
558 	if (!comp_op) {
559 		SPDK_ERRLOG("trying to get a comp op!\n");
560 		rc = -ENOMEM;
561 		goto error_get_op;
562 	}
563 
564 	/* get an mbuf per iov, src and dst */
565 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
566 	if (rc) {
567 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
568 		rc = -ENOMEM;
569 		goto error_get_src;
570 	}
571 	assert(src_mbufs[0]);
572 
573 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
574 	if (rc) {
575 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
576 		rc = -ENOMEM;
577 		goto error_get_dst;
578 	}
579 	assert(dst_mbufs[0]);
580 
581 	/* There is a 1:1 mapping between a bdev_io and a compression operation
582 	 * Some PMDs that SPDK uses don't support chaining, but reduce library should
583 	 * provide correct buffers
584 	 * Build our mbuf chain and associate it with our single comp_op.
585 	 */
586 	rc = _setup_compress_mbuf(src_mbufs, &src_mbuf_total, &total_length,
587 				  src_iovs, src_iovcnt, reduce_cb_arg);
588 	if (rc < 0) {
589 		goto error_src_dst;
590 	}
591 	if (!comp_bdev->backing_dev.sgl_in && src_mbufs[0]->next != NULL) {
592 		if (src_iovcnt == 1) {
593 			SPDK_ERRLOG("Src buffer crosses physical page boundary but driver %s doesn't support SGL input\n",
594 				    comp_bdev->drv_name);
595 		} else {
596 			SPDK_ERRLOG("Driver %s doesn't support SGL input\n", comp_bdev->drv_name);
597 		}
598 		rc = -EINVAL;
599 		goto error_src_dst;
600 	}
601 
602 	comp_op->m_src = src_mbufs[0];
603 	comp_op->src.offset = 0;
604 	comp_op->src.length = total_length;
605 
606 	rc = _setup_compress_mbuf(dst_mbufs, &dst_mbuf_total, NULL,
607 				  dst_iovs, dst_iovcnt, reduce_cb_arg);
608 	if (rc < 0) {
609 		goto error_src_dst;
610 	}
611 	if (!comp_bdev->backing_dev.sgl_out && dst_mbufs[0]->next != NULL) {
612 		if (dst_iovcnt == 1) {
613 			SPDK_ERRLOG("Dst buffer crosses physical page boundary but driver %s doesn't support SGL output\n",
614 				    comp_bdev->drv_name);
615 		} else {
616 			SPDK_ERRLOG("Driver %s doesn't support SGL output\n", comp_bdev->drv_name);
617 		}
618 		rc = -EINVAL;
619 		goto error_src_dst;
620 	}
621 
622 	comp_op->m_dst = dst_mbufs[0];
623 	comp_op->dst.offset = 0;
624 
625 	if (compress == true) {
626 		comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
627 	} else {
628 		comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
629 	}
630 
631 	comp_op->op_type = RTE_COMP_OP_STATELESS;
632 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
633 
634 	rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
635 	assert(rc <= 1);
636 
637 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
638 	if (rc == 1) {
639 		return 0;
640 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
641 		rc = -EAGAIN;
642 	} else {
643 		device_error = true;
644 	}
645 
646 	/* Error cleanup paths. */
647 error_src_dst:
648 	rte_pktmbuf_free_bulk(dst_mbufs, dst_iovcnt);
649 error_get_dst:
650 	rte_pktmbuf_free_bulk(src_mbufs, src_iovcnt);
651 error_get_src:
652 	rte_comp_op_free(comp_op);
653 error_get_op:
654 
655 	if (device_error == true) {
656 		/* There was an error sending the op to the device, most
657 		 * likely with the parameters.
658 		 */
659 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
660 		return -EINVAL;
661 	}
662 	if (rc != -ENOMEM && rc != -EAGAIN) {
663 		return rc;
664 	}
665 
666 	op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
667 	if (op_to_queue == NULL) {
668 		SPDK_ERRLOG("unable to allocate operation for queueing.\n");
669 		return -ENOMEM;
670 	}
671 	op_to_queue->backing_dev = backing_dev;
672 	op_to_queue->src_iovs = src_iovs;
673 	op_to_queue->src_iovcnt = src_iovcnt;
674 	op_to_queue->dst_iovs = dst_iovs;
675 	op_to_queue->dst_iovcnt = dst_iovcnt;
676 	op_to_queue->compress = compress;
677 	op_to_queue->cb_arg = cb_arg;
678 	TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
679 			  op_to_queue,
680 			  link);
681 	return 0;
682 }
683 
684 /* Poller for the DPDK compression driver. */
685 static int
686 comp_dev_poller(void *args)
687 {
688 	struct vbdev_compress *comp_bdev = args;
689 	uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
690 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
691 	uint16_t num_deq;
692 	struct spdk_reduce_vol_cb_args *reduce_args;
693 	struct vbdev_comp_op *op_to_resubmit;
694 	int rc, i;
695 
696 	num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
697 						NUM_MAX_INFLIGHT_OPS);
698 	for (i = 0; i < num_deq; i++) {
699 		reduce_args = (struct spdk_reduce_vol_cb_args *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
700 				uint64_t *);
701 		if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
702 
703 			/* tell reduce this is done and what the bytecount was */
704 			reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
705 		} else {
706 			SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
707 				       deq_ops[i]->status);
708 
709 			/* Reduce will simply store uncompressed on neg errno value. */
710 			reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
711 		}
712 
713 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
714 		 * call takes care of freeing all of the mbufs in the chain back to their
715 		 * original pool.
716 		 */
717 		rte_pktmbuf_free(deq_ops[i]->m_src);
718 		rte_pktmbuf_free(deq_ops[i]->m_dst);
719 
720 		/* There is no bulk free for com ops so we have to free them one at a time
721 		 * here however it would be rare that we'd ever have more than 1 at a time
722 		 * anyways.
723 		 */
724 		rte_comp_op_free(deq_ops[i]);
725 
726 		/* Check if there are any pending comp ops to process, only pull one
727 		 * at a time off as _compress_operation() may re-queue the op.
728 		 */
729 		if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
730 			op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
731 			rc = _compress_operation(op_to_resubmit->backing_dev,
732 						 op_to_resubmit->src_iovs,
733 						 op_to_resubmit->src_iovcnt,
734 						 op_to_resubmit->dst_iovs,
735 						 op_to_resubmit->dst_iovcnt,
736 						 op_to_resubmit->compress,
737 						 op_to_resubmit->cb_arg);
738 			if (rc == 0) {
739 				TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
740 				free(op_to_resubmit);
741 			}
742 		}
743 	}
744 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
745 }
746 
747 /* Entry point for reduce lib to issue a compress operation. */
748 static void
749 _comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
750 		      struct iovec *src_iovs, int src_iovcnt,
751 		      struct iovec *dst_iovs, int dst_iovcnt,
752 		      struct spdk_reduce_vol_cb_args *cb_arg)
753 {
754 	int rc;
755 
756 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
757 	if (rc) {
758 		SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
759 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
760 	}
761 }
762 
763 /* Entry point for reduce lib to issue a decompress operation. */
764 static void
765 _comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
766 			struct iovec *src_iovs, int src_iovcnt,
767 			struct iovec *dst_iovs, int dst_iovcnt,
768 			struct spdk_reduce_vol_cb_args *cb_arg)
769 {
770 	int rc;
771 
772 	rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
773 	if (rc) {
774 		SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
775 		cb_arg->cb_fn(cb_arg->cb_arg, rc);
776 	}
777 }
778 
779 /* Callback for getting a buf from the bdev pool in the event that the caller passed
780  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
781  * beneath us before we're done with it.
782  */
783 static void
784 comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
785 {
786 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
787 					   comp_bdev);
788 
789 	if (spdk_unlikely(!success)) {
790 		SPDK_ERRLOG("Failed to get data buffer\n");
791 		reduce_rw_blocks_cb(bdev_io, -ENOMEM);
792 		return;
793 	}
794 
795 	spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
796 			      bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
797 			      reduce_rw_blocks_cb, bdev_io);
798 }
799 
800 /* scheduled for completion on IO thread */
801 static void
802 _complete_other_io(void *arg)
803 {
804 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
805 	if (io_ctx->status == 0) {
806 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
807 	} else {
808 		spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
809 	}
810 }
811 
812 /* scheduled for submission on reduce thread */
813 static void
814 _comp_bdev_io_submit(void *arg)
815 {
816 	struct spdk_bdev_io *bdev_io = arg;
817 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
818 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
819 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
820 					   comp_bdev);
821 	struct spdk_thread *orig_thread;
822 
823 	switch (bdev_io->type) {
824 	case SPDK_BDEV_IO_TYPE_READ:
825 		spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
826 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
827 		return;
828 	case SPDK_BDEV_IO_TYPE_WRITE:
829 		spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
830 				       bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
831 				       reduce_rw_blocks_cb, bdev_io);
832 		return;
833 	/* TODO support RESET in future patch in the series */
834 	case SPDK_BDEV_IO_TYPE_RESET:
835 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
836 	case SPDK_BDEV_IO_TYPE_UNMAP:
837 	case SPDK_BDEV_IO_TYPE_FLUSH:
838 	default:
839 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
840 		io_ctx->status = -ENOTSUP;
841 
842 		/* Complete this on the orig IO thread. */
843 		orig_thread = spdk_io_channel_get_thread(ch);
844 		if (orig_thread != spdk_get_thread()) {
845 			spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
846 		} else {
847 			_complete_other_io(io_ctx);
848 		}
849 	}
850 }
851 
852 /* Called when someone above submits IO to this vbdev. */
853 static void
854 vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
855 {
856 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
857 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
858 					   comp_bdev);
859 	struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
860 
861 	memset(io_ctx, 0, sizeof(struct comp_bdev_io));
862 	io_ctx->comp_bdev = comp_bdev;
863 	io_ctx->comp_ch = comp_ch;
864 	io_ctx->orig_io = bdev_io;
865 
866 	/* Send this request to the reduce_thread if that's not what we're on. */
867 	if (spdk_get_thread() != comp_bdev->reduce_thread) {
868 		spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
869 	} else {
870 		_comp_bdev_io_submit(bdev_io);
871 	}
872 }
873 
874 static bool
875 vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
876 {
877 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
878 
879 	switch (io_type) {
880 	case SPDK_BDEV_IO_TYPE_READ:
881 	case SPDK_BDEV_IO_TYPE_WRITE:
882 		return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
883 	case SPDK_BDEV_IO_TYPE_UNMAP:
884 	case SPDK_BDEV_IO_TYPE_RESET:
885 	case SPDK_BDEV_IO_TYPE_FLUSH:
886 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
887 	default:
888 		return false;
889 	}
890 }
891 
892 /* Resubmission function used by the bdev layer when a queued IO is ready to be
893  * submitted.
894  */
895 static void
896 vbdev_compress_resubmit_io(void *arg)
897 {
898 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
899 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
900 
901 	vbdev_compress_submit_request(io_ctx->ch, bdev_io);
902 }
903 
904 /* Used to queue an IO in the event of resource issues. */
905 static void
906 vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
907 {
908 	struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
909 	int rc;
910 
911 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
912 	io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
913 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
914 
915 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
916 	if (rc) {
917 		SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
918 		assert(false);
919 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
920 	}
921 }
922 
923 /* Callback for unregistering the IO device. */
924 static void
925 _device_unregister_cb(void *io_device)
926 {
927 	struct vbdev_compress *comp_bdev = io_device;
928 
929 	/* Done with this comp_bdev. */
930 	pthread_mutex_destroy(&comp_bdev->reduce_lock);
931 	free(comp_bdev->comp_bdev.name);
932 	free(comp_bdev);
933 }
934 
935 static void
936 _vbdev_compress_destruct_cb(void *ctx)
937 {
938 	struct vbdev_compress *comp_bdev = ctx;
939 
940 	TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
941 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
942 	/* Close the underlying bdev on its same opened thread. */
943 	spdk_bdev_close(comp_bdev->base_desc);
944 	comp_bdev->vol = NULL;
945 	if (comp_bdev->orphaned == false) {
946 		spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
947 	} else {
948 		vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
949 		_device_unregister_cb(comp_bdev);
950 	}
951 }
952 
953 static void
954 vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
955 {
956 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
957 
958 	if (reduce_errno) {
959 		SPDK_ERRLOG("number %d\n", reduce_errno);
960 	} else {
961 		if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
962 			spdk_thread_send_msg(comp_bdev->thread,
963 					     _vbdev_compress_destruct_cb, comp_bdev);
964 		} else {
965 			_vbdev_compress_destruct_cb(comp_bdev);
966 		}
967 	}
968 }
969 
970 static void
971 _reduce_destroy_cb(void *ctx, int reduce_errno)
972 {
973 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
974 
975 	if (reduce_errno) {
976 		SPDK_ERRLOG("number %d\n", reduce_errno);
977 	}
978 
979 	comp_bdev->vol = NULL;
980 	spdk_put_io_channel(comp_bdev->base_ch);
981 	if (comp_bdev->orphaned == false) {
982 		spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
983 				     comp_bdev->delete_ctx);
984 	} else {
985 		vbdev_compress_destruct_cb((void *)comp_bdev, 0);
986 	}
987 
988 }
989 
990 static void
991 _delete_vol_unload_cb(void *ctx)
992 {
993 	struct vbdev_compress *comp_bdev = ctx;
994 
995 	/* FIXME: Assert if these conditions are not satisfied for now. */
996 	assert(!comp_bdev->reduce_thread ||
997 	       comp_bdev->reduce_thread == spdk_get_thread());
998 
999 	/* reducelib needs a channel to comm with the backing device */
1000 	comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1001 
1002 	/* Clean the device before we free our resources. */
1003 	spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
1004 }
1005 
1006 /* Called by reduceLib after performing unload vol actions */
1007 static void
1008 delete_vol_unload_cb(void *cb_arg, int reduce_errno)
1009 {
1010 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1011 
1012 	if (reduce_errno) {
1013 		SPDK_ERRLOG("number %d\n", reduce_errno);
1014 		/* FIXME: callback should be executed. */
1015 		return;
1016 	}
1017 
1018 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1019 	if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
1020 		spdk_thread_send_msg(comp_bdev->reduce_thread,
1021 				     _delete_vol_unload_cb, comp_bdev);
1022 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1023 	} else {
1024 		pthread_mutex_unlock(&comp_bdev->reduce_lock);
1025 
1026 		_delete_vol_unload_cb(comp_bdev);
1027 	}
1028 }
1029 
1030 const char *
1031 compress_get_name(const struct vbdev_compress *comp_bdev)
1032 {
1033 	return comp_bdev->comp_bdev.name;
1034 }
1035 
1036 struct vbdev_compress *
1037 compress_bdev_first(void)
1038 {
1039 	struct vbdev_compress *comp_bdev;
1040 
1041 	comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
1042 
1043 	return comp_bdev;
1044 }
1045 
1046 struct vbdev_compress *
1047 compress_bdev_next(struct vbdev_compress *prev)
1048 {
1049 	struct vbdev_compress *comp_bdev;
1050 
1051 	comp_bdev = TAILQ_NEXT(prev, link);
1052 
1053 	return comp_bdev;
1054 }
1055 
1056 bool
1057 compress_has_orphan(const char *name)
1058 {
1059 	struct vbdev_compress *comp_bdev;
1060 
1061 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1062 		if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1063 			return true;
1064 		}
1065 	}
1066 	return false;
1067 }
1068 
1069 /* Called after we've unregistered following a hot remove callback.
1070  * Our finish entry point will be called next.
1071  */
1072 static int
1073 vbdev_compress_destruct(void *ctx)
1074 {
1075 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1076 
1077 	if (comp_bdev->vol != NULL) {
1078 		/* Tell reducelib that we're done with this volume. */
1079 		spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
1080 	} else {
1081 		vbdev_compress_destruct_cb(comp_bdev, 0);
1082 	}
1083 
1084 	return 0;
1085 }
1086 
1087 /* We supplied this as an entry point for upper layers who want to communicate to this
1088  * bdev.  This is how they get a channel.
1089  */
1090 static struct spdk_io_channel *
1091 vbdev_compress_get_io_channel(void *ctx)
1092 {
1093 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1094 
1095 	/* The IO channel code will allocate a channel for us which consists of
1096 	 * the SPDK channel structure plus the size of our comp_io_channel struct
1097 	 * that we passed in when we registered our IO device. It will then call
1098 	 * our channel create callback to populate any elements that we need to
1099 	 * update.
1100 	 */
1101 	return spdk_get_io_channel(comp_bdev);
1102 }
1103 
1104 /* This is the output for bdev_get_bdevs() for this vbdev */
1105 static int
1106 vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1107 {
1108 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
1109 
1110 	spdk_json_write_name(w, "compress");
1111 	spdk_json_write_object_begin(w);
1112 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1113 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1114 	spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1115 	spdk_json_write_object_end(w);
1116 
1117 	return 0;
1118 }
1119 
1120 /* This is used to generate JSON that can configure this module to its current state. */
1121 static int
1122 vbdev_compress_config_json(struct spdk_json_write_ctx *w)
1123 {
1124 	struct vbdev_compress *comp_bdev;
1125 
1126 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1127 		spdk_json_write_object_begin(w);
1128 		spdk_json_write_named_string(w, "method", "bdev_compress_create");
1129 		spdk_json_write_named_object_begin(w, "params");
1130 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
1131 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
1132 		spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
1133 		spdk_json_write_object_end(w);
1134 		spdk_json_write_object_end(w);
1135 	}
1136 	return 0;
1137 }
1138 
1139 static void
1140 _vbdev_reduce_init_cb(void *ctx)
1141 {
1142 	struct vbdev_compress *meta_ctx = ctx;
1143 	int rc;
1144 
1145 	assert(meta_ctx->base_desc != NULL);
1146 
1147 	/* We're done with metadata operations */
1148 	spdk_put_io_channel(meta_ctx->base_ch);
1149 
1150 	if (meta_ctx->vol) {
1151 		rc = vbdev_compress_claim(meta_ctx);
1152 		if (rc == 0) {
1153 			return;
1154 		}
1155 	}
1156 
1157 	/* Close the underlying bdev on its same opened thread. */
1158 	spdk_bdev_close(meta_ctx->base_desc);
1159 	free(meta_ctx);
1160 }
1161 
1162 /* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
1163  * used for initial metadata operations to claim where it will be further filled out
1164  * and added to the global list.
1165  */
1166 static void
1167 vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1168 {
1169 	struct vbdev_compress *meta_ctx = cb_arg;
1170 
1171 	if (reduce_errno == 0) {
1172 		meta_ctx->vol = vol;
1173 	} else {
1174 		SPDK_ERRLOG("for vol %s, error %u\n",
1175 			    spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
1176 	}
1177 
1178 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1179 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
1180 	} else {
1181 		_vbdev_reduce_init_cb(meta_ctx);
1182 	}
1183 }
1184 
1185 /* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
1186  * call the callback provided by reduceLib when it called the read/write/unmap function and
1187  * free the bdev_io.
1188  */
1189 static void
1190 comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1191 {
1192 	struct spdk_reduce_vol_cb_args *cb_args = arg;
1193 	int reduce_errno;
1194 
1195 	if (success) {
1196 		reduce_errno = 0;
1197 	} else {
1198 		reduce_errno = -EIO;
1199 	}
1200 	spdk_bdev_free_io(bdev_io);
1201 	cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
1202 }
1203 
1204 /* This is the function provided to the reduceLib for sending reads directly to
1205  * the backing device.
1206  */
1207 static void
1208 _comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1209 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1210 {
1211 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1212 					   backing_dev);
1213 	int rc;
1214 
1215 	rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1216 				    iov, iovcnt, lba, lba_count,
1217 				    comp_reduce_io_cb,
1218 				    args);
1219 	if (rc) {
1220 		if (rc == -ENOMEM) {
1221 			SPDK_ERRLOG("No memory, start to queue io.\n");
1222 			/* TODO: there's no bdev_io to queue */
1223 		} else {
1224 			SPDK_ERRLOG("submitting readv request\n");
1225 		}
1226 		args->cb_fn(args->cb_arg, rc);
1227 	}
1228 }
1229 
1230 /* This is the function provided to the reduceLib for sending writes directly to
1231  * the backing device.
1232  */
1233 static void
1234 _comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
1235 		    uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1236 {
1237 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1238 					   backing_dev);
1239 	int rc;
1240 
1241 	rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1242 				     iov, iovcnt, lba, lba_count,
1243 				     comp_reduce_io_cb,
1244 				     args);
1245 	if (rc) {
1246 		if (rc == -ENOMEM) {
1247 			SPDK_ERRLOG("No memory, start to queue io.\n");
1248 			/* TODO: there's no bdev_io to queue */
1249 		} else {
1250 			SPDK_ERRLOG("error submitting writev request\n");
1251 		}
1252 		args->cb_fn(args->cb_arg, rc);
1253 	}
1254 }
1255 
1256 /* This is the function provided to the reduceLib for sending unmaps directly to
1257  * the backing device.
1258  */
1259 static void
1260 _comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
1261 		   uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
1262 {
1263 	struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
1264 					   backing_dev);
1265 	int rc;
1266 
1267 	rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
1268 				    lba, lba_count,
1269 				    comp_reduce_io_cb,
1270 				    args);
1271 
1272 	if (rc) {
1273 		if (rc == -ENOMEM) {
1274 			SPDK_ERRLOG("No memory, start to queue io.\n");
1275 			/* TODO: there's no bdev_io to queue */
1276 		} else {
1277 			SPDK_ERRLOG("submitting unmap request\n");
1278 		}
1279 		args->cb_fn(args->cb_arg, rc);
1280 	}
1281 }
1282 
1283 /* Called by reduceLib after performing unload vol actions following base bdev hotremove */
1284 static void
1285 bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
1286 {
1287 	struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
1288 
1289 	if (reduce_errno) {
1290 		SPDK_ERRLOG("number %d\n", reduce_errno);
1291 	}
1292 
1293 	comp_bdev->vol = NULL;
1294 	spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
1295 }
1296 
1297 static void
1298 vbdev_compress_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1299 {
1300 	struct vbdev_compress *comp_bdev, *tmp;
1301 
1302 	TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
1303 		if (bdev_find == comp_bdev->base_bdev) {
1304 			/* Tell reduceLib that we're done with this volume. */
1305 			spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
1306 		}
1307 	}
1308 }
1309 
1310 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1311 static void
1312 vbdev_compress_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1313 				  void *event_ctx)
1314 {
1315 	switch (type) {
1316 	case SPDK_BDEV_EVENT_REMOVE:
1317 		vbdev_compress_base_bdev_hotremove_cb(bdev);
1318 		break;
1319 	default:
1320 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1321 		break;
1322 	}
1323 }
1324 
1325 /* TODO: determine which parms we want user configurable, HC for now
1326  * params.vol_size
1327  * params.chunk_size
1328  * compression PMD, algorithm, window size, comp level, etc.
1329  * DEV_MD_PATH
1330  */
1331 
1332 /* Common function for init and load to allocate and populate the minimal
1333  * information for reducelib to init or load.
1334  */
1335 struct vbdev_compress *
1336 _prepare_for_load_init(struct spdk_bdev_desc *bdev_desc, uint32_t lb_size)
1337 {
1338 	struct vbdev_compress *meta_ctx;
1339 	struct spdk_bdev *bdev;
1340 
1341 	meta_ctx = calloc(1, sizeof(struct vbdev_compress));
1342 	if (meta_ctx == NULL) {
1343 		SPDK_ERRLOG("failed to alloc init contexts\n");
1344 		return NULL;
1345 	}
1346 
1347 	meta_ctx->drv_name = "None";
1348 	meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
1349 	meta_ctx->backing_dev.readv = _comp_reduce_readv;
1350 	meta_ctx->backing_dev.writev = _comp_reduce_writev;
1351 	meta_ctx->backing_dev.compress = _comp_reduce_compress;
1352 	meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
1353 
1354 	meta_ctx->base_desc = bdev_desc;
1355 	bdev = spdk_bdev_desc_get_bdev(bdev_desc);
1356 	meta_ctx->base_bdev = bdev;
1357 
1358 	meta_ctx->backing_dev.blocklen = bdev->blocklen;
1359 	meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
1360 
1361 	meta_ctx->params.chunk_size = CHUNK_SIZE;
1362 	if (lb_size == 0) {
1363 		meta_ctx->params.logical_block_size = bdev->blocklen;
1364 	} else {
1365 		meta_ctx->params.logical_block_size = lb_size;
1366 	}
1367 
1368 	meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
1369 	return meta_ctx;
1370 }
1371 
1372 static bool
1373 _set_pmd(struct vbdev_compress *comp_dev)
1374 {
1375 	if (g_opts == COMPRESS_PMD_AUTO) {
1376 		if (g_qat_available) {
1377 			comp_dev->drv_name = QAT_PMD;
1378 		} else if (g_mlx5_pci_available) {
1379 			comp_dev->drv_name = MLX5_PMD;
1380 		} else {
1381 			comp_dev->drv_name = ISAL_PMD;
1382 		}
1383 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
1384 		comp_dev->drv_name = QAT_PMD;
1385 	} else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
1386 		comp_dev->drv_name = ISAL_PMD;
1387 	} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
1388 		comp_dev->drv_name = MLX5_PMD;
1389 	} else {
1390 		SPDK_ERRLOG("Requested PMD is not available.\n");
1391 		return false;
1392 	}
1393 	SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
1394 	return true;
1395 }
1396 
1397 /* Call reducelib to initialize a new volume */
1398 static int
1399 vbdev_init_reduce(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1400 {
1401 	struct spdk_bdev_desc *bdev_desc = NULL;
1402 	struct vbdev_compress *meta_ctx;
1403 	int rc;
1404 
1405 	rc = spdk_bdev_open_ext(bdev_name, true, vbdev_compress_base_bdev_event_cb,
1406 				NULL, &bdev_desc);
1407 	if (rc) {
1408 		SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
1409 		return rc;
1410 	}
1411 
1412 	meta_ctx = _prepare_for_load_init(bdev_desc, lb_size);
1413 	if (meta_ctx == NULL) {
1414 		spdk_bdev_close(bdev_desc);
1415 		return -EINVAL;
1416 	}
1417 
1418 	if (_set_pmd(meta_ctx) == false) {
1419 		SPDK_ERRLOG("could not find required pmd\n");
1420 		free(meta_ctx);
1421 		spdk_bdev_close(bdev_desc);
1422 		return -EINVAL;
1423 	}
1424 
1425 	/* Save the thread where the base device is opened */
1426 	meta_ctx->thread = spdk_get_thread();
1427 
1428 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1429 
1430 	spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
1431 			     pm_path,
1432 			     vbdev_reduce_init_cb,
1433 			     meta_ctx);
1434 	return 0;
1435 }
1436 
1437 /* We provide this callback for the SPDK channel code to create a channel using
1438  * the channel struct we provided in our module get_io_channel() entry point. Here
1439  * we get and save off an underlying base channel of the device below us so that
1440  * we can communicate with the base bdev on a per channel basis.  If we needed
1441  * our own poller for this vbdev, we'd register it here.
1442  */
1443 static int
1444 comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1445 {
1446 	struct vbdev_compress *comp_bdev = io_device;
1447 	struct comp_device_qp *device_qp;
1448 
1449 	/* Now set the reduce channel if it's not already set. */
1450 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1451 	if (comp_bdev->ch_count == 0) {
1452 		/* We use this queue to track outstanding IO in our layer. */
1453 		TAILQ_INIT(&comp_bdev->pending_comp_ios);
1454 
1455 		/* We use this to queue up compression operations as needed. */
1456 		TAILQ_INIT(&comp_bdev->queued_comp_ops);
1457 
1458 		comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
1459 		comp_bdev->reduce_thread = spdk_get_thread();
1460 		comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
1461 		/* Now assign a q pair */
1462 		pthread_mutex_lock(&g_comp_device_qp_lock);
1463 		TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
1464 			if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
1465 				if (device_qp->thread == spdk_get_thread()) {
1466 					comp_bdev->device_qp = device_qp;
1467 					break;
1468 				}
1469 				if (device_qp->thread == NULL) {
1470 					comp_bdev->device_qp = device_qp;
1471 					device_qp->thread = spdk_get_thread();
1472 					break;
1473 				}
1474 			}
1475 		}
1476 		pthread_mutex_unlock(&g_comp_device_qp_lock);
1477 	}
1478 	comp_bdev->ch_count++;
1479 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1480 
1481 	if (comp_bdev->device_qp != NULL) {
1482 		uint64_t comp_feature_flags =
1483 			comp_bdev->device_qp->device->cdev_info.capabilities[RTE_COMP_ALGO_DEFLATE].comp_feature_flags;
1484 
1485 		if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) {
1486 			comp_bdev->backing_dev.sgl_in = true;
1487 		}
1488 		if (comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) {
1489 			comp_bdev->backing_dev.sgl_out = true;
1490 		}
1491 		return 0;
1492 	} else {
1493 		SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
1494 		assert(false);
1495 		return -ENOMEM;
1496 	}
1497 }
1498 
1499 static void
1500 _channel_cleanup(struct vbdev_compress *comp_bdev)
1501 {
1502 	/* Note: comp_bdevs can share a device_qp if they are
1503 	 * on the same thread so we leave the device_qp element
1504 	 * alone for this comp_bdev and just clear the reduce thread.
1505 	 */
1506 	spdk_put_io_channel(comp_bdev->base_ch);
1507 	comp_bdev->reduce_thread = NULL;
1508 	spdk_poller_unregister(&comp_bdev->poller);
1509 }
1510 
1511 /* Used to reroute destroy_ch to the correct thread */
1512 static void
1513 _comp_bdev_ch_destroy_cb(void *arg)
1514 {
1515 	struct vbdev_compress *comp_bdev = arg;
1516 
1517 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1518 	_channel_cleanup(comp_bdev);
1519 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1520 }
1521 
1522 /* We provide this callback for the SPDK channel code to destroy a channel
1523  * created with our create callback. We just need to undo anything we did
1524  * when we created. If this bdev used its own poller, we'd unregister it here.
1525  */
1526 static void
1527 comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1528 {
1529 	struct vbdev_compress *comp_bdev = io_device;
1530 
1531 	pthread_mutex_lock(&comp_bdev->reduce_lock);
1532 	comp_bdev->ch_count--;
1533 	if (comp_bdev->ch_count == 0) {
1534 		/* Send this request to the thread where the channel was created. */
1535 		if (comp_bdev->reduce_thread != spdk_get_thread()) {
1536 			spdk_thread_send_msg(comp_bdev->reduce_thread,
1537 					     _comp_bdev_ch_destroy_cb, comp_bdev);
1538 		} else {
1539 			_channel_cleanup(comp_bdev);
1540 		}
1541 	}
1542 	pthread_mutex_unlock(&comp_bdev->reduce_lock);
1543 }
1544 
1545 /* RPC entry point for compression vbdev creation. */
1546 int
1547 create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
1548 {
1549 	struct vbdev_compress *comp_bdev = NULL;
1550 
1551 	if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
1552 		SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
1553 		return -EINVAL;
1554 	}
1555 
1556 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1557 		if (strcmp(bdev_name, comp_bdev->base_bdev->name) == 0) {
1558 			SPDK_ERRLOG("Bass bdev %s already being used for a compress bdev\n", bdev_name);
1559 			return -EBUSY;
1560 		}
1561 	}
1562 	return vbdev_init_reduce(bdev_name, pm_path, lb_size);
1563 }
1564 
1565 /* On init, just init the compress drivers. All metadata is stored on disk. */
1566 static int
1567 vbdev_compress_init(void)
1568 {
1569 	if (vbdev_init_compress_drivers()) {
1570 		SPDK_ERRLOG("Error setting up compression devices\n");
1571 		return -EINVAL;
1572 	}
1573 
1574 	return 0;
1575 }
1576 
1577 /* Called when the entire module is being torn down. */
1578 static void
1579 vbdev_compress_finish(void)
1580 {
1581 	struct comp_device_qp *dev_qp;
1582 	/* TODO: unload vol in a future patch */
1583 
1584 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
1585 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
1586 		free(dev_qp);
1587 	}
1588 	pthread_mutex_destroy(&g_comp_device_qp_lock);
1589 
1590 	rte_mempool_free(g_comp_op_mp);
1591 	rte_mempool_free(g_mbuf_mp);
1592 }
1593 
1594 /* During init we'll be asked how much memory we'd like passed to us
1595  * in bev_io structures as context. Here's where we specify how
1596  * much context we want per IO.
1597  */
1598 static int
1599 vbdev_compress_get_ctx_size(void)
1600 {
1601 	return sizeof(struct comp_bdev_io);
1602 }
1603 
1604 /* When we register our bdev this is how we specify our entry points. */
1605 static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
1606 	.destruct		= vbdev_compress_destruct,
1607 	.submit_request		= vbdev_compress_submit_request,
1608 	.io_type_supported	= vbdev_compress_io_type_supported,
1609 	.get_io_channel		= vbdev_compress_get_io_channel,
1610 	.dump_info_json		= vbdev_compress_dump_info_json,
1611 	.write_config_json	= NULL,
1612 };
1613 
1614 static struct spdk_bdev_module compress_if = {
1615 	.name = "compress",
1616 	.module_init = vbdev_compress_init,
1617 	.get_ctx_size = vbdev_compress_get_ctx_size,
1618 	.examine_disk = vbdev_compress_examine,
1619 	.module_fini = vbdev_compress_finish,
1620 	.config_json = vbdev_compress_config_json
1621 };
1622 
1623 SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
1624 
1625 static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
1626 {
1627 	struct spdk_bdev_alias *aliases;
1628 
1629 	if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
1630 		aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
1631 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias.name);
1632 		if (!comp_bdev->comp_bdev.name) {
1633 			SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
1634 			return -ENOMEM;
1635 		}
1636 	} else {
1637 		comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
1638 		if (!comp_bdev->comp_bdev.name) {
1639 			SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
1640 			return -ENOMEM;
1641 		}
1642 	}
1643 	return 0;
1644 }
1645 
1646 static int
1647 vbdev_compress_claim(struct vbdev_compress *comp_bdev)
1648 {
1649 	int rc;
1650 
1651 	if (_set_compbdev_name(comp_bdev)) {
1652 		return -EINVAL;
1653 	}
1654 
1655 	/* Note: some of the fields below will change in the future - for example,
1656 	 * blockcnt specifically will not match (the compressed volume size will
1657 	 * be slightly less than the base bdev size)
1658 	 */
1659 	comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
1660 	comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
1661 
1662 	if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
1663 		comp_bdev->comp_bdev.required_alignment =
1664 			spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
1665 				 comp_bdev->base_bdev->required_alignment);
1666 		SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1667 			       comp_bdev->comp_bdev.required_alignment);
1668 	} else {
1669 		comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
1670 	}
1671 	comp_bdev->comp_bdev.optimal_io_boundary =
1672 		comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
1673 
1674 	comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
1675 
1676 	comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
1677 	comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
1678 	assert(comp_bdev->comp_bdev.blockcnt > 0);
1679 
1680 	/* This is the context that is passed to us when the bdev
1681 	 * layer calls in so we'll save our comp_bdev node here.
1682 	 */
1683 	comp_bdev->comp_bdev.ctxt = comp_bdev;
1684 	comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
1685 	comp_bdev->comp_bdev.module = &compress_if;
1686 
1687 	pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
1688 
1689 	/* Save the thread where the base device is opened */
1690 	comp_bdev->thread = spdk_get_thread();
1691 
1692 	spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
1693 				sizeof(struct comp_io_channel),
1694 				comp_bdev->comp_bdev.name);
1695 
1696 	rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
1697 					 comp_bdev->comp_bdev.module);
1698 	if (rc) {
1699 		SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
1700 		goto error_claim;
1701 	}
1702 
1703 	rc = spdk_bdev_register(&comp_bdev->comp_bdev);
1704 	if (rc < 0) {
1705 		SPDK_ERRLOG("trying to register bdev\n");
1706 		goto error_bdev_register;
1707 	}
1708 
1709 	TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
1710 
1711 	SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
1712 
1713 	return 0;
1714 
1715 	/* Error cleanup paths. */
1716 error_bdev_register:
1717 	spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
1718 error_claim:
1719 	spdk_io_device_unregister(comp_bdev, NULL);
1720 	free(comp_bdev->comp_bdev.name);
1721 	return rc;
1722 }
1723 
1724 static void
1725 _vbdev_compress_delete_done(void *_ctx)
1726 {
1727 	struct vbdev_comp_delete_ctx *ctx = _ctx;
1728 
1729 	ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
1730 
1731 	free(ctx);
1732 }
1733 
1734 static void
1735 vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
1736 {
1737 	struct vbdev_comp_delete_ctx *ctx = cb_arg;
1738 
1739 	ctx->cb_rc = bdeverrno;
1740 
1741 	if (ctx->orig_thread != spdk_get_thread()) {
1742 		spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
1743 	} else {
1744 		_vbdev_compress_delete_done(ctx);
1745 	}
1746 }
1747 
1748 void
1749 bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
1750 {
1751 	struct vbdev_compress *comp_bdev = NULL;
1752 	struct vbdev_comp_delete_ctx *ctx;
1753 
1754 	TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
1755 		if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
1756 			break;
1757 		}
1758 	}
1759 
1760 	if (comp_bdev == NULL) {
1761 		cb_fn(cb_arg, -ENODEV);
1762 		return;
1763 	}
1764 
1765 	ctx = calloc(1, sizeof(*ctx));
1766 	if (ctx == NULL) {
1767 		SPDK_ERRLOG("Failed to allocate delete context\n");
1768 		cb_fn(cb_arg, -ENOMEM);
1769 		return;
1770 	}
1771 
1772 	/* Save these for after the vol is destroyed. */
1773 	ctx->cb_fn = cb_fn;
1774 	ctx->cb_arg = cb_arg;
1775 	ctx->orig_thread = spdk_get_thread();
1776 
1777 	comp_bdev->delete_ctx = ctx;
1778 
1779 	/* Tell reducelib that we're done with this volume. */
1780 	if (comp_bdev->orphaned == false) {
1781 		spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
1782 	} else {
1783 		delete_vol_unload_cb(comp_bdev, 0);
1784 	}
1785 }
1786 
1787 static void
1788 _vbdev_reduce_load_cb(void *ctx)
1789 {
1790 	struct vbdev_compress *meta_ctx = ctx;
1791 	int rc;
1792 
1793 	assert(meta_ctx->base_desc != NULL);
1794 
1795 	/* Done with metadata operations */
1796 	spdk_put_io_channel(meta_ctx->base_ch);
1797 
1798 	if (meta_ctx->reduce_errno == 0) {
1799 		if (_set_pmd(meta_ctx) == false) {
1800 			SPDK_ERRLOG("could not find required pmd\n");
1801 			goto err;
1802 		}
1803 
1804 		rc = vbdev_compress_claim(meta_ctx);
1805 		if (rc != 0) {
1806 			goto err;
1807 		}
1808 	} else if (meta_ctx->reduce_errno == -ENOENT) {
1809 		if (_set_compbdev_name(meta_ctx)) {
1810 			goto err;
1811 		}
1812 
1813 		/* Save the thread where the base device is opened */
1814 		meta_ctx->thread = spdk_get_thread();
1815 
1816 		meta_ctx->comp_bdev.module = &compress_if;
1817 		pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
1818 		rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
1819 						 meta_ctx->comp_bdev.module);
1820 		if (rc) {
1821 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
1822 			free(meta_ctx->comp_bdev.name);
1823 			goto err;
1824 		}
1825 
1826 		meta_ctx->orphaned = true;
1827 		TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
1828 	} else {
1829 		if (meta_ctx->reduce_errno != -EILSEQ) {
1830 			SPDK_ERRLOG("for vol %s, error %u\n",
1831 				    spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
1832 		}
1833 		goto err;
1834 	}
1835 
1836 	spdk_bdev_module_examine_done(&compress_if);
1837 	return;
1838 
1839 err:
1840 	/* Close the underlying bdev on its same opened thread. */
1841 	spdk_bdev_close(meta_ctx->base_desc);
1842 	free(meta_ctx);
1843 	spdk_bdev_module_examine_done(&compress_if);
1844 }
1845 
1846 /* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
1847  * used for initial metadata operations to claim where it will be further filled out
1848  * and added to the global list.
1849  */
1850 static void
1851 vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
1852 {
1853 	struct vbdev_compress *meta_ctx = cb_arg;
1854 
1855 	if (reduce_errno == 0) {
1856 		/* Update information following volume load. */
1857 		meta_ctx->vol = vol;
1858 		memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
1859 		       sizeof(struct spdk_reduce_vol_params));
1860 	}
1861 
1862 	meta_ctx->reduce_errno = reduce_errno;
1863 
1864 	if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
1865 		spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
1866 	} else {
1867 		_vbdev_reduce_load_cb(meta_ctx);
1868 	}
1869 
1870 }
1871 
1872 /* Examine_disk entry point: will do a metadata load to see if this is ours,
1873  * and if so will go ahead and claim it.
1874  */
1875 static void
1876 vbdev_compress_examine(struct spdk_bdev *bdev)
1877 {
1878 	struct spdk_bdev_desc *bdev_desc = NULL;
1879 	struct vbdev_compress *meta_ctx;
1880 	int rc;
1881 
1882 	if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
1883 		spdk_bdev_module_examine_done(&compress_if);
1884 		return;
1885 	}
1886 
1887 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false,
1888 				vbdev_compress_base_bdev_event_cb, NULL, &bdev_desc);
1889 	if (rc) {
1890 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1891 		spdk_bdev_module_examine_done(&compress_if);
1892 		return;
1893 	}
1894 
1895 	meta_ctx = _prepare_for_load_init(bdev_desc, 0);
1896 	if (meta_ctx == NULL) {
1897 		spdk_bdev_close(bdev_desc);
1898 		spdk_bdev_module_examine_done(&compress_if);
1899 		return;
1900 	}
1901 
1902 	/* Save the thread where the base device is opened */
1903 	meta_ctx->thread = spdk_get_thread();
1904 
1905 	meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
1906 	spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
1907 }
1908 
1909 int
1910 compress_set_pmd(enum compress_pmd *opts)
1911 {
1912 	g_opts = *opts;
1913 
1914 	return 0;
1915 }
1916 
1917 SPDK_LOG_REGISTER_COMPONENT(vbdev_compress)
1918