xref: /dpdk/drivers/compress/zlib/zlib_pmd.c (revision f665790a5dbad7b645ff46f31d65e977324e7bfc)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Cavium Networks
3  */
4 
5 #include <bus_vdev_driver.h>
6 #include <rte_common.h>
7 
8 #include "zlib_pmd_private.h"
9 
10 /** Compute next mbuf in the list, assign data buffer and length,
11  *  returns 0 if mbuf is NULL
12  */
13 #define COMPUTE_BUF(mbuf, data, len)		\
14 		((mbuf = mbuf->next) ?		\
15 		(data = rte_pktmbuf_mtod(mbuf, uint8_t *)),	\
16 		(len = rte_pktmbuf_data_len(mbuf)) : 0)
17 
18 static void
19 process_zlib_deflate(struct rte_comp_op *op, z_stream *strm)
20 {
21 	int ret, flush, fin_flush;
22 	struct rte_mbuf *mbuf_src = op->m_src;
23 	struct rte_mbuf *mbuf_dst = op->m_dst;
24 
25 	switch (op->flush_flag) {
26 	case RTE_COMP_FLUSH_FULL:
27 	case RTE_COMP_FLUSH_FINAL:
28 		fin_flush = Z_FINISH;
29 		break;
30 	default:
31 		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
32 		ZLIB_PMD_ERR("Invalid flush value");
33 		return;
34 	}
35 
36 	if (unlikely(!strm)) {
37 		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
38 		ZLIB_PMD_ERR("Invalid z_stream");
39 		return;
40 	}
41 	/* Update z_stream with the inputs provided by application */
42 	strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
43 			op->src.offset);
44 
45 	strm->avail_in = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
46 
47 	strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
48 			op->dst.offset);
49 
50 	strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
51 
52 	/* Set flush value to NO_FLUSH unless it is last mbuf */
53 	flush = Z_NO_FLUSH;
54 	/* Initialize status to SUCCESS */
55 	op->status = RTE_COMP_OP_STATUS_SUCCESS;
56 
57 	do {
58 		/* Set flush value to Z_FINISH for last block */
59 		if ((op->src.length - strm->total_in) <= strm->avail_in) {
60 			strm->avail_in = (op->src.length - strm->total_in);
61 			flush = fin_flush;
62 		}
63 		do {
64 			ret = deflate(strm, flush);
65 			if (unlikely(ret == Z_STREAM_ERROR)) {
66 				/* error return, do not process further */
67 				op->status =  RTE_COMP_OP_STATUS_ERROR;
68 				goto def_end;
69 			}
70 			/* Break if Z_STREAM_END is encountered */
71 			if (ret == Z_STREAM_END)
72 				goto def_end;
73 
74 		/* Keep looping until input mbuf is consumed.
75 		 * Exit if destination mbuf gets exhausted.
76 		 */
77 		} while ((strm->avail_out == 0) &&
78 			COMPUTE_BUF(mbuf_dst, strm->next_out, strm->avail_out));
79 
80 		if (!strm->avail_out) {
81 			/* there is no space for compressed output */
82 			op->status = RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED;
83 			break;
84 		}
85 
86 	/* Update source buffer to next mbuf
87 	 * Exit if input buffers are fully consumed
88 	 */
89 	} while (COMPUTE_BUF(mbuf_src, strm->next_in, strm->avail_in));
90 
91 def_end:
92 	/* Update op stats */
93 	switch (op->status) {
94 	case RTE_COMP_OP_STATUS_SUCCESS:
95 		op->consumed += strm->total_in;
96 	/* Fall-through */
97 	case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
98 		op->produced += strm->total_out;
99 		break;
100 	default:
101 		ZLIB_PMD_ERR("stats not updated for status:%d",
102 				op->status);
103 	}
104 
105 	deflateReset(strm);
106 }
107 
108 static void
109 process_zlib_inflate(struct rte_comp_op *op, z_stream *strm)
110 {
111 	int ret, flush;
112 	struct rte_mbuf *mbuf_src = op->m_src;
113 	struct rte_mbuf *mbuf_dst = op->m_dst;
114 
115 	if (unlikely(!strm)) {
116 		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
117 		ZLIB_PMD_ERR("Invalid z_stream");
118 		return;
119 	}
120 	strm->next_in = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *,
121 			op->src.offset);
122 
123 	strm->avail_in = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
124 
125 	strm->next_out = rte_pktmbuf_mtod_offset(mbuf_dst, uint8_t *,
126 			op->dst.offset);
127 
128 	strm->avail_out = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
129 
130 	/** Ignoring flush value provided from application for decompression */
131 	flush = Z_NO_FLUSH;
132 	/* initialize status to SUCCESS */
133 	op->status = RTE_COMP_OP_STATUS_SUCCESS;
134 
135 	do {
136 		do {
137 			ret = inflate(strm, flush);
138 
139 			switch (ret) {
140 			/* Fall-through */
141 			case Z_NEED_DICT:
142 				ret = Z_DATA_ERROR;
143 			/* Fall-through */
144 			case Z_DATA_ERROR:
145 			/* Fall-through */
146 			case Z_MEM_ERROR:
147 			/* Fall-through */
148 			case Z_STREAM_ERROR:
149 				op->status = RTE_COMP_OP_STATUS_ERROR;
150 			/* Fall-through */
151 			case Z_STREAM_END:
152 				/* no further computation needed if
153 				 * Z_STREAM_END is encountered
154 				 */
155 				goto inf_end;
156 			default:
157 				/* success */
158 				break;
159 
160 			}
161 		/* Keep looping until input mbuf is consumed.
162 		 * Exit if destination mbuf gets exhausted.
163 		 */
164 		} while ((strm->avail_out == 0) &&
165 			COMPUTE_BUF(mbuf_dst, strm->next_out, strm->avail_out));
166 
167 		if (!strm->avail_out) {
168 			/* there is no more space for decompressed output */
169 			op->status = RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED;
170 			break;
171 		}
172 	/* Read next input buffer to be processed, exit if compressed
173 	 * blocks are fully read
174 	 */
175 	} while (COMPUTE_BUF(mbuf_src, strm->next_in, strm->avail_in));
176 
177 inf_end:
178 	/* Update op stats */
179 	switch (op->status) {
180 	case RTE_COMP_OP_STATUS_SUCCESS:
181 		op->consumed += strm->total_in;
182 	/* Fall-through */
183 	case RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED:
184 		op->produced += strm->total_out;
185 		break;
186 	default:
187 		ZLIB_PMD_ERR("stats not produced for status:%d",
188 				op->status);
189 	}
190 
191 	inflateReset(strm);
192 }
193 
194 /** Process comp operation for mbuf */
195 static inline int
196 process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op)
197 {
198 	struct zlib_stream *stream;
199 	struct zlib_priv_xform *private_xform;
200 
201 	if ((op->op_type == RTE_COMP_OP_STATEFUL) ||
202 			(op->src.offset > rte_pktmbuf_data_len(op->m_src)) ||
203 			(op->dst.offset > rte_pktmbuf_data_len(op->m_dst))) {
204 		op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
205 		ZLIB_PMD_ERR("Invalid source or destination buffers or "
206 			     "invalid Operation requested");
207 	} else {
208 		private_xform = (struct zlib_priv_xform *)op->private_xform;
209 		stream = &private_xform->stream;
210 		stream->comp(op, &stream->strm);
211 	}
212 	/* whatever is out of op, put it into completion queue with
213 	 * its status
214 	 */
215 	return rte_ring_enqueue(qp->processed_pkts, (void *)op);
216 }
217 
218 /** Parse comp xform and set private xform/Stream parameters */
219 int
220 zlib_set_stream_parameters(const struct rte_comp_xform *xform,
221 		struct zlib_stream *stream)
222 {
223 	int strategy, level, wbits;
224 	z_stream *strm = &stream->strm;
225 
226 	/* allocate deflate state */
227 	strm->zalloc = Z_NULL;
228 	strm->zfree = Z_NULL;
229 	strm->opaque = Z_NULL;
230 
231 	switch (xform->type) {
232 	case RTE_COMP_COMPRESS:
233 		stream->comp = process_zlib_deflate;
234 		stream->free = deflateEnd;
235 		/** Compression window bits */
236 		switch (xform->compress.algo) {
237 		case RTE_COMP_ALGO_DEFLATE:
238 			wbits = -(xform->compress.window_size);
239 			break;
240 		default:
241 			ZLIB_PMD_ERR("Compression algorithm not supported");
242 			return -1;
243 		}
244 		/** Compression Level */
245 		switch (xform->compress.level) {
246 		case RTE_COMP_LEVEL_PMD_DEFAULT:
247 			level = Z_DEFAULT_COMPRESSION;
248 			break;
249 		case RTE_COMP_LEVEL_NONE:
250 			level = Z_NO_COMPRESSION;
251 			break;
252 		case RTE_COMP_LEVEL_MIN:
253 			level = Z_BEST_SPEED;
254 			break;
255 		case RTE_COMP_LEVEL_MAX:
256 			level = Z_BEST_COMPRESSION;
257 			break;
258 		default:
259 			level = xform->compress.level;
260 			if (level < RTE_COMP_LEVEL_MIN ||
261 					level > RTE_COMP_LEVEL_MAX) {
262 				ZLIB_PMD_ERR("Compression level %d "
263 						"not supported",
264 						level);
265 				return -1;
266 			}
267 			break;
268 		}
269 		/** Compression strategy */
270 		switch (xform->compress.deflate.huffman) {
271 		case RTE_COMP_HUFFMAN_DEFAULT:
272 			strategy = Z_DEFAULT_STRATEGY;
273 			break;
274 		case RTE_COMP_HUFFMAN_FIXED:
275 			strategy = Z_FIXED;
276 			break;
277 		case RTE_COMP_HUFFMAN_DYNAMIC:
278 			strategy = Z_DEFAULT_STRATEGY;
279 			break;
280 		default:
281 			ZLIB_PMD_ERR("Compression strategy not supported");
282 			return -1;
283 		}
284 		if (deflateInit2(strm, level,
285 					Z_DEFLATED, wbits,
286 					DEF_MEM_LEVEL, strategy) != Z_OK) {
287 			ZLIB_PMD_ERR("Deflate init failed");
288 			return -1;
289 		}
290 		break;
291 
292 	case RTE_COMP_DECOMPRESS:
293 		stream->comp = process_zlib_inflate;
294 		stream->free = inflateEnd;
295 		/** window bits */
296 		switch (xform->decompress.algo) {
297 		case RTE_COMP_ALGO_DEFLATE:
298 			wbits = -(xform->decompress.window_size);
299 			break;
300 		default:
301 			ZLIB_PMD_ERR("Compression algorithm not supported");
302 			return -1;
303 		}
304 
305 		if (inflateInit2(strm, wbits) != Z_OK) {
306 			ZLIB_PMD_ERR("Inflate init failed");
307 			return -1;
308 		}
309 		break;
310 	default:
311 		return -1;
312 	}
313 	return 0;
314 }
315 
316 static uint16_t
317 zlib_pmd_enqueue_burst(void *queue_pair,
318 			struct rte_comp_op **ops, uint16_t nb_ops)
319 {
320 	struct zlib_qp *qp = queue_pair;
321 	int ret;
322 	uint16_t i;
323 	uint16_t enqd = 0;
324 	for (i = 0; i < nb_ops; i++) {
325 		ret = process_zlib_op(qp, ops[i]);
326 		if (unlikely(ret < 0)) {
327 			/* increment count if failed to push to completion
328 			 * queue
329 			 */
330 			qp->qp_stats.enqueue_err_count++;
331 		} else {
332 			qp->qp_stats.enqueued_count++;
333 			enqd++;
334 		}
335 	}
336 	return enqd;
337 }
338 
339 static uint16_t
340 zlib_pmd_dequeue_burst(void *queue_pair,
341 			struct rte_comp_op **ops, uint16_t nb_ops)
342 {
343 	struct zlib_qp *qp = queue_pair;
344 
345 	unsigned int nb_dequeued = 0;
346 
347 	nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts,
348 			(void **)ops, nb_ops, NULL);
349 	qp->qp_stats.dequeued_count += nb_dequeued;
350 
351 	return nb_dequeued;
352 }
353 
354 static int
355 zlib_create(const char *name,
356 		struct rte_vdev_device *vdev,
357 		struct rte_compressdev_pmd_init_params *init_params)
358 {
359 	struct rte_compressdev *dev;
360 
361 	dev = rte_compressdev_pmd_create(name, &vdev->device,
362 			sizeof(struct zlib_private), init_params);
363 	if (dev == NULL) {
364 		ZLIB_PMD_ERR("driver %s: create failed", init_params->name);
365 		return -ENODEV;
366 	}
367 
368 	dev->dev_ops = rte_zlib_pmd_ops;
369 
370 	/* register rx/tx burst functions for data path */
371 	dev->dequeue_burst = zlib_pmd_dequeue_burst;
372 	dev->enqueue_burst = zlib_pmd_enqueue_burst;
373 
374 	return 0;
375 }
376 
377 static int
378 zlib_probe(struct rte_vdev_device *vdev)
379 {
380 	struct rte_compressdev_pmd_init_params init_params = {
381 		"",
382 		rte_socket_id()
383 	};
384 	const char *name;
385 	const char *input_args;
386 	int retval;
387 
388 	name = rte_vdev_device_name(vdev);
389 
390 	if (name == NULL)
391 		return -EINVAL;
392 
393 	input_args = rte_vdev_device_args(vdev);
394 
395 	retval = rte_compressdev_pmd_parse_input_args(&init_params, input_args);
396 	if (retval < 0) {
397 		ZLIB_PMD_LOG(ERR,
398 			"Failed to parse initialisation arguments[%s]",
399 			input_args);
400 		return -EINVAL;
401 	}
402 
403 	return zlib_create(name, vdev, &init_params);
404 }
405 
406 static int
407 zlib_remove(struct rte_vdev_device *vdev)
408 {
409 	struct rte_compressdev *compressdev;
410 	const char *name;
411 
412 	name = rte_vdev_device_name(vdev);
413 	if (name == NULL)
414 		return -EINVAL;
415 
416 	compressdev = rte_compressdev_pmd_get_named_dev(name);
417 	if (compressdev == NULL)
418 		return -ENODEV;
419 
420 	return rte_compressdev_pmd_destroy(compressdev);
421 }
422 
423 static struct rte_vdev_driver zlib_pmd_drv = {
424 	.probe = zlib_probe,
425 	.remove = zlib_remove
426 };
427 
428 RTE_PMD_REGISTER_VDEV(COMPRESSDEV_NAME_ZLIB_PMD, zlib_pmd_drv);
429 RTE_LOG_REGISTER_DEFAULT(zlib_logtype_driver, INFO);
430