xref: /spdk/examples/accel/perf/accel_perf.c (revision d4d015a572e1af7b2818e44218c1e661a61545ec)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 #include "spdk/thread.h"
9 #include "spdk/env.h"
10 #include "spdk/event.h"
11 #include "spdk/log.h"
12 #include "spdk/string.h"
13 #include "spdk/accel.h"
14 #include "spdk/crc32.h"
15 #include "spdk/util.h"
16 #include "spdk/xor.h"
17 #include "spdk/dif.h"
18 
19 #define DATA_PATTERN 0x5a
20 #define ALIGN_4K 0x1000
21 #define COMP_BUF_PAD_PERCENTAGE 1.1L
22 
23 static uint64_t	g_tsc_rate;
24 static uint64_t g_tsc_end;
25 static int g_rc;
26 static int g_xfer_size_bytes = 4096;
27 static int g_block_size_bytes = 512;
28 static int g_md_size_bytes = 8;
29 static int g_queue_depth = 32;
30 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
31  * be at least as much as the queue depth.
32  */
33 static int g_allocate_depth = 0;
34 static int g_threads_per_core = 1;
35 static int g_time_in_sec = 5;
36 static uint32_t g_crc32c_seed = 0;
37 static uint32_t g_chained_count = 1;
38 static int g_fail_percent_goal = 0;
39 static uint8_t g_fill_pattern = 255;
40 static uint32_t g_xor_src_count = 2;
41 static bool g_verify = false;
42 static const char *g_workload_type = NULL;
43 static enum spdk_accel_opcode g_workload_selection = SPDK_ACCEL_OPC_LAST;
44 static const char *g_module_name = NULL;
45 static struct worker_thread *g_workers = NULL;
46 static int g_num_workers = 0;
47 static char *g_cd_file_in_name = NULL;
48 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
49 static struct spdk_app_opts g_opts = {};
50 
51 struct ap_compress_seg {
52 	void		*uncompressed_data;
53 	uint32_t	uncompressed_len;
54 	struct iovec	*uncompressed_iovs;
55 	uint32_t	uncompressed_iovcnt;
56 
57 	void		*compressed_data;
58 	uint32_t	compressed_len;
59 	uint32_t	compressed_len_padded;
60 	struct iovec	*compressed_iovs;
61 	uint32_t	compressed_iovcnt;
62 
63 	STAILQ_ENTRY(ap_compress_seg)	link;
64 };
65 
66 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
67 
68 struct worker_thread;
69 static void accel_done(void *ref, int status);
70 
71 struct display_info {
72 	int core;
73 	int thread;
74 };
75 
76 struct ap_task {
77 	void			*src;
78 	struct iovec		*src_iovs;
79 	uint32_t		src_iovcnt;
80 	void			**sources;
81 	struct iovec		*dst_iovs;
82 	uint32_t		dst_iovcnt;
83 	struct iovec		md_iov;
84 	void			*dst;
85 	void			*dst2;
86 	uint32_t		*crc_dst;
87 	uint32_t		compressed_sz;
88 	struct ap_compress_seg *cur_seg;
89 	struct worker_thread	*worker;
90 	int			expected_status; /* used for the compare operation */
91 	uint32_t		num_blocks; /* used for the DIF related operations */
92 	struct spdk_dif_ctx	dif_ctx;
93 	struct spdk_dif_error	dif_err;
94 	TAILQ_ENTRY(ap_task)	link;
95 };
96 
97 struct worker_thread {
98 	struct spdk_io_channel		*ch;
99 	struct spdk_accel_opcode_stats	stats;
100 	uint64_t			xfer_failed;
101 	uint64_t			injected_miscompares;
102 	uint64_t			current_queue_depth;
103 	TAILQ_HEAD(, ap_task)		tasks_pool;
104 	struct worker_thread		*next;
105 	unsigned			core;
106 	struct spdk_thread		*thread;
107 	bool				is_draining;
108 	struct spdk_poller		*is_draining_poller;
109 	struct spdk_poller		*stop_poller;
110 	void				*task_base;
111 	struct display_info		display;
112 	enum spdk_accel_opcode		workload;
113 };
114 
115 static void
116 dump_user_config(void)
117 {
118 	const char *module_name = NULL;
119 	int rc;
120 
121 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
122 	if (rc) {
123 		printf("error getting module name (%d)\n", rc);
124 	}
125 
126 	printf("\nSPDK Configuration:\n");
127 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
128 	printf("Accel Perf Configuration:\n");
129 	printf("Workload Type:  %s\n", g_workload_type);
130 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
131 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
132 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
133 	} else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
134 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
135 	} else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
136 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
137 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
138 		printf("Source buffers: %u\n", g_xor_src_count);
139 	}
140 	if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
141 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
142 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
143 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
144 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
145 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
146 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
147 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
148 	} else {
149 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
150 	}
151 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
152 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
153 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
154 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
155 		printf("Block size:     %u bytes\n", g_block_size_bytes);
156 		printf("Metadata size:  %u bytes\n", g_md_size_bytes);
157 	}
158 	printf("Vector count    %u\n", g_chained_count);
159 	printf("Module:         %s\n", module_name);
160 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
161 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
162 		printf("File Name:      %s\n", g_cd_file_in_name);
163 	}
164 	printf("Queue depth:    %u\n", g_queue_depth);
165 	printf("Allocate depth: %u\n", g_allocate_depth);
166 	printf("# threads/core: %u\n", g_threads_per_core);
167 	printf("Run time:       %u seconds\n", g_time_in_sec);
168 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
169 }
170 
171 static void
172 usage(void)
173 {
174 	printf("accel_perf options:\n");
175 	printf("\t[-h help message]\n");
176 	printf("\t[-q queue depth per core]\n");
177 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
178 	printf("\t[-T number of threads per core\n");
179 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
180 	printf("\t[-t time in seconds]\n");
181 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor,\n");
182 	printf("\t[                                       dif_verify, dif_verify_copy, dif_generate, dif_generate_copy, dix_generate, dix_verify\n");
183 	printf("\t[-M assign module to the operation, not compatible with accel_assign_opc RPC\n");
184 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
185 	printf("\t[-S for crc32c workload, use this seed value (default 0)\n");
186 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
187 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
188 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
189 	printf("\t[-y verify result if this switch is on]\n");
190 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
191 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
192 }
193 
194 static int
195 parse_args(int ch, char *arg)
196 {
197 	int argval = 0;
198 
199 	switch (ch) {
200 	case 'a':
201 	case 'C':
202 	case 'f':
203 	case 'T':
204 	case 'o':
205 	case 'P':
206 	case 'q':
207 	case 'S':
208 	case 't':
209 	case 'x':
210 		argval = spdk_strtol(optarg, 10);
211 		if (argval < 0) {
212 			fprintf(stderr, "-%c option must be non-negative.\n", ch);
213 			usage();
214 			return 1;
215 		}
216 		break;
217 	default:
218 		break;
219 	};
220 
221 	switch (ch) {
222 	case 'a':
223 		g_allocate_depth = argval;
224 		break;
225 	case 'C':
226 		g_chained_count = argval;
227 		break;
228 	case 'l':
229 		g_cd_file_in_name = optarg;
230 		break;
231 	case 'f':
232 		g_fill_pattern = (uint8_t)argval;
233 		break;
234 	case 'T':
235 		g_threads_per_core = argval;
236 		break;
237 	case 'o':
238 		g_xfer_size_bytes = argval;
239 		break;
240 	case 'P':
241 		g_fail_percent_goal = argval;
242 		break;
243 	case 'q':
244 		g_queue_depth = argval;
245 		break;
246 	case 'S':
247 		g_crc32c_seed = argval;
248 		break;
249 	case 't':
250 		g_time_in_sec = argval;
251 		break;
252 	case 'x':
253 		g_xor_src_count = argval;
254 		break;
255 	case 'y':
256 		g_verify = true;
257 		break;
258 	case 'w':
259 		g_workload_type = optarg;
260 		if (!strcmp(g_workload_type, "copy")) {
261 			g_workload_selection = SPDK_ACCEL_OPC_COPY;
262 		} else if (!strcmp(g_workload_type, "fill")) {
263 			g_workload_selection = SPDK_ACCEL_OPC_FILL;
264 		} else if (!strcmp(g_workload_type, "crc32c")) {
265 			g_workload_selection = SPDK_ACCEL_OPC_CRC32C;
266 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
267 			g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C;
268 		} else if (!strcmp(g_workload_type, "compare")) {
269 			g_workload_selection = SPDK_ACCEL_OPC_COMPARE;
270 		} else if (!strcmp(g_workload_type, "dualcast")) {
271 			g_workload_selection = SPDK_ACCEL_OPC_DUALCAST;
272 		} else if (!strcmp(g_workload_type, "compress")) {
273 			g_workload_selection = SPDK_ACCEL_OPC_COMPRESS;
274 		} else if (!strcmp(g_workload_type, "decompress")) {
275 			g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS;
276 		} else if (!strcmp(g_workload_type, "xor")) {
277 			g_workload_selection = SPDK_ACCEL_OPC_XOR;
278 		} else if (!strcmp(g_workload_type, "dif_verify")) {
279 			g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY;
280 		} else if (!strcmp(g_workload_type, "dif_verify_copy")) {
281 			g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY_COPY;
282 		} else if (!strcmp(g_workload_type, "dif_generate")) {
283 			g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE;
284 		} else if (!strcmp(g_workload_type, "dif_generate_copy")) {
285 			g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE_COPY;
286 		} else if (!strcmp(g_workload_type, "dix_verify")) {
287 			g_workload_selection = SPDK_ACCEL_OPC_DIX_VERIFY;
288 		} else if (!strcmp(g_workload_type, "dix_generate")) {
289 			g_workload_selection = SPDK_ACCEL_OPC_DIX_GENERATE;
290 		} else {
291 			fprintf(stderr, "Unsupported workload type: %s\n", optarg);
292 			usage();
293 			return 1;
294 		}
295 		break;
296 	case 'M':
297 		g_module_name = optarg;
298 		break;
299 
300 	default:
301 		usage();
302 		return 1;
303 	}
304 
305 	return 0;
306 }
307 
308 static int dump_result(void);
309 static void
310 unregister_worker(void *arg1)
311 {
312 	struct worker_thread *worker = arg1;
313 
314 	if (worker->ch) {
315 		spdk_accel_get_opcode_stats(worker->ch, worker->workload,
316 					    &worker->stats, sizeof(worker->stats));
317 		spdk_put_io_channel(worker->ch);
318 		worker->ch = NULL;
319 	}
320 	free(worker->task_base);
321 	spdk_thread_exit(spdk_get_thread());
322 	pthread_mutex_lock(&g_workers_lock);
323 	assert(g_num_workers >= 1);
324 	if (--g_num_workers == 0) {
325 		pthread_mutex_unlock(&g_workers_lock);
326 		/* Only dump results on successful runs */
327 		if (g_rc == 0) {
328 			g_rc = dump_result();
329 		}
330 		spdk_app_stop(g_rc);
331 	} else {
332 		pthread_mutex_unlock(&g_workers_lock);
333 	}
334 }
335 
336 static void
337 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
338 {
339 	uint64_t ele_size;
340 	uint8_t *data;
341 	uint32_t i;
342 
343 	ele_size = spdk_divide_round_up(sz, iovcnt);
344 
345 	data = buf;
346 	for (i = 0; i < iovcnt; i++) {
347 		ele_size = spdk_min(ele_size, sz);
348 		assert(ele_size > 0);
349 
350 		iovs[i].iov_base = data;
351 		iovs[i].iov_len = ele_size;
352 
353 		data += ele_size;
354 		sz -= ele_size;
355 	}
356 	assert(sz == 0);
357 }
358 
359 static int
360 _get_task_data_bufs(struct ap_task *task)
361 {
362 	uint32_t align = 0;
363 	uint32_t i = 0;
364 	int src_buff_len = g_xfer_size_bytes;
365 	int dst_buff_len = g_xfer_size_bytes;
366 	int md_buff_len;
367 	struct spdk_dif_ctx_init_ext_opts dif_opts;
368 	uint32_t num_blocks, transfer_size_with_md;
369 	int rc;
370 
371 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
372 	 * we do this for all modules to keep it simple.
373 	 */
374 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) {
375 		align = ALIGN_4K;
376 	}
377 
378 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
379 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
380 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
381 
382 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
383 			dst_buff_len = task->cur_seg->compressed_len_padded;
384 		}
385 
386 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
387 		if (task->dst == NULL) {
388 			fprintf(stderr, "Unable to alloc dst buffer\n");
389 			return -ENOMEM;
390 		}
391 
392 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
393 		if (!task->dst_iovs) {
394 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
395 			return -ENOMEM;
396 		}
397 		task->dst_iovcnt = g_chained_count;
398 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
399 
400 		return 0;
401 	}
402 
403 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
404 		task->dst_iovcnt = g_chained_count;
405 		task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec));
406 		if (!task->dst_iovs) {
407 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
408 			return -ENOMEM;
409 		}
410 
411 		num_blocks = g_xfer_size_bytes / g_block_size_bytes;
412 		/* Add bytes for each block for metadata */
413 		transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes);
414 		task->num_blocks = num_blocks;
415 
416 		for (i = 0; i < task->dst_iovcnt; i++) {
417 			task->dst_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL);
418 			if (task->dst_iovs[i].iov_base == NULL) {
419 				return -ENOMEM;
420 			}
421 			task->dst_iovs[i].iov_len = transfer_size_with_md;
422 		}
423 
424 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
425 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
426 
427 		rc = spdk_dif_ctx_init(&task->dif_ctx,
428 				       g_block_size_bytes + g_md_size_bytes,
429 				       g_md_size_bytes, true, true,
430 				       SPDK_DIF_TYPE1,
431 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
432 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
433 		if (rc != 0) {
434 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
435 			return rc;
436 		}
437 	}
438 
439 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
440 		/* Allocate source buffers */
441 		task->src_iovcnt = g_chained_count;
442 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
443 		if (!task->src_iovs) {
444 			fprintf(stderr, "cannot allocate task->src_iovs for task=%p\n", task);
445 			return -ENOMEM;
446 		}
447 
448 		num_blocks = g_xfer_size_bytes / g_block_size_bytes;
449 		/* Add bytes for each block for metadata */
450 		transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes);
451 		task->num_blocks = num_blocks;
452 
453 		for (i = 0; i < task->src_iovcnt; i++) {
454 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL);
455 			if (task->src_iovs[i].iov_base == NULL) {
456 				return -ENOMEM;
457 			}
458 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, transfer_size_with_md);
459 			task->src_iovs[i].iov_len = transfer_size_with_md;
460 		}
461 
462 		/* Allocate destination buffers */
463 		task->dst_iovcnt = g_chained_count;
464 		task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec));
465 		if (!task->dst_iovs) {
466 			fprintf(stderr, "cannot allocated task->dst_iovs fot task=%p\n", task);
467 			return -ENOMEM;
468 		}
469 
470 		for (i = 0; i < task->dst_iovcnt; i++) {
471 			task->dst_iovs[i].iov_base = spdk_dma_zmalloc(dst_buff_len, 0, NULL);
472 			if (task->dst_iovs[i].iov_base == NULL) {
473 				return -ENOMEM;
474 			}
475 			task->dst_iovs[i].iov_len = dst_buff_len;
476 		}
477 
478 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
479 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
480 
481 		/* Init DIF ctx */
482 		rc = spdk_dif_ctx_init(&task->dif_ctx,
483 				       g_block_size_bytes + g_md_size_bytes,
484 				       g_md_size_bytes, true, true,
485 				       SPDK_DIF_TYPE1,
486 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
487 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
488 		if (rc != 0) {
489 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
490 			return rc;
491 		}
492 
493 		rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx);
494 		if (rc != 0) {
495 			fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc);
496 			return rc;
497 		}
498 	}
499 
500 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
501 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
502 		task->crc_dst = spdk_dma_zmalloc(sizeof(*task->crc_dst), 0, NULL);
503 	}
504 
505 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
506 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
507 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
508 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
509 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY ||
510 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
511 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE) {
512 		assert(g_chained_count > 0);
513 		task->src_iovcnt = g_chained_count;
514 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
515 		if (!task->src_iovs) {
516 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
517 			return -ENOMEM;
518 		}
519 
520 		if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
521 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
522 		}
523 
524 		if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
525 		    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) {
526 			src_buff_len += (g_xfer_size_bytes / g_block_size_bytes) * g_md_size_bytes;
527 		}
528 
529 		for (i = 0; i < task->src_iovcnt; i++) {
530 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(src_buff_len, 0, NULL);
531 			if (task->src_iovs[i].iov_base == NULL) {
532 				return -ENOMEM;
533 			}
534 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, src_buff_len);
535 			task->src_iovs[i].iov_len = src_buff_len;
536 		}
537 		if (g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
538 		    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
539 			md_buff_len = (g_xfer_size_bytes / g_block_size_bytes) * g_md_size_bytes *
540 				      g_chained_count;
541 			task->md_iov.iov_base = spdk_dma_zmalloc(md_buff_len, 0, NULL);
542 			if (task->md_iov.iov_base == NULL) {
543 				return -ENOMEM;
544 			}
545 			task->md_iov.iov_len = md_buff_len;
546 		}
547 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
548 		assert(g_xor_src_count > 1);
549 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
550 		if (!task->sources) {
551 			return -ENOMEM;
552 		}
553 
554 		for (i = 0; i < g_xor_src_count; i++) {
555 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
556 			if (!task->sources[i]) {
557 				return -ENOMEM;
558 			}
559 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
560 		}
561 	} else {
562 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
563 		if (task->src == NULL) {
564 			fprintf(stderr, "Unable to alloc src buffer\n");
565 			return -ENOMEM;
566 		}
567 
568 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
569 		if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
570 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
571 		} else {
572 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
573 		}
574 	}
575 
576 	if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C &&
577 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY &&
578 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE &&
579 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE_COPY &&
580 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY_COPY &&
581 	    g_workload_selection != SPDK_ACCEL_OPC_DIX_VERIFY &&
582 	    g_workload_selection != SPDK_ACCEL_OPC_DIX_GENERATE) {
583 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
584 		if (task->dst == NULL) {
585 			fprintf(stderr, "Unable to alloc dst buffer\n");
586 			return -ENOMEM;
587 		}
588 
589 		/* For compare we want the buffers to match, otherwise not. */
590 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) {
591 			memset(task->dst, DATA_PATTERN, dst_buff_len);
592 		} else {
593 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
594 		}
595 	}
596 
597 	/* For dualcast 2 buffers are needed for the operation.  */
598 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST ||
599 	    (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) {
600 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
601 		if (task->dst2 == NULL) {
602 			fprintf(stderr, "Unable to alloc dst buffer\n");
603 			return -ENOMEM;
604 		}
605 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
606 	}
607 
608 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
609 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
610 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
611 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
612 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
613 
614 		task->num_blocks = (g_xfer_size_bytes * g_chained_count) / g_block_size_bytes;
615 
616 		rc = spdk_dif_ctx_init(&task->dif_ctx,
617 				       g_block_size_bytes + g_md_size_bytes,
618 				       g_md_size_bytes, true, true,
619 				       SPDK_DIF_TYPE1,
620 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
621 				       16, 0xFFFF, 10, 0, 0, &dif_opts);
622 		if (rc != 0) {
623 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
624 			return rc;
625 		}
626 
627 		if ((g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) ||
628 		    (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY)) {
629 			rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx);
630 			if (rc != 0) {
631 				fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc);
632 				return rc;
633 			}
634 		}
635 	}
636 	if (g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
637 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
638 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
639 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
640 
641 		task->num_blocks = (g_xfer_size_bytes * g_chained_count) / g_block_size_bytes;
642 
643 		rc = spdk_dif_ctx_init(&task->dif_ctx,
644 				       g_block_size_bytes,
645 				       g_md_size_bytes, false, true,
646 				       SPDK_DIF_TYPE1,
647 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK |
648 				       SPDK_DIF_FLAGS_REFTAG_CHECK,
649 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
650 		if (rc != 0) {
651 			fprintf(stderr, "Initialization of DIX context failed, error (%d)\n", rc);
652 			return rc;
653 		}
654 		if (g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
655 			rc = spdk_dix_generate(task->src_iovs, task->src_iovcnt, &task->md_iov,
656 					       task->num_blocks, &task->dif_ctx);
657 			if (rc != 0) {
658 				fprintf(stderr, "Generation of DIX failed, error (%d)\n", rc);
659 				return rc;
660 			}
661 		}
662 
663 	}
664 
665 	return 0;
666 }
667 
668 inline static struct ap_task *
669 _get_task(struct worker_thread *worker)
670 {
671 	struct ap_task *task;
672 
673 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
674 		task = TAILQ_FIRST(&worker->tasks_pool);
675 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
676 	} else {
677 		fprintf(stderr, "Unable to get ap_task\n");
678 		return NULL;
679 	}
680 
681 	return task;
682 }
683 
684 /* Submit one operation using the same ap task that just completed. */
685 static void
686 _submit_single(struct worker_thread *worker, struct ap_task *task)
687 {
688 	int random_num;
689 	int rc = 0;
690 
691 	assert(worker);
692 
693 	switch (worker->workload) {
694 	case SPDK_ACCEL_OPC_COPY:
695 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
696 					    g_xfer_size_bytes, accel_done, task);
697 		break;
698 	case SPDK_ACCEL_OPC_FILL:
699 		/* For fill use the first byte of the task->dst buffer */
700 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
701 					    g_xfer_size_bytes, accel_done, task);
702 		break;
703 	case SPDK_ACCEL_OPC_CRC32C:
704 		rc = spdk_accel_submit_crc32cv(worker->ch, task->crc_dst,
705 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
706 					       accel_done, task);
707 		break;
708 	case SPDK_ACCEL_OPC_COPY_CRC32C:
709 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
710 						    task->crc_dst, g_crc32c_seed, accel_done, task);
711 		break;
712 	case SPDK_ACCEL_OPC_COMPARE:
713 		random_num = rand() % 100;
714 		if (random_num < g_fail_percent_goal) {
715 			task->expected_status = -EILSEQ;
716 			*(uint8_t *)task->dst = ~DATA_PATTERN;
717 		} else {
718 			task->expected_status = 0;
719 			*(uint8_t *)task->dst = DATA_PATTERN;
720 		}
721 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
722 					       g_xfer_size_bytes, accel_done, task);
723 		break;
724 	case SPDK_ACCEL_OPC_DUALCAST:
725 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
726 						task->src, g_xfer_size_bytes, accel_done, task);
727 		break;
728 	case SPDK_ACCEL_OPC_COMPRESS:
729 		task->src_iovs = task->cur_seg->uncompressed_iovs;
730 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
731 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
732 						task->src_iovs,
733 						task->src_iovcnt, &task->compressed_sz, accel_done, task);
734 		break;
735 	case SPDK_ACCEL_OPC_DECOMPRESS:
736 		task->src_iovs = task->cur_seg->compressed_iovs;
737 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
738 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
739 						  task->src_iovcnt, NULL, accel_done, task);
740 		break;
741 	case SPDK_ACCEL_OPC_XOR:
742 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
743 					   g_xfer_size_bytes, accel_done, task);
744 		break;
745 	case SPDK_ACCEL_OPC_DIF_VERIFY:
746 		rc = spdk_accel_submit_dif_verify(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks,
747 						  &task->dif_ctx, &task->dif_err, accel_done, task);
748 		break;
749 	case SPDK_ACCEL_OPC_DIF_GENERATE:
750 		rc = spdk_accel_submit_dif_generate(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks,
751 						    &task->dif_ctx, accel_done, task);
752 		break;
753 	case SPDK_ACCEL_OPC_DIF_GENERATE_COPY:
754 		rc = spdk_accel_submit_dif_generate_copy(worker->ch, task->dst_iovs, task->dst_iovcnt,
755 				task->src_iovs, task->src_iovcnt,
756 				task->num_blocks, &task->dif_ctx, accel_done, task);
757 		break;
758 	case SPDK_ACCEL_OPC_DIF_VERIFY_COPY:
759 		rc = spdk_accel_submit_dif_verify_copy(worker->ch, task->dst_iovs, task->dst_iovcnt,
760 						       task->src_iovs, task->src_iovcnt, task->num_blocks,
761 						       &task->dif_ctx, &task->dif_err, accel_done, task);
762 		break;
763 	case SPDK_ACCEL_OPC_DIX_GENERATE:
764 		rc = spdk_accel_submit_dix_generate(worker->ch, task->src_iovs, task->src_iovcnt,
765 						    &task->md_iov, task->num_blocks,
766 						    &task->dif_ctx, accel_done, task);
767 		break;
768 	case SPDK_ACCEL_OPC_DIX_VERIFY:
769 		rc = spdk_accel_submit_dix_verify(worker->ch, task->src_iovs, task->src_iovcnt,
770 						  &task->md_iov, task->num_blocks,
771 						  &task->dif_ctx, &task->dif_err, accel_done, task);
772 		break;
773 	default:
774 		assert(false);
775 		break;
776 
777 	}
778 
779 	worker->current_queue_depth++;
780 	if (rc) {
781 		accel_done(task, rc);
782 	}
783 }
784 
785 static void
786 _free_task_buffers(struct ap_task *task)
787 {
788 	uint32_t i;
789 
790 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS ||
791 	    g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
792 		free(task->dst_iovs);
793 	} else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
794 		   g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
795 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
796 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
797 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY ||
798 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY ||
799 		   g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
800 		   g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE) {
801 		if (task->crc_dst) {
802 			spdk_dma_free(task->crc_dst);
803 		}
804 		if (task->src_iovs) {
805 			for (i = 0; i < task->src_iovcnt; i++) {
806 				if (task->src_iovs[i].iov_base) {
807 					spdk_dma_free(task->src_iovs[i].iov_base);
808 				}
809 			}
810 			free(task->src_iovs);
811 		}
812 		if (task->dst_iovs) {
813 			for (i = 0; i < task->dst_iovcnt; i++) {
814 				if (task->dst_iovs[i].iov_base) {
815 					spdk_dma_free(task->dst_iovs[i].iov_base);
816 				}
817 			}
818 			free(task->dst_iovs);
819 		}
820 		if (task->md_iov.iov_base) {
821 			spdk_dma_free(task->md_iov.iov_base);
822 		}
823 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
824 		if (task->sources) {
825 			for (i = 0; i < g_xor_src_count; i++) {
826 				spdk_dma_free(task->sources[i]);
827 			}
828 			free(task->sources);
829 		}
830 	} else {
831 		spdk_dma_free(task->src);
832 	}
833 
834 	spdk_dma_free(task->dst);
835 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) {
836 		spdk_dma_free(task->dst2);
837 	}
838 }
839 
840 static int
841 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
842 {
843 	uint32_t i;
844 	uint32_t ttl_len = 0;
845 	uint8_t *dst = (uint8_t *)_dst;
846 
847 	for (i = 0; i < iovcnt; i++) {
848 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
849 			return -1;
850 		}
851 		dst += src_src_iovs[i].iov_len;
852 		ttl_len += src_src_iovs[i].iov_len;
853 	}
854 
855 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
856 		return -1;
857 	}
858 
859 	return 0;
860 }
861 
862 static int _worker_stop(void *arg);
863 
864 static void
865 accel_done(void *arg1, int status)
866 {
867 	struct ap_task *task = arg1;
868 	struct worker_thread *worker = task->worker;
869 	uint32_t sw_crc32c;
870 	struct spdk_dif_error err_blk;
871 
872 	assert(worker);
873 	assert(worker->current_queue_depth > 0);
874 
875 	if (g_verify && status == 0) {
876 		switch (worker->workload) {
877 		case SPDK_ACCEL_OPC_COPY_CRC32C:
878 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
879 			if (*task->crc_dst != sw_crc32c) {
880 				SPDK_NOTICELOG("CRC-32C miscompare\n");
881 				worker->xfer_failed++;
882 			}
883 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
884 				SPDK_NOTICELOG("Data miscompare\n");
885 				worker->xfer_failed++;
886 			}
887 			break;
888 		case SPDK_ACCEL_OPC_CRC32C:
889 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
890 			if (*task->crc_dst != sw_crc32c) {
891 				SPDK_NOTICELOG("CRC-32C miscompare\n");
892 				worker->xfer_failed++;
893 			}
894 			break;
895 		case SPDK_ACCEL_OPC_COPY:
896 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
897 				SPDK_NOTICELOG("Data miscompare\n");
898 				worker->xfer_failed++;
899 			}
900 			break;
901 		case SPDK_ACCEL_OPC_DUALCAST:
902 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
903 				SPDK_NOTICELOG("Data miscompare, first destination\n");
904 				worker->xfer_failed++;
905 			}
906 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
907 				SPDK_NOTICELOG("Data miscompare, second destination\n");
908 				worker->xfer_failed++;
909 			}
910 			break;
911 		case SPDK_ACCEL_OPC_FILL:
912 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
913 				SPDK_NOTICELOG("Data miscompare\n");
914 				worker->xfer_failed++;
915 			}
916 			break;
917 		case SPDK_ACCEL_OPC_COMPARE:
918 			break;
919 		case SPDK_ACCEL_OPC_COMPRESS:
920 			break;
921 		case SPDK_ACCEL_OPC_DECOMPRESS:
922 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
923 				SPDK_NOTICELOG("Data miscompare on decompression\n");
924 				worker->xfer_failed++;
925 			}
926 			break;
927 		case SPDK_ACCEL_OPC_XOR:
928 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
929 					 g_xfer_size_bytes) != 0) {
930 				SPDK_ERRLOG("Failed to generate xor for verification\n");
931 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
932 				SPDK_NOTICELOG("Data miscompare\n");
933 				worker->xfer_failed++;
934 			}
935 			break;
936 		case SPDK_ACCEL_OPC_DIF_VERIFY:
937 			break;
938 		case SPDK_ACCEL_OPC_DIF_GENERATE:
939 			if (spdk_dif_verify(task->src_iovs, task->src_iovcnt, task->num_blocks,
940 					    &task->dif_ctx, &err_blk) != 0) {
941 				SPDK_NOTICELOG("Data miscompare, "
942 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
943 					       err_blk.err_type, err_blk.expected,
944 					       err_blk.actual, err_blk.err_offset);
945 				worker->xfer_failed++;
946 			}
947 			break;
948 		case SPDK_ACCEL_OPC_DIF_GENERATE_COPY:
949 			if (spdk_dif_verify(task->dst_iovs, task->dst_iovcnt, task->num_blocks,
950 					    &task->dif_ctx, &err_blk) != 0) {
951 				SPDK_NOTICELOG("Data miscompare, "
952 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
953 					       err_blk.err_type, err_blk.expected,
954 					       err_blk.actual, err_blk.err_offset);
955 				worker->xfer_failed++;
956 			}
957 			break;
958 		case SPDK_ACCEL_OPC_DIF_VERIFY_COPY:
959 			break;
960 		case SPDK_ACCEL_OPC_DIX_GENERATE:
961 			if (spdk_dix_verify(task->src_iovs, task->src_iovcnt, &task->md_iov,
962 					    task->num_blocks, &task->dif_ctx, &err_blk) != 0) {
963 				SPDK_NOTICELOG("Data miscompare, "
964 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
965 					       err_blk.err_type, err_blk.expected,
966 					       err_blk.actual, err_blk.err_offset);
967 				worker->xfer_failed++;
968 			}
969 			break;
970 		case SPDK_ACCEL_OPC_DIX_VERIFY:
971 			break;
972 		default:
973 			assert(false);
974 			break;
975 		}
976 	}
977 
978 	if (worker->workload == SPDK_ACCEL_OPC_COMPRESS ||
979 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
980 		/* Advance the task to the next segment */
981 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
982 		if (task->cur_seg == NULL) {
983 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
984 		}
985 	}
986 
987 	if (task->expected_status == -EILSEQ) {
988 		assert(status != 0);
989 		worker->injected_miscompares++;
990 		status = 0;
991 	} else if (status) {
992 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
993 		worker->xfer_failed++;
994 	}
995 
996 	worker->current_queue_depth--;
997 
998 	if (!worker->is_draining && status == 0) {
999 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
1000 		task = _get_task(worker);
1001 		_submit_single(worker, task);
1002 	} else {
1003 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
1004 	}
1005 }
1006 
1007 static int
1008 dump_result(void)
1009 {
1010 	uint64_t total_completed = 0;
1011 	uint64_t total_failed = 0;
1012 	uint64_t total_miscompared = 0;
1013 	uint64_t total_xfer_per_sec, total_bw_in_MiBps = 0;
1014 	struct worker_thread *worker = g_workers;
1015 	char tmp[64];
1016 
1017 	printf("\n%-12s %20s %16s %16s %16s\n",
1018 	       "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares");
1019 	printf("------------------------------------------------------------------------------------\n");
1020 	while (worker != NULL) {
1021 
1022 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
1023 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
1024 				       (g_time_in_sec * 1024 * 1024);
1025 
1026 		total_completed += worker->stats.executed;
1027 		total_failed += worker->xfer_failed;
1028 		total_miscompared += worker->injected_miscompares;
1029 		total_bw_in_MiBps += bw_in_MiBps;
1030 
1031 		snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread);
1032 		if (xfer_per_sec) {
1033 			printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
1034 			       tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed,
1035 			       worker->injected_miscompares);
1036 		}
1037 
1038 		worker = worker->next;
1039 	}
1040 
1041 	total_xfer_per_sec = total_completed / g_time_in_sec;
1042 
1043 	printf("====================================================================================\n");
1044 	printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
1045 	       "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
1046 
1047 	return total_failed ? 1 : 0;
1048 }
1049 
1050 static inline void
1051 _free_task_buffers_in_pool(struct worker_thread *worker)
1052 {
1053 	struct ap_task *task;
1054 
1055 	assert(worker);
1056 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
1057 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
1058 		_free_task_buffers(task);
1059 	}
1060 }
1061 
1062 static int
1063 _check_draining(void *arg)
1064 {
1065 	struct worker_thread *worker = arg;
1066 
1067 	assert(worker);
1068 
1069 	if (worker->current_queue_depth == 0) {
1070 		_free_task_buffers_in_pool(worker);
1071 		spdk_poller_unregister(&worker->is_draining_poller);
1072 		unregister_worker(worker);
1073 	}
1074 
1075 	return SPDK_POLLER_BUSY;
1076 }
1077 
1078 static int
1079 _worker_stop(void *arg)
1080 {
1081 	struct worker_thread *worker = arg;
1082 
1083 	assert(worker);
1084 
1085 	spdk_poller_unregister(&worker->stop_poller);
1086 
1087 	/* now let the worker drain and check it's outstanding IO with a poller */
1088 	worker->is_draining = true;
1089 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
1090 
1091 	return SPDK_POLLER_BUSY;
1092 }
1093 
1094 static void shutdown_cb(void);
1095 
1096 static void
1097 _init_thread(void *arg1)
1098 {
1099 	struct worker_thread *worker;
1100 	struct ap_task *task;
1101 	int i, num_tasks = g_allocate_depth;
1102 	struct display_info *display = arg1;
1103 
1104 	worker = calloc(1, sizeof(*worker));
1105 	if (worker == NULL) {
1106 		fprintf(stderr, "Unable to allocate worker\n");
1107 		free(display);
1108 		spdk_thread_exit(spdk_get_thread());
1109 		goto no_worker;
1110 	}
1111 
1112 	worker->workload = g_workload_selection;
1113 	worker->display.core = display->core;
1114 	worker->display.thread = display->thread;
1115 	free(display);
1116 	worker->core = spdk_env_get_current_core();
1117 	worker->thread = spdk_get_thread();
1118 	pthread_mutex_lock(&g_workers_lock);
1119 	g_num_workers++;
1120 	worker->next = g_workers;
1121 	g_workers = worker;
1122 	pthread_mutex_unlock(&g_workers_lock);
1123 	worker->ch = spdk_accel_get_io_channel();
1124 	if (worker->ch == NULL) {
1125 		fprintf(stderr, "Unable to get an accel channel\n");
1126 		goto error;
1127 	}
1128 
1129 	TAILQ_INIT(&worker->tasks_pool);
1130 
1131 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
1132 	if (worker->task_base == NULL) {
1133 		fprintf(stderr, "Could not allocate task base.\n");
1134 		goto error;
1135 	}
1136 
1137 	task = worker->task_base;
1138 	for (i = 0; i < num_tasks; i++) {
1139 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
1140 		task->worker = worker;
1141 		if (_get_task_data_bufs(task)) {
1142 			fprintf(stderr, "Unable to get data bufs\n");
1143 			goto error;
1144 		}
1145 		task++;
1146 	}
1147 
1148 	/* Register a poller that will stop the worker at time elapsed */
1149 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
1150 			      g_time_in_sec * 1000000ULL);
1151 
1152 	/* Load up queue depth worth of operations. */
1153 	for (i = 0; i < g_queue_depth; i++) {
1154 		task = _get_task(worker);
1155 		if (task == NULL) {
1156 			goto error;
1157 		}
1158 
1159 		_submit_single(worker, task);
1160 	}
1161 	return;
1162 error:
1163 
1164 	_free_task_buffers_in_pool(worker);
1165 	free(worker->task_base);
1166 	worker->task_base = NULL;
1167 no_worker:
1168 	shutdown_cb();
1169 	g_rc = -1;
1170 }
1171 
1172 static void
1173 accel_perf_start(void *arg1)
1174 {
1175 	struct spdk_cpuset tmp_cpumask = {};
1176 	char thread_name[32];
1177 	uint32_t i;
1178 	int j;
1179 	struct spdk_thread *thread;
1180 	struct display_info *display;
1181 
1182 	g_tsc_rate = spdk_get_ticks_hz();
1183 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
1184 
1185 	dump_user_config();
1186 
1187 	printf("Running for %d seconds...\n", g_time_in_sec);
1188 	fflush(stdout);
1189 
1190 	/* Create worker threads for each core that was specified. */
1191 	SPDK_ENV_FOREACH_CORE(i) {
1192 		for (j = 0; j < g_threads_per_core; j++) {
1193 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
1194 			spdk_cpuset_zero(&tmp_cpumask);
1195 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
1196 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
1197 			display = calloc(1, sizeof(*display));
1198 			if (display == NULL) {
1199 				fprintf(stderr, "Unable to allocate memory\n");
1200 				spdk_app_stop(-1);
1201 				return;
1202 			}
1203 			display->core = i;
1204 			display->thread = j;
1205 			spdk_thread_send_msg(thread, _init_thread, display);
1206 		}
1207 	}
1208 }
1209 
1210 static void
1211 accel_perf_free_compress_segs(void)
1212 {
1213 	struct ap_compress_seg *seg, *tmp;
1214 
1215 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
1216 		free(seg->uncompressed_iovs);
1217 		free(seg->compressed_iovs);
1218 		spdk_dma_free(seg->compressed_data);
1219 		spdk_dma_free(seg->uncompressed_data);
1220 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
1221 		free(seg);
1222 	}
1223 }
1224 
1225 struct accel_perf_prep_ctx {
1226 	FILE			*file;
1227 	long			remaining;
1228 	struct spdk_io_channel	*ch;
1229 	struct ap_compress_seg	*cur_seg;
1230 };
1231 
1232 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
1233 
1234 static void
1235 accel_perf_prep_process_seg_cpl(void *ref, int status)
1236 {
1237 	struct accel_perf_prep_ctx *ctx = ref;
1238 	struct ap_compress_seg *seg;
1239 
1240 	if (status != 0) {
1241 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
1242 		spdk_dma_free(ctx->cur_seg->compressed_data);
1243 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
1244 		free(ctx->cur_seg);
1245 		spdk_put_io_channel(ctx->ch);
1246 		fclose(ctx->file);
1247 		free(ctx);
1248 		spdk_app_stop(-status);
1249 		return;
1250 	}
1251 
1252 	seg = ctx->cur_seg;
1253 
1254 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
1255 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1256 		if (seg->compressed_iovs == NULL) {
1257 			fprintf(stderr, "unable to allocate iovec\n");
1258 			spdk_dma_free(seg->compressed_data);
1259 			spdk_dma_free(seg->uncompressed_data);
1260 			free(seg);
1261 			spdk_put_io_channel(ctx->ch);
1262 			fclose(ctx->file);
1263 			free(ctx);
1264 			spdk_app_stop(-ENOMEM);
1265 			return;
1266 		}
1267 		seg->compressed_iovcnt = g_chained_count;
1268 
1269 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
1270 					  seg->compressed_iovcnt);
1271 	}
1272 
1273 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
1274 	ctx->remaining -= seg->uncompressed_len;
1275 
1276 	accel_perf_prep_process_seg(ctx);
1277 }
1278 
1279 static void
1280 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
1281 {
1282 	struct ap_compress_seg *seg;
1283 	int sz, sz_read, sz_padded;
1284 	void *ubuf, *cbuf;
1285 	struct iovec iov[1];
1286 	int rc;
1287 
1288 	if (ctx->remaining == 0) {
1289 		spdk_put_io_channel(ctx->ch);
1290 		fclose(ctx->file);
1291 		free(ctx);
1292 		accel_perf_start(NULL);
1293 		return;
1294 	}
1295 
1296 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
1297 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
1298 	 * would likely either deal with the failure of not having a large enough buffer
1299 	 * by submitting another operation with a larger one.  Or, like the vbdev module
1300 	 * does, just accept the error and use the data uncompressed marking it as such in
1301 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
1302 	 * data, etc.
1303 	 */
1304 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
1305 
1306 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
1307 	if (!ubuf) {
1308 		fprintf(stderr, "unable to allocate uncompress buffer\n");
1309 		rc = -ENOMEM;
1310 		goto error;
1311 	}
1312 
1313 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
1314 	if (!cbuf) {
1315 		fprintf(stderr, "unable to allocate compress buffer\n");
1316 		rc = -ENOMEM;
1317 		spdk_dma_free(ubuf);
1318 		goto error;
1319 	}
1320 
1321 	seg = calloc(1, sizeof(*seg));
1322 	if (!seg) {
1323 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
1324 		spdk_dma_free(ubuf);
1325 		spdk_dma_free(cbuf);
1326 		rc = -ENOMEM;
1327 		goto error;
1328 	}
1329 
1330 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1331 	if (sz_read != sz) {
1332 		fprintf(stderr, "unable to read input file\n");
1333 		free(seg);
1334 		spdk_dma_free(ubuf);
1335 		spdk_dma_free(cbuf);
1336 		rc = -errno;
1337 		goto error;
1338 	}
1339 
1340 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
1341 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1342 		if (seg->uncompressed_iovs == NULL) {
1343 			fprintf(stderr, "unable to allocate iovec\n");
1344 			free(seg);
1345 			spdk_dma_free(ubuf);
1346 			spdk_dma_free(cbuf);
1347 			rc = -ENOMEM;
1348 			goto error;
1349 		}
1350 		seg->uncompressed_iovcnt = g_chained_count;
1351 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1352 	}
1353 
1354 	seg->uncompressed_data = ubuf;
1355 	seg->uncompressed_len = sz;
1356 	seg->compressed_data = cbuf;
1357 	seg->compressed_len = sz;
1358 	seg->compressed_len_padded = sz_padded;
1359 
1360 	ctx->cur_seg = seg;
1361 	iov[0].iov_base = seg->uncompressed_data;
1362 	iov[0].iov_len = seg->uncompressed_len;
1363 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1364 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1365 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1366 	 * but real applications may want to consider a more sophisticated method.
1367 	 */
1368 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1369 					&seg->compressed_len, accel_perf_prep_process_seg_cpl, ctx);
1370 	if (rc < 0) {
1371 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1372 		goto error;
1373 	}
1374 
1375 	return;
1376 
1377 error:
1378 	spdk_put_io_channel(ctx->ch);
1379 	fclose(ctx->file);
1380 	free(ctx);
1381 	spdk_app_stop(rc);
1382 }
1383 
1384 static void
1385 accel_perf_prep(void *arg1)
1386 {
1387 	struct accel_perf_prep_ctx *ctx;
1388 	const char *module_name = NULL;
1389 	int rc = 0;
1390 
1391 	if (g_module_name) {
1392 		rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
1393 		if (rc != 0 || strcmp(g_module_name, module_name) != 0) {
1394 			fprintf(stderr, "Module '%s' was assigned via JSON config or RPC, instead of '%s'\n",
1395 				module_name, g_module_name);
1396 			fprintf(stderr, "-M option is not compatible with accel_assign_opc RPC\n");
1397 			rc = -EINVAL;
1398 			goto error_end;
1399 		}
1400 	}
1401 
1402 	if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS &&
1403 	    g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) {
1404 		accel_perf_start(arg1);
1405 		return;
1406 	}
1407 
1408 	if (g_cd_file_in_name == NULL) {
1409 		fprintf(stdout, "A filename is required.\n");
1410 		rc = -EINVAL;
1411 		goto error_end;
1412 	}
1413 
1414 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) {
1415 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1416 		rc = -ENOTSUP;
1417 		goto error_end;
1418 	}
1419 
1420 	printf("Preparing input file...\n");
1421 
1422 	ctx = calloc(1, sizeof(*ctx));
1423 	if (ctx == NULL) {
1424 		rc = -ENOMEM;
1425 		goto error_end;
1426 	}
1427 
1428 	ctx->file = fopen(g_cd_file_in_name, "r");
1429 	if (ctx->file == NULL) {
1430 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1431 		rc = -errno;
1432 		goto error_ctx;
1433 	}
1434 
1435 	fseek(ctx->file, 0L, SEEK_END);
1436 	ctx->remaining = ftell(ctx->file);
1437 	fseek(ctx->file, 0L, SEEK_SET);
1438 
1439 	ctx->ch = spdk_accel_get_io_channel();
1440 	if (ctx->ch == NULL) {
1441 		rc = -EAGAIN;
1442 		goto error_file;
1443 	}
1444 
1445 	if (g_xfer_size_bytes == 0) {
1446 		/* size of 0 means "file at a time" */
1447 		g_xfer_size_bytes = ctx->remaining;
1448 	}
1449 
1450 	accel_perf_prep_process_seg(ctx);
1451 	return;
1452 
1453 error_file:
1454 	fclose(ctx->file);
1455 error_ctx:
1456 	free(ctx);
1457 error_end:
1458 	spdk_app_stop(rc);
1459 }
1460 
1461 static void
1462 worker_shutdown(void *ctx)
1463 {
1464 	_worker_stop(ctx);
1465 }
1466 
1467 static void
1468 shutdown_cb(void)
1469 {
1470 	struct worker_thread *worker;
1471 
1472 	pthread_mutex_lock(&g_workers_lock);
1473 	if (!g_workers) {
1474 		spdk_app_stop(1);
1475 		goto unlock;
1476 	}
1477 
1478 	worker = g_workers;
1479 	while (worker) {
1480 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1481 		worker = worker->next;
1482 	}
1483 unlock:
1484 	pthread_mutex_unlock(&g_workers_lock);
1485 }
1486 
1487 int
1488 main(int argc, char **argv)
1489 {
1490 	struct worker_thread *worker, *tmp;
1491 	int rc;
1492 
1493 	pthread_mutex_init(&g_workers_lock, NULL);
1494 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1495 	g_opts.name = "accel_perf";
1496 	g_opts.reactor_mask = "0x1";
1497 	g_opts.shutdown_cb = shutdown_cb;
1498 	g_opts.rpc_addr = NULL;
1499 
1500 	rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:M:P:f:T:l:S:x:", NULL,
1501 				 parse_args, usage);
1502 	if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) {
1503 		return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1;
1504 	}
1505 
1506 	if (g_workload_selection == SPDK_ACCEL_OPC_LAST) {
1507 		fprintf(stderr, "Must provide a workload type\n");
1508 		usage();
1509 		return -1;
1510 	}
1511 
1512 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1513 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1514 		usage();
1515 		return -1;
1516 	}
1517 
1518 	if (g_allocate_depth == 0) {
1519 		g_allocate_depth = g_queue_depth;
1520 	}
1521 
1522 	if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
1523 	     g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
1524 	     g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
1525 	     g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
1526 	     g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
1527 	     g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE) &&
1528 	    g_chained_count == 0) {
1529 		usage();
1530 		return -1;
1531 	}
1532 
1533 	if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) {
1534 		usage();
1535 		return -1;
1536 	}
1537 
1538 	if (g_module_name && spdk_accel_assign_opc(g_workload_selection, g_module_name)) {
1539 		fprintf(stderr, "Was not able to assign '%s' module to the workload\n", g_module_name);
1540 		usage();
1541 		return -1;
1542 	}
1543 
1544 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1545 	if (g_rc) {
1546 		SPDK_ERRLOG("ERROR starting application\n");
1547 	}
1548 
1549 	pthread_mutex_destroy(&g_workers_lock);
1550 
1551 	worker = g_workers;
1552 	while (worker) {
1553 		tmp = worker->next;
1554 		free(worker);
1555 		worker = tmp;
1556 	}
1557 	accel_perf_free_compress_segs();
1558 	spdk_app_fini();
1559 	return g_rc;
1560 }
1561