xref: /spdk/module/accel/dpdk_compressdev/accel_dpdk_compressdev.c (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021, 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "accel_dpdk_compressdev.h"
8 #include "spdk/accel_module.h"
9 
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/likely.h"
18 
19 #include "spdk/log.h"
20 
21 #include <rte_config.h>
22 #include <rte_bus_vdev.h>
23 #include <rte_compressdev.h>
24 #include <rte_comp.h>
25 #include <rte_mbuf_dyn.h>
26 
27 /* Used to store IO context in mbuf */
28 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
29 	.name = "context_accel_comp",
30 	.size = sizeof(uint64_t),
31 	.align = __alignof__(uint64_t),
32 	.flags = 0,
33 };
34 static int g_mbuf_offset;
35 static enum compress_pmd g_opts;
36 static bool g_compressdev_enable = false;
37 static bool g_compressdev_initialized = false;
38 
39 #define NUM_MAX_XFORMS		2
40 #define NUM_MAX_INFLIGHT_OPS	128
41 #define DEFAULT_WINDOW_SIZE	15
42 #define MBUF_SPLIT		(1UL << DEFAULT_WINDOW_SIZE)
43 #define QAT_PMD			"compress_qat"
44 #define MLX5_PMD		"mlx5_pci"
45 #define NUM_MBUFS		65536
46 #define POOL_CACHE_SIZE		256
47 
48 /* Global list of available compression devices. */
49 struct compress_dev {
50 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
51 	uint8_t				cdev_id;	/* identifier for the device */
52 	void				*comp_xform;	/* shared private xform for comp on this PMD */
53 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
54 	bool				sgl_in;
55 	bool				sgl_out;
56 	TAILQ_ENTRY(compress_dev)	link;
57 };
58 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
59 
60 #define MAX_NUM_QP 48
61 /* Global list and lock for unique device/queue pair combos */
62 struct comp_device_qp {
63 	struct compress_dev		*device;	/* ptr to compression device */
64 	uint8_t				qp;		/* queue pair for this node */
65 	struct compress_io_channel	*chan;
66 	TAILQ_ENTRY(comp_device_qp)	link;
67 };
68 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
69 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
70 
71 struct compress_io_channel {
72 	char				*drv_name;	/* name of the compression device driver */
73 	struct comp_device_qp		*device_qp;
74 	struct spdk_poller		*poller;
75 	struct rte_mbuf			**src_mbufs;
76 	struct rte_mbuf			**dst_mbufs;
77 	TAILQ_HEAD(, spdk_accel_task)	queued_tasks;
78 };
79 
80 /* Shared mempools between all devices on this system */
81 static struct rte_mempool *g_mbuf_mp = NULL;		/* mbuf mempool */
82 static struct rte_mempool *g_comp_op_mp = NULL;		/* comp operations, must be rte* mempool */
83 static struct rte_mbuf_ext_shared_info g_shinfo = {};	/* used by DPDK mbuf macros */
84 static bool g_qat_available = false;
85 static bool g_mlx5_pci_available = false;
86 
87 /* Create shared (between all ops per PMD) compress xforms. */
88 static struct rte_comp_xform g_comp_xform = {
89 	.type = RTE_COMP_COMPRESS,
90 	.compress = {
91 		.algo = RTE_COMP_ALGO_DEFLATE,
92 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
93 		.level = RTE_COMP_LEVEL_MAX,
94 		.window_size = DEFAULT_WINDOW_SIZE,
95 		.chksum = RTE_COMP_CHECKSUM_NONE,
96 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
97 	}
98 };
99 /* Create shared (between all ops per PMD) decompress xforms. */
100 static struct rte_comp_xform g_decomp_xform = {
101 	.type = RTE_COMP_DECOMPRESS,
102 	.decompress = {
103 		.algo = RTE_COMP_ALGO_DEFLATE,
104 		.chksum = RTE_COMP_CHECKSUM_NONE,
105 		.window_size = DEFAULT_WINDOW_SIZE,
106 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
107 	}
108 };
109 
110 /* Dummy function used by DPDK to free ext attached buffers
111  * to mbufs, we free them ourselves but this callback has to
112  * be here.
113  */
114 static void
115 shinfo_free_cb(void *arg1, void *arg2)
116 {
117 }
118 
119 /* Called by accel_init_compress_drivers() to init each discovered compression device */
120 static int
121 create_compress_dev(uint8_t index)
122 {
123 	struct compress_dev *device;
124 	uint16_t q_pairs;
125 	uint8_t cdev_id;
126 	int rc, i;
127 	struct comp_device_qp *dev_qp;
128 	struct comp_device_qp *tmp_qp;
129 
130 	device = calloc(1, sizeof(struct compress_dev));
131 	if (!device) {
132 		return -ENOMEM;
133 	}
134 
135 	/* Get details about this device. */
136 	rte_compressdev_info_get(index, &device->cdev_info);
137 
138 	cdev_id = device->cdev_id = index;
139 
140 	/* Zero means no limit so choose number of lcores. */
141 	if (device->cdev_info.max_nb_queue_pairs == 0) {
142 		q_pairs = MAX_NUM_QP;
143 	} else {
144 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
145 	}
146 
147 	/* Configure the compression device. */
148 	struct rte_compressdev_config config = {
149 		.socket_id = rte_socket_id(),
150 		.nb_queue_pairs = q_pairs,
151 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
152 		.max_nb_streams = 0
153 	};
154 	rc = rte_compressdev_configure(cdev_id, &config);
155 	if (rc < 0) {
156 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
157 		goto err;
158 	}
159 
160 	/* Pre-setup all potential qpairs now and assign them in the channel
161 	 * callback.
162 	 */
163 	for (i = 0; i < q_pairs; i++) {
164 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
165 						      NUM_MAX_INFLIGHT_OPS,
166 						      rte_socket_id());
167 		if (rc) {
168 			if (i > 0) {
169 				q_pairs = i;
170 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
171 					       "compressdev %u with error %u "
172 					       "so limiting to %u qpairs\n",
173 					       cdev_id, rc, q_pairs);
174 				break;
175 			} else {
176 				SPDK_ERRLOG("Failed to setup queue pair on "
177 					    "compressdev %u with error %u\n", cdev_id, rc);
178 				rc = -EINVAL;
179 				goto err;
180 			}
181 		}
182 	}
183 
184 	rc = rte_compressdev_start(cdev_id);
185 	if (rc < 0) {
186 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
187 			    cdev_id, rc);
188 		goto err;
189 	}
190 
191 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
192 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
193 				&device->comp_xform);
194 		if (rc < 0) {
195 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
196 				    cdev_id, rc);
197 			goto err;
198 		}
199 
200 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
201 				&device->decomp_xform);
202 		if (rc) {
203 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
204 				    cdev_id, rc);
205 			goto err;
206 		}
207 	} else {
208 		SPDK_ERRLOG("PMD does not support shared transforms\n");
209 		goto err;
210 	}
211 
212 	/* Build up list of device/qp combinations */
213 	for (i = 0; i < q_pairs; i++) {
214 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
215 		if (!dev_qp) {
216 			rc = -ENOMEM;
217 			goto err;
218 		}
219 		dev_qp->device = device;
220 		dev_qp->qp = i;
221 		dev_qp->chan = NULL;
222 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
223 	}
224 
225 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
226 
227 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
228 		g_qat_available = true;
229 	}
230 
231 	if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
232 		g_mlx5_pci_available = true;
233 	}
234 
235 	return 0;
236 
237 err:
238 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
239 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
240 		free(dev_qp);
241 	}
242 	free(device);
243 	return rc;
244 }
245 
246 /* Called from driver init entry point, accel_compress_init() */
247 static int
248 accel_init_compress_drivers(void)
249 {
250 	uint8_t cdev_count, i;
251 	struct compress_dev *tmp_dev;
252 	struct compress_dev *device;
253 	int rc;
254 
255 	/* If we have no compression devices, there's no reason to continue. */
256 	cdev_count = rte_compressdev_count();
257 	if (cdev_count == 0) {
258 		return 0;
259 	}
260 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
261 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
262 		return -EINVAL;
263 	}
264 
265 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
266 	if (g_mbuf_offset < 0) {
267 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
268 		return -EINVAL;
269 	}
270 
271 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
272 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
273 	if (g_mbuf_mp == NULL) {
274 		SPDK_ERRLOG("Cannot create mbuf pool\n");
275 		rc = -ENOMEM;
276 		goto error_create_mbuf;
277 	}
278 
279 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
280 					       0, rte_socket_id());
281 	if (g_comp_op_mp == NULL) {
282 		SPDK_ERRLOG("Cannot create comp op pool\n");
283 		rc = -ENOMEM;
284 		goto error_create_op;
285 	}
286 
287 	/* Init all devices */
288 	for (i = 0; i < cdev_count; i++) {
289 		rc = create_compress_dev(i);
290 		if (rc != 0) {
291 			goto error_create_compress_devs;
292 		}
293 	}
294 
295 	if (g_qat_available == true) {
296 		SPDK_NOTICELOG("initialized QAT PMD\n");
297 	}
298 
299 	g_shinfo.free_cb = shinfo_free_cb;
300 
301 	return 0;
302 
303 	/* Error cleanup paths. */
304 error_create_compress_devs:
305 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
306 		TAILQ_REMOVE(&g_compress_devs, device, link);
307 		free(device);
308 	}
309 error_create_op:
310 error_create_mbuf:
311 	rte_mempool_free(g_mbuf_mp);
312 
313 	return rc;
314 }
315 
316 int
317 accel_compressdev_enable_probe(enum compress_pmd *opts)
318 {
319 	g_opts = *opts;
320 	g_compressdev_enable = true;
321 
322 	return 0;
323 }
324 
325 static int
326 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
327 		     struct iovec *iovs, int iovcnt, struct spdk_accel_task *task)
328 {
329 	uint64_t iovec_length, updated_length, phys_addr;
330 	uint64_t processed, mbuf_length, remainder;
331 	uint8_t *current_base = NULL;
332 	int iov_index, mbuf_index;
333 	int rc = 0;
334 
335 	/* Setup mbufs */
336 	iov_index = mbuf_index = 0;
337 	while (iov_index < iovcnt) {
338 
339 		processed = 0;
340 		iovec_length = iovs[iov_index].iov_len;
341 
342 		current_base = iovs[iov_index].iov_base;
343 		if (total_length) {
344 			*total_length += iovec_length;
345 		}
346 
347 		assert(mbufs[mbuf_index] != NULL);
348 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task;
349 
350 		do {
351 			/* new length is min of remaining left or max mbuf size of MBUF_SPLIT */
352 			mbuf_length = updated_length = spdk_min(MBUF_SPLIT, iovec_length - processed);
353 
354 			phys_addr = spdk_vtophys((void *)current_base, &updated_length);
355 
356 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
357 						  current_base,
358 						  phys_addr,
359 						  updated_length,
360 						  &g_shinfo);
361 			rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
362 			remainder = mbuf_length - updated_length;
363 
364 			/* although the mbufs were preallocated, we still need to chain them */
365 			if (mbuf_index > 0) {
366 				rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
367 			}
368 
369 			/* keep track of the total we've put into the mbuf chain */
370 			processed += updated_length;
371 			/* bump the base by what was previously added */
372 			current_base += updated_length;
373 
374 			/* If we crossed 2MB boundary we need another mbuf for the remainder */
375 			if (remainder > 0) {
376 
377 				assert(remainder <= MBUF_SPLIT);
378 
379 				/* allocate an mbuf at the end of the array */
380 				rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
381 							    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
382 				if (rc) {
383 					SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
384 					return -1;
385 				}
386 				(*mbuf_total)++;
387 				mbuf_index++;
388 				*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task;
389 
390 				/* bump the base by what was previously added */
391 				current_base += updated_length;
392 
393 				updated_length = remainder;
394 				phys_addr = spdk_vtophys((void *)current_base, &updated_length);
395 
396 				/* assert we don't cross another */
397 				assert(remainder == updated_length);
398 
399 				rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
400 							  current_base,
401 							  phys_addr,
402 							  remainder,
403 							  &g_shinfo);
404 				rte_pktmbuf_append(mbufs[mbuf_index], remainder);
405 				rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
406 
407 				/* keep track of the total we've put into the mbuf chain */
408 				processed += remainder;
409 			}
410 
411 			mbuf_index++;
412 
413 		} while (processed < iovec_length);
414 
415 		assert(processed == iovec_length);
416 		iov_index++;
417 	}
418 
419 	return 0;
420 }
421 
422 static int
423 _compress_operation(struct compress_io_channel *chan,  struct spdk_accel_task *task)
424 {
425 	int dst_iovcnt = task->d.iovcnt;
426 	struct iovec *dst_iovs = task->d.iovs;
427 	int src_iovcnt = task->s.iovcnt;
428 	struct iovec *src_iovs = task->s.iovs;
429 	struct rte_comp_op *comp_op;
430 	uint8_t cdev_id;
431 	uint64_t total_length = 0;
432 	int rc = 0, i;
433 	int src_mbuf_total = 0;
434 	int dst_mbuf_total = 0;
435 	bool device_error = false;
436 	bool compress = (task->op_code == SPDK_ACCEL_OPC_COMPRESS);
437 
438 	assert(chan->device_qp->device != NULL);
439 	cdev_id = chan->device_qp->device->cdev_id;
440 
441 	/* calc our mbuf totals based on max MBUF size allowed so we can pre-alloc mbufs in bulk */
442 	for (i = 0 ; i < src_iovcnt; i++) {
443 		src_mbuf_total += spdk_divide_round_up(src_iovs[i].iov_len, MBUF_SPLIT);
444 	}
445 	for (i = 0 ; i < dst_iovcnt; i++) {
446 		dst_mbuf_total += spdk_divide_round_up(dst_iovs[i].iov_len, MBUF_SPLIT);
447 	}
448 
449 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
450 	if (!comp_op) {
451 		SPDK_ERRLOG("trying to get a comp op!\n");
452 		rc = -ENOMEM;
453 		goto error_get_op;
454 	}
455 
456 	/* get an mbuf per iov, src and dst */
457 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->src_mbufs, src_mbuf_total);
458 	if (rc) {
459 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
460 		rc = -ENOMEM;
461 		goto error_get_src;
462 	}
463 	assert(chan->src_mbufs[0]);
464 
465 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->dst_mbufs, dst_mbuf_total);
466 	if (rc) {
467 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
468 		rc = -ENOMEM;
469 		goto error_get_dst;
470 	}
471 	assert(chan->dst_mbufs[0]);
472 
473 	rc = _setup_compress_mbuf(chan->src_mbufs, &src_mbuf_total, &total_length,
474 				  src_iovs, src_iovcnt, task);
475 
476 	if (rc < 0) {
477 		goto error_src_dst;
478 	}
479 	if (!chan->device_qp->device->sgl_in && src_mbuf_total > 1) {
480 		SPDK_ERRLOG("Src buffer uses chained mbufs but driver %s doesn't support SGL input\n",
481 			    chan->drv_name);
482 		rc = -EINVAL;
483 		goto error_src_dst;
484 	}
485 
486 	comp_op->m_src = chan->src_mbufs[0];
487 	comp_op->src.offset = 0;
488 	comp_op->src.length = total_length;
489 
490 	rc = _setup_compress_mbuf(chan->dst_mbufs, &dst_mbuf_total, NULL,
491 				  dst_iovs, dst_iovcnt, task);
492 	if (rc < 0) {
493 		goto error_src_dst;
494 	}
495 	if (!chan->device_qp->device->sgl_out && dst_mbuf_total > 1) {
496 		SPDK_ERRLOG("Dst buffer uses chained mbufs but driver %s doesn't support SGL output\n",
497 			    chan->drv_name);
498 		rc = -EINVAL;
499 		goto error_src_dst;
500 	}
501 
502 	comp_op->m_dst = chan->dst_mbufs[0];
503 	comp_op->dst.offset = 0;
504 
505 	if (compress == true) {
506 		comp_op->private_xform = chan->device_qp->device->comp_xform;
507 	} else {
508 		comp_op->private_xform = chan->device_qp->device->decomp_xform;
509 	}
510 
511 	comp_op->op_type = RTE_COMP_OP_STATELESS;
512 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
513 
514 	rc = rte_compressdev_enqueue_burst(cdev_id, chan->device_qp->qp, &comp_op, 1);
515 	assert(rc <= 1);
516 
517 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
518 	if (rc == 1) {
519 		return 0;
520 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
521 		rc = -EAGAIN;
522 	} else {
523 		device_error = true;
524 	}
525 
526 	/* Error cleanup paths. */
527 error_src_dst:
528 	rte_pktmbuf_free_bulk(chan->dst_mbufs, dst_iovcnt);
529 error_get_dst:
530 	rte_pktmbuf_free_bulk(chan->src_mbufs, src_iovcnt);
531 error_get_src:
532 	rte_comp_op_free(comp_op);
533 error_get_op:
534 
535 	if (device_error == true) {
536 		/* There was an error sending the op to the device, most
537 		 * likely with the parameters.
538 		 */
539 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
540 		return -EINVAL;
541 	}
542 	if (rc != -ENOMEM && rc != -EAGAIN) {
543 		return rc;
544 	}
545 
546 	TAILQ_INSERT_TAIL(&chan->queued_tasks, task, link);
547 	return 0;
548 }
549 
550 /* Poller for the DPDK compression driver. */
551 static int
552 comp_dev_poller(void *args)
553 {
554 	struct compress_io_channel *chan = args;
555 	uint8_t cdev_id;
556 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
557 	uint16_t num_deq;
558 	struct spdk_accel_task *task, *task_to_resubmit;
559 	int rc, i, status;
560 
561 	assert(chan->device_qp->device != NULL);
562 	cdev_id = chan->device_qp->device->cdev_id;
563 
564 	num_deq = rte_compressdev_dequeue_burst(cdev_id, chan->device_qp->qp, deq_ops,
565 						NUM_MAX_INFLIGHT_OPS);
566 	for (i = 0; i < num_deq; i++) {
567 
568 		/* We store this off regardless of success/error so we know how to contruct the
569 		 * next task
570 		 */
571 		task = (struct spdk_accel_task *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
572 				uint64_t *);
573 		status = deq_ops[i]->status;
574 
575 		if (spdk_likely(status == RTE_COMP_OP_STATUS_SUCCESS)) {
576 			if (task->output_size != NULL) {
577 				*task->output_size = deq_ops[i]->produced;
578 			}
579 		} else {
580 			SPDK_NOTICELOG("Deque status %u\n", status);
581 		}
582 
583 		spdk_accel_task_complete(task, status);
584 
585 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
586 		 * call takes care of freeing all of the mbufs in the chain back to their
587 		 * original pool.
588 		 */
589 		rte_pktmbuf_free(deq_ops[i]->m_src);
590 		rte_pktmbuf_free(deq_ops[i]->m_dst);
591 
592 		/* There is no bulk free for com ops so we have to free them one at a time
593 		 * here however it would be rare that we'd ever have more than 1 at a time
594 		 * anyways.
595 		 */
596 		rte_comp_op_free(deq_ops[i]);
597 
598 		/* Check if there are any pending comp ops to process, only pull one
599 		 * at a time off as _compress_operation() may re-queue the op.
600 		 */
601 		if (!TAILQ_EMPTY(&chan->queued_tasks)) {
602 			task_to_resubmit = TAILQ_FIRST(&chan->queued_tasks);
603 			rc = _compress_operation(chan, task_to_resubmit);
604 			if (rc == 0) {
605 				TAILQ_REMOVE(&chan->queued_tasks, task_to_resubmit, link);
606 			}
607 		}
608 	}
609 
610 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
611 }
612 
613 static int
614 _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task)
615 {
616 	struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch);
617 	int rc;
618 
619 	rc = _compress_operation(chan, task);
620 	if (rc) {
621 		SPDK_ERRLOG("Error (%d) in comrpess operation\n", rc);
622 		assert(false);
623 	}
624 
625 	return rc;
626 }
627 
628 static int
629 compress_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task)
630 {
631 	struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch);
632 	struct spdk_accel_task *task, *tmp;
633 	int rc = 0;
634 
635 	task = first_task;
636 
637 	if (!TAILQ_EMPTY(&chan->queued_tasks)) {
638 		goto queue_tasks;
639 	}
640 
641 	/* The caller will either submit a single task or a group of tasks that are
642 	 * linked together but they cannot be on a list. For example, see poller
643 	 * where a list of queued tasks is being resubmitted, the list they are on
644 	 * is initialized after saving off the first task from the list which is then
645 	 * passed in here.  Similar thing is done in the accel framework.
646 	 */
647 	while (task) {
648 		tmp = TAILQ_NEXT(task, link);
649 		rc = _process_single_task(ch, task);
650 
651 		if (rc == -EBUSY) {
652 			goto queue_tasks;
653 		} else if (rc) {
654 			spdk_accel_task_complete(task, rc);
655 		}
656 		task = tmp;
657 	}
658 
659 	return 0;
660 
661 queue_tasks:
662 	while (task != NULL) {
663 		tmp = TAILQ_NEXT(task, link);
664 		TAILQ_INSERT_TAIL(&chan->queued_tasks, task, link);
665 		task = tmp;
666 	}
667 	return 0;
668 }
669 
670 static bool
671 _set_pmd(struct compress_io_channel *chan)
672 {
673 
674 	/* Note: the compress_isal PMD is not supported as accel_fw supports native ISAL
675 	 * using the accel_sw module */
676 	if (g_opts == COMPRESS_PMD_AUTO) {
677 		if (g_qat_available) {
678 			chan->drv_name = QAT_PMD;
679 		} else if (g_mlx5_pci_available) {
680 			chan->drv_name = MLX5_PMD;
681 		}
682 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
683 		chan->drv_name = QAT_PMD;
684 	} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
685 		chan->drv_name = MLX5_PMD;
686 	} else {
687 		SPDK_ERRLOG("Requested PMD is not available.\n");
688 		return false;
689 	}
690 	SPDK_NOTICELOG("Channel %p PMD being used: %s\n", chan, chan->drv_name);
691 	return true;
692 }
693 
694 static int compress_create_cb(void *io_device, void *ctx_buf);
695 static void compress_destroy_cb(void *io_device, void *ctx_buf);
696 static struct spdk_accel_module_if g_compress_module;
697 static int
698 accel_compress_init(void)
699 {
700 	int rc;
701 
702 	if (!g_compressdev_enable) {
703 		return -EINVAL;
704 	}
705 
706 	rc = accel_init_compress_drivers();
707 	if (rc) {
708 		assert(TAILQ_EMPTY(&g_compress_devs));
709 		SPDK_NOTICELOG("no available compression devices\n");
710 		return -EINVAL;
711 	}
712 
713 	g_compressdev_initialized = true;
714 	spdk_io_device_register(&g_compress_module, compress_create_cb, compress_destroy_cb,
715 				sizeof(struct compress_io_channel), "compressdev_accel_module");
716 	return 0;
717 }
718 
719 static int
720 compress_create_cb(void *io_device, void *ctx_buf)
721 {
722 	struct compress_io_channel *chan = ctx_buf;
723 	const struct rte_compressdev_capabilities *capab;
724 	struct comp_device_qp *device_qp;
725 	size_t length;
726 
727 	if (_set_pmd(chan) == false) {
728 		assert(false);
729 		return -ENODEV;
730 	}
731 
732 	/* The following variable length arrays of mbuf pointers are required to submit to compressdev */
733 	length = NUM_MBUFS * sizeof(void *);
734 	chan->src_mbufs = spdk_zmalloc(length, 0x40, NULL,
735 				       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
736 	if (chan->src_mbufs == NULL) {
737 		return -ENOMEM;
738 	}
739 	chan->dst_mbufs = spdk_zmalloc(length, 0x40, NULL,
740 				       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
741 	if (chan->dst_mbufs == NULL) {
742 		free(chan->src_mbufs);
743 		return -ENOMEM;
744 	}
745 
746 	chan->poller = SPDK_POLLER_REGISTER(comp_dev_poller, chan, 0);
747 	TAILQ_INIT(&chan->queued_tasks);
748 
749 	pthread_mutex_lock(&g_comp_device_qp_lock);
750 	TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
751 		if (strcmp(device_qp->device->cdev_info.driver_name, chan->drv_name) == 0) {
752 			if (device_qp->chan == NULL) {
753 				chan->device_qp = device_qp;
754 				device_qp->chan = chan;
755 				break;
756 			}
757 		}
758 	}
759 	pthread_mutex_unlock(&g_comp_device_qp_lock);
760 
761 	if (chan->device_qp == NULL) {
762 		SPDK_ERRLOG("out of qpairs, cannot assign one\n");
763 		assert(false);
764 		return -ENOMEM;
765 	} else {
766 		capab = rte_compressdev_capability_get(0, RTE_COMP_ALGO_DEFLATE);
767 
768 		if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) {
769 			chan->device_qp->device->sgl_in = true;
770 		}
771 
772 		if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) {
773 			chan->device_qp->device->sgl_out = true;
774 		}
775 	}
776 
777 	return 0;
778 }
779 
780 static void
781 accel_compress_write_config_json(struct spdk_json_write_ctx *w)
782 {
783 	if (g_compressdev_enable) {
784 		spdk_json_write_object_begin(w);
785 		spdk_json_write_named_string(w, "method", "compressdev_scan_accel_module");
786 		spdk_json_write_named_object_begin(w, "params");
787 		spdk_json_write_named_uint32(w, "pmd", g_opts);
788 		spdk_json_write_object_end(w);
789 		spdk_json_write_object_end(w);
790 	}
791 }
792 
793 static void
794 compress_destroy_cb(void *io_device, void *ctx_buf)
795 {
796 	struct compress_io_channel *chan = ctx_buf;
797 	struct comp_device_qp *device_qp = chan->device_qp;
798 
799 	spdk_free(chan->src_mbufs);
800 	spdk_free(chan->dst_mbufs);
801 
802 	spdk_poller_unregister(&chan->poller);
803 
804 	pthread_mutex_lock(&g_comp_device_qp_lock);
805 	chan->device_qp = NULL;
806 	device_qp->chan = NULL;
807 	pthread_mutex_unlock(&g_comp_device_qp_lock);
808 }
809 
810 static size_t
811 accel_compress_get_ctx_size(void)
812 {
813 	return 0;
814 }
815 
816 static bool
817 compress_supports_opcode(enum spdk_accel_opcode opc)
818 {
819 	if (g_mlx5_pci_available || g_qat_available) {
820 		switch (opc) {
821 		case SPDK_ACCEL_OPC_COMPRESS:
822 		case SPDK_ACCEL_OPC_DECOMPRESS:
823 			return true;
824 		default:
825 			break;
826 		}
827 	}
828 
829 	return false;
830 }
831 
832 static struct spdk_io_channel *
833 compress_get_io_channel(void)
834 {
835 	return spdk_get_io_channel(&g_compress_module);
836 }
837 
838 static void accel_compress_exit(void *ctx);
839 static struct spdk_accel_module_if g_compress_module = {
840 	.module_init		= accel_compress_init,
841 	.module_fini		= accel_compress_exit,
842 	.write_config_json	= accel_compress_write_config_json,
843 	.get_ctx_size		= accel_compress_get_ctx_size,
844 	.name			= "dpdk_compressdev",
845 	.supports_opcode	= compress_supports_opcode,
846 	.get_io_channel		= compress_get_io_channel,
847 	.submit_tasks		= compress_submit_tasks
848 };
849 
850 void
851 accel_dpdk_compressdev_enable(void)
852 {
853 	spdk_accel_module_list_add(&g_compress_module);
854 }
855 
856 /* Callback for unregistering the IO device. */
857 static void
858 _device_unregister_cb(void *io_device)
859 {
860 	struct comp_device_qp *dev_qp;
861 	struct compress_dev *device;
862 
863 	while ((device = TAILQ_FIRST(&g_compress_devs))) {
864 		TAILQ_REMOVE(&g_compress_devs, device, link);
865 		free(device);
866 	}
867 
868 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
869 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
870 		free(dev_qp);
871 	}
872 
873 	pthread_mutex_destroy(&g_comp_device_qp_lock);
874 
875 	rte_mempool_free(g_comp_op_mp);
876 	rte_mempool_free(g_mbuf_mp);
877 
878 	spdk_accel_module_finish();
879 }
880 
881 static void
882 accel_compress_exit(void *ctx)
883 {
884 	if (g_compressdev_initialized) {
885 		spdk_io_device_unregister(&g_compress_module, _device_unregister_cb);
886 		g_compressdev_initialized = false;
887 	} else {
888 		spdk_accel_module_finish();
889 	}
890 }
891