xref: /spdk/examples/accel/perf/accel_perf.c (revision 7920c6425d78e868c64b4f54562e15874d1785ed)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 
17 #define DATA_PATTERN 0x5a
18 #define ALIGN_4K 0x1000
19 #define COMP_BUF_PAD_PERCENTAGE 1.1L
20 
21 static uint64_t	g_tsc_rate;
22 static uint64_t g_tsc_end;
23 static int g_rc;
24 static int g_xfer_size_bytes = 4096;
25 static int g_queue_depth = 32;
26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
27  * be at least as much as the queue depth.
28  */
29 static int g_allocate_depth = 0;
30 static int g_threads_per_core = 1;
31 static int g_time_in_sec = 5;
32 static uint32_t g_crc32c_seed = 0;
33 static uint32_t g_chained_count = 1;
34 static int g_fail_percent_goal = 0;
35 static uint8_t g_fill_pattern = 255;
36 static uint32_t g_xor_src_count = 2;
37 static bool g_verify = false;
38 static const char *g_workload_type = NULL;
39 static enum spdk_accel_opcode g_workload_selection;
40 static struct worker_thread *g_workers = NULL;
41 static int g_num_workers = 0;
42 static char *g_cd_file_in_name = NULL;
43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
44 static struct spdk_app_opts g_opts = {};
45 
46 struct ap_compress_seg {
47 	void		*uncompressed_data;
48 	uint32_t	uncompressed_len;
49 	struct iovec	*uncompressed_iovs;
50 	uint32_t	uncompressed_iovcnt;
51 
52 	void		*compressed_data;
53 	uint32_t	compressed_len;
54 	uint32_t	compressed_len_padded;
55 	struct iovec	*compressed_iovs;
56 	uint32_t	compressed_iovcnt;
57 
58 	STAILQ_ENTRY(ap_compress_seg)	link;
59 };
60 
61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
62 
63 struct worker_thread;
64 static void accel_done(void *ref, int status);
65 
66 struct display_info {
67 	int core;
68 	int thread;
69 };
70 
71 struct ap_task {
72 	void			*src;
73 	struct iovec		*src_iovs;
74 	uint32_t		src_iovcnt;
75 	void			**sources;
76 	struct iovec		*dst_iovs;
77 	uint32_t		dst_iovcnt;
78 	void			*dst;
79 	void			*dst2;
80 	uint32_t		crc_dst;
81 	uint32_t		compressed_sz;
82 	struct ap_compress_seg *cur_seg;
83 	struct worker_thread	*worker;
84 	int			expected_status; /* used for the compare operation */
85 	TAILQ_ENTRY(ap_task)	link;
86 };
87 
88 struct worker_thread {
89 	struct spdk_io_channel		*ch;
90 	struct spdk_accel_opcode_stats	stats;
91 	uint64_t			xfer_failed;
92 	uint64_t			injected_miscompares;
93 	uint64_t			current_queue_depth;
94 	TAILQ_HEAD(, ap_task)		tasks_pool;
95 	struct worker_thread		*next;
96 	unsigned			core;
97 	struct spdk_thread		*thread;
98 	bool				is_draining;
99 	struct spdk_poller		*is_draining_poller;
100 	struct spdk_poller		*stop_poller;
101 	void				*task_base;
102 	struct display_info		display;
103 	enum spdk_accel_opcode		workload;
104 };
105 
106 static void
107 dump_user_config(void)
108 {
109 	const char *module_name = NULL;
110 	int rc;
111 
112 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
113 	if (rc) {
114 		printf("error getting module name (%d)\n", rc);
115 	}
116 
117 	printf("\nSPDK Configuration:\n");
118 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
119 	printf("Accel Perf Configuration:\n");
120 	printf("Workload Type:  %s\n", g_workload_type);
121 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
122 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
123 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
124 	} else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
125 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
126 	} else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
127 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
128 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
129 		printf("Source buffers: %u\n", g_xor_src_count);
130 	}
131 	if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
132 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
133 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
134 	} else {
135 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
136 	}
137 	printf("vector count    %u\n", g_chained_count);
138 	printf("Module:         %s\n", module_name);
139 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
140 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
141 		printf("File Name:      %s\n", g_cd_file_in_name);
142 	}
143 	printf("Queue depth:    %u\n", g_queue_depth);
144 	printf("Allocate depth: %u\n", g_allocate_depth);
145 	printf("# threads/core: %u\n", g_threads_per_core);
146 	printf("Run time:       %u seconds\n", g_time_in_sec);
147 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
148 }
149 
150 static void
151 usage(void)
152 {
153 	printf("accel_perf options:\n");
154 	printf("\t[-h help message]\n");
155 	printf("\t[-q queue depth per core]\n");
156 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
157 	printf("\t[-T number of threads per core\n");
158 	printf("\t[-n number of channels]\n");
159 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
160 	printf("\t[-t time in seconds]\n");
161 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n");
162 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
163 	printf("\t[-S for crc32c workload, use this seed value (default 0)\n");
164 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
165 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
166 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
167 	printf("\t[-y verify result if this switch is on]\n");
168 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
169 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
170 }
171 
172 static int
173 parse_args(int ch, char *arg)
174 {
175 	int argval = 0;
176 
177 	switch (ch) {
178 	case 'a':
179 	case 'C':
180 	case 'f':
181 	case 'T':
182 	case 'o':
183 	case 'P':
184 	case 'q':
185 	case 'S':
186 	case 't':
187 	case 'x':
188 		argval = spdk_strtol(optarg, 10);
189 		if (argval < 0) {
190 			fprintf(stderr, "-%c option must be non-negative.\n", ch);
191 			usage();
192 			return 1;
193 		}
194 		break;
195 	default:
196 		break;
197 	};
198 
199 	switch (ch) {
200 	case 'a':
201 		g_allocate_depth = argval;
202 		break;
203 	case 'C':
204 		g_chained_count = argval;
205 		break;
206 	case 'l':
207 		g_cd_file_in_name = optarg;
208 		break;
209 	case 'f':
210 		g_fill_pattern = (uint8_t)argval;
211 		break;
212 	case 'T':
213 		g_threads_per_core = argval;
214 		break;
215 	case 'o':
216 		g_xfer_size_bytes = argval;
217 		break;
218 	case 'P':
219 		g_fail_percent_goal = argval;
220 		break;
221 	case 'q':
222 		g_queue_depth = argval;
223 		break;
224 	case 'S':
225 		g_crc32c_seed = argval;
226 		break;
227 	case 't':
228 		g_time_in_sec = argval;
229 		break;
230 	case 'x':
231 		g_xor_src_count = argval;
232 		break;
233 	case 'y':
234 		g_verify = true;
235 		break;
236 	case 'w':
237 		g_workload_type = optarg;
238 		if (!strcmp(g_workload_type, "copy")) {
239 			g_workload_selection = SPDK_ACCEL_OPC_COPY;
240 		} else if (!strcmp(g_workload_type, "fill")) {
241 			g_workload_selection = SPDK_ACCEL_OPC_FILL;
242 		} else if (!strcmp(g_workload_type, "crc32c")) {
243 			g_workload_selection = SPDK_ACCEL_OPC_CRC32C;
244 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
245 			g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C;
246 		} else if (!strcmp(g_workload_type, "compare")) {
247 			g_workload_selection = SPDK_ACCEL_OPC_COMPARE;
248 		} else if (!strcmp(g_workload_type, "dualcast")) {
249 			g_workload_selection = SPDK_ACCEL_OPC_DUALCAST;
250 		} else if (!strcmp(g_workload_type, "compress")) {
251 			g_workload_selection = SPDK_ACCEL_OPC_COMPRESS;
252 		} else if (!strcmp(g_workload_type, "decompress")) {
253 			g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS;
254 		} else if (!strcmp(g_workload_type, "xor")) {
255 			g_workload_selection = SPDK_ACCEL_OPC_XOR;
256 		} else {
257 			usage();
258 			return 1;
259 		}
260 		break;
261 	default:
262 		usage();
263 		return 1;
264 	}
265 
266 	return 0;
267 }
268 
269 static int dump_result(void);
270 static void
271 unregister_worker(void *arg1)
272 {
273 	struct worker_thread *worker = arg1;
274 
275 	spdk_accel_get_opcode_stats(worker->ch, worker->workload,
276 				    &worker->stats, sizeof(worker->stats));
277 	free(worker->task_base);
278 	spdk_put_io_channel(worker->ch);
279 	spdk_thread_exit(spdk_get_thread());
280 	pthread_mutex_lock(&g_workers_lock);
281 	assert(g_num_workers >= 1);
282 	if (--g_num_workers == 0) {
283 		pthread_mutex_unlock(&g_workers_lock);
284 		g_rc = dump_result();
285 		spdk_app_stop(0);
286 	} else {
287 		pthread_mutex_unlock(&g_workers_lock);
288 	}
289 }
290 
291 static void
292 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
293 {
294 	uint64_t ele_size;
295 	uint8_t *data;
296 	uint32_t i;
297 
298 	ele_size = spdk_divide_round_up(sz, iovcnt);
299 
300 	data = buf;
301 	for (i = 0; i < iovcnt; i++) {
302 		ele_size = spdk_min(ele_size, sz);
303 		assert(ele_size > 0);
304 
305 		iovs[i].iov_base = data;
306 		iovs[i].iov_len = ele_size;
307 
308 		data += ele_size;
309 		sz -= ele_size;
310 	}
311 	assert(sz == 0);
312 }
313 
314 static int
315 _get_task_data_bufs(struct ap_task *task)
316 {
317 	uint32_t align = 0;
318 	uint32_t i = 0;
319 	int dst_buff_len = g_xfer_size_bytes;
320 
321 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
322 	 * we do this for all modules to keep it simple.
323 	 */
324 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) {
325 		align = ALIGN_4K;
326 	}
327 
328 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
329 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
330 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
331 
332 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
333 			dst_buff_len = task->cur_seg->compressed_len_padded;
334 		}
335 
336 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
337 		if (task->dst == NULL) {
338 			fprintf(stderr, "Unable to alloc dst buffer\n");
339 			return -ENOMEM;
340 		}
341 
342 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
343 		if (!task->dst_iovs) {
344 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
345 			return -ENOMEM;
346 		}
347 		task->dst_iovcnt = g_chained_count;
348 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
349 
350 		return 0;
351 	}
352 
353 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
354 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
355 		assert(g_chained_count > 0);
356 		task->src_iovcnt = g_chained_count;
357 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
358 		if (!task->src_iovs) {
359 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
360 			return -ENOMEM;
361 		}
362 
363 		if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
364 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
365 		}
366 
367 		for (i = 0; i < task->src_iovcnt; i++) {
368 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
369 			if (task->src_iovs[i].iov_base == NULL) {
370 				return -ENOMEM;
371 			}
372 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
373 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
374 		}
375 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
376 		assert(g_xor_src_count > 1);
377 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
378 		if (!task->sources) {
379 			return -ENOMEM;
380 		}
381 
382 		for (i = 0; i < g_xor_src_count; i++) {
383 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
384 			if (!task->sources[i]) {
385 				return -ENOMEM;
386 			}
387 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
388 		}
389 	} else {
390 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
391 		if (task->src == NULL) {
392 			fprintf(stderr, "Unable to alloc src buffer\n");
393 			return -ENOMEM;
394 		}
395 
396 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
397 		if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
398 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
399 		} else {
400 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
401 		}
402 	}
403 
404 	if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) {
405 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
406 		if (task->dst == NULL) {
407 			fprintf(stderr, "Unable to alloc dst buffer\n");
408 			return -ENOMEM;
409 		}
410 
411 		/* For compare we want the buffers to match, otherwise not. */
412 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) {
413 			memset(task->dst, DATA_PATTERN, dst_buff_len);
414 		} else {
415 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
416 		}
417 	}
418 
419 	/* For dualcast 2 buffers are needed for the operation.  */
420 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST ||
421 	    (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) {
422 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
423 		if (task->dst2 == NULL) {
424 			fprintf(stderr, "Unable to alloc dst buffer\n");
425 			return -ENOMEM;
426 		}
427 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
428 	}
429 
430 	return 0;
431 }
432 
433 inline static struct ap_task *
434 _get_task(struct worker_thread *worker)
435 {
436 	struct ap_task *task;
437 
438 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
439 		task = TAILQ_FIRST(&worker->tasks_pool);
440 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
441 	} else {
442 		fprintf(stderr, "Unable to get ap_task\n");
443 		return NULL;
444 	}
445 
446 	return task;
447 }
448 
449 /* Submit one operation using the same ap task that just completed. */
450 static void
451 _submit_single(struct worker_thread *worker, struct ap_task *task)
452 {
453 	int random_num;
454 	int rc = 0;
455 	int flags = 0;
456 
457 	assert(worker);
458 
459 	switch (worker->workload) {
460 	case SPDK_ACCEL_OPC_COPY:
461 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
462 					    g_xfer_size_bytes, flags, accel_done, task);
463 		break;
464 	case SPDK_ACCEL_OPC_FILL:
465 		/* For fill use the first byte of the task->dst buffer */
466 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
467 					    g_xfer_size_bytes, flags, accel_done, task);
468 		break;
469 	case SPDK_ACCEL_OPC_CRC32C:
470 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
471 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
472 					       accel_done, task);
473 		break;
474 	case SPDK_ACCEL_OPC_COPY_CRC32C:
475 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
476 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
477 		break;
478 	case SPDK_ACCEL_OPC_COMPARE:
479 		random_num = rand() % 100;
480 		if (random_num < g_fail_percent_goal) {
481 			task->expected_status = -EILSEQ;
482 			*(uint8_t *)task->dst = ~DATA_PATTERN;
483 		} else {
484 			task->expected_status = 0;
485 			*(uint8_t *)task->dst = DATA_PATTERN;
486 		}
487 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
488 					       g_xfer_size_bytes, accel_done, task);
489 		break;
490 	case SPDK_ACCEL_OPC_DUALCAST:
491 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
492 						task->src, g_xfer_size_bytes, flags, accel_done, task);
493 		break;
494 	case SPDK_ACCEL_OPC_COMPRESS:
495 		task->src_iovs = task->cur_seg->uncompressed_iovs;
496 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
497 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
498 						task->src_iovs,
499 						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
500 		break;
501 	case SPDK_ACCEL_OPC_DECOMPRESS:
502 		task->src_iovs = task->cur_seg->compressed_iovs;
503 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
504 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
505 						  task->src_iovcnt, NULL, flags, accel_done, task);
506 		break;
507 	case SPDK_ACCEL_OPC_XOR:
508 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
509 					   g_xfer_size_bytes, accel_done, task);
510 		break;
511 	default:
512 		assert(false);
513 		break;
514 
515 	}
516 
517 	worker->current_queue_depth++;
518 	if (rc) {
519 		accel_done(task, rc);
520 	}
521 }
522 
523 static void
524 _free_task_buffers(struct ap_task *task)
525 {
526 	uint32_t i;
527 
528 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS ||
529 	    g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
530 		free(task->dst_iovs);
531 	} else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
532 		   g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
533 		if (task->src_iovs) {
534 			for (i = 0; i < task->src_iovcnt; i++) {
535 				if (task->src_iovs[i].iov_base) {
536 					spdk_dma_free(task->src_iovs[i].iov_base);
537 				}
538 			}
539 			free(task->src_iovs);
540 		}
541 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
542 		if (task->sources) {
543 			for (i = 0; i < g_xor_src_count; i++) {
544 				spdk_dma_free(task->sources[i]);
545 			}
546 			free(task->sources);
547 		}
548 	} else {
549 		spdk_dma_free(task->src);
550 	}
551 
552 	spdk_dma_free(task->dst);
553 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) {
554 		spdk_dma_free(task->dst2);
555 	}
556 }
557 
558 static int
559 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
560 {
561 	uint32_t i;
562 	uint32_t ttl_len = 0;
563 	uint8_t *dst = (uint8_t *)_dst;
564 
565 	for (i = 0; i < iovcnt; i++) {
566 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
567 			return -1;
568 		}
569 		dst += src_src_iovs[i].iov_len;
570 		ttl_len += src_src_iovs[i].iov_len;
571 	}
572 
573 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
574 		return -1;
575 	}
576 
577 	return 0;
578 }
579 
580 static int _worker_stop(void *arg);
581 
582 static void
583 accel_done(void *arg1, int status)
584 {
585 	struct ap_task *task = arg1;
586 	struct worker_thread *worker = task->worker;
587 	uint32_t sw_crc32c;
588 
589 	assert(worker);
590 	assert(worker->current_queue_depth > 0);
591 
592 	if (g_verify && status == 0) {
593 		switch (worker->workload) {
594 		case SPDK_ACCEL_OPC_COPY_CRC32C:
595 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
596 			if (task->crc_dst != sw_crc32c) {
597 				SPDK_NOTICELOG("CRC-32C miscompare\n");
598 				worker->xfer_failed++;
599 			}
600 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
601 				SPDK_NOTICELOG("Data miscompare\n");
602 				worker->xfer_failed++;
603 			}
604 			break;
605 		case SPDK_ACCEL_OPC_CRC32C:
606 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
607 			if (task->crc_dst != sw_crc32c) {
608 				SPDK_NOTICELOG("CRC-32C miscompare\n");
609 				worker->xfer_failed++;
610 			}
611 			break;
612 		case SPDK_ACCEL_OPC_COPY:
613 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
614 				SPDK_NOTICELOG("Data miscompare\n");
615 				worker->xfer_failed++;
616 			}
617 			break;
618 		case SPDK_ACCEL_OPC_DUALCAST:
619 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
620 				SPDK_NOTICELOG("Data miscompare, first destination\n");
621 				worker->xfer_failed++;
622 			}
623 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
624 				SPDK_NOTICELOG("Data miscompare, second destination\n");
625 				worker->xfer_failed++;
626 			}
627 			break;
628 		case SPDK_ACCEL_OPC_FILL:
629 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
630 				SPDK_NOTICELOG("Data miscompare\n");
631 				worker->xfer_failed++;
632 			}
633 			break;
634 		case SPDK_ACCEL_OPC_COMPARE:
635 			break;
636 		case SPDK_ACCEL_OPC_COMPRESS:
637 			break;
638 		case SPDK_ACCEL_OPC_DECOMPRESS:
639 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
640 				SPDK_NOTICELOG("Data miscompare on decompression\n");
641 				worker->xfer_failed++;
642 			}
643 			break;
644 		case SPDK_ACCEL_OPC_XOR:
645 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
646 					 g_xfer_size_bytes) != 0) {
647 				SPDK_ERRLOG("Failed to generate xor for verification\n");
648 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
649 				SPDK_NOTICELOG("Data miscompare\n");
650 				worker->xfer_failed++;
651 			}
652 			break;
653 		default:
654 			assert(false);
655 			break;
656 		}
657 	}
658 
659 	if (worker->workload == SPDK_ACCEL_OPC_COMPRESS ||
660 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
661 		/* Advance the task to the next segment */
662 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
663 		if (task->cur_seg == NULL) {
664 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
665 		}
666 	}
667 
668 	if (task->expected_status == -EILSEQ) {
669 		assert(status != 0);
670 		worker->injected_miscompares++;
671 		status = 0;
672 	} else if (status) {
673 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
674 		worker->xfer_failed++;
675 	}
676 
677 	worker->current_queue_depth--;
678 
679 	if (!worker->is_draining && status == 0) {
680 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
681 		task = _get_task(worker);
682 		_submit_single(worker, task);
683 	} else {
684 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
685 	}
686 }
687 
688 static int
689 dump_result(void)
690 {
691 	uint64_t total_completed = 0;
692 	uint64_t total_failed = 0;
693 	uint64_t total_miscompared = 0;
694 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
695 	struct worker_thread *worker = g_workers;
696 	char tmp[64];
697 
698 	printf("\n%-12s %20s %16s %16s %16s\n",
699 	       "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares");
700 	printf("------------------------------------------------------------------------------------\n");
701 	while (worker != NULL) {
702 
703 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
704 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
705 				       (g_time_in_sec * 1024 * 1024);
706 
707 		total_completed += worker->stats.executed;
708 		total_failed += worker->xfer_failed;
709 		total_miscompared += worker->injected_miscompares;
710 
711 		snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread);
712 		if (xfer_per_sec) {
713 			printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
714 			       tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed,
715 			       worker->injected_miscompares);
716 		}
717 
718 		worker = worker->next;
719 	}
720 
721 	total_xfer_per_sec = total_completed / g_time_in_sec;
722 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
723 			    (g_time_in_sec * 1024 * 1024);
724 
725 	printf("====================================================================================\n");
726 	printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
727 	       "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
728 
729 	return total_failed ? 1 : 0;
730 }
731 
732 static inline void
733 _free_task_buffers_in_pool(struct worker_thread *worker)
734 {
735 	struct ap_task *task;
736 
737 	assert(worker);
738 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
739 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
740 		_free_task_buffers(task);
741 	}
742 }
743 
744 static int
745 _check_draining(void *arg)
746 {
747 	struct worker_thread *worker = arg;
748 
749 	assert(worker);
750 
751 	if (worker->current_queue_depth == 0) {
752 		_free_task_buffers_in_pool(worker);
753 		spdk_poller_unregister(&worker->is_draining_poller);
754 		unregister_worker(worker);
755 	}
756 
757 	return SPDK_POLLER_BUSY;
758 }
759 
760 static int
761 _worker_stop(void *arg)
762 {
763 	struct worker_thread *worker = arg;
764 
765 	assert(worker);
766 
767 	spdk_poller_unregister(&worker->stop_poller);
768 
769 	/* now let the worker drain and check it's outstanding IO with a poller */
770 	worker->is_draining = true;
771 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
772 
773 	return SPDK_POLLER_BUSY;
774 }
775 
776 static void
777 _init_thread(void *arg1)
778 {
779 	struct worker_thread *worker;
780 	struct ap_task *task;
781 	int i, num_tasks = g_allocate_depth;
782 	struct display_info *display = arg1;
783 
784 	worker = calloc(1, sizeof(*worker));
785 	if (worker == NULL) {
786 		fprintf(stderr, "Unable to allocate worker\n");
787 		free(display);
788 		return;
789 	}
790 
791 	worker->workload = g_workload_selection;
792 	worker->display.core = display->core;
793 	worker->display.thread = display->thread;
794 	free(display);
795 	worker->core = spdk_env_get_current_core();
796 	worker->thread = spdk_get_thread();
797 	pthread_mutex_lock(&g_workers_lock);
798 	g_num_workers++;
799 	worker->next = g_workers;
800 	g_workers = worker;
801 	pthread_mutex_unlock(&g_workers_lock);
802 	worker->ch = spdk_accel_get_io_channel();
803 	if (worker->ch == NULL) {
804 		fprintf(stderr, "Unable to get an accel channel\n");
805 		goto error;
806 	}
807 
808 	TAILQ_INIT(&worker->tasks_pool);
809 
810 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
811 	if (worker->task_base == NULL) {
812 		fprintf(stderr, "Could not allocate task base.\n");
813 		goto error;
814 	}
815 
816 	task = worker->task_base;
817 	for (i = 0; i < num_tasks; i++) {
818 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
819 		task->worker = worker;
820 		if (_get_task_data_bufs(task)) {
821 			fprintf(stderr, "Unable to get data bufs\n");
822 			goto error;
823 		}
824 		task++;
825 	}
826 
827 	/* Register a poller that will stop the worker at time elapsed */
828 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
829 			      g_time_in_sec * 1000000ULL);
830 
831 	/* Load up queue depth worth of operations. */
832 	for (i = 0; i < g_queue_depth; i++) {
833 		task = _get_task(worker);
834 		if (task == NULL) {
835 			goto error;
836 		}
837 
838 		_submit_single(worker, task);
839 	}
840 	return;
841 error:
842 
843 	_free_task_buffers_in_pool(worker);
844 	free(worker->task_base);
845 	spdk_app_stop(-1);
846 }
847 
848 static void
849 accel_perf_start(void *arg1)
850 {
851 	struct spdk_cpuset tmp_cpumask = {};
852 	char thread_name[32];
853 	uint32_t i;
854 	int j;
855 	struct spdk_thread *thread;
856 	struct display_info *display;
857 
858 	g_tsc_rate = spdk_get_ticks_hz();
859 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
860 
861 	dump_user_config();
862 
863 	printf("Running for %d seconds...\n", g_time_in_sec);
864 	fflush(stdout);
865 
866 	/* Create worker threads for each core that was specified. */
867 	SPDK_ENV_FOREACH_CORE(i) {
868 		for (j = 0; j < g_threads_per_core; j++) {
869 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
870 			spdk_cpuset_zero(&tmp_cpumask);
871 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
872 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
873 			display = calloc(1, sizeof(*display));
874 			if (display == NULL) {
875 				fprintf(stderr, "Unable to allocate memory\n");
876 				spdk_app_stop(-1);
877 				return;
878 			}
879 			display->core = i;
880 			display->thread = j;
881 			spdk_thread_send_msg(thread, _init_thread, display);
882 		}
883 	}
884 }
885 
886 static void
887 accel_perf_free_compress_segs(void)
888 {
889 	struct ap_compress_seg *seg, *tmp;
890 
891 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
892 		free(seg->uncompressed_iovs);
893 		free(seg->compressed_iovs);
894 		spdk_dma_free(seg->compressed_data);
895 		spdk_dma_free(seg->uncompressed_data);
896 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
897 		free(seg);
898 	}
899 }
900 
901 struct accel_perf_prep_ctx {
902 	FILE			*file;
903 	long			remaining;
904 	struct spdk_io_channel	*ch;
905 	struct ap_compress_seg	*cur_seg;
906 };
907 
908 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
909 
910 static void
911 accel_perf_prep_process_seg_cpl(void *ref, int status)
912 {
913 	struct accel_perf_prep_ctx *ctx = ref;
914 	struct ap_compress_seg *seg;
915 
916 	if (status != 0) {
917 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
918 		spdk_dma_free(ctx->cur_seg->compressed_data);
919 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
920 		free(ctx->cur_seg);
921 		spdk_put_io_channel(ctx->ch);
922 		fclose(ctx->file);
923 		free(ctx);
924 		spdk_app_stop(-status);
925 		return;
926 	}
927 
928 	seg = ctx->cur_seg;
929 
930 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
931 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
932 		if (seg->compressed_iovs == NULL) {
933 			fprintf(stderr, "unable to allocate iovec\n");
934 			spdk_dma_free(seg->compressed_data);
935 			spdk_dma_free(seg->uncompressed_data);
936 			free(seg);
937 			spdk_put_io_channel(ctx->ch);
938 			fclose(ctx->file);
939 			free(ctx);
940 			spdk_app_stop(-ENOMEM);
941 			return;
942 		}
943 		seg->compressed_iovcnt = g_chained_count;
944 
945 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
946 					  seg->compressed_iovcnt);
947 	}
948 
949 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
950 	ctx->remaining -= seg->uncompressed_len;
951 
952 	accel_perf_prep_process_seg(ctx);
953 }
954 
955 static void
956 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
957 {
958 	struct ap_compress_seg *seg;
959 	int sz, sz_read, sz_padded;
960 	void *ubuf, *cbuf;
961 	struct iovec iov[1];
962 	int rc;
963 
964 	if (ctx->remaining == 0) {
965 		spdk_put_io_channel(ctx->ch);
966 		fclose(ctx->file);
967 		free(ctx);
968 		accel_perf_start(NULL);
969 		return;
970 	}
971 
972 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
973 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
974 	 * would likely either deal with the failure of not having a large enough buffer
975 	 * by submitting another operation with a larger one.  Or, like the vbdev module
976 	 * does, just accept the error and use the data uncompressed marking it as such in
977 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
978 	 * data, etc.
979 	 */
980 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
981 
982 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
983 	if (!ubuf) {
984 		fprintf(stderr, "unable to allocate uncompress buffer\n");
985 		rc = -ENOMEM;
986 		goto error;
987 	}
988 
989 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
990 	if (!cbuf) {
991 		fprintf(stderr, "unable to allocate compress buffer\n");
992 		rc = -ENOMEM;
993 		spdk_dma_free(ubuf);
994 		goto error;
995 	}
996 
997 	seg = calloc(1, sizeof(*seg));
998 	if (!seg) {
999 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
1000 		spdk_dma_free(ubuf);
1001 		spdk_dma_free(cbuf);
1002 		rc = -ENOMEM;
1003 		goto error;
1004 	}
1005 
1006 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1007 	if (sz_read != sz) {
1008 		fprintf(stderr, "unable to read input file\n");
1009 		free(seg);
1010 		spdk_dma_free(ubuf);
1011 		spdk_dma_free(cbuf);
1012 		rc = -errno;
1013 		goto error;
1014 	}
1015 
1016 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
1017 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1018 		if (seg->uncompressed_iovs == NULL) {
1019 			fprintf(stderr, "unable to allocate iovec\n");
1020 			free(seg);
1021 			spdk_dma_free(ubuf);
1022 			spdk_dma_free(cbuf);
1023 			rc = -ENOMEM;
1024 			goto error;
1025 		}
1026 		seg->uncompressed_iovcnt = g_chained_count;
1027 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1028 	}
1029 
1030 	seg->uncompressed_data = ubuf;
1031 	seg->uncompressed_len = sz;
1032 	seg->compressed_data = cbuf;
1033 	seg->compressed_len = sz;
1034 	seg->compressed_len_padded = sz_padded;
1035 
1036 	ctx->cur_seg = seg;
1037 	iov[0].iov_base = seg->uncompressed_data;
1038 	iov[0].iov_len = seg->uncompressed_len;
1039 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1040 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1041 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1042 	 * but real applications may want to consider a more sophisticated method.
1043 	 */
1044 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1045 					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
1046 	if (rc < 0) {
1047 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1048 		goto error;
1049 	}
1050 
1051 	return;
1052 
1053 error:
1054 	spdk_put_io_channel(ctx->ch);
1055 	fclose(ctx->file);
1056 	free(ctx);
1057 	spdk_app_stop(rc);
1058 }
1059 
1060 static void
1061 accel_perf_prep(void *arg1)
1062 {
1063 	struct accel_perf_prep_ctx *ctx;
1064 	int rc = 0;
1065 
1066 	if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS &&
1067 	    g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) {
1068 		accel_perf_start(arg1);
1069 		return;
1070 	}
1071 
1072 	if (g_cd_file_in_name == NULL) {
1073 		fprintf(stdout, "A filename is required.\n");
1074 		rc = -EINVAL;
1075 		goto error_end;
1076 	}
1077 
1078 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) {
1079 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1080 		rc = -ENOTSUP;
1081 		goto error_end;
1082 	}
1083 
1084 	printf("Preparing input file...\n");
1085 
1086 	ctx = calloc(1, sizeof(*ctx));
1087 	if (ctx == NULL) {
1088 		rc = -ENOMEM;
1089 		goto error_end;
1090 	}
1091 
1092 	ctx->file = fopen(g_cd_file_in_name, "r");
1093 	if (ctx->file == NULL) {
1094 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1095 		rc = -errno;
1096 		goto error_ctx;
1097 	}
1098 
1099 	fseek(ctx->file, 0L, SEEK_END);
1100 	ctx->remaining = ftell(ctx->file);
1101 	fseek(ctx->file, 0L, SEEK_SET);
1102 
1103 	ctx->ch = spdk_accel_get_io_channel();
1104 	if (ctx->ch == NULL) {
1105 		rc = -EAGAIN;
1106 		goto error_file;
1107 	}
1108 
1109 	if (g_xfer_size_bytes == 0) {
1110 		/* size of 0 means "file at a time" */
1111 		g_xfer_size_bytes = ctx->remaining;
1112 	}
1113 
1114 	accel_perf_prep_process_seg(ctx);
1115 	return;
1116 
1117 error_file:
1118 	fclose(ctx->file);
1119 error_ctx:
1120 	free(ctx);
1121 error_end:
1122 	spdk_app_stop(rc);
1123 }
1124 
1125 static void
1126 worker_shutdown(void *ctx)
1127 {
1128 	_worker_stop(ctx);
1129 }
1130 
1131 static void
1132 shutdown_cb(void)
1133 {
1134 	struct worker_thread *worker;
1135 
1136 	pthread_mutex_lock(&g_workers_lock);
1137 	if (!g_workers) {
1138 		spdk_app_stop(1);
1139 		goto unlock;
1140 	}
1141 
1142 	worker = g_workers;
1143 	while (worker) {
1144 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1145 		worker = worker->next;
1146 	}
1147 unlock:
1148 	pthread_mutex_unlock(&g_workers_lock);
1149 }
1150 
1151 int
1152 main(int argc, char **argv)
1153 {
1154 	struct worker_thread *worker, *tmp;
1155 	int rc;
1156 
1157 	pthread_mutex_init(&g_workers_lock, NULL);
1158 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1159 	g_opts.name = "accel_perf";
1160 	g_opts.reactor_mask = "0x1";
1161 	g_opts.shutdown_cb = shutdown_cb;
1162 
1163 	rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:S:x:", NULL,
1164 				 parse_args, usage);
1165 	if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) {
1166 		return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1;
1167 	}
1168 
1169 	if ((g_workload_selection != SPDK_ACCEL_OPC_COPY) &&
1170 	    (g_workload_selection != SPDK_ACCEL_OPC_FILL) &&
1171 	    (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) &&
1172 	    (g_workload_selection != SPDK_ACCEL_OPC_COPY_CRC32C) &&
1173 	    (g_workload_selection != SPDK_ACCEL_OPC_COMPARE) &&
1174 	    (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS) &&
1175 	    (g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) &&
1176 	    (g_workload_selection != SPDK_ACCEL_OPC_DUALCAST) &&
1177 	    (g_workload_selection != SPDK_ACCEL_OPC_XOR)) {
1178 		usage();
1179 		g_rc = -1;
1180 		goto cleanup;
1181 	}
1182 
1183 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1184 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1185 		usage();
1186 		g_rc = -1;
1187 		goto cleanup;
1188 	}
1189 
1190 	if (g_allocate_depth == 0) {
1191 		g_allocate_depth = g_queue_depth;
1192 	}
1193 
1194 	if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
1195 	     g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) &&
1196 	    g_chained_count == 0) {
1197 		usage();
1198 		g_rc = -1;
1199 		goto cleanup;
1200 	}
1201 
1202 	if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) {
1203 		usage();
1204 		g_rc = -1;
1205 		goto cleanup;
1206 	}
1207 
1208 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1209 	if (g_rc) {
1210 		SPDK_ERRLOG("ERROR starting application\n");
1211 	}
1212 
1213 	pthread_mutex_destroy(&g_workers_lock);
1214 
1215 	worker = g_workers;
1216 	while (worker) {
1217 		tmp = worker->next;
1218 		free(worker);
1219 		worker = tmp;
1220 	}
1221 cleanup:
1222 	accel_perf_free_compress_segs();
1223 	spdk_app_fini();
1224 	return g_rc;
1225 }
1226