xref: /spdk/module/accel/dpdk_compressdev/accel_dpdk_compressdev.c (revision 698b2423d5f98e56c36dcf8484205bb034d0f6f5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "accel_dpdk_compressdev.h"
8 #include "spdk/accel_module.h"
9 
10 #include "spdk/stdinc.h"
11 #include "spdk/rpc.h"
12 #include "spdk/env.h"
13 #include "spdk/endian.h"
14 #include "spdk/string.h"
15 #include "spdk/thread.h"
16 #include "spdk/util.h"
17 #include "spdk/likely.h"
18 
19 #include "spdk/log.h"
20 
21 #include <rte_config.h>
22 #include <rte_bus_vdev.h>
23 #include <rte_compressdev.h>
24 #include <rte_comp.h>
25 #include <rte_mbuf_dyn.h>
26 
27 /* Used to store IO context in mbuf */
28 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
29 	.name = "context_accel_comp",
30 	.size = sizeof(uint64_t),
31 	.align = __alignof__(uint64_t),
32 	.flags = 0,
33 };
34 static int g_mbuf_offset;
35 static enum compress_pmd g_opts;
36 static bool g_compressdev_enable = false;
37 static bool g_compressdev_initialized = false;
38 
39 #define NUM_MAX_XFORMS		2
40 #define NUM_MAX_INFLIGHT_OPS	128
41 #define DEFAULT_WINDOW_SIZE	15
42 #define MBUF_SPLIT		(1UL << DEFAULT_WINDOW_SIZE)
43 #define QAT_PMD			"compress_qat"
44 #define MLX5_PMD		"mlx5_pci"
45 #define UADK_PMD		"compress_uadk"
46 #define NUM_MBUFS		65536
47 #define POOL_CACHE_SIZE		256
48 
49 /* Global list of available compression devices. */
50 struct compress_dev {
51 	struct rte_compressdev_info	cdev_info;	/* includes device friendly name */
52 	uint8_t				cdev_id;	/* identifier for the device */
53 	void				*comp_xform;	/* shared private xform for comp on this PMD */
54 	void				*decomp_xform;	/* shared private xform for decomp on this PMD */
55 	bool				sgl_in;
56 	bool				sgl_out;
57 	TAILQ_ENTRY(compress_dev)	link;
58 };
59 static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
60 
61 #define MAX_NUM_QP 48
62 /* Global list and lock for unique device/queue pair combos */
63 struct comp_device_qp {
64 	struct compress_dev		*device;	/* ptr to compression device */
65 	uint8_t				qp;		/* queue pair for this node */
66 	struct compress_io_channel	*chan;
67 	TAILQ_ENTRY(comp_device_qp)	link;
68 };
69 static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
70 static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
71 
72 struct compress_io_channel {
73 	char				*drv_name;	/* name of the compression device driver */
74 	struct comp_device_qp		*device_qp;
75 	struct spdk_poller		*poller;
76 	struct rte_mbuf			**src_mbufs;
77 	struct rte_mbuf			**dst_mbufs;
78 	STAILQ_HEAD(, spdk_accel_task)	queued_tasks;
79 };
80 
81 /* Shared mempools between all devices on this system */
82 static struct rte_mempool *g_mbuf_mp = NULL;		/* mbuf mempool */
83 static struct rte_mempool *g_comp_op_mp = NULL;		/* comp operations, must be rte* mempool */
84 static struct rte_mbuf_ext_shared_info g_shinfo = {};	/* used by DPDK mbuf macros */
85 static bool g_qat_available = false;
86 static bool g_mlx5_pci_available = false;
87 static bool g_uadk_available = false;
88 
89 /* Create shared (between all ops per PMD) compress xforms. */
90 static struct rte_comp_xform g_comp_xform = {
91 	.type = RTE_COMP_COMPRESS,
92 	.compress = {
93 		.algo = RTE_COMP_ALGO_DEFLATE,
94 		.deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
95 		.level = RTE_COMP_LEVEL_MAX,
96 		.window_size = DEFAULT_WINDOW_SIZE,
97 		.chksum = RTE_COMP_CHECKSUM_NONE,
98 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
99 	}
100 };
101 /* Create shared (between all ops per PMD) decompress xforms. */
102 static struct rte_comp_xform g_decomp_xform = {
103 	.type = RTE_COMP_DECOMPRESS,
104 	.decompress = {
105 		.algo = RTE_COMP_ALGO_DEFLATE,
106 		.chksum = RTE_COMP_CHECKSUM_NONE,
107 		.window_size = DEFAULT_WINDOW_SIZE,
108 		.hash_algo = RTE_COMP_HASH_ALGO_NONE
109 	}
110 };
111 
112 /* Dummy function used by DPDK to free ext attached buffers
113  * to mbufs, we free them ourselves but this callback has to
114  * be here.
115  */
116 static void
117 shinfo_free_cb(void *arg1, void *arg2)
118 {
119 }
120 
121 /* Called by accel_init_compress_drivers() to init each discovered compression device */
122 static int
123 create_compress_dev(uint8_t index)
124 {
125 	struct compress_dev *device;
126 	uint16_t q_pairs;
127 	uint8_t cdev_id;
128 	int rc, i;
129 	struct comp_device_qp *dev_qp;
130 	struct comp_device_qp *tmp_qp;
131 
132 	device = calloc(1, sizeof(struct compress_dev));
133 	if (!device) {
134 		return -ENOMEM;
135 	}
136 
137 	/* Get details about this device. */
138 	rte_compressdev_info_get(index, &device->cdev_info);
139 
140 	cdev_id = device->cdev_id = index;
141 
142 	/* Zero means no limit so choose number of lcores. */
143 	if (device->cdev_info.max_nb_queue_pairs == 0) {
144 		q_pairs = MAX_NUM_QP;
145 	} else {
146 		q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
147 	}
148 
149 	/* Configure the compression device. */
150 	struct rte_compressdev_config config = {
151 		.socket_id = rte_socket_id(),
152 		.nb_queue_pairs = q_pairs,
153 		.max_nb_priv_xforms = NUM_MAX_XFORMS,
154 		.max_nb_streams = 0
155 	};
156 	rc = rte_compressdev_configure(cdev_id, &config);
157 	if (rc < 0) {
158 		SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
159 		goto err_close;
160 	}
161 
162 	/* Pre-setup all potential qpairs now and assign them in the channel
163 	 * callback.
164 	 */
165 	for (i = 0; i < q_pairs; i++) {
166 		rc = rte_compressdev_queue_pair_setup(cdev_id, i,
167 						      NUM_MAX_INFLIGHT_OPS,
168 						      rte_socket_id());
169 		if (rc) {
170 			if (i > 0) {
171 				q_pairs = i;
172 				SPDK_NOTICELOG("FYI failed to setup a queue pair on "
173 					       "compressdev %u with error %u "
174 					       "so limiting to %u qpairs\n",
175 					       cdev_id, rc, q_pairs);
176 				break;
177 			} else {
178 				SPDK_ERRLOG("Failed to setup queue pair on "
179 					    "compressdev %u with error %u\n", cdev_id, rc);
180 				rc = -EINVAL;
181 				goto err_close;
182 			}
183 		}
184 	}
185 
186 	rc = rte_compressdev_start(cdev_id);
187 	if (rc < 0) {
188 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
189 			    cdev_id, rc);
190 		goto err_close;
191 	}
192 
193 	if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
194 		rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
195 				&device->comp_xform);
196 		if (rc < 0) {
197 			SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
198 				    cdev_id, rc);
199 			goto err_stop;
200 		}
201 
202 		rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
203 				&device->decomp_xform);
204 		if (rc) {
205 			SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
206 				    cdev_id, rc);
207 			goto err_stop;
208 		}
209 	} else {
210 		SPDK_ERRLOG("PMD does not support shared transforms\n");
211 		goto err_stop;
212 	}
213 
214 	/* Build up list of device/qp combinations */
215 	for (i = 0; i < q_pairs; i++) {
216 		dev_qp = calloc(1, sizeof(struct comp_device_qp));
217 		if (!dev_qp) {
218 			rc = -ENOMEM;
219 			goto err_qp;
220 		}
221 		dev_qp->device = device;
222 		dev_qp->qp = i;
223 		dev_qp->chan = NULL;
224 		TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
225 	}
226 
227 	TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
228 
229 	if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
230 		g_qat_available = true;
231 	}
232 
233 	if (strcmp(device->cdev_info.driver_name, MLX5_PMD) == 0) {
234 		g_mlx5_pci_available = true;
235 	}
236 
237 	if (strcmp(device->cdev_info.driver_name, UADK_PMD) == 0) {
238 		g_uadk_available = true;
239 	}
240 
241 	return 0;
242 
243 err_qp:
244 	TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
245 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
246 		free(dev_qp);
247 	}
248 err_stop:
249 	rte_compressdev_stop(cdev_id);
250 err_close:
251 	rte_compressdev_close(cdev_id);
252 	free(device);
253 	return rc;
254 }
255 
256 /* Called from driver init entry point, accel_compress_init() */
257 static int
258 accel_init_compress_drivers(void)
259 {
260 	uint8_t cdev_count, i;
261 	struct compress_dev *tmp_dev;
262 	struct compress_dev *device;
263 	int rc;
264 
265 	/* If we have no compression devices, report error to fallback on other modules. */
266 	cdev_count = rte_compressdev_count();
267 	if (cdev_count == 0) {
268 		return -ENODEV;
269 	}
270 	if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
271 		SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
272 		return -EINVAL;
273 	}
274 
275 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
276 	if (g_mbuf_offset < 0) {
277 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
278 		return -EINVAL;
279 	}
280 
281 	g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
282 					    sizeof(struct rte_mbuf), 0, rte_socket_id());
283 	if (g_mbuf_mp == NULL) {
284 		SPDK_ERRLOG("Cannot create mbuf pool\n");
285 		rc = -ENOMEM;
286 		goto error_create_mbuf;
287 	}
288 
289 	g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
290 					       0, rte_socket_id());
291 	if (g_comp_op_mp == NULL) {
292 		SPDK_ERRLOG("Cannot create comp op pool\n");
293 		rc = -ENOMEM;
294 		goto error_create_op;
295 	}
296 
297 	/* Init all devices */
298 	for (i = 0; i < cdev_count; i++) {
299 		rc = create_compress_dev(i);
300 		if (rc != 0) {
301 			goto error_create_compress_devs;
302 		}
303 	}
304 
305 	if (g_qat_available == true) {
306 		SPDK_NOTICELOG("initialized QAT PMD\n");
307 	}
308 
309 	g_shinfo.free_cb = shinfo_free_cb;
310 
311 	return 0;
312 
313 	/* Error cleanup paths. */
314 error_create_compress_devs:
315 	TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
316 		TAILQ_REMOVE(&g_compress_devs, device, link);
317 		free(device);
318 	}
319 error_create_op:
320 error_create_mbuf:
321 	rte_mempool_free(g_mbuf_mp);
322 
323 	return rc;
324 }
325 
326 int
327 accel_compressdev_enable_probe(enum compress_pmd *opts)
328 {
329 	g_opts = *opts;
330 	g_compressdev_enable = true;
331 
332 	return 0;
333 }
334 
335 static int
336 _setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
337 		     struct iovec *iovs, int iovcnt, struct spdk_accel_task *task)
338 {
339 	uint64_t iovec_length, updated_length, phys_addr;
340 	uint64_t processed, mbuf_length, remainder;
341 	uint8_t *current_base = NULL;
342 	int iov_index, mbuf_index;
343 	int rc = 0;
344 
345 	/* Setup mbufs */
346 	iov_index = mbuf_index = 0;
347 	while (iov_index < iovcnt) {
348 
349 		processed = 0;
350 		iovec_length = iovs[iov_index].iov_len;
351 
352 		current_base = iovs[iov_index].iov_base;
353 		if (total_length) {
354 			*total_length += iovec_length;
355 		}
356 
357 		assert(mbufs[mbuf_index] != NULL);
358 		*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task;
359 
360 		do {
361 			/* new length is min of remaining left or max mbuf size of MBUF_SPLIT */
362 			mbuf_length = updated_length = spdk_min(MBUF_SPLIT, iovec_length - processed);
363 
364 			phys_addr = spdk_vtophys((void *)current_base, &updated_length);
365 
366 			rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
367 						  current_base,
368 						  phys_addr,
369 						  updated_length,
370 						  &g_shinfo);
371 			rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
372 			remainder = mbuf_length - updated_length;
373 
374 			/* although the mbufs were preallocated, we still need to chain them */
375 			if (mbuf_index > 0) {
376 				rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
377 			}
378 
379 			/* keep track of the total we've put into the mbuf chain */
380 			processed += updated_length;
381 			/* bump the base by what was previously added */
382 			current_base += updated_length;
383 
384 			/* If we crossed 2MB boundary we need another mbuf for the remainder */
385 			if (remainder > 0) {
386 
387 				assert(remainder <= MBUF_SPLIT);
388 
389 				/* allocate an mbuf at the end of the array */
390 				rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
391 							    (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
392 				if (rc) {
393 					SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
394 					return -1;
395 				}
396 				(*mbuf_total)++;
397 				mbuf_index++;
398 				*RTE_MBUF_DYNFIELD(mbufs[mbuf_index], g_mbuf_offset, uint64_t *) = (uint64_t)task;
399 
400 				/* bump the base by what was previously added */
401 				current_base += updated_length;
402 
403 				updated_length = remainder;
404 				phys_addr = spdk_vtophys((void *)current_base, &updated_length);
405 
406 				/* assert we don't cross another */
407 				assert(remainder == updated_length);
408 
409 				rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
410 							  current_base,
411 							  phys_addr,
412 							  remainder,
413 							  &g_shinfo);
414 				rte_pktmbuf_append(mbufs[mbuf_index], remainder);
415 				rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
416 
417 				/* keep track of the total we've put into the mbuf chain */
418 				processed += remainder;
419 			}
420 
421 			mbuf_index++;
422 
423 		} while (processed < iovec_length);
424 
425 		assert(processed == iovec_length);
426 		iov_index++;
427 	}
428 
429 	return 0;
430 }
431 
432 static int
433 _compress_operation(struct compress_io_channel *chan,  struct spdk_accel_task *task)
434 {
435 	int dst_iovcnt = task->d.iovcnt;
436 	struct iovec *dst_iovs = task->d.iovs;
437 	int src_iovcnt = task->s.iovcnt;
438 	struct iovec *src_iovs = task->s.iovs;
439 	struct rte_comp_op *comp_op;
440 	uint8_t cdev_id;
441 	uint64_t total_length = 0;
442 	int rc = 0, i;
443 	int src_mbuf_total = 0;
444 	int dst_mbuf_total = 0;
445 	bool device_error = false;
446 	bool compress = (task->op_code == SPDK_ACCEL_OPC_COMPRESS);
447 
448 	assert(chan->device_qp->device != NULL);
449 	cdev_id = chan->device_qp->device->cdev_id;
450 
451 	/* calc our mbuf totals based on max MBUF size allowed so we can pre-alloc mbufs in bulk */
452 	for (i = 0 ; i < src_iovcnt; i++) {
453 		src_mbuf_total += spdk_divide_round_up(src_iovs[i].iov_len, MBUF_SPLIT);
454 	}
455 	for (i = 0 ; i < dst_iovcnt; i++) {
456 		dst_mbuf_total += spdk_divide_round_up(dst_iovs[i].iov_len, MBUF_SPLIT);
457 	}
458 
459 	comp_op = rte_comp_op_alloc(g_comp_op_mp);
460 	if (!comp_op) {
461 		SPDK_ERRLOG("trying to get a comp op!\n");
462 		rc = -ENOMEM;
463 		goto error_get_op;
464 	}
465 
466 	/* get an mbuf per iov, src and dst */
467 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->src_mbufs, src_mbuf_total);
468 	if (rc) {
469 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
470 		rc = -ENOMEM;
471 		goto error_get_src;
472 	}
473 	assert(chan->src_mbufs[0]);
474 
475 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, chan->dst_mbufs, dst_mbuf_total);
476 	if (rc) {
477 		SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
478 		rc = -ENOMEM;
479 		goto error_get_dst;
480 	}
481 	assert(chan->dst_mbufs[0]);
482 
483 	rc = _setup_compress_mbuf(chan->src_mbufs, &src_mbuf_total, &total_length,
484 				  src_iovs, src_iovcnt, task);
485 
486 	if (rc < 0) {
487 		goto error_src_dst;
488 	}
489 	if (!chan->device_qp->device->sgl_in && src_mbuf_total > 1) {
490 		SPDK_ERRLOG("Src buffer uses chained mbufs but driver %s doesn't support SGL input\n",
491 			    chan->drv_name);
492 		rc = -EINVAL;
493 		goto error_src_dst;
494 	}
495 
496 	comp_op->m_src = chan->src_mbufs[0];
497 	comp_op->src.offset = 0;
498 	comp_op->src.length = total_length;
499 
500 	rc = _setup_compress_mbuf(chan->dst_mbufs, &dst_mbuf_total, NULL,
501 				  dst_iovs, dst_iovcnt, task);
502 	if (rc < 0) {
503 		goto error_src_dst;
504 	}
505 	if (!chan->device_qp->device->sgl_out && dst_mbuf_total > 1) {
506 		SPDK_ERRLOG("Dst buffer uses chained mbufs but driver %s doesn't support SGL output\n",
507 			    chan->drv_name);
508 		rc = -EINVAL;
509 		goto error_src_dst;
510 	}
511 
512 	comp_op->m_dst = chan->dst_mbufs[0];
513 	comp_op->dst.offset = 0;
514 
515 	if (compress == true) {
516 		comp_op->private_xform = chan->device_qp->device->comp_xform;
517 	} else {
518 		comp_op->private_xform = chan->device_qp->device->decomp_xform;
519 	}
520 
521 	comp_op->op_type = RTE_COMP_OP_STATELESS;
522 	comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
523 
524 	rc = rte_compressdev_enqueue_burst(cdev_id, chan->device_qp->qp, &comp_op, 1);
525 	assert(rc <= 1);
526 
527 	/* We always expect 1 got queued, if 0 then we need to queue it up. */
528 	if (rc == 1) {
529 		return 0;
530 	} else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
531 		rc = -EAGAIN;
532 	} else {
533 		device_error = true;
534 	}
535 
536 	/* Error cleanup paths. */
537 error_src_dst:
538 	rte_pktmbuf_free_bulk(chan->dst_mbufs, dst_iovcnt);
539 error_get_dst:
540 	rte_pktmbuf_free_bulk(chan->src_mbufs, src_iovcnt);
541 error_get_src:
542 	rte_comp_op_free(comp_op);
543 error_get_op:
544 
545 	if (device_error == true) {
546 		/* There was an error sending the op to the device, most
547 		 * likely with the parameters.
548 		 */
549 		SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
550 		return -EINVAL;
551 	}
552 	if (rc != -ENOMEM && rc != -EAGAIN) {
553 		return rc;
554 	}
555 
556 	STAILQ_INSERT_TAIL(&chan->queued_tasks, task, link);
557 	return 0;
558 }
559 
560 /* Poller for the DPDK compression driver. */
561 static int
562 comp_dev_poller(void *args)
563 {
564 	struct compress_io_channel *chan = args;
565 	uint8_t cdev_id;
566 	struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
567 	uint16_t num_deq;
568 	struct spdk_accel_task *task, *task_to_resubmit;
569 	int rc, i, status;
570 
571 	assert(chan->device_qp->device != NULL);
572 	cdev_id = chan->device_qp->device->cdev_id;
573 
574 	num_deq = rte_compressdev_dequeue_burst(cdev_id, chan->device_qp->qp, deq_ops,
575 						NUM_MAX_INFLIGHT_OPS);
576 	for (i = 0; i < num_deq; i++) {
577 
578 		/* We store this off regardless of success/error so we know how to construct the
579 		 * next task
580 		 */
581 		task = (struct spdk_accel_task *)*RTE_MBUF_DYNFIELD(deq_ops[i]->m_src, g_mbuf_offset,
582 				uint64_t *);
583 		status = deq_ops[i]->status;
584 
585 		if (spdk_likely(status == RTE_COMP_OP_STATUS_SUCCESS)) {
586 			if (task->output_size != NULL) {
587 				*task->output_size = deq_ops[i]->produced;
588 			}
589 		} else {
590 			SPDK_NOTICELOG("Deque status %u\n", status);
591 		}
592 
593 		spdk_accel_task_complete(task, status);
594 
595 		/* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
596 		 * call takes care of freeing all of the mbufs in the chain back to their
597 		 * original pool.
598 		 */
599 		rte_pktmbuf_free(deq_ops[i]->m_src);
600 		rte_pktmbuf_free(deq_ops[i]->m_dst);
601 
602 		/* There is no bulk free for com ops so we have to free them one at a time
603 		 * here however it would be rare that we'd ever have more than 1 at a time
604 		 * anyways.
605 		 */
606 		rte_comp_op_free(deq_ops[i]);
607 
608 		/* Check if there are any pending comp ops to process, only pull one
609 		 * at a time off as _compress_operation() may re-queue the op.
610 		 */
611 		if (!STAILQ_EMPTY(&chan->queued_tasks)) {
612 			task_to_resubmit = STAILQ_FIRST(&chan->queued_tasks);
613 			rc = _compress_operation(chan, task_to_resubmit);
614 			if (rc == 0) {
615 				STAILQ_REMOVE_HEAD(&chan->queued_tasks, link);
616 			}
617 		}
618 	}
619 
620 	return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
621 }
622 
623 static int
624 _process_single_task(struct spdk_io_channel *ch, struct spdk_accel_task *task)
625 {
626 	struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch);
627 	int rc;
628 
629 	rc = _compress_operation(chan, task);
630 	if (rc) {
631 		SPDK_ERRLOG("Error (%d) in compress operation\n", rc);
632 		assert(false);
633 	}
634 
635 	return rc;
636 }
637 
638 static int
639 compress_submit_tasks(struct spdk_io_channel *ch, struct spdk_accel_task *first_task)
640 {
641 	struct compress_io_channel *chan = spdk_io_channel_get_ctx(ch);
642 	struct spdk_accel_task *task, *tmp;
643 	int rc = 0;
644 
645 	task = first_task;
646 
647 	if (!STAILQ_EMPTY(&chan->queued_tasks)) {
648 		goto queue_tasks;
649 	}
650 
651 	/* The caller will either submit a single task or a group of tasks that are
652 	 * linked together but they cannot be on a list. For example, see poller
653 	 * where a list of queued tasks is being resubmitted, the list they are on
654 	 * is initialized after saving off the first task from the list which is then
655 	 * passed in here.  Similar thing is done in the accel framework.
656 	 */
657 	while (task) {
658 		tmp = STAILQ_NEXT(task, link);
659 		rc = _process_single_task(ch, task);
660 
661 		if (rc == -EBUSY) {
662 			goto queue_tasks;
663 		} else if (rc) {
664 			spdk_accel_task_complete(task, rc);
665 		}
666 		task = tmp;
667 	}
668 
669 	return 0;
670 
671 queue_tasks:
672 	while (task != NULL) {
673 		tmp = STAILQ_NEXT(task, link);
674 		STAILQ_INSERT_TAIL(&chan->queued_tasks, task, link);
675 		task = tmp;
676 	}
677 	return 0;
678 }
679 
680 static bool
681 _set_pmd(struct compress_io_channel *chan)
682 {
683 
684 	/* Note: the compress_isal PMD is not supported as accel_fw supports native ISAL
685 	 * using the accel_sw module */
686 	if (g_opts == COMPRESS_PMD_AUTO) {
687 		if (g_qat_available) {
688 			chan->drv_name = QAT_PMD;
689 		} else if (g_mlx5_pci_available) {
690 			chan->drv_name = MLX5_PMD;
691 		} else if (g_uadk_available) {
692 			chan->drv_name = UADK_PMD;
693 		}
694 	} else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
695 		chan->drv_name = QAT_PMD;
696 	} else if (g_opts == COMPRESS_PMD_MLX5_PCI_ONLY && g_mlx5_pci_available) {
697 		chan->drv_name = MLX5_PMD;
698 	} else if (g_opts == COMPRESS_PMD_UADK_ONLY && g_uadk_available) {
699 		chan->drv_name = UADK_PMD;
700 	} else {
701 		SPDK_ERRLOG("Requested PMD is not available.\n");
702 		return false;
703 	}
704 	SPDK_NOTICELOG("Channel %p PMD being used: %s\n", chan, chan->drv_name);
705 	return true;
706 }
707 
708 static int compress_create_cb(void *io_device, void *ctx_buf);
709 static void compress_destroy_cb(void *io_device, void *ctx_buf);
710 static struct spdk_accel_module_if g_compress_module;
711 static int
712 accel_compress_init(void)
713 {
714 	int rc;
715 
716 	if (!g_compressdev_enable) {
717 		return -EINVAL;
718 	}
719 
720 	if (g_opts == COMPRESS_PMD_UADK_ONLY) {
721 		char *driver_name = UADK_PMD;
722 
723 		rc = rte_vdev_init(driver_name, NULL);
724 		if (rc) {
725 			SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. "
726 				       "Possibly %s is not supported by DPDK library. "
727 				       "Keep going...\n", driver_name, rc, driver_name);
728 		}
729 	}
730 
731 	rc = accel_init_compress_drivers();
732 	if (rc) {
733 		assert(TAILQ_EMPTY(&g_compress_devs));
734 		return rc;
735 	}
736 
737 	g_compressdev_initialized = true;
738 	spdk_io_device_register(&g_compress_module, compress_create_cb, compress_destroy_cb,
739 				sizeof(struct compress_io_channel), "compressdev_accel_module");
740 	return 0;
741 }
742 
743 static int
744 compress_create_cb(void *io_device, void *ctx_buf)
745 {
746 	struct compress_io_channel *chan = ctx_buf;
747 	const struct rte_compressdev_capabilities *capab;
748 	struct comp_device_qp *device_qp;
749 	size_t length;
750 
751 	if (_set_pmd(chan) == false) {
752 		assert(false);
753 		return -ENODEV;
754 	}
755 
756 	/* The following variable length arrays of mbuf pointers are required to submit to compressdev */
757 	length = NUM_MBUFS * sizeof(void *);
758 	chan->src_mbufs = spdk_zmalloc(length, 0x40, NULL,
759 				       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
760 	if (chan->src_mbufs == NULL) {
761 		return -ENOMEM;
762 	}
763 	chan->dst_mbufs = spdk_zmalloc(length, 0x40, NULL,
764 				       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
765 	if (chan->dst_mbufs == NULL) {
766 		free(chan->src_mbufs);
767 		return -ENOMEM;
768 	}
769 
770 	chan->poller = SPDK_POLLER_REGISTER(comp_dev_poller, chan, 0);
771 	STAILQ_INIT(&chan->queued_tasks);
772 
773 	pthread_mutex_lock(&g_comp_device_qp_lock);
774 	TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
775 		if (strcmp(device_qp->device->cdev_info.driver_name, chan->drv_name) == 0) {
776 			if (device_qp->chan == NULL) {
777 				chan->device_qp = device_qp;
778 				device_qp->chan = chan;
779 				break;
780 			}
781 		}
782 	}
783 	pthread_mutex_unlock(&g_comp_device_qp_lock);
784 
785 	if (chan->device_qp == NULL) {
786 		SPDK_ERRLOG("out of qpairs, cannot assign one\n");
787 		assert(false);
788 		return -ENOMEM;
789 	} else {
790 		capab = rte_compressdev_capability_get(0, RTE_COMP_ALGO_DEFLATE);
791 
792 		if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_SGL_IN_LB_OUT)) {
793 			chan->device_qp->device->sgl_in = true;
794 		}
795 
796 		if (capab->comp_feature_flags & (RTE_COMP_FF_OOP_SGL_IN_SGL_OUT | RTE_COMP_FF_OOP_LB_IN_SGL_OUT)) {
797 			chan->device_qp->device->sgl_out = true;
798 		}
799 	}
800 
801 	return 0;
802 }
803 
804 static void
805 accel_compress_write_config_json(struct spdk_json_write_ctx *w)
806 {
807 	if (g_compressdev_enable) {
808 		spdk_json_write_object_begin(w);
809 		spdk_json_write_named_string(w, "method", "compressdev_scan_accel_module");
810 		spdk_json_write_named_object_begin(w, "params");
811 		spdk_json_write_named_uint32(w, "pmd", g_opts);
812 		spdk_json_write_object_end(w);
813 		spdk_json_write_object_end(w);
814 	}
815 }
816 
817 static void
818 compress_destroy_cb(void *io_device, void *ctx_buf)
819 {
820 	struct compress_io_channel *chan = ctx_buf;
821 	struct comp_device_qp *device_qp = chan->device_qp;
822 
823 	spdk_free(chan->src_mbufs);
824 	spdk_free(chan->dst_mbufs);
825 
826 	spdk_poller_unregister(&chan->poller);
827 
828 	pthread_mutex_lock(&g_comp_device_qp_lock);
829 	chan->device_qp = NULL;
830 	device_qp->chan = NULL;
831 	pthread_mutex_unlock(&g_comp_device_qp_lock);
832 }
833 
834 static size_t
835 accel_compress_get_ctx_size(void)
836 {
837 	return 0;
838 }
839 
840 static bool
841 compress_supports_opcode(enum spdk_accel_opcode opc)
842 {
843 	if (g_mlx5_pci_available || g_qat_available || g_uadk_available) {
844 		switch (opc) {
845 		case SPDK_ACCEL_OPC_COMPRESS:
846 		case SPDK_ACCEL_OPC_DECOMPRESS:
847 			return true;
848 		default:
849 			break;
850 		}
851 	}
852 
853 	return false;
854 }
855 
856 static bool
857 compress_supports_algo(enum spdk_accel_comp_algo algo)
858 {
859 	if (algo == SPDK_ACCEL_COMP_ALGO_DEFLATE) {
860 		return true;
861 	}
862 
863 	return false;
864 }
865 
866 static int
867 compress_get_level_range(enum spdk_accel_comp_algo algo,
868 			 uint32_t *min_level, uint32_t *max_level)
869 {
870 	switch (algo) {
871 	case SPDK_ACCEL_COMP_ALGO_DEFLATE:
872 		/**
873 		 * Hardware compression is set to the highest level by default and
874 		 * will not be affected by cover parameters in actual operation.
875 		 * This is set to the maximum range.
876 		 * */
877 		*min_level = 0;
878 		*max_level = 0;
879 
880 		return 0;
881 	default:
882 		return -EINVAL;
883 	}
884 }
885 
886 static struct spdk_io_channel *
887 compress_get_io_channel(void)
888 {
889 	return spdk_get_io_channel(&g_compress_module);
890 }
891 
892 static void accel_compress_exit(void *ctx);
893 static struct spdk_accel_module_if g_compress_module = {
894 	.module_init	             = accel_compress_init,
895 	.module_fini	             = accel_compress_exit,
896 	.write_config_json           = accel_compress_write_config_json,
897 	.get_ctx_size	             = accel_compress_get_ctx_size,
898 	.name		             = "dpdk_compressdev",
899 	.supports_opcode             = compress_supports_opcode,
900 	.get_io_channel	             = compress_get_io_channel,
901 	.submit_tasks                = compress_submit_tasks,
902 	.compress_supports_algo      = compress_supports_algo,
903 	.get_compress_level_range    = compress_get_level_range,
904 };
905 
906 void
907 accel_dpdk_compressdev_enable(void)
908 {
909 	spdk_accel_module_list_add(&g_compress_module);
910 }
911 
912 /* Callback for unregistering the IO device. */
913 static void
914 _device_unregister_cb(void *io_device)
915 {
916 	struct comp_device_qp *dev_qp;
917 	struct compress_dev *device;
918 
919 	while ((device = TAILQ_FIRST(&g_compress_devs))) {
920 		TAILQ_REMOVE(&g_compress_devs, device, link);
921 		rte_compressdev_stop(device->cdev_id);
922 		rte_compressdev_close(device->cdev_id);
923 		free(device);
924 	}
925 
926 	while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
927 		TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
928 		free(dev_qp);
929 	}
930 
931 	if (g_opts == COMPRESS_PMD_UADK_ONLY) {
932 		rte_vdev_uninit(UADK_PMD);
933 	}
934 
935 	pthread_mutex_destroy(&g_comp_device_qp_lock);
936 
937 	rte_mempool_free(g_comp_op_mp);
938 	rte_mempool_free(g_mbuf_mp);
939 
940 	spdk_accel_module_finish();
941 }
942 
943 static void
944 accel_compress_exit(void *ctx)
945 {
946 	if (g_compressdev_initialized) {
947 		spdk_io_device_unregister(&g_compress_module, _device_unregister_cb);
948 		g_compressdev_initialized = false;
949 	} else {
950 		spdk_accel_module_finish();
951 	}
952 }
953