xref: /spdk/examples/accel/perf/accel_perf.c (revision 698b2423d5f98e56c36dcf8484205bb034d0f6f5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 #include "spdk/dif.h"
17 
18 #define DATA_PATTERN 0x5a
19 #define ALIGN_4K 0x1000
20 #define COMP_BUF_PAD_PERCENTAGE 1.1L
21 
22 static uint64_t	g_tsc_rate;
23 static uint64_t g_tsc_end;
24 static int g_rc;
25 static int g_xfer_size_bytes = 4096;
26 static int g_block_size_bytes = 512;
27 static int g_md_size_bytes = 8;
28 static int g_queue_depth = 32;
29 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
30  * be at least as much as the queue depth.
31  */
32 static int g_allocate_depth = 0;
33 static int g_threads_per_core = 1;
34 static int g_time_in_sec = 5;
35 static uint32_t g_crc32c_seed = 0;
36 static uint32_t g_chained_count = 1;
37 static int g_fail_percent_goal = 0;
38 static uint8_t g_fill_pattern = 255;
39 static uint32_t g_xor_src_count = 2;
40 static bool g_verify = false;
41 static const char *g_workload_type = NULL;
42 static enum spdk_accel_opcode g_workload_selection = SPDK_ACCEL_OPC_LAST;
43 static const char *g_module_name = NULL;
44 static struct worker_thread *g_workers = NULL;
45 static int g_num_workers = 0;
46 static char *g_cd_file_in_name = NULL;
47 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
48 static struct spdk_app_opts g_opts = {};
49 
50 struct ap_compress_seg {
51 	void		*uncompressed_data;
52 	uint32_t	uncompressed_len;
53 	struct iovec	*uncompressed_iovs;
54 	uint32_t	uncompressed_iovcnt;
55 
56 	void		*compressed_data;
57 	uint32_t	compressed_len;
58 	uint32_t	compressed_len_padded;
59 	struct iovec	*compressed_iovs;
60 	uint32_t	compressed_iovcnt;
61 
62 	STAILQ_ENTRY(ap_compress_seg)	link;
63 };
64 
65 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
66 
67 struct worker_thread;
68 static void accel_done(void *ref, int status);
69 
70 struct display_info {
71 	int core;
72 	int thread;
73 };
74 
75 struct ap_task {
76 	void			*src;
77 	struct iovec		*src_iovs;
78 	uint32_t		src_iovcnt;
79 	void			**sources;
80 	struct iovec		*dst_iovs;
81 	uint32_t		dst_iovcnt;
82 	struct iovec		md_iov;
83 	void			*dst;
84 	void			*dst2;
85 	uint32_t		*crc_dst;
86 	uint32_t		compressed_sz;
87 	struct ap_compress_seg *cur_seg;
88 	struct worker_thread	*worker;
89 	int			expected_status; /* used for the compare operation */
90 	uint32_t		num_blocks; /* used for the DIF related operations */
91 	struct spdk_dif_ctx	dif_ctx;
92 	struct spdk_dif_error	dif_err;
93 	TAILQ_ENTRY(ap_task)	link;
94 };
95 
96 struct worker_thread {
97 	struct spdk_io_channel		*ch;
98 	struct spdk_accel_opcode_stats	stats;
99 	uint64_t			xfer_failed;
100 	uint64_t			injected_miscompares;
101 	uint64_t			current_queue_depth;
102 	TAILQ_HEAD(, ap_task)		tasks_pool;
103 	struct worker_thread		*next;
104 	unsigned			core;
105 	struct spdk_thread		*thread;
106 	bool				is_draining;
107 	struct spdk_poller		*is_draining_poller;
108 	struct spdk_poller		*stop_poller;
109 	void				*task_base;
110 	struct display_info		display;
111 	enum spdk_accel_opcode		workload;
112 };
113 
114 static void
115 dump_user_config(void)
116 {
117 	const char *module_name = NULL;
118 	int rc;
119 
120 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
121 	if (rc) {
122 		printf("error getting module name (%d)\n", rc);
123 	}
124 
125 	printf("\nSPDK Configuration:\n");
126 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
127 	printf("Accel Perf Configuration:\n");
128 	printf("Workload Type:  %s\n", g_workload_type);
129 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
130 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
131 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
132 	} else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
133 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
134 	} else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
135 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
136 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
137 		printf("Source buffers: %u\n", g_xor_src_count);
138 	}
139 	if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
140 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
141 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
142 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
143 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
144 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
145 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
146 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
147 	} else {
148 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
149 	}
150 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
151 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
152 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
153 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
154 		printf("Block size:     %u bytes\n", g_block_size_bytes);
155 		printf("Metadata size:  %u bytes\n", g_md_size_bytes);
156 	}
157 	printf("Vector count    %u\n", g_chained_count);
158 	printf("Module:         %s\n", module_name);
159 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
160 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
161 		printf("File Name:      %s\n", g_cd_file_in_name);
162 	}
163 	printf("Queue depth:    %u\n", g_queue_depth);
164 	printf("Allocate depth: %u\n", g_allocate_depth);
165 	printf("# threads/core: %u\n", g_threads_per_core);
166 	printf("Run time:       %u seconds\n", g_time_in_sec);
167 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
168 }
169 
170 static void
171 usage(void)
172 {
173 	printf("accel_perf options:\n");
174 	printf("\t[-h help message]\n");
175 	printf("\t[-q queue depth per core]\n");
176 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
177 	printf("\t[-T number of threads per core\n");
178 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
179 	printf("\t[-t time in seconds]\n");
180 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor,\n");
181 	printf("\t[                                       dif_verify, dif_verify_copy, dif_generate, dif_generate_copy, dix_generate, dix_verify\n");
182 	printf("\t[-M assign module to the operation, not compatible with accel_assign_opc RPC\n");
183 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
184 	printf("\t[-S for crc32c workload, use this seed value (default 0)\n");
185 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
186 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
187 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
188 	printf("\t[-y verify result if this switch is on]\n");
189 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
190 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
191 }
192 
193 static int
194 parse_args(int ch, char *arg)
195 {
196 	int argval = 0;
197 
198 	switch (ch) {
199 	case 'a':
200 	case 'C':
201 	case 'f':
202 	case 'T':
203 	case 'o':
204 	case 'P':
205 	case 'q':
206 	case 'S':
207 	case 't':
208 	case 'x':
209 		argval = spdk_strtol(optarg, 10);
210 		if (argval < 0) {
211 			fprintf(stderr, "-%c option must be non-negative.\n", ch);
212 			usage();
213 			return 1;
214 		}
215 		break;
216 	default:
217 		break;
218 	};
219 
220 	switch (ch) {
221 	case 'a':
222 		g_allocate_depth = argval;
223 		break;
224 	case 'C':
225 		g_chained_count = argval;
226 		break;
227 	case 'l':
228 		g_cd_file_in_name = optarg;
229 		break;
230 	case 'f':
231 		g_fill_pattern = (uint8_t)argval;
232 		break;
233 	case 'T':
234 		g_threads_per_core = argval;
235 		break;
236 	case 'o':
237 		g_xfer_size_bytes = argval;
238 		break;
239 	case 'P':
240 		g_fail_percent_goal = argval;
241 		break;
242 	case 'q':
243 		g_queue_depth = argval;
244 		break;
245 	case 'S':
246 		g_crc32c_seed = argval;
247 		break;
248 	case 't':
249 		g_time_in_sec = argval;
250 		break;
251 	case 'x':
252 		g_xor_src_count = argval;
253 		break;
254 	case 'y':
255 		g_verify = true;
256 		break;
257 	case 'w':
258 		g_workload_type = optarg;
259 		if (!strcmp(g_workload_type, "copy")) {
260 			g_workload_selection = SPDK_ACCEL_OPC_COPY;
261 		} else if (!strcmp(g_workload_type, "fill")) {
262 			g_workload_selection = SPDK_ACCEL_OPC_FILL;
263 		} else if (!strcmp(g_workload_type, "crc32c")) {
264 			g_workload_selection = SPDK_ACCEL_OPC_CRC32C;
265 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
266 			g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C;
267 		} else if (!strcmp(g_workload_type, "compare")) {
268 			g_workload_selection = SPDK_ACCEL_OPC_COMPARE;
269 		} else if (!strcmp(g_workload_type, "dualcast")) {
270 			g_workload_selection = SPDK_ACCEL_OPC_DUALCAST;
271 		} else if (!strcmp(g_workload_type, "compress")) {
272 			g_workload_selection = SPDK_ACCEL_OPC_COMPRESS;
273 		} else if (!strcmp(g_workload_type, "decompress")) {
274 			g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS;
275 		} else if (!strcmp(g_workload_type, "xor")) {
276 			g_workload_selection = SPDK_ACCEL_OPC_XOR;
277 		} else if (!strcmp(g_workload_type, "dif_verify")) {
278 			g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY;
279 		} else if (!strcmp(g_workload_type, "dif_verify_copy")) {
280 			g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY_COPY;
281 		} else if (!strcmp(g_workload_type, "dif_generate")) {
282 			g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE;
283 		} else if (!strcmp(g_workload_type, "dif_generate_copy")) {
284 			g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE_COPY;
285 		} else if (!strcmp(g_workload_type, "dix_verify")) {
286 			g_workload_selection = SPDK_ACCEL_OPC_DIX_VERIFY;
287 		} else if (!strcmp(g_workload_type, "dix_generate")) {
288 			g_workload_selection = SPDK_ACCEL_OPC_DIX_GENERATE;
289 		} else {
290 			fprintf(stderr, "Unsupported workload type: %s\n", optarg);
291 			usage();
292 			return 1;
293 		}
294 		break;
295 	case 'M':
296 		g_module_name = optarg;
297 		break;
298 
299 	default:
300 		usage();
301 		return 1;
302 	}
303 
304 	return 0;
305 }
306 
307 static int dump_result(void);
308 static void
309 unregister_worker(void *arg1)
310 {
311 	struct worker_thread *worker = arg1;
312 
313 	if (worker->ch) {
314 		spdk_accel_get_opcode_stats(worker->ch, worker->workload,
315 					    &worker->stats, sizeof(worker->stats));
316 		spdk_put_io_channel(worker->ch);
317 		worker->ch = NULL;
318 	}
319 	free(worker->task_base);
320 	spdk_thread_exit(spdk_get_thread());
321 	pthread_mutex_lock(&g_workers_lock);
322 	assert(g_num_workers >= 1);
323 	if (--g_num_workers == 0) {
324 		pthread_mutex_unlock(&g_workers_lock);
325 		/* Only dump results on successful runs */
326 		if (g_rc == 0) {
327 			g_rc = dump_result();
328 		}
329 		spdk_app_stop(g_rc);
330 	} else {
331 		pthread_mutex_unlock(&g_workers_lock);
332 	}
333 }
334 
335 static void
336 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
337 {
338 	uint64_t ele_size;
339 	uint8_t *data;
340 	uint32_t i;
341 
342 	ele_size = spdk_divide_round_up(sz, iovcnt);
343 
344 	data = buf;
345 	for (i = 0; i < iovcnt; i++) {
346 		ele_size = spdk_min(ele_size, sz);
347 		assert(ele_size > 0);
348 
349 		iovs[i].iov_base = data;
350 		iovs[i].iov_len = ele_size;
351 
352 		data += ele_size;
353 		sz -= ele_size;
354 	}
355 	assert(sz == 0);
356 }
357 
358 static int
359 _get_task_data_bufs(struct ap_task *task)
360 {
361 	uint32_t align = 0;
362 	uint32_t i = 0;
363 	int src_buff_len = g_xfer_size_bytes;
364 	int dst_buff_len = g_xfer_size_bytes;
365 	int md_buff_len;
366 	struct spdk_dif_ctx_init_ext_opts dif_opts;
367 	uint32_t num_blocks, transfer_size_with_md;
368 	int rc;
369 
370 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
371 	 * we do this for all modules to keep it simple.
372 	 */
373 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) {
374 		align = ALIGN_4K;
375 	}
376 
377 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
378 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
379 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
380 
381 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
382 			dst_buff_len = task->cur_seg->compressed_len_padded;
383 		}
384 
385 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
386 		if (task->dst == NULL) {
387 			fprintf(stderr, "Unable to alloc dst buffer\n");
388 			return -ENOMEM;
389 		}
390 
391 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
392 		if (!task->dst_iovs) {
393 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
394 			return -ENOMEM;
395 		}
396 		task->dst_iovcnt = g_chained_count;
397 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
398 
399 		return 0;
400 	}
401 
402 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
403 		task->dst_iovcnt = g_chained_count;
404 		task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec));
405 		if (!task->dst_iovs) {
406 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
407 			return -ENOMEM;
408 		}
409 
410 		num_blocks = g_xfer_size_bytes / g_block_size_bytes;
411 		/* Add bytes for each block for metadata */
412 		transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes);
413 		task->num_blocks = num_blocks;
414 
415 		for (i = 0; i < task->dst_iovcnt; i++) {
416 			task->dst_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL);
417 			if (task->dst_iovs[i].iov_base == NULL) {
418 				return -ENOMEM;
419 			}
420 			task->dst_iovs[i].iov_len = transfer_size_with_md;
421 		}
422 
423 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
424 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
425 
426 		rc = spdk_dif_ctx_init(&task->dif_ctx,
427 				       g_block_size_bytes + g_md_size_bytes,
428 				       g_md_size_bytes, true, true,
429 				       SPDK_DIF_TYPE1,
430 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
431 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
432 		if (rc != 0) {
433 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
434 			return rc;
435 		}
436 	}
437 
438 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
439 		/* Allocate source buffers */
440 		task->src_iovcnt = g_chained_count;
441 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
442 		if (!task->src_iovs) {
443 			fprintf(stderr, "cannot allocate task->src_iovs for task=%p\n", task);
444 			return -ENOMEM;
445 		}
446 
447 		num_blocks = g_xfer_size_bytes / g_block_size_bytes;
448 		/* Add bytes for each block for metadata */
449 		transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes);
450 		task->num_blocks = num_blocks;
451 
452 		for (i = 0; i < task->src_iovcnt; i++) {
453 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL);
454 			if (task->src_iovs[i].iov_base == NULL) {
455 				return -ENOMEM;
456 			}
457 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, transfer_size_with_md);
458 			task->src_iovs[i].iov_len = transfer_size_with_md;
459 		}
460 
461 		/* Allocate destination buffers */
462 		task->dst_iovcnt = g_chained_count;
463 		task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec));
464 		if (!task->dst_iovs) {
465 			fprintf(stderr, "cannot allocated task->dst_iovs fot task=%p\n", task);
466 			return -ENOMEM;
467 		}
468 
469 		for (i = 0; i < task->dst_iovcnt; i++) {
470 			task->dst_iovs[i].iov_base = spdk_dma_zmalloc(dst_buff_len, 0, NULL);
471 			if (task->dst_iovs[i].iov_base == NULL) {
472 				return -ENOMEM;
473 			}
474 			task->dst_iovs[i].iov_len = dst_buff_len;
475 		}
476 
477 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
478 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
479 
480 		/* Init DIF ctx */
481 		rc = spdk_dif_ctx_init(&task->dif_ctx,
482 				       g_block_size_bytes + g_md_size_bytes,
483 				       g_md_size_bytes, true, true,
484 				       SPDK_DIF_TYPE1,
485 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
486 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
487 		if (rc != 0) {
488 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
489 			return rc;
490 		}
491 
492 		rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx);
493 		if (rc != 0) {
494 			fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc);
495 			return rc;
496 		}
497 	}
498 
499 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
500 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
501 		task->crc_dst = spdk_dma_zmalloc(sizeof(*task->crc_dst), 0, NULL);
502 	}
503 
504 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
505 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
506 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
507 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
508 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY ||
509 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
510 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE) {
511 		assert(g_chained_count > 0);
512 		task->src_iovcnt = g_chained_count;
513 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
514 		if (!task->src_iovs) {
515 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
516 			return -ENOMEM;
517 		}
518 
519 		if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
520 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
521 		}
522 
523 		if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
524 		    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) {
525 			src_buff_len += (g_xfer_size_bytes / g_block_size_bytes) * g_md_size_bytes;
526 		}
527 
528 		for (i = 0; i < task->src_iovcnt; i++) {
529 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(src_buff_len, 0, NULL);
530 			if (task->src_iovs[i].iov_base == NULL) {
531 				return -ENOMEM;
532 			}
533 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, src_buff_len);
534 			task->src_iovs[i].iov_len = src_buff_len;
535 		}
536 		if (g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
537 		    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
538 			md_buff_len = (g_xfer_size_bytes / g_block_size_bytes) * g_md_size_bytes *
539 				      g_chained_count;
540 			task->md_iov.iov_base = spdk_dma_zmalloc(md_buff_len, 0, NULL);
541 			if (task->md_iov.iov_base == NULL) {
542 				return -ENOMEM;
543 			}
544 			task->md_iov.iov_len = md_buff_len;
545 		}
546 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
547 		assert(g_xor_src_count > 1);
548 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
549 		if (!task->sources) {
550 			return -ENOMEM;
551 		}
552 
553 		for (i = 0; i < g_xor_src_count; i++) {
554 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
555 			if (!task->sources[i]) {
556 				return -ENOMEM;
557 			}
558 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
559 		}
560 	} else {
561 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
562 		if (task->src == NULL) {
563 			fprintf(stderr, "Unable to alloc src buffer\n");
564 			return -ENOMEM;
565 		}
566 
567 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
568 		if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
569 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
570 		} else {
571 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
572 		}
573 	}
574 
575 	if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C &&
576 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY &&
577 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE &&
578 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE_COPY &&
579 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY_COPY &&
580 	    g_workload_selection != SPDK_ACCEL_OPC_DIX_VERIFY &&
581 	    g_workload_selection != SPDK_ACCEL_OPC_DIX_GENERATE) {
582 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
583 		if (task->dst == NULL) {
584 			fprintf(stderr, "Unable to alloc dst buffer\n");
585 			return -ENOMEM;
586 		}
587 
588 		/* For compare we want the buffers to match, otherwise not. */
589 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) {
590 			memset(task->dst, DATA_PATTERN, dst_buff_len);
591 		} else {
592 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
593 		}
594 	}
595 
596 	/* For dualcast 2 buffers are needed for the operation.  */
597 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST ||
598 	    (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) {
599 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
600 		if (task->dst2 == NULL) {
601 			fprintf(stderr, "Unable to alloc dst buffer\n");
602 			return -ENOMEM;
603 		}
604 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
605 	}
606 
607 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
608 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
609 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
610 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
611 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
612 
613 		task->num_blocks = (g_xfer_size_bytes * g_chained_count) / g_block_size_bytes;
614 
615 		rc = spdk_dif_ctx_init(&task->dif_ctx,
616 				       g_block_size_bytes + g_md_size_bytes,
617 				       g_md_size_bytes, true, true,
618 				       SPDK_DIF_TYPE1,
619 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
620 				       16, 0xFFFF, 10, 0, 0, &dif_opts);
621 		if (rc != 0) {
622 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
623 			return rc;
624 		}
625 
626 		if ((g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) ||
627 		    (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY)) {
628 			rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx);
629 			if (rc != 0) {
630 				fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc);
631 				return rc;
632 			}
633 		}
634 	}
635 	if (g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE ||
636 	    g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
637 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
638 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
639 
640 		task->num_blocks = (g_xfer_size_bytes * g_chained_count) / g_block_size_bytes;
641 
642 		rc = spdk_dif_ctx_init(&task->dif_ctx,
643 				       g_block_size_bytes,
644 				       g_md_size_bytes, false, true,
645 				       SPDK_DIF_TYPE1,
646 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK |
647 				       SPDK_DIF_FLAGS_REFTAG_CHECK,
648 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
649 		if (rc != 0) {
650 			fprintf(stderr, "Initialization of DIX context failed, error (%d)\n", rc);
651 			return rc;
652 		}
653 		if (g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY) {
654 			rc = spdk_dix_generate(task->src_iovs, task->src_iovcnt, &task->md_iov,
655 					       task->num_blocks, &task->dif_ctx);
656 			if (rc != 0) {
657 				fprintf(stderr, "Generation of DIX failed, error (%d)\n", rc);
658 				return rc;
659 			}
660 		}
661 
662 	}
663 
664 	return 0;
665 }
666 
667 inline static struct ap_task *
668 _get_task(struct worker_thread *worker)
669 {
670 	struct ap_task *task;
671 
672 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
673 		task = TAILQ_FIRST(&worker->tasks_pool);
674 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
675 	} else {
676 		fprintf(stderr, "Unable to get ap_task\n");
677 		return NULL;
678 	}
679 
680 	return task;
681 }
682 
683 /* Submit one operation using the same ap task that just completed. */
684 static void
685 _submit_single(struct worker_thread *worker, struct ap_task *task)
686 {
687 	int random_num;
688 	int rc = 0;
689 
690 	assert(worker);
691 
692 	switch (worker->workload) {
693 	case SPDK_ACCEL_OPC_COPY:
694 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
695 					    g_xfer_size_bytes, accel_done, task);
696 		break;
697 	case SPDK_ACCEL_OPC_FILL:
698 		/* For fill use the first byte of the task->dst buffer */
699 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
700 					    g_xfer_size_bytes, accel_done, task);
701 		break;
702 	case SPDK_ACCEL_OPC_CRC32C:
703 		rc = spdk_accel_submit_crc32cv(worker->ch, task->crc_dst,
704 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
705 					       accel_done, task);
706 		break;
707 	case SPDK_ACCEL_OPC_COPY_CRC32C:
708 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
709 						    task->crc_dst, g_crc32c_seed, accel_done, task);
710 		break;
711 	case SPDK_ACCEL_OPC_COMPARE:
712 		random_num = rand() % 100;
713 		if (random_num < g_fail_percent_goal) {
714 			task->expected_status = -EILSEQ;
715 			*(uint8_t *)task->dst = ~DATA_PATTERN;
716 		} else {
717 			task->expected_status = 0;
718 			*(uint8_t *)task->dst = DATA_PATTERN;
719 		}
720 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
721 					       g_xfer_size_bytes, accel_done, task);
722 		break;
723 	case SPDK_ACCEL_OPC_DUALCAST:
724 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
725 						task->src, g_xfer_size_bytes, accel_done, task);
726 		break;
727 	case SPDK_ACCEL_OPC_COMPRESS:
728 		task->src_iovs = task->cur_seg->uncompressed_iovs;
729 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
730 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
731 						task->src_iovs,
732 						task->src_iovcnt, &task->compressed_sz, accel_done, task);
733 		break;
734 	case SPDK_ACCEL_OPC_DECOMPRESS:
735 		task->src_iovs = task->cur_seg->compressed_iovs;
736 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
737 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
738 						  task->src_iovcnt, NULL, accel_done, task);
739 		break;
740 	case SPDK_ACCEL_OPC_XOR:
741 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
742 					   g_xfer_size_bytes, accel_done, task);
743 		break;
744 	case SPDK_ACCEL_OPC_DIF_VERIFY:
745 		rc = spdk_accel_submit_dif_verify(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks,
746 						  &task->dif_ctx, &task->dif_err, accel_done, task);
747 		break;
748 	case SPDK_ACCEL_OPC_DIF_GENERATE:
749 		rc = spdk_accel_submit_dif_generate(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks,
750 						    &task->dif_ctx, accel_done, task);
751 		break;
752 	case SPDK_ACCEL_OPC_DIF_GENERATE_COPY:
753 		rc = spdk_accel_submit_dif_generate_copy(worker->ch, task->dst_iovs, task->dst_iovcnt,
754 				task->src_iovs, task->src_iovcnt,
755 				task->num_blocks, &task->dif_ctx, accel_done, task);
756 		break;
757 	case SPDK_ACCEL_OPC_DIF_VERIFY_COPY:
758 		rc = spdk_accel_submit_dif_verify_copy(worker->ch, task->dst_iovs, task->dst_iovcnt,
759 						       task->src_iovs, task->src_iovcnt, task->num_blocks,
760 						       &task->dif_ctx, &task->dif_err, accel_done, task);
761 		break;
762 	case SPDK_ACCEL_OPC_DIX_GENERATE:
763 		rc = spdk_accel_submit_dix_generate(worker->ch, task->src_iovs, task->src_iovcnt,
764 						    &task->md_iov, task->num_blocks,
765 						    &task->dif_ctx, accel_done, task);
766 		break;
767 	case SPDK_ACCEL_OPC_DIX_VERIFY:
768 		rc = spdk_accel_submit_dix_verify(worker->ch, task->src_iovs, task->src_iovcnt,
769 						  &task->md_iov, task->num_blocks,
770 						  &task->dif_ctx, &task->dif_err, accel_done, task);
771 		break;
772 	default:
773 		assert(false);
774 		break;
775 
776 	}
777 
778 	worker->current_queue_depth++;
779 	if (rc) {
780 		accel_done(task, rc);
781 	}
782 }
783 
784 static void
785 _free_task_buffers(struct ap_task *task)
786 {
787 	uint32_t i;
788 
789 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS ||
790 	    g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
791 		free(task->dst_iovs);
792 	} else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
793 		   g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
794 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
795 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
796 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY ||
797 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY ||
798 		   g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
799 		   g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE) {
800 		if (task->crc_dst) {
801 			spdk_dma_free(task->crc_dst);
802 		}
803 		if (task->src_iovs) {
804 			for (i = 0; i < task->src_iovcnt; i++) {
805 				if (task->src_iovs[i].iov_base) {
806 					spdk_dma_free(task->src_iovs[i].iov_base);
807 				}
808 			}
809 			free(task->src_iovs);
810 		}
811 		if (task->dst_iovs) {
812 			for (i = 0; i < task->dst_iovcnt; i++) {
813 				if (task->dst_iovs[i].iov_base) {
814 					spdk_dma_free(task->dst_iovs[i].iov_base);
815 				}
816 			}
817 			free(task->dst_iovs);
818 		}
819 		if (task->md_iov.iov_base) {
820 			spdk_dma_free(task->md_iov.iov_base);
821 		}
822 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
823 		if (task->sources) {
824 			for (i = 0; i < g_xor_src_count; i++) {
825 				spdk_dma_free(task->sources[i]);
826 			}
827 			free(task->sources);
828 		}
829 	} else {
830 		spdk_dma_free(task->src);
831 	}
832 
833 	spdk_dma_free(task->dst);
834 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) {
835 		spdk_dma_free(task->dst2);
836 	}
837 }
838 
839 static int
840 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
841 {
842 	uint32_t i;
843 	uint32_t ttl_len = 0;
844 	uint8_t *dst = (uint8_t *)_dst;
845 
846 	for (i = 0; i < iovcnt; i++) {
847 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
848 			return -1;
849 		}
850 		dst += src_src_iovs[i].iov_len;
851 		ttl_len += src_src_iovs[i].iov_len;
852 	}
853 
854 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
855 		return -1;
856 	}
857 
858 	return 0;
859 }
860 
861 static int _worker_stop(void *arg);
862 
863 static void
864 accel_done(void *arg1, int status)
865 {
866 	struct ap_task *task = arg1;
867 	struct worker_thread *worker = task->worker;
868 	uint32_t sw_crc32c;
869 	struct spdk_dif_error err_blk;
870 
871 	assert(worker);
872 	assert(worker->current_queue_depth > 0);
873 
874 	if (g_verify && status == 0) {
875 		switch (worker->workload) {
876 		case SPDK_ACCEL_OPC_COPY_CRC32C:
877 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
878 			if (*task->crc_dst != sw_crc32c) {
879 				SPDK_NOTICELOG("CRC-32C miscompare\n");
880 				worker->xfer_failed++;
881 			}
882 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
883 				SPDK_NOTICELOG("Data miscompare\n");
884 				worker->xfer_failed++;
885 			}
886 			break;
887 		case SPDK_ACCEL_OPC_CRC32C:
888 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
889 			if (*task->crc_dst != sw_crc32c) {
890 				SPDK_NOTICELOG("CRC-32C miscompare\n");
891 				worker->xfer_failed++;
892 			}
893 			break;
894 		case SPDK_ACCEL_OPC_COPY:
895 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
896 				SPDK_NOTICELOG("Data miscompare\n");
897 				worker->xfer_failed++;
898 			}
899 			break;
900 		case SPDK_ACCEL_OPC_DUALCAST:
901 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
902 				SPDK_NOTICELOG("Data miscompare, first destination\n");
903 				worker->xfer_failed++;
904 			}
905 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
906 				SPDK_NOTICELOG("Data miscompare, second destination\n");
907 				worker->xfer_failed++;
908 			}
909 			break;
910 		case SPDK_ACCEL_OPC_FILL:
911 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
912 				SPDK_NOTICELOG("Data miscompare\n");
913 				worker->xfer_failed++;
914 			}
915 			break;
916 		case SPDK_ACCEL_OPC_COMPARE:
917 			break;
918 		case SPDK_ACCEL_OPC_COMPRESS:
919 			break;
920 		case SPDK_ACCEL_OPC_DECOMPRESS:
921 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
922 				SPDK_NOTICELOG("Data miscompare on decompression\n");
923 				worker->xfer_failed++;
924 			}
925 			break;
926 		case SPDK_ACCEL_OPC_XOR:
927 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
928 					 g_xfer_size_bytes) != 0) {
929 				SPDK_ERRLOG("Failed to generate xor for verification\n");
930 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
931 				SPDK_NOTICELOG("Data miscompare\n");
932 				worker->xfer_failed++;
933 			}
934 			break;
935 		case SPDK_ACCEL_OPC_DIF_VERIFY:
936 			break;
937 		case SPDK_ACCEL_OPC_DIF_GENERATE:
938 			if (spdk_dif_verify(task->src_iovs, task->src_iovcnt, task->num_blocks,
939 					    &task->dif_ctx, &err_blk) != 0) {
940 				SPDK_NOTICELOG("Data miscompare, "
941 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
942 					       err_blk.err_type, err_blk.expected,
943 					       err_blk.actual, err_blk.err_offset);
944 				worker->xfer_failed++;
945 			}
946 			break;
947 		case SPDK_ACCEL_OPC_DIF_GENERATE_COPY:
948 			if (spdk_dif_verify(task->dst_iovs, task->dst_iovcnt, task->num_blocks,
949 					    &task->dif_ctx, &err_blk) != 0) {
950 				SPDK_NOTICELOG("Data miscompare, "
951 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
952 					       err_blk.err_type, err_blk.expected,
953 					       err_blk.actual, err_blk.err_offset);
954 				worker->xfer_failed++;
955 			}
956 			break;
957 		case SPDK_ACCEL_OPC_DIF_VERIFY_COPY:
958 			break;
959 		case SPDK_ACCEL_OPC_DIX_GENERATE:
960 			if (spdk_dix_verify(task->src_iovs, task->src_iovcnt, &task->md_iov,
961 					    task->num_blocks, &task->dif_ctx, &err_blk) != 0) {
962 				SPDK_NOTICELOG("Data miscompare, "
963 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
964 					       err_blk.err_type, err_blk.expected,
965 					       err_blk.actual, err_blk.err_offset);
966 				worker->xfer_failed++;
967 			}
968 			break;
969 		case SPDK_ACCEL_OPC_DIX_VERIFY:
970 			break;
971 		default:
972 			assert(false);
973 			break;
974 		}
975 	}
976 
977 	if (worker->workload == SPDK_ACCEL_OPC_COMPRESS ||
978 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
979 		/* Advance the task to the next segment */
980 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
981 		if (task->cur_seg == NULL) {
982 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
983 		}
984 	}
985 
986 	if (task->expected_status == -EILSEQ) {
987 		assert(status != 0);
988 		worker->injected_miscompares++;
989 		status = 0;
990 	} else if (status) {
991 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
992 		worker->xfer_failed++;
993 	}
994 
995 	worker->current_queue_depth--;
996 
997 	if (!worker->is_draining && status == 0) {
998 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
999 		task = _get_task(worker);
1000 		_submit_single(worker, task);
1001 	} else {
1002 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
1003 	}
1004 }
1005 
1006 static int
1007 dump_result(void)
1008 {
1009 	uint64_t total_completed = 0;
1010 	uint64_t total_failed = 0;
1011 	uint64_t total_miscompared = 0;
1012 	uint64_t total_xfer_per_sec, total_bw_in_MiBps = 0;
1013 	struct worker_thread *worker = g_workers;
1014 	char tmp[64];
1015 
1016 	printf("\n%-12s %20s %16s %16s %16s\n",
1017 	       "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares");
1018 	printf("------------------------------------------------------------------------------------\n");
1019 	while (worker != NULL) {
1020 
1021 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
1022 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
1023 				       (g_time_in_sec * 1024 * 1024);
1024 
1025 		total_completed += worker->stats.executed;
1026 		total_failed += worker->xfer_failed;
1027 		total_miscompared += worker->injected_miscompares;
1028 		total_bw_in_MiBps += bw_in_MiBps;
1029 
1030 		snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread);
1031 		if (xfer_per_sec) {
1032 			printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
1033 			       tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed,
1034 			       worker->injected_miscompares);
1035 		}
1036 
1037 		worker = worker->next;
1038 	}
1039 
1040 	total_xfer_per_sec = total_completed / g_time_in_sec;
1041 
1042 	printf("====================================================================================\n");
1043 	printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
1044 	       "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
1045 
1046 	return total_failed ? 1 : 0;
1047 }
1048 
1049 static inline void
1050 _free_task_buffers_in_pool(struct worker_thread *worker)
1051 {
1052 	struct ap_task *task;
1053 
1054 	assert(worker);
1055 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
1056 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
1057 		_free_task_buffers(task);
1058 	}
1059 }
1060 
1061 static int
1062 _check_draining(void *arg)
1063 {
1064 	struct worker_thread *worker = arg;
1065 
1066 	assert(worker);
1067 
1068 	if (worker->current_queue_depth == 0) {
1069 		_free_task_buffers_in_pool(worker);
1070 		spdk_poller_unregister(&worker->is_draining_poller);
1071 		unregister_worker(worker);
1072 	}
1073 
1074 	return SPDK_POLLER_BUSY;
1075 }
1076 
1077 static int
1078 _worker_stop(void *arg)
1079 {
1080 	struct worker_thread *worker = arg;
1081 
1082 	assert(worker);
1083 
1084 	spdk_poller_unregister(&worker->stop_poller);
1085 
1086 	/* now let the worker drain and check it's outstanding IO with a poller */
1087 	worker->is_draining = true;
1088 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
1089 
1090 	return SPDK_POLLER_BUSY;
1091 }
1092 
1093 static void shutdown_cb(void);
1094 
1095 static void
1096 _init_thread(void *arg1)
1097 {
1098 	struct worker_thread *worker;
1099 	struct ap_task *task;
1100 	int i, num_tasks = g_allocate_depth;
1101 	struct display_info *display = arg1;
1102 
1103 	worker = calloc(1, sizeof(*worker));
1104 	if (worker == NULL) {
1105 		fprintf(stderr, "Unable to allocate worker\n");
1106 		free(display);
1107 		spdk_thread_exit(spdk_get_thread());
1108 		goto no_worker;
1109 	}
1110 
1111 	worker->workload = g_workload_selection;
1112 	worker->display.core = display->core;
1113 	worker->display.thread = display->thread;
1114 	free(display);
1115 	worker->core = spdk_env_get_current_core();
1116 	worker->thread = spdk_get_thread();
1117 	pthread_mutex_lock(&g_workers_lock);
1118 	g_num_workers++;
1119 	worker->next = g_workers;
1120 	g_workers = worker;
1121 	pthread_mutex_unlock(&g_workers_lock);
1122 	worker->ch = spdk_accel_get_io_channel();
1123 	if (worker->ch == NULL) {
1124 		fprintf(stderr, "Unable to get an accel channel\n");
1125 		goto error;
1126 	}
1127 
1128 	TAILQ_INIT(&worker->tasks_pool);
1129 
1130 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
1131 	if (worker->task_base == NULL) {
1132 		fprintf(stderr, "Could not allocate task base.\n");
1133 		goto error;
1134 	}
1135 
1136 	task = worker->task_base;
1137 	for (i = 0; i < num_tasks; i++) {
1138 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
1139 		task->worker = worker;
1140 		if (_get_task_data_bufs(task)) {
1141 			fprintf(stderr, "Unable to get data bufs\n");
1142 			goto error;
1143 		}
1144 		task++;
1145 	}
1146 
1147 	/* Register a poller that will stop the worker at time elapsed */
1148 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
1149 			      g_time_in_sec * 1000000ULL);
1150 
1151 	/* Load up queue depth worth of operations. */
1152 	for (i = 0; i < g_queue_depth; i++) {
1153 		task = _get_task(worker);
1154 		if (task == NULL) {
1155 			goto error;
1156 		}
1157 
1158 		_submit_single(worker, task);
1159 	}
1160 	return;
1161 error:
1162 
1163 	_free_task_buffers_in_pool(worker);
1164 	free(worker->task_base);
1165 no_worker:
1166 	shutdown_cb();
1167 	g_rc = -1;
1168 }
1169 
1170 static void
1171 accel_perf_start(void *arg1)
1172 {
1173 	struct spdk_cpuset tmp_cpumask = {};
1174 	char thread_name[32];
1175 	uint32_t i;
1176 	int j;
1177 	struct spdk_thread *thread;
1178 	struct display_info *display;
1179 
1180 	g_tsc_rate = spdk_get_ticks_hz();
1181 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
1182 
1183 	dump_user_config();
1184 
1185 	printf("Running for %d seconds...\n", g_time_in_sec);
1186 	fflush(stdout);
1187 
1188 	/* Create worker threads for each core that was specified. */
1189 	SPDK_ENV_FOREACH_CORE(i) {
1190 		for (j = 0; j < g_threads_per_core; j++) {
1191 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
1192 			spdk_cpuset_zero(&tmp_cpumask);
1193 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
1194 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
1195 			display = calloc(1, sizeof(*display));
1196 			if (display == NULL) {
1197 				fprintf(stderr, "Unable to allocate memory\n");
1198 				spdk_app_stop(-1);
1199 				return;
1200 			}
1201 			display->core = i;
1202 			display->thread = j;
1203 			spdk_thread_send_msg(thread, _init_thread, display);
1204 		}
1205 	}
1206 }
1207 
1208 static void
1209 accel_perf_free_compress_segs(void)
1210 {
1211 	struct ap_compress_seg *seg, *tmp;
1212 
1213 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
1214 		free(seg->uncompressed_iovs);
1215 		free(seg->compressed_iovs);
1216 		spdk_dma_free(seg->compressed_data);
1217 		spdk_dma_free(seg->uncompressed_data);
1218 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
1219 		free(seg);
1220 	}
1221 }
1222 
1223 struct accel_perf_prep_ctx {
1224 	FILE			*file;
1225 	long			remaining;
1226 	struct spdk_io_channel	*ch;
1227 	struct ap_compress_seg	*cur_seg;
1228 };
1229 
1230 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
1231 
1232 static void
1233 accel_perf_prep_process_seg_cpl(void *ref, int status)
1234 {
1235 	struct accel_perf_prep_ctx *ctx = ref;
1236 	struct ap_compress_seg *seg;
1237 
1238 	if (status != 0) {
1239 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
1240 		spdk_dma_free(ctx->cur_seg->compressed_data);
1241 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
1242 		free(ctx->cur_seg);
1243 		spdk_put_io_channel(ctx->ch);
1244 		fclose(ctx->file);
1245 		free(ctx);
1246 		spdk_app_stop(-status);
1247 		return;
1248 	}
1249 
1250 	seg = ctx->cur_seg;
1251 
1252 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
1253 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1254 		if (seg->compressed_iovs == NULL) {
1255 			fprintf(stderr, "unable to allocate iovec\n");
1256 			spdk_dma_free(seg->compressed_data);
1257 			spdk_dma_free(seg->uncompressed_data);
1258 			free(seg);
1259 			spdk_put_io_channel(ctx->ch);
1260 			fclose(ctx->file);
1261 			free(ctx);
1262 			spdk_app_stop(-ENOMEM);
1263 			return;
1264 		}
1265 		seg->compressed_iovcnt = g_chained_count;
1266 
1267 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
1268 					  seg->compressed_iovcnt);
1269 	}
1270 
1271 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
1272 	ctx->remaining -= seg->uncompressed_len;
1273 
1274 	accel_perf_prep_process_seg(ctx);
1275 }
1276 
1277 static void
1278 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
1279 {
1280 	struct ap_compress_seg *seg;
1281 	int sz, sz_read, sz_padded;
1282 	void *ubuf, *cbuf;
1283 	struct iovec iov[1];
1284 	int rc;
1285 
1286 	if (ctx->remaining == 0) {
1287 		spdk_put_io_channel(ctx->ch);
1288 		fclose(ctx->file);
1289 		free(ctx);
1290 		accel_perf_start(NULL);
1291 		return;
1292 	}
1293 
1294 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
1295 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
1296 	 * would likely either deal with the failure of not having a large enough buffer
1297 	 * by submitting another operation with a larger one.  Or, like the vbdev module
1298 	 * does, just accept the error and use the data uncompressed marking it as such in
1299 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
1300 	 * data, etc.
1301 	 */
1302 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
1303 
1304 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
1305 	if (!ubuf) {
1306 		fprintf(stderr, "unable to allocate uncompress buffer\n");
1307 		rc = -ENOMEM;
1308 		goto error;
1309 	}
1310 
1311 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
1312 	if (!cbuf) {
1313 		fprintf(stderr, "unable to allocate compress buffer\n");
1314 		rc = -ENOMEM;
1315 		spdk_dma_free(ubuf);
1316 		goto error;
1317 	}
1318 
1319 	seg = calloc(1, sizeof(*seg));
1320 	if (!seg) {
1321 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
1322 		spdk_dma_free(ubuf);
1323 		spdk_dma_free(cbuf);
1324 		rc = -ENOMEM;
1325 		goto error;
1326 	}
1327 
1328 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1329 	if (sz_read != sz) {
1330 		fprintf(stderr, "unable to read input file\n");
1331 		free(seg);
1332 		spdk_dma_free(ubuf);
1333 		spdk_dma_free(cbuf);
1334 		rc = -errno;
1335 		goto error;
1336 	}
1337 
1338 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
1339 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1340 		if (seg->uncompressed_iovs == NULL) {
1341 			fprintf(stderr, "unable to allocate iovec\n");
1342 			free(seg);
1343 			spdk_dma_free(ubuf);
1344 			spdk_dma_free(cbuf);
1345 			rc = -ENOMEM;
1346 			goto error;
1347 		}
1348 		seg->uncompressed_iovcnt = g_chained_count;
1349 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1350 	}
1351 
1352 	seg->uncompressed_data = ubuf;
1353 	seg->uncompressed_len = sz;
1354 	seg->compressed_data = cbuf;
1355 	seg->compressed_len = sz;
1356 	seg->compressed_len_padded = sz_padded;
1357 
1358 	ctx->cur_seg = seg;
1359 	iov[0].iov_base = seg->uncompressed_data;
1360 	iov[0].iov_len = seg->uncompressed_len;
1361 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1362 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1363 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1364 	 * but real applications may want to consider a more sophisticated method.
1365 	 */
1366 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1367 					&seg->compressed_len, accel_perf_prep_process_seg_cpl, ctx);
1368 	if (rc < 0) {
1369 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1370 		goto error;
1371 	}
1372 
1373 	return;
1374 
1375 error:
1376 	spdk_put_io_channel(ctx->ch);
1377 	fclose(ctx->file);
1378 	free(ctx);
1379 	spdk_app_stop(rc);
1380 }
1381 
1382 static void
1383 accel_perf_prep(void *arg1)
1384 {
1385 	struct accel_perf_prep_ctx *ctx;
1386 	const char *module_name = NULL;
1387 	int rc = 0;
1388 
1389 	if (g_module_name) {
1390 		rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
1391 		if (rc != 0 || strcmp(g_module_name, module_name) != 0) {
1392 			fprintf(stderr, "Module '%s' was assigned via JSON config or RPC, instead of '%s'\n",
1393 				module_name, g_module_name);
1394 			fprintf(stderr, "-M option is not compatible with accel_assign_opc RPC\n");
1395 			rc = -EINVAL;
1396 			goto error_end;
1397 		}
1398 	}
1399 
1400 	if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS &&
1401 	    g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) {
1402 		accel_perf_start(arg1);
1403 		return;
1404 	}
1405 
1406 	if (g_cd_file_in_name == NULL) {
1407 		fprintf(stdout, "A filename is required.\n");
1408 		rc = -EINVAL;
1409 		goto error_end;
1410 	}
1411 
1412 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) {
1413 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1414 		rc = -ENOTSUP;
1415 		goto error_end;
1416 	}
1417 
1418 	printf("Preparing input file...\n");
1419 
1420 	ctx = calloc(1, sizeof(*ctx));
1421 	if (ctx == NULL) {
1422 		rc = -ENOMEM;
1423 		goto error_end;
1424 	}
1425 
1426 	ctx->file = fopen(g_cd_file_in_name, "r");
1427 	if (ctx->file == NULL) {
1428 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1429 		rc = -errno;
1430 		goto error_ctx;
1431 	}
1432 
1433 	fseek(ctx->file, 0L, SEEK_END);
1434 	ctx->remaining = ftell(ctx->file);
1435 	fseek(ctx->file, 0L, SEEK_SET);
1436 
1437 	ctx->ch = spdk_accel_get_io_channel();
1438 	if (ctx->ch == NULL) {
1439 		rc = -EAGAIN;
1440 		goto error_file;
1441 	}
1442 
1443 	if (g_xfer_size_bytes == 0) {
1444 		/* size of 0 means "file at a time" */
1445 		g_xfer_size_bytes = ctx->remaining;
1446 	}
1447 
1448 	accel_perf_prep_process_seg(ctx);
1449 	return;
1450 
1451 error_file:
1452 	fclose(ctx->file);
1453 error_ctx:
1454 	free(ctx);
1455 error_end:
1456 	spdk_app_stop(rc);
1457 }
1458 
1459 static void
1460 worker_shutdown(void *ctx)
1461 {
1462 	_worker_stop(ctx);
1463 }
1464 
1465 static void
1466 shutdown_cb(void)
1467 {
1468 	struct worker_thread *worker;
1469 
1470 	pthread_mutex_lock(&g_workers_lock);
1471 	if (!g_workers) {
1472 		spdk_app_stop(1);
1473 		goto unlock;
1474 	}
1475 
1476 	worker = g_workers;
1477 	while (worker) {
1478 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1479 		worker = worker->next;
1480 	}
1481 unlock:
1482 	pthread_mutex_unlock(&g_workers_lock);
1483 }
1484 
1485 int
1486 main(int argc, char **argv)
1487 {
1488 	struct worker_thread *worker, *tmp;
1489 	int rc;
1490 
1491 	pthread_mutex_init(&g_workers_lock, NULL);
1492 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1493 	g_opts.name = "accel_perf";
1494 	g_opts.reactor_mask = "0x1";
1495 	g_opts.shutdown_cb = shutdown_cb;
1496 	g_opts.rpc_addr = NULL;
1497 
1498 	rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:M:P:f:T:l:S:x:", NULL,
1499 				 parse_args, usage);
1500 	if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) {
1501 		return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1;
1502 	}
1503 
1504 	if (g_workload_selection == SPDK_ACCEL_OPC_LAST) {
1505 		fprintf(stderr, "Must provide a workload type\n");
1506 		usage();
1507 		return -1;
1508 	}
1509 
1510 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1511 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1512 		usage();
1513 		return -1;
1514 	}
1515 
1516 	if (g_allocate_depth == 0) {
1517 		g_allocate_depth = g_queue_depth;
1518 	}
1519 
1520 	if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
1521 	     g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
1522 	     g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
1523 	     g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
1524 	     g_workload_selection == SPDK_ACCEL_OPC_DIX_VERIFY ||
1525 	     g_workload_selection == SPDK_ACCEL_OPC_DIX_GENERATE) &&
1526 	    g_chained_count == 0) {
1527 		usage();
1528 		return -1;
1529 	}
1530 
1531 	if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) {
1532 		usage();
1533 		return -1;
1534 	}
1535 
1536 	if (g_module_name && spdk_accel_assign_opc(g_workload_selection, g_module_name)) {
1537 		fprintf(stderr, "Was not able to assign '%s' module to the workload\n", g_module_name);
1538 		usage();
1539 		return -1;
1540 	}
1541 
1542 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1543 	if (g_rc) {
1544 		SPDK_ERRLOG("ERROR starting application\n");
1545 	}
1546 
1547 	pthread_mutex_destroy(&g_workers_lock);
1548 
1549 	worker = g_workers;
1550 	while (worker) {
1551 		tmp = worker->next;
1552 		free(worker);
1553 		worker = tmp;
1554 	}
1555 	accel_perf_free_compress_segs();
1556 	spdk_app_fini();
1557 	return g_rc;
1558 }
1559