xref: /dpdk/app/test-compress-perf/comp_perf_test_verify.c (revision b6a7e6852e9ab82ae0e05e2d2a0b83abca17de3b)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_malloc.h>
8 #include <rte_eal.h>
9 #include <rte_log.h>
10 #include <rte_compressdev.h>
11 
12 #include "comp_perf_test_verify.h"
13 #include "comp_perf_test_common.h"
14 
15 void
cperf_verify_test_destructor(void * arg)16 cperf_verify_test_destructor(void *arg)
17 {
18 	if (arg) {
19 		comp_perf_free_memory(
20 				((struct cperf_verify_ctx *)arg)->options,
21 				&((struct cperf_verify_ctx *)arg)->mem);
22 		rte_free(arg);
23 	}
24 }
25 
26 void *
cperf_verify_test_constructor(uint8_t dev_id,uint16_t qp_id,struct comp_test_data * options)27 cperf_verify_test_constructor(uint8_t dev_id, uint16_t qp_id,
28 		struct comp_test_data *options)
29 {
30 	struct cperf_verify_ctx *ctx = NULL;
31 
32 	ctx = rte_malloc(NULL, sizeof(struct cperf_verify_ctx), 0);
33 
34 	if (ctx == NULL)
35 		return NULL;
36 
37 	ctx->mem.dev_id = dev_id;
38 	ctx->mem.qp_id = qp_id;
39 	ctx->options = options;
40 
41 	if (!comp_perf_allocate_memory(ctx->options, &ctx->mem) &&
42 			!prepare_bufs(ctx->options, &ctx->mem))
43 		return ctx;
44 
45 	cperf_verify_test_destructor(ctx);
46 	return NULL;
47 }
48 
49 static int
main_loop(struct cperf_verify_ctx * ctx,enum rte_comp_xform_type type)50 main_loop(struct cperf_verify_ctx *ctx, enum rte_comp_xform_type type)
51 {
52 	struct comp_test_data *test_data = ctx->options;
53 	uint8_t *output_data_ptr = NULL;
54 	size_t *output_data_sz = NULL;
55 	struct cperf_mem_resources *mem = &ctx->mem;
56 
57 	uint8_t dev_id = mem->dev_id;
58 	uint32_t i, iter, num_iter;
59 	struct rte_comp_op **ops, **deq_ops;
60 	void *priv_xform = NULL;
61 	struct rte_comp_xform xform;
62 	size_t output_size = 0;
63 	struct rte_mbuf **input_bufs, **output_bufs;
64 	int res = 0;
65 	int allocated = 0;
66 	uint32_t out_seg_sz;
67 
68 	if (test_data == NULL || !test_data->burst_sz) {
69 		RTE_LOG(ERR, USER1,
70 			"Unknown burst size\n");
71 		return -1;
72 	}
73 
74 	ops = rte_zmalloc_socket(NULL,
75 		2 * mem->total_bufs * sizeof(struct rte_comp_op *),
76 		0, rte_socket_id());
77 
78 	if (ops == NULL) {
79 		RTE_LOG(ERR, USER1,
80 			"Can't allocate memory for ops structures\n");
81 		return -1;
82 	}
83 
84 	deq_ops = &ops[mem->total_bufs];
85 
86 	if (type == RTE_COMP_COMPRESS) {
87 		xform = (struct rte_comp_xform) {
88 			.type = RTE_COMP_COMPRESS,
89 			.compress = {
90 				.algo = test_data->test_algo,
91 				.level = test_data->level,
92 				.window_size = test_data->window_sz,
93 				.chksum = RTE_COMP_CHECKSUM_NONE,
94 				.hash_algo = RTE_COMP_HASH_ALGO_NONE
95 			}
96 		};
97 		if (test_data->test_algo == RTE_COMP_ALGO_DEFLATE)
98 			xform.compress.deflate.huffman = test_data->huffman_enc;
99 		else if (test_data->test_algo == RTE_COMP_ALGO_LZ4)
100 			xform.compress.lz4.flags = test_data->lz4_flags;
101 		output_data_ptr = ctx->mem.compressed_data;
102 		output_data_sz = &ctx->comp_data_sz;
103 		input_bufs = mem->decomp_bufs;
104 		output_bufs = mem->comp_bufs;
105 		out_seg_sz = test_data->out_seg_sz;
106 	} else {
107 		xform = (struct rte_comp_xform) {
108 			.type = RTE_COMP_DECOMPRESS,
109 			.decompress = {
110 				.algo = test_data->test_algo,
111 				.chksum = RTE_COMP_CHECKSUM_NONE,
112 				.window_size = test_data->window_sz,
113 				.hash_algo = RTE_COMP_HASH_ALGO_NONE
114 			}
115 		};
116 		if (test_data->test_algo == RTE_COMP_ALGO_LZ4)
117 			xform.decompress.lz4.flags = test_data->lz4_flags;
118 		output_data_ptr = ctx->mem.decompressed_data;
119 		output_data_sz = &ctx->decomp_data_sz;
120 		input_bufs = mem->comp_bufs;
121 		output_bufs = mem->decomp_bufs;
122 		out_seg_sz = (test_data->test_op & COMPRESS) ?
123 			     test_data->seg_sz : test_data->out_seg_sz;
124 	}
125 
126 	/* Create private xform */
127 	if (rte_compressdev_private_xform_create(dev_id, &xform,
128 			&priv_xform) < 0) {
129 		RTE_LOG(ERR, USER1, "Private xform could not be created\n");
130 		res = -1;
131 		goto end;
132 	}
133 
134 	num_iter = 1;
135 
136 	for (iter = 0; iter < num_iter; iter++) {
137 		uint32_t total_ops = mem->total_bufs;
138 		uint32_t remaining_ops = mem->total_bufs;
139 		uint32_t total_deq_ops = 0;
140 		uint32_t total_enq_ops = 0;
141 		uint16_t ops_unused = 0;
142 		uint16_t num_enq = 0;
143 		uint16_t num_deq = 0;
144 
145 		output_size = 0;
146 
147 		while (remaining_ops > 0) {
148 			uint16_t num_ops = RTE_MIN(remaining_ops,
149 						   test_data->burst_sz);
150 			uint16_t ops_needed = num_ops - ops_unused;
151 
152 			/*
153 			 * Move the unused operations from the previous
154 			 * enqueue_burst call to the front, to maintain order
155 			 */
156 			if ((ops_unused > 0) && (num_enq > 0)) {
157 				size_t nb_b_to_mov =
158 				      ops_unused * sizeof(struct rte_comp_op *);
159 
160 				memmove(ops, &ops[num_enq], nb_b_to_mov);
161 			}
162 
163 			/* Allocate compression operations */
164 			if (ops_needed && !rte_comp_op_bulk_alloc(
165 						mem->op_pool,
166 						&ops[ops_unused],
167 						ops_needed)) {
168 				RTE_LOG(ERR, USER1,
169 				      "Could not allocate enough operations\n");
170 				res = -1;
171 				goto end;
172 			}
173 			allocated += ops_needed;
174 
175 			for (i = 0; i < ops_needed; i++) {
176 				/*
177 				 * Calculate next buffer to attach to operation
178 				 */
179 				uint32_t buf_id = total_enq_ops + i +
180 						ops_unused;
181 				uint16_t op_id = ops_unused + i;
182 				/* Reset all data in output buffers */
183 				struct rte_mbuf *m = output_bufs[buf_id];
184 
185 				m->pkt_len = out_seg_sz * m->nb_segs;
186 				while (m) {
187 					m->data_len = m->buf_len - m->data_off;
188 					m = m->next;
189 				}
190 				ops[op_id]->m_src = input_bufs[buf_id];
191 				ops[op_id]->m_dst = output_bufs[buf_id];
192 				ops[op_id]->src.offset = 0;
193 				ops[op_id]->src.length =
194 					rte_pktmbuf_pkt_len(input_bufs[buf_id]);
195 				ops[op_id]->dst.offset = 0;
196 				ops[op_id]->flush_flag = RTE_COMP_FLUSH_FINAL;
197 				ops[op_id]->input_chksum = buf_id;
198 				ops[op_id]->private_xform = priv_xform;
199 			}
200 
201 			if (unlikely(test_data->perf_comp_force_stop))
202 				goto end;
203 
204 			num_enq = rte_compressdev_enqueue_burst(dev_id,
205 								mem->qp_id, ops,
206 								num_ops);
207 			if (num_enq == 0) {
208 				struct rte_compressdev_stats stats;
209 
210 				rte_compressdev_stats_get(dev_id, &stats);
211 				if (stats.enqueue_err_count) {
212 					res = -1;
213 					goto end;
214 				}
215 			}
216 
217 			ops_unused = num_ops - num_enq;
218 			remaining_ops -= num_enq;
219 			total_enq_ops += num_enq;
220 
221 			num_deq = rte_compressdev_dequeue_burst(dev_id,
222 							   mem->qp_id,
223 							   deq_ops,
224 							   test_data->burst_sz);
225 			total_deq_ops += num_deq;
226 
227 			for (i = 0; i < num_deq; i++) {
228 				struct rte_comp_op *op = deq_ops[i];
229 
230 				if (op->status ==
231 				  RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED ||
232 				  op->status ==
233 				  RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE) {
234 					RTE_LOG(ERR, USER1,
235 "Out of space error occurred due to incompressible input data expanding to larger than destination buffer. Increase the EXPANSE_RATIO constant to use this data.\n");
236 					res = -1;
237 					goto end;
238 				} else if (op->status !=
239 						RTE_COMP_OP_STATUS_SUCCESS) {
240 					RTE_LOG(ERR, USER1,
241 						"Some operations were not successful\n");
242 					goto end;
243 				}
244 
245 				const void *read_data_addr =
246 						rte_pktmbuf_read(op->m_dst, 0,
247 						op->produced, output_data_ptr);
248 				if (read_data_addr == NULL) {
249 					RTE_LOG(ERR, USER1,
250 						"Could not copy buffer in destination\n");
251 					res = -1;
252 					goto end;
253 				}
254 
255 				if (read_data_addr != output_data_ptr)
256 					rte_memcpy(output_data_ptr,
257 						   rte_pktmbuf_mtod(op->m_dst,
258 								    uint8_t *),
259 						   op->produced);
260 				output_data_ptr += op->produced;
261 				output_size += op->produced;
262 
263 			}
264 
265 
266 			if (iter == num_iter - 1) {
267 				for (i = 0; i < num_deq; i++) {
268 					struct rte_comp_op *op = deq_ops[i];
269 					struct rte_mbuf *m = op->m_dst;
270 
271 					m->pkt_len = op->produced;
272 					uint32_t remaining_data = op->produced;
273 					uint16_t data_to_append;
274 
275 					while (remaining_data > 0) {
276 						data_to_append =
277 							RTE_MIN(remaining_data,
278 							out_seg_sz);
279 						m->data_len = data_to_append;
280 						remaining_data -=
281 								data_to_append;
282 						m = m->next;
283 					}
284 				}
285 			}
286 			rte_mempool_put_bulk(mem->op_pool,
287 					     (void **)deq_ops, num_deq);
288 			allocated -= num_deq;
289 		}
290 
291 		/* Dequeue the last operations */
292 		while (total_deq_ops < total_ops) {
293 			if (unlikely(test_data->perf_comp_force_stop))
294 				goto end;
295 
296 			num_deq = rte_compressdev_dequeue_burst(dev_id,
297 							mem->qp_id,
298 							deq_ops,
299 							test_data->burst_sz);
300 			if (num_deq == 0) {
301 				struct rte_compressdev_stats stats;
302 
303 				rte_compressdev_stats_get(dev_id, &stats);
304 				if (stats.dequeue_err_count) {
305 					res = -1;
306 					goto end;
307 				}
308 			}
309 
310 			total_deq_ops += num_deq;
311 
312 			for (i = 0; i < num_deq; i++) {
313 				struct rte_comp_op *op = deq_ops[i];
314 
315 				if (op->status ==
316 				  RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED ||
317 				  op->status ==
318 				  RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE) {
319 					RTE_LOG(ERR, USER1,
320 "Out of space error occurred due to incompressible input data expanding to larger than destination buffer. Increase the EXPANSE_RATIO constant to use this data.\n");
321 					res = -1;
322 					goto end;
323 				} else if (op->status !=
324 						RTE_COMP_OP_STATUS_SUCCESS) {
325 					RTE_LOG(ERR, USER1,
326 						"Some operations were not successful\n");
327 					goto end;
328 				}
329 				const void *read_data_addr =
330 						rte_pktmbuf_read(op->m_dst,
331 								 op->dst.offset,
332 						op->produced, output_data_ptr);
333 				if (read_data_addr == NULL) {
334 					RTE_LOG(ERR, USER1,
335 						"Could not copy buffer in destination\n");
336 					res = -1;
337 					goto end;
338 				}
339 
340 				if (read_data_addr != output_data_ptr)
341 					rte_memcpy(output_data_ptr,
342 						   rte_pktmbuf_mtod(
343 							op->m_dst, uint8_t *),
344 						   op->produced);
345 				output_data_ptr += op->produced;
346 				output_size += op->produced;
347 
348 			}
349 
350 			if (iter == num_iter - 1) {
351 				for (i = 0; i < num_deq; i++) {
352 					struct rte_comp_op *op = deq_ops[i];
353 					struct rte_mbuf *m = op->m_dst;
354 
355 					m->pkt_len = op->produced;
356 					uint32_t remaining_data = op->produced;
357 					uint16_t data_to_append;
358 
359 					while (remaining_data > 0) {
360 						data_to_append =
361 						RTE_MIN(remaining_data,
362 							out_seg_sz);
363 						m->data_len = data_to_append;
364 						remaining_data -=
365 								data_to_append;
366 						m = m->next;
367 					}
368 				}
369 			}
370 			rte_mempool_put_bulk(mem->op_pool,
371 					     (void **)deq_ops, num_deq);
372 			allocated -= num_deq;
373 		}
374 	}
375 
376 	if (output_data_sz)
377 		*output_data_sz = output_size;
378 end:
379 	rte_mempool_put_bulk(mem->op_pool, (void **)ops, allocated);
380 	rte_compressdev_private_xform_free(dev_id, priv_xform);
381 	rte_free(ops);
382 
383 	if (test_data->perf_comp_force_stop) {
384 		RTE_LOG(ERR, USER1,
385 		      "lcore: %d Perf. test has been aborted by user\n",
386 			mem->lcore_id);
387 		res = -1;
388 	}
389 
390 	return res;
391 }
392 
393 int
cperf_verify_test_runner(void * test_ctx)394 cperf_verify_test_runner(void *test_ctx)
395 {
396 	struct cperf_verify_ctx *ctx = test_ctx;
397 	struct comp_test_data *test_data = ctx->options;
398 	int ret = EXIT_SUCCESS;
399 	static RTE_ATOMIC(uint16_t) display_once;
400 	uint32_t lcore = rte_lcore_id();
401 	uint16_t exp = 0;
402 
403 	ctx->mem.lcore_id = lcore;
404 
405 	test_data->ratio = 0;
406 
407 	if (test_data->test_op & COMPRESS) {
408 		if (main_loop(ctx, RTE_COMP_COMPRESS) < 0) {
409 			ret = EXIT_FAILURE;
410 			goto end;
411 		}
412 	}
413 
414 	if (test_data->test_op & DECOMPRESS) {
415 		if (main_loop(ctx, RTE_COMP_DECOMPRESS) < 0) {
416 			ret = EXIT_FAILURE;
417 			goto end;
418 		}
419 
420 		if (!(test_data->test_op & COMPRESS)) {
421 			/*
422 			 * For DECOMPRESS_ONLY mode there is no more
423 			 * verifications, reset the 'ratio' and 'comp_data_sz'
424 			 * fields for other tests report.
425 			 */
426 			ctx->comp_data_sz = 0;
427 			ctx->ratio = 0;
428 			goto end;
429 		}
430 
431 		if (ctx->decomp_data_sz != test_data->input_data_sz) {
432 			RTE_LOG(ERR, USER1,
433 				"Decompressed data length not equal to input data length\n");
434 			RTE_LOG(ERR, USER1,
435 				"Decompressed size = %zu, expected = %zu\n",
436 				ctx->decomp_data_sz, test_data->input_data_sz);
437 			ret = EXIT_FAILURE;
438 			goto end;
439 		} else {
440 			if (memcmp(ctx->mem.decompressed_data,
441 					test_data->input_data,
442 					test_data->input_data_sz) != 0) {
443 				RTE_LOG(ERR, USER1,
444 					"Decompressed data is not the same as file data\n");
445 				ret = EXIT_FAILURE;
446 				goto end;
447 			}
448 		}
449 	}
450 
451 	ctx->ratio = (double) ctx->comp_data_sz /
452 			test_data->input_data_sz * 100;
453 
454 	if (!ctx->silent) {
455 		if (rte_atomic_compare_exchange_strong_explicit(&display_once, &exp, 1,
456 				rte_memory_order_relaxed, rte_memory_order_relaxed)) {
457 			printf("%12s%6s%12s%17s\n",
458 			    "lcore id", "Level", "Comp size", "Comp ratio [%]");
459 		}
460 		printf("%12u%6u%12zu%17.2f\n",
461 		       ctx->mem.lcore_id,
462 		       test_data->level, ctx->comp_data_sz, ctx->ratio);
463 	}
464 
465 end:
466 	return ret;
467 }
468