xref: /spdk/examples/accel/perf/accel_perf.c (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 
17 #define DATA_PATTERN 0x5a
18 #define ALIGN_4K 0x1000
19 #define COMP_BUF_PAD_PERCENTAGE 1.1L
20 
21 static uint64_t	g_tsc_rate;
22 static uint64_t g_tsc_end;
23 static int g_rc;
24 static int g_xfer_size_bytes = 4096;
25 static int g_queue_depth = 32;
26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
27  * be at least as much as the queue depth.
28  */
29 static int g_allocate_depth = 0;
30 static int g_threads_per_core = 1;
31 static int g_time_in_sec = 5;
32 static uint32_t g_crc32c_seed = 0;
33 static uint32_t g_chained_count = 1;
34 static int g_fail_percent_goal = 0;
35 static uint8_t g_fill_pattern = 255;
36 static uint32_t g_xor_src_count = 2;
37 static bool g_verify = false;
38 static const char *g_workload_type = NULL;
39 static enum spdk_accel_opcode g_workload_selection;
40 static struct worker_thread *g_workers = NULL;
41 static int g_num_workers = 0;
42 static char *g_cd_file_in_name = NULL;
43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
44 static struct spdk_app_opts g_opts = {};
45 
46 struct ap_compress_seg {
47 	void		*uncompressed_data;
48 	uint32_t	uncompressed_len;
49 	struct iovec	*uncompressed_iovs;
50 	uint32_t	uncompressed_iovcnt;
51 
52 	void		*compressed_data;
53 	uint32_t	compressed_len;
54 	uint32_t	compressed_len_padded;
55 	struct iovec	*compressed_iovs;
56 	uint32_t	compressed_iovcnt;
57 
58 	STAILQ_ENTRY(ap_compress_seg)	link;
59 };
60 
61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
62 
63 struct worker_thread;
64 static void accel_done(void *ref, int status);
65 
66 struct display_info {
67 	int core;
68 	int thread;
69 };
70 
71 struct ap_task {
72 	void			*src;
73 	struct iovec		*src_iovs;
74 	uint32_t		src_iovcnt;
75 	void			**sources;
76 	struct iovec		*dst_iovs;
77 	uint32_t		dst_iovcnt;
78 	void			*dst;
79 	void			*dst2;
80 	uint32_t		crc_dst;
81 	uint32_t		compressed_sz;
82 	struct ap_compress_seg *cur_seg;
83 	struct worker_thread	*worker;
84 	int			expected_status; /* used for the compare operation */
85 	TAILQ_ENTRY(ap_task)	link;
86 };
87 
88 struct worker_thread {
89 	struct spdk_io_channel		*ch;
90 	struct spdk_accel_opcode_stats	stats;
91 	uint64_t			xfer_failed;
92 	uint64_t			injected_miscompares;
93 	uint64_t			current_queue_depth;
94 	TAILQ_HEAD(, ap_task)		tasks_pool;
95 	struct worker_thread		*next;
96 	unsigned			core;
97 	struct spdk_thread		*thread;
98 	bool				is_draining;
99 	struct spdk_poller		*is_draining_poller;
100 	struct spdk_poller		*stop_poller;
101 	void				*task_base;
102 	struct display_info		display;
103 	enum spdk_accel_opcode		workload;
104 };
105 
106 static void
107 dump_user_config(void)
108 {
109 	const char *module_name = NULL;
110 	int rc;
111 
112 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
113 	if (rc) {
114 		printf("error getting module name (%d)\n", rc);
115 	}
116 
117 	printf("\nSPDK Configuration:\n");
118 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
119 	printf("Accel Perf Configuration:\n");
120 	printf("Workload Type:  %s\n", g_workload_type);
121 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
122 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
123 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
124 	} else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
125 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
126 	} else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
127 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
128 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
129 		printf("Source buffers: %u\n", g_xor_src_count);
130 	}
131 	if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
132 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
133 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
134 	} else {
135 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
136 	}
137 	printf("vector count    %u\n", g_chained_count);
138 	printf("Module:         %s\n", module_name);
139 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
140 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
141 		printf("File Name:      %s\n", g_cd_file_in_name);
142 	}
143 	printf("Queue depth:    %u\n", g_queue_depth);
144 	printf("Allocate depth: %u\n", g_allocate_depth);
145 	printf("# threads/core: %u\n", g_threads_per_core);
146 	printf("Run time:       %u seconds\n", g_time_in_sec);
147 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
148 }
149 
150 static void
151 usage(void)
152 {
153 	printf("accel_perf options:\n");
154 	printf("\t[-h help message]\n");
155 	printf("\t[-q queue depth per core]\n");
156 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
157 	printf("\t[-T number of threads per core\n");
158 	printf("\t[-n number of channels]\n");
159 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
160 	printf("\t[-t time in seconds]\n");
161 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n");
162 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
163 	printf("\t[-S for crc32c workload, use this seed value (default 0)\n");
164 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
165 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
166 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
167 	printf("\t[-y verify result if this switch is on]\n");
168 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
169 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
170 }
171 
172 static int
173 parse_args(int ch, char *arg)
174 {
175 	int argval = 0;
176 
177 	switch (ch) {
178 	case 'a':
179 	case 'C':
180 	case 'f':
181 	case 'T':
182 	case 'o':
183 	case 'P':
184 	case 'q':
185 	case 'S':
186 	case 't':
187 	case 'x':
188 		argval = spdk_strtol(optarg, 10);
189 		if (argval < 0) {
190 			fprintf(stderr, "-%c option must be non-negative.\n", ch);
191 			usage();
192 			return 1;
193 		}
194 		break;
195 	default:
196 		break;
197 	};
198 
199 	switch (ch) {
200 	case 'a':
201 		g_allocate_depth = argval;
202 		break;
203 	case 'C':
204 		g_chained_count = argval;
205 		break;
206 	case 'l':
207 		g_cd_file_in_name = optarg;
208 		break;
209 	case 'f':
210 		g_fill_pattern = (uint8_t)argval;
211 		break;
212 	case 'T':
213 		g_threads_per_core = argval;
214 		break;
215 	case 'o':
216 		g_xfer_size_bytes = argval;
217 		break;
218 	case 'P':
219 		g_fail_percent_goal = argval;
220 		break;
221 	case 'q':
222 		g_queue_depth = argval;
223 		break;
224 	case 'S':
225 		g_crc32c_seed = argval;
226 		break;
227 	case 't':
228 		g_time_in_sec = argval;
229 		break;
230 	case 'x':
231 		g_xor_src_count = argval;
232 		break;
233 	case 'y':
234 		g_verify = true;
235 		break;
236 	case 'w':
237 		g_workload_type = optarg;
238 		if (!strcmp(g_workload_type, "copy")) {
239 			g_workload_selection = SPDK_ACCEL_OPC_COPY;
240 		} else if (!strcmp(g_workload_type, "fill")) {
241 			g_workload_selection = SPDK_ACCEL_OPC_FILL;
242 		} else if (!strcmp(g_workload_type, "crc32c")) {
243 			g_workload_selection = SPDK_ACCEL_OPC_CRC32C;
244 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
245 			g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C;
246 		} else if (!strcmp(g_workload_type, "compare")) {
247 			g_workload_selection = SPDK_ACCEL_OPC_COMPARE;
248 		} else if (!strcmp(g_workload_type, "dualcast")) {
249 			g_workload_selection = SPDK_ACCEL_OPC_DUALCAST;
250 		} else if (!strcmp(g_workload_type, "compress")) {
251 			g_workload_selection = SPDK_ACCEL_OPC_COMPRESS;
252 		} else if (!strcmp(g_workload_type, "decompress")) {
253 			g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS;
254 		} else if (!strcmp(g_workload_type, "xor")) {
255 			g_workload_selection = SPDK_ACCEL_OPC_XOR;
256 		} else {
257 			usage();
258 			return 1;
259 		}
260 		break;
261 	default:
262 		usage();
263 		return 1;
264 	}
265 
266 	return 0;
267 }
268 
269 static int dump_result(void);
270 static void
271 unregister_worker(void *arg1)
272 {
273 	struct worker_thread *worker = arg1;
274 
275 	spdk_accel_get_opcode_stats(worker->ch, worker->workload,
276 				    &worker->stats, sizeof(worker->stats));
277 	free(worker->task_base);
278 	spdk_put_io_channel(worker->ch);
279 	spdk_thread_exit(spdk_get_thread());
280 	pthread_mutex_lock(&g_workers_lock);
281 	assert(g_num_workers >= 1);
282 	if (--g_num_workers == 0) {
283 		pthread_mutex_unlock(&g_workers_lock);
284 		g_rc = dump_result();
285 		spdk_app_stop(0);
286 	} else {
287 		pthread_mutex_unlock(&g_workers_lock);
288 	}
289 }
290 
291 static void
292 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
293 {
294 	uint64_t ele_size;
295 	uint8_t *data;
296 	uint32_t i;
297 
298 	ele_size = spdk_divide_round_up(sz, iovcnt);
299 
300 	data = buf;
301 	for (i = 0; i < iovcnt; i++) {
302 		ele_size = spdk_min(ele_size, sz);
303 		assert(ele_size > 0);
304 
305 		iovs[i].iov_base = data;
306 		iovs[i].iov_len = ele_size;
307 
308 		data += ele_size;
309 		sz -= ele_size;
310 	}
311 	assert(sz == 0);
312 }
313 
314 static int
315 _get_task_data_bufs(struct ap_task *task)
316 {
317 	uint32_t align = 0;
318 	uint32_t i = 0;
319 	int dst_buff_len = g_xfer_size_bytes;
320 
321 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
322 	 * we do this for all modules to keep it simple.
323 	 */
324 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) {
325 		align = ALIGN_4K;
326 	}
327 
328 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
329 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
330 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
331 
332 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
333 			dst_buff_len = task->cur_seg->compressed_len_padded;
334 		}
335 
336 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
337 		if (task->dst == NULL) {
338 			fprintf(stderr, "Unable to alloc dst buffer\n");
339 			return -ENOMEM;
340 		}
341 
342 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
343 		if (!task->dst_iovs) {
344 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
345 			return -ENOMEM;
346 		}
347 		task->dst_iovcnt = g_chained_count;
348 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
349 
350 		return 0;
351 	}
352 
353 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
354 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
355 		assert(g_chained_count > 0);
356 		task->src_iovcnt = g_chained_count;
357 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
358 		if (!task->src_iovs) {
359 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
360 			return -ENOMEM;
361 		}
362 
363 		if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
364 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
365 		}
366 
367 		for (i = 0; i < task->src_iovcnt; i++) {
368 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
369 			if (task->src_iovs[i].iov_base == NULL) {
370 				return -ENOMEM;
371 			}
372 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
373 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
374 		}
375 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
376 		assert(g_xor_src_count > 1);
377 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
378 		if (!task->sources) {
379 			return -ENOMEM;
380 		}
381 
382 		for (i = 0; i < g_xor_src_count; i++) {
383 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
384 			if (!task->sources[i]) {
385 				return -ENOMEM;
386 			}
387 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
388 		}
389 	} else {
390 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
391 		if (task->src == NULL) {
392 			fprintf(stderr, "Unable to alloc src buffer\n");
393 			return -ENOMEM;
394 		}
395 
396 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
397 		if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
398 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
399 		} else {
400 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
401 		}
402 	}
403 
404 	if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) {
405 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
406 		if (task->dst == NULL) {
407 			fprintf(stderr, "Unable to alloc dst buffer\n");
408 			return -ENOMEM;
409 		}
410 
411 		/* For compare we want the buffers to match, otherwise not. */
412 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) {
413 			memset(task->dst, DATA_PATTERN, dst_buff_len);
414 		} else {
415 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
416 		}
417 	}
418 
419 	/* For dualcast 2 buffers are needed for the operation.  */
420 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST ||
421 	    (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) {
422 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
423 		if (task->dst2 == NULL) {
424 			fprintf(stderr, "Unable to alloc dst buffer\n");
425 			return -ENOMEM;
426 		}
427 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
428 	}
429 
430 	return 0;
431 }
432 
433 inline static struct ap_task *
434 _get_task(struct worker_thread *worker)
435 {
436 	struct ap_task *task;
437 
438 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
439 		task = TAILQ_FIRST(&worker->tasks_pool);
440 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
441 	} else {
442 		fprintf(stderr, "Unable to get ap_task\n");
443 		return NULL;
444 	}
445 
446 	return task;
447 }
448 
449 /* Submit one operation using the same ap task that just completed. */
450 static void
451 _submit_single(struct worker_thread *worker, struct ap_task *task)
452 {
453 	int random_num;
454 	int rc = 0;
455 	int flags = 0;
456 
457 	assert(worker);
458 
459 	switch (worker->workload) {
460 	case SPDK_ACCEL_OPC_COPY:
461 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
462 					    g_xfer_size_bytes, flags, accel_done, task);
463 		break;
464 	case SPDK_ACCEL_OPC_FILL:
465 		/* For fill use the first byte of the task->dst buffer */
466 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
467 					    g_xfer_size_bytes, flags, accel_done, task);
468 		break;
469 	case SPDK_ACCEL_OPC_CRC32C:
470 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
471 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
472 					       accel_done, task);
473 		break;
474 	case SPDK_ACCEL_OPC_COPY_CRC32C:
475 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
476 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
477 		break;
478 	case SPDK_ACCEL_OPC_COMPARE:
479 		random_num = rand() % 100;
480 		if (random_num < g_fail_percent_goal) {
481 			task->expected_status = -EILSEQ;
482 			*(uint8_t *)task->dst = ~DATA_PATTERN;
483 		} else {
484 			task->expected_status = 0;
485 			*(uint8_t *)task->dst = DATA_PATTERN;
486 		}
487 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
488 					       g_xfer_size_bytes, accel_done, task);
489 		break;
490 	case SPDK_ACCEL_OPC_DUALCAST:
491 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
492 						task->src, g_xfer_size_bytes, flags, accel_done, task);
493 		break;
494 	case SPDK_ACCEL_OPC_COMPRESS:
495 		task->src_iovs = task->cur_seg->uncompressed_iovs;
496 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
497 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
498 						task->src_iovs,
499 						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
500 		break;
501 	case SPDK_ACCEL_OPC_DECOMPRESS:
502 		task->src_iovs = task->cur_seg->compressed_iovs;
503 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
504 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
505 						  task->src_iovcnt, NULL, flags, accel_done, task);
506 		break;
507 	case SPDK_ACCEL_OPC_XOR:
508 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
509 					   g_xfer_size_bytes, accel_done, task);
510 		break;
511 	default:
512 		assert(false);
513 		break;
514 
515 	}
516 
517 	worker->current_queue_depth++;
518 	if (rc) {
519 		accel_done(task, rc);
520 	}
521 }
522 
523 static void
524 _free_task_buffers(struct ap_task *task)
525 {
526 	uint32_t i;
527 
528 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS ||
529 	    g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
530 		free(task->dst_iovs);
531 	} else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
532 		   g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
533 		if (task->src_iovs) {
534 			for (i = 0; i < task->src_iovcnt; i++) {
535 				if (task->src_iovs[i].iov_base) {
536 					spdk_dma_free(task->src_iovs[i].iov_base);
537 				}
538 			}
539 			free(task->src_iovs);
540 		}
541 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
542 		if (task->sources) {
543 			for (i = 0; i < g_xor_src_count; i++) {
544 				spdk_dma_free(task->sources[i]);
545 			}
546 			free(task->sources);
547 		}
548 	} else {
549 		spdk_dma_free(task->src);
550 	}
551 
552 	spdk_dma_free(task->dst);
553 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) {
554 		spdk_dma_free(task->dst2);
555 	}
556 }
557 
558 static int
559 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
560 {
561 	uint32_t i;
562 	uint32_t ttl_len = 0;
563 	uint8_t *dst = (uint8_t *)_dst;
564 
565 	for (i = 0; i < iovcnt; i++) {
566 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
567 			return -1;
568 		}
569 		dst += src_src_iovs[i].iov_len;
570 		ttl_len += src_src_iovs[i].iov_len;
571 	}
572 
573 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
574 		return -1;
575 	}
576 
577 	return 0;
578 }
579 
580 static int _worker_stop(void *arg);
581 
582 static void
583 accel_done(void *arg1, int status)
584 {
585 	struct ap_task *task = arg1;
586 	struct worker_thread *worker = task->worker;
587 	uint32_t sw_crc32c;
588 
589 	assert(worker);
590 	assert(worker->current_queue_depth > 0);
591 
592 	if (g_verify && status == 0) {
593 		switch (worker->workload) {
594 		case SPDK_ACCEL_OPC_COPY_CRC32C:
595 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
596 			if (task->crc_dst != sw_crc32c) {
597 				SPDK_NOTICELOG("CRC-32C miscompare\n");
598 				worker->xfer_failed++;
599 			}
600 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
601 				SPDK_NOTICELOG("Data miscompare\n");
602 				worker->xfer_failed++;
603 			}
604 			break;
605 		case SPDK_ACCEL_OPC_CRC32C:
606 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
607 			if (task->crc_dst != sw_crc32c) {
608 				SPDK_NOTICELOG("CRC-32C miscompare\n");
609 				worker->xfer_failed++;
610 			}
611 			break;
612 		case SPDK_ACCEL_OPC_COPY:
613 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
614 				SPDK_NOTICELOG("Data miscompare\n");
615 				worker->xfer_failed++;
616 			}
617 			break;
618 		case SPDK_ACCEL_OPC_DUALCAST:
619 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
620 				SPDK_NOTICELOG("Data miscompare, first destination\n");
621 				worker->xfer_failed++;
622 			}
623 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
624 				SPDK_NOTICELOG("Data miscompare, second destination\n");
625 				worker->xfer_failed++;
626 			}
627 			break;
628 		case SPDK_ACCEL_OPC_FILL:
629 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
630 				SPDK_NOTICELOG("Data miscompare\n");
631 				worker->xfer_failed++;
632 			}
633 			break;
634 		case SPDK_ACCEL_OPC_COMPARE:
635 			break;
636 		case SPDK_ACCEL_OPC_COMPRESS:
637 			break;
638 		case SPDK_ACCEL_OPC_DECOMPRESS:
639 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
640 				SPDK_NOTICELOG("Data miscompare on decompression\n");
641 				worker->xfer_failed++;
642 			}
643 			break;
644 		case SPDK_ACCEL_OPC_XOR:
645 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
646 					 g_xfer_size_bytes) != 0) {
647 				SPDK_ERRLOG("Failed to generate xor for verification\n");
648 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
649 				SPDK_NOTICELOG("Data miscompare\n");
650 				worker->xfer_failed++;
651 			}
652 			break;
653 		default:
654 			assert(false);
655 			break;
656 		}
657 	}
658 
659 	if (worker->workload == SPDK_ACCEL_OPC_COMPRESS ||
660 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
661 		/* Advance the task to the next segment */
662 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
663 		if (task->cur_seg == NULL) {
664 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
665 		}
666 	}
667 
668 	if (task->expected_status == -EILSEQ) {
669 		assert(status != 0);
670 		worker->injected_miscompares++;
671 		status = 0;
672 	} else if (status) {
673 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
674 		worker->xfer_failed++;
675 	}
676 
677 	worker->current_queue_depth--;
678 
679 	if (!worker->is_draining && status == 0) {
680 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
681 		task = _get_task(worker);
682 		_submit_single(worker, task);
683 	} else {
684 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
685 	}
686 }
687 
688 static int
689 dump_result(void)
690 {
691 	uint64_t total_completed = 0;
692 	uint64_t total_failed = 0;
693 	uint64_t total_miscompared = 0;
694 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
695 	struct worker_thread *worker = g_workers;
696 
697 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
698 	printf("------------------------------------------------------------------------\n");
699 	while (worker != NULL) {
700 
701 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
702 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
703 				       (g_time_in_sec * 1024 * 1024);
704 
705 		total_completed += worker->stats.executed;
706 		total_failed += worker->xfer_failed;
707 		total_miscompared += worker->injected_miscompares;
708 
709 		if (xfer_per_sec) {
710 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
711 			       worker->display.core, worker->display.thread, xfer_per_sec,
712 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
713 		}
714 
715 		worker = worker->next;
716 	}
717 
718 	total_xfer_per_sec = total_completed / g_time_in_sec;
719 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
720 			    (g_time_in_sec * 1024 * 1024);
721 
722 	printf("=========================================================================\n");
723 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
724 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
725 
726 	return total_failed ? 1 : 0;
727 }
728 
729 static inline void
730 _free_task_buffers_in_pool(struct worker_thread *worker)
731 {
732 	struct ap_task *task;
733 
734 	assert(worker);
735 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
736 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
737 		_free_task_buffers(task);
738 	}
739 }
740 
741 static int
742 _check_draining(void *arg)
743 {
744 	struct worker_thread *worker = arg;
745 
746 	assert(worker);
747 
748 	if (worker->current_queue_depth == 0) {
749 		_free_task_buffers_in_pool(worker);
750 		spdk_poller_unregister(&worker->is_draining_poller);
751 		unregister_worker(worker);
752 	}
753 
754 	return SPDK_POLLER_BUSY;
755 }
756 
757 static int
758 _worker_stop(void *arg)
759 {
760 	struct worker_thread *worker = arg;
761 
762 	assert(worker);
763 
764 	spdk_poller_unregister(&worker->stop_poller);
765 
766 	/* now let the worker drain and check it's outstanding IO with a poller */
767 	worker->is_draining = true;
768 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
769 
770 	return SPDK_POLLER_BUSY;
771 }
772 
773 static void
774 _init_thread(void *arg1)
775 {
776 	struct worker_thread *worker;
777 	struct ap_task *task;
778 	int i, num_tasks = g_allocate_depth;
779 	struct display_info *display = arg1;
780 
781 	worker = calloc(1, sizeof(*worker));
782 	if (worker == NULL) {
783 		fprintf(stderr, "Unable to allocate worker\n");
784 		free(display);
785 		return;
786 	}
787 
788 	worker->workload = g_workload_selection;
789 	worker->display.core = display->core;
790 	worker->display.thread = display->thread;
791 	free(display);
792 	worker->core = spdk_env_get_current_core();
793 	worker->thread = spdk_get_thread();
794 	pthread_mutex_lock(&g_workers_lock);
795 	g_num_workers++;
796 	worker->next = g_workers;
797 	g_workers = worker;
798 	pthread_mutex_unlock(&g_workers_lock);
799 	worker->ch = spdk_accel_get_io_channel();
800 	if (worker->ch == NULL) {
801 		fprintf(stderr, "Unable to get an accel channel\n");
802 		goto error;
803 	}
804 
805 	TAILQ_INIT(&worker->tasks_pool);
806 
807 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
808 	if (worker->task_base == NULL) {
809 		fprintf(stderr, "Could not allocate task base.\n");
810 		goto error;
811 	}
812 
813 	task = worker->task_base;
814 	for (i = 0; i < num_tasks; i++) {
815 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
816 		task->worker = worker;
817 		if (_get_task_data_bufs(task)) {
818 			fprintf(stderr, "Unable to get data bufs\n");
819 			goto error;
820 		}
821 		task++;
822 	}
823 
824 	/* Register a poller that will stop the worker at time elapsed */
825 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
826 			      g_time_in_sec * 1000000ULL);
827 
828 	/* Load up queue depth worth of operations. */
829 	for (i = 0; i < g_queue_depth; i++) {
830 		task = _get_task(worker);
831 		if (task == NULL) {
832 			goto error;
833 		}
834 
835 		_submit_single(worker, task);
836 	}
837 	return;
838 error:
839 
840 	_free_task_buffers_in_pool(worker);
841 	free(worker->task_base);
842 	spdk_app_stop(-1);
843 }
844 
845 static void
846 accel_perf_start(void *arg1)
847 {
848 	struct spdk_cpuset tmp_cpumask = {};
849 	char thread_name[32];
850 	uint32_t i;
851 	int j;
852 	struct spdk_thread *thread;
853 	struct display_info *display;
854 
855 	g_tsc_rate = spdk_get_ticks_hz();
856 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
857 
858 	dump_user_config();
859 
860 	printf("Running for %d seconds...\n", g_time_in_sec);
861 	fflush(stdout);
862 
863 	/* Create worker threads for each core that was specified. */
864 	SPDK_ENV_FOREACH_CORE(i) {
865 		for (j = 0; j < g_threads_per_core; j++) {
866 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
867 			spdk_cpuset_zero(&tmp_cpumask);
868 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
869 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
870 			display = calloc(1, sizeof(*display));
871 			if (display == NULL) {
872 				fprintf(stderr, "Unable to allocate memory\n");
873 				spdk_app_stop(-1);
874 				return;
875 			}
876 			display->core = i;
877 			display->thread = j;
878 			spdk_thread_send_msg(thread, _init_thread, display);
879 		}
880 	}
881 }
882 
883 static void
884 accel_perf_free_compress_segs(void)
885 {
886 	struct ap_compress_seg *seg, *tmp;
887 
888 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
889 		free(seg->uncompressed_iovs);
890 		free(seg->compressed_iovs);
891 		spdk_dma_free(seg->compressed_data);
892 		spdk_dma_free(seg->uncompressed_data);
893 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
894 		free(seg);
895 	}
896 }
897 
898 struct accel_perf_prep_ctx {
899 	FILE			*file;
900 	long			remaining;
901 	struct spdk_io_channel	*ch;
902 	struct ap_compress_seg	*cur_seg;
903 };
904 
905 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
906 
907 static void
908 accel_perf_prep_process_seg_cpl(void *ref, int status)
909 {
910 	struct accel_perf_prep_ctx *ctx = ref;
911 	struct ap_compress_seg *seg;
912 
913 	if (status != 0) {
914 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
915 		spdk_dma_free(ctx->cur_seg->compressed_data);
916 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
917 		free(ctx->cur_seg);
918 		spdk_put_io_channel(ctx->ch);
919 		fclose(ctx->file);
920 		free(ctx);
921 		spdk_app_stop(-status);
922 		return;
923 	}
924 
925 	seg = ctx->cur_seg;
926 
927 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
928 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
929 		if (seg->compressed_iovs == NULL) {
930 			fprintf(stderr, "unable to allocate iovec\n");
931 			spdk_dma_free(seg->compressed_data);
932 			spdk_dma_free(seg->uncompressed_data);
933 			free(seg);
934 			spdk_put_io_channel(ctx->ch);
935 			fclose(ctx->file);
936 			free(ctx);
937 			spdk_app_stop(-ENOMEM);
938 			return;
939 		}
940 		seg->compressed_iovcnt = g_chained_count;
941 
942 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
943 					  seg->compressed_iovcnt);
944 	}
945 
946 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
947 	ctx->remaining -= seg->uncompressed_len;
948 
949 	accel_perf_prep_process_seg(ctx);
950 }
951 
952 static void
953 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
954 {
955 	struct ap_compress_seg *seg;
956 	int sz, sz_read, sz_padded;
957 	void *ubuf, *cbuf;
958 	struct iovec iov[1];
959 	int rc;
960 
961 	if (ctx->remaining == 0) {
962 		spdk_put_io_channel(ctx->ch);
963 		fclose(ctx->file);
964 		free(ctx);
965 		accel_perf_start(NULL);
966 		return;
967 	}
968 
969 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
970 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
971 	 * would likely either deal with the failure of not having a large enough buffer
972 	 * by submitting another operation with a larger one.  Or, like the vbdev module
973 	 * does, just accept the error and use the data uncompressed marking it as such in
974 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
975 	 * data, etc.
976 	 */
977 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
978 
979 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
980 	if (!ubuf) {
981 		fprintf(stderr, "unable to allocate uncompress buffer\n");
982 		rc = -ENOMEM;
983 		goto error;
984 	}
985 
986 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
987 	if (!cbuf) {
988 		fprintf(stderr, "unable to allocate compress buffer\n");
989 		rc = -ENOMEM;
990 		spdk_dma_free(ubuf);
991 		goto error;
992 	}
993 
994 	seg = calloc(1, sizeof(*seg));
995 	if (!seg) {
996 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
997 		spdk_dma_free(ubuf);
998 		spdk_dma_free(cbuf);
999 		rc = -ENOMEM;
1000 		goto error;
1001 	}
1002 
1003 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1004 	if (sz_read != sz) {
1005 		fprintf(stderr, "unable to read input file\n");
1006 		free(seg);
1007 		spdk_dma_free(ubuf);
1008 		spdk_dma_free(cbuf);
1009 		rc = -errno;
1010 		goto error;
1011 	}
1012 
1013 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
1014 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1015 		if (seg->uncompressed_iovs == NULL) {
1016 			fprintf(stderr, "unable to allocate iovec\n");
1017 			free(seg);
1018 			spdk_dma_free(ubuf);
1019 			spdk_dma_free(cbuf);
1020 			rc = -ENOMEM;
1021 			goto error;
1022 		}
1023 		seg->uncompressed_iovcnt = g_chained_count;
1024 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1025 	}
1026 
1027 	seg->uncompressed_data = ubuf;
1028 	seg->uncompressed_len = sz;
1029 	seg->compressed_data = cbuf;
1030 	seg->compressed_len = sz;
1031 	seg->compressed_len_padded = sz_padded;
1032 
1033 	ctx->cur_seg = seg;
1034 	iov[0].iov_base = seg->uncompressed_data;
1035 	iov[0].iov_len = seg->uncompressed_len;
1036 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1037 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1038 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1039 	 * but real applications may want to consider a more sophisticated method.
1040 	 */
1041 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1042 					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
1043 	if (rc < 0) {
1044 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1045 		goto error;
1046 	}
1047 
1048 	return;
1049 
1050 error:
1051 	spdk_put_io_channel(ctx->ch);
1052 	fclose(ctx->file);
1053 	free(ctx);
1054 	spdk_app_stop(rc);
1055 }
1056 
1057 static void
1058 accel_perf_prep(void *arg1)
1059 {
1060 	struct accel_perf_prep_ctx *ctx;
1061 	int rc = 0;
1062 
1063 	if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS &&
1064 	    g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) {
1065 		accel_perf_start(arg1);
1066 		return;
1067 	}
1068 
1069 	if (g_cd_file_in_name == NULL) {
1070 		fprintf(stdout, "A filename is required.\n");
1071 		rc = -EINVAL;
1072 		goto error_end;
1073 	}
1074 
1075 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) {
1076 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1077 		rc = -ENOTSUP;
1078 		goto error_end;
1079 	}
1080 
1081 	printf("Preparing input file...\n");
1082 
1083 	ctx = calloc(1, sizeof(*ctx));
1084 	if (ctx == NULL) {
1085 		rc = -ENOMEM;
1086 		goto error_end;
1087 	}
1088 
1089 	ctx->file = fopen(g_cd_file_in_name, "r");
1090 	if (ctx->file == NULL) {
1091 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1092 		rc = -errno;
1093 		goto error_ctx;
1094 	}
1095 
1096 	fseek(ctx->file, 0L, SEEK_END);
1097 	ctx->remaining = ftell(ctx->file);
1098 	fseek(ctx->file, 0L, SEEK_SET);
1099 
1100 	ctx->ch = spdk_accel_get_io_channel();
1101 	if (ctx->ch == NULL) {
1102 		rc = -EAGAIN;
1103 		goto error_file;
1104 	}
1105 
1106 	if (g_xfer_size_bytes == 0) {
1107 		/* size of 0 means "file at a time" */
1108 		g_xfer_size_bytes = ctx->remaining;
1109 	}
1110 
1111 	accel_perf_prep_process_seg(ctx);
1112 	return;
1113 
1114 error_file:
1115 	fclose(ctx->file);
1116 error_ctx:
1117 	free(ctx);
1118 error_end:
1119 	spdk_app_stop(rc);
1120 }
1121 
1122 static void
1123 worker_shutdown(void *ctx)
1124 {
1125 	_worker_stop(ctx);
1126 }
1127 
1128 static void
1129 shutdown_cb(void)
1130 {
1131 	struct worker_thread *worker;
1132 
1133 	pthread_mutex_lock(&g_workers_lock);
1134 	if (!g_workers) {
1135 		spdk_app_stop(1);
1136 		goto unlock;
1137 	}
1138 
1139 	worker = g_workers;
1140 	while (worker) {
1141 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1142 		worker = worker->next;
1143 	}
1144 unlock:
1145 	pthread_mutex_unlock(&g_workers_lock);
1146 }
1147 
1148 int
1149 main(int argc, char **argv)
1150 {
1151 	struct worker_thread *worker, *tmp;
1152 	int rc;
1153 
1154 	pthread_mutex_init(&g_workers_lock, NULL);
1155 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1156 	g_opts.name = "accel_perf";
1157 	g_opts.reactor_mask = "0x1";
1158 	g_opts.shutdown_cb = shutdown_cb;
1159 
1160 	rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:S:x:", NULL,
1161 				 parse_args, usage);
1162 	if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) {
1163 		return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1;
1164 	}
1165 
1166 	if ((g_workload_selection != SPDK_ACCEL_OPC_COPY) &&
1167 	    (g_workload_selection != SPDK_ACCEL_OPC_FILL) &&
1168 	    (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) &&
1169 	    (g_workload_selection != SPDK_ACCEL_OPC_COPY_CRC32C) &&
1170 	    (g_workload_selection != SPDK_ACCEL_OPC_COMPARE) &&
1171 	    (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS) &&
1172 	    (g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) &&
1173 	    (g_workload_selection != SPDK_ACCEL_OPC_DUALCAST) &&
1174 	    (g_workload_selection != SPDK_ACCEL_OPC_XOR)) {
1175 		usage();
1176 		g_rc = -1;
1177 		goto cleanup;
1178 	}
1179 
1180 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1181 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1182 		usage();
1183 		g_rc = -1;
1184 		goto cleanup;
1185 	}
1186 
1187 	if (g_allocate_depth == 0) {
1188 		g_allocate_depth = g_queue_depth;
1189 	}
1190 
1191 	if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
1192 	     g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) &&
1193 	    g_chained_count == 0) {
1194 		usage();
1195 		g_rc = -1;
1196 		goto cleanup;
1197 	}
1198 
1199 	if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) {
1200 		usage();
1201 		g_rc = -1;
1202 		goto cleanup;
1203 	}
1204 
1205 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1206 	if (g_rc) {
1207 		SPDK_ERRLOG("ERROR starting application\n");
1208 	}
1209 
1210 	pthread_mutex_destroy(&g_workers_lock);
1211 
1212 	worker = g_workers;
1213 	while (worker) {
1214 		tmp = worker->next;
1215 		free(worker);
1216 		worker = tmp;
1217 	}
1218 cleanup:
1219 	accel_perf_free_compress_segs();
1220 	spdk_app_fini();
1221 	return g_rc;
1222 }
1223