xref: /spdk/examples/accel/perf/accel_perf.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 
17 #define DATA_PATTERN 0x5a
18 #define ALIGN_4K 0x1000
19 #define COMP_BUF_PAD_PERCENTAGE 1.1L
20 
21 static uint64_t	g_tsc_rate;
22 static uint64_t g_tsc_end;
23 static int g_rc;
24 static int g_xfer_size_bytes = 4096;
25 static int g_queue_depth = 32;
26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
27  * be at least as much as the queue depth.
28  */
29 static int g_allocate_depth = 0;
30 static int g_threads_per_core = 1;
31 static int g_time_in_sec = 5;
32 static uint32_t g_crc32c_seed = 0;
33 static uint32_t g_chained_count = 1;
34 static int g_fail_percent_goal = 0;
35 static uint8_t g_fill_pattern = 255;
36 static uint32_t g_xor_src_count = 2;
37 static bool g_verify = false;
38 static const char *g_workload_type = NULL;
39 static enum accel_opcode g_workload_selection;
40 static struct worker_thread *g_workers = NULL;
41 static int g_num_workers = 0;
42 static char *g_cd_file_in_name = NULL;
43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
44 static struct spdk_app_opts g_opts = {};
45 
46 struct ap_compress_seg {
47 	void		*uncompressed_data;
48 	uint32_t	uncompressed_len;
49 	struct iovec	*uncompressed_iovs;
50 	uint32_t	uncompressed_iovcnt;
51 
52 	void		*compressed_data;
53 	uint32_t	compressed_len;
54 	uint32_t	compressed_len_padded;
55 	struct iovec	*compressed_iovs;
56 	uint32_t	compressed_iovcnt;
57 
58 	STAILQ_ENTRY(ap_compress_seg)	link;
59 };
60 
61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
62 
63 struct worker_thread;
64 static void accel_done(void *ref, int status);
65 
66 struct display_info {
67 	int core;
68 	int thread;
69 };
70 
71 struct ap_task {
72 	void			*src;
73 	struct iovec		*src_iovs;
74 	uint32_t		src_iovcnt;
75 	void			**sources;
76 	struct iovec		*dst_iovs;
77 	uint32_t		dst_iovcnt;
78 	void			*dst;
79 	void			*dst2;
80 	uint32_t		crc_dst;
81 	uint32_t		compressed_sz;
82 	struct ap_compress_seg *cur_seg;
83 	struct worker_thread	*worker;
84 	int			expected_status; /* used for the compare operation */
85 	TAILQ_ENTRY(ap_task)	link;
86 };
87 
88 struct worker_thread {
89 	struct spdk_io_channel		*ch;
90 	struct spdk_accel_opcode_stats	stats;
91 	uint64_t			xfer_failed;
92 	uint64_t			injected_miscompares;
93 	uint64_t			current_queue_depth;
94 	TAILQ_HEAD(, ap_task)		tasks_pool;
95 	struct worker_thread		*next;
96 	unsigned			core;
97 	struct spdk_thread		*thread;
98 	bool				is_draining;
99 	struct spdk_poller		*is_draining_poller;
100 	struct spdk_poller		*stop_poller;
101 	void				*task_base;
102 	struct display_info		display;
103 	enum accel_opcode		workload;
104 };
105 
106 static void
107 dump_user_config(void)
108 {
109 	const char *module_name = NULL;
110 	int rc;
111 
112 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
113 	if (rc) {
114 		printf("error getting module name (%d)\n", rc);
115 	}
116 
117 	printf("\nSPDK Configuration:\n");
118 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
119 	printf("Accel Perf Configuration:\n");
120 	printf("Workload Type:  %s\n", g_workload_type);
121 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
122 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
123 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
124 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
125 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
126 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
127 	} else if (g_workload_selection == ACCEL_OPC_XOR) {
128 		printf("Source buffers: %u\n", g_xor_src_count);
129 	}
130 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
131 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
132 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
133 	} else {
134 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
135 	}
136 	printf("vector count    %u\n", g_chained_count);
137 	printf("Module:         %s\n", module_name);
138 	if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
139 		printf("File Name:      %s\n", g_cd_file_in_name);
140 	}
141 	printf("Queue depth:    %u\n", g_queue_depth);
142 	printf("Allocate depth: %u\n", g_allocate_depth);
143 	printf("# threads/core: %u\n", g_threads_per_core);
144 	printf("Run time:       %u seconds\n", g_time_in_sec);
145 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
146 }
147 
148 static void
149 usage(void)
150 {
151 	printf("accel_perf options:\n");
152 	printf("\t[-h help message]\n");
153 	printf("\t[-q queue depth per core]\n");
154 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
155 	printf("\t[-T number of threads per core\n");
156 	printf("\t[-n number of channels]\n");
157 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
158 	printf("\t[-t time in seconds]\n");
159 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n");
160 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
161 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
162 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
163 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
164 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
165 	printf("\t[-y verify result if this switch is on]\n");
166 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
167 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
168 }
169 
170 static int
171 parse_args(int argc, char *argv)
172 {
173 	int argval = 0;
174 
175 	switch (argc) {
176 	case 'a':
177 	case 'C':
178 	case 'f':
179 	case 'T':
180 	case 'o':
181 	case 'P':
182 	case 'q':
183 	case 's':
184 	case 't':
185 	case 'x':
186 		argval = spdk_strtol(optarg, 10);
187 		if (argval < 0) {
188 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
189 			usage();
190 			return 1;
191 		}
192 		break;
193 	default:
194 		break;
195 	};
196 
197 	switch (argc) {
198 	case 'a':
199 		g_allocate_depth = argval;
200 		break;
201 	case 'C':
202 		g_chained_count = argval;
203 		break;
204 	case 'l':
205 		g_cd_file_in_name = optarg;
206 		break;
207 	case 'f':
208 		g_fill_pattern = (uint8_t)argval;
209 		break;
210 	case 'T':
211 		g_threads_per_core = argval;
212 		break;
213 	case 'o':
214 		g_xfer_size_bytes = argval;
215 		break;
216 	case 'P':
217 		g_fail_percent_goal = argval;
218 		break;
219 	case 'q':
220 		g_queue_depth = argval;
221 		break;
222 	case 's':
223 		g_crc32c_seed = argval;
224 		break;
225 	case 't':
226 		g_time_in_sec = argval;
227 		break;
228 	case 'x':
229 		g_xor_src_count = argval;
230 		break;
231 	case 'y':
232 		g_verify = true;
233 		break;
234 	case 'w':
235 		g_workload_type = optarg;
236 		if (!strcmp(g_workload_type, "copy")) {
237 			g_workload_selection = ACCEL_OPC_COPY;
238 		} else if (!strcmp(g_workload_type, "fill")) {
239 			g_workload_selection = ACCEL_OPC_FILL;
240 		} else if (!strcmp(g_workload_type, "crc32c")) {
241 			g_workload_selection = ACCEL_OPC_CRC32C;
242 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
243 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
244 		} else if (!strcmp(g_workload_type, "compare")) {
245 			g_workload_selection = ACCEL_OPC_COMPARE;
246 		} else if (!strcmp(g_workload_type, "dualcast")) {
247 			g_workload_selection = ACCEL_OPC_DUALCAST;
248 		} else if (!strcmp(g_workload_type, "compress")) {
249 			g_workload_selection = ACCEL_OPC_COMPRESS;
250 		} else if (!strcmp(g_workload_type, "decompress")) {
251 			g_workload_selection = ACCEL_OPC_DECOMPRESS;
252 		} else if (!strcmp(g_workload_type, "xor")) {
253 			g_workload_selection = ACCEL_OPC_XOR;
254 		} else {
255 			usage();
256 			return 1;
257 		}
258 		break;
259 	default:
260 		usage();
261 		return 1;
262 	}
263 
264 	return 0;
265 }
266 
267 static int dump_result(void);
268 static void
269 unregister_worker(void *arg1)
270 {
271 	struct worker_thread *worker = arg1;
272 
273 	spdk_accel_get_opcode_stats(worker->ch, worker->workload,
274 				    &worker->stats, sizeof(worker->stats));
275 	free(worker->task_base);
276 	spdk_put_io_channel(worker->ch);
277 	spdk_thread_exit(spdk_get_thread());
278 	pthread_mutex_lock(&g_workers_lock);
279 	assert(g_num_workers >= 1);
280 	if (--g_num_workers == 0) {
281 		pthread_mutex_unlock(&g_workers_lock);
282 		g_rc = dump_result();
283 		spdk_app_stop(0);
284 	} else {
285 		pthread_mutex_unlock(&g_workers_lock);
286 	}
287 }
288 
289 static void
290 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
291 {
292 	uint64_t ele_size;
293 	uint8_t *data;
294 	uint32_t i;
295 
296 	ele_size = spdk_divide_round_up(sz, iovcnt);
297 
298 	data = buf;
299 	for (i = 0; i < iovcnt; i++) {
300 		ele_size = spdk_min(ele_size, sz);
301 		assert(ele_size > 0);
302 
303 		iovs[i].iov_base = data;
304 		iovs[i].iov_len = ele_size;
305 
306 		data += ele_size;
307 		sz -= ele_size;
308 	}
309 	assert(sz == 0);
310 }
311 
312 static int
313 _get_task_data_bufs(struct ap_task *task)
314 {
315 	uint32_t align = 0;
316 	uint32_t i = 0;
317 	int dst_buff_len = g_xfer_size_bytes;
318 
319 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
320 	 * we do this for all modules to keep it simple.
321 	 */
322 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
323 		align = ALIGN_4K;
324 	}
325 
326 	if (g_workload_selection == ACCEL_OPC_COMPRESS ||
327 	    g_workload_selection == ACCEL_OPC_DECOMPRESS) {
328 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
329 
330 		if (g_workload_selection == ACCEL_OPC_COMPRESS) {
331 			dst_buff_len = task->cur_seg->compressed_len_padded;
332 		}
333 
334 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
335 		if (task->dst == NULL) {
336 			fprintf(stderr, "Unable to alloc dst buffer\n");
337 			return -ENOMEM;
338 		}
339 
340 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
341 		if (!task->dst_iovs) {
342 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
343 			return -ENOMEM;
344 		}
345 		task->dst_iovcnt = g_chained_count;
346 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
347 
348 		return 0;
349 	}
350 
351 	if (g_workload_selection == ACCEL_OPC_CRC32C ||
352 	    g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
353 		assert(g_chained_count > 0);
354 		task->src_iovcnt = g_chained_count;
355 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
356 		if (!task->src_iovs) {
357 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
358 			return -ENOMEM;
359 		}
360 
361 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
362 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
363 		}
364 
365 		for (i = 0; i < task->src_iovcnt; i++) {
366 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
367 			if (task->src_iovs[i].iov_base == NULL) {
368 				return -ENOMEM;
369 			}
370 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
371 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
372 		}
373 	} else if (g_workload_selection == ACCEL_OPC_XOR) {
374 		assert(g_xor_src_count > 1);
375 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
376 		if (!task->sources) {
377 			return -ENOMEM;
378 		}
379 
380 		for (i = 0; i < g_xor_src_count; i++) {
381 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
382 			if (!task->sources[i]) {
383 				return -ENOMEM;
384 			}
385 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
386 		}
387 	} else {
388 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
389 		if (task->src == NULL) {
390 			fprintf(stderr, "Unable to alloc src buffer\n");
391 			return -ENOMEM;
392 		}
393 
394 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
395 		if (g_workload_selection == ACCEL_OPC_FILL) {
396 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
397 		} else {
398 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
399 		}
400 	}
401 
402 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
403 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
404 		if (task->dst == NULL) {
405 			fprintf(stderr, "Unable to alloc dst buffer\n");
406 			return -ENOMEM;
407 		}
408 
409 		/* For compare we want the buffers to match, otherwise not. */
410 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
411 			memset(task->dst, DATA_PATTERN, dst_buff_len);
412 		} else {
413 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
414 		}
415 	}
416 
417 	/* For dualcast 2 buffers are needed for the operation.  */
418 	if (g_workload_selection == ACCEL_OPC_DUALCAST ||
419 	    (g_workload_selection == ACCEL_OPC_XOR && g_verify)) {
420 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
421 		if (task->dst2 == NULL) {
422 			fprintf(stderr, "Unable to alloc dst buffer\n");
423 			return -ENOMEM;
424 		}
425 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
426 	}
427 
428 	return 0;
429 }
430 
431 inline static struct ap_task *
432 _get_task(struct worker_thread *worker)
433 {
434 	struct ap_task *task;
435 
436 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
437 		task = TAILQ_FIRST(&worker->tasks_pool);
438 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
439 	} else {
440 		fprintf(stderr, "Unable to get ap_task\n");
441 		return NULL;
442 	}
443 
444 	return task;
445 }
446 
447 /* Submit one operation using the same ap task that just completed. */
448 static void
449 _submit_single(struct worker_thread *worker, struct ap_task *task)
450 {
451 	int random_num;
452 	int rc = 0;
453 	int flags = 0;
454 
455 	assert(worker);
456 
457 	switch (worker->workload) {
458 	case ACCEL_OPC_COPY:
459 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
460 					    g_xfer_size_bytes, flags, accel_done, task);
461 		break;
462 	case ACCEL_OPC_FILL:
463 		/* For fill use the first byte of the task->dst buffer */
464 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
465 					    g_xfer_size_bytes, flags, accel_done, task);
466 		break;
467 	case ACCEL_OPC_CRC32C:
468 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
469 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
470 					       accel_done, task);
471 		break;
472 	case ACCEL_OPC_COPY_CRC32C:
473 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
474 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
475 		break;
476 	case ACCEL_OPC_COMPARE:
477 		random_num = rand() % 100;
478 		if (random_num < g_fail_percent_goal) {
479 			task->expected_status = -EILSEQ;
480 			*(uint8_t *)task->dst = ~DATA_PATTERN;
481 		} else {
482 			task->expected_status = 0;
483 			*(uint8_t *)task->dst = DATA_PATTERN;
484 		}
485 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
486 					       g_xfer_size_bytes, accel_done, task);
487 		break;
488 	case ACCEL_OPC_DUALCAST:
489 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
490 						task->src, g_xfer_size_bytes, flags, accel_done, task);
491 		break;
492 	case ACCEL_OPC_COMPRESS:
493 		task->src_iovs = task->cur_seg->uncompressed_iovs;
494 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
495 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
496 						task->src_iovs,
497 						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
498 		break;
499 	case ACCEL_OPC_DECOMPRESS:
500 		task->src_iovs = task->cur_seg->compressed_iovs;
501 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
502 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
503 						  task->src_iovcnt, NULL, flags, accel_done, task);
504 		break;
505 	case ACCEL_OPC_XOR:
506 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
507 					   g_xfer_size_bytes, accel_done, task);
508 		break;
509 	default:
510 		assert(false);
511 		break;
512 
513 	}
514 
515 	worker->current_queue_depth++;
516 	if (rc) {
517 		accel_done(task, rc);
518 	}
519 }
520 
521 static void
522 _free_task_buffers(struct ap_task *task)
523 {
524 	uint32_t i;
525 
526 	if (g_workload_selection == ACCEL_OPC_DECOMPRESS || g_workload_selection == ACCEL_OPC_COMPRESS) {
527 		free(task->dst_iovs);
528 	} else if (g_workload_selection == ACCEL_OPC_CRC32C ||
529 		   g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
530 		if (task->src_iovs) {
531 			for (i = 0; i < task->src_iovcnt; i++) {
532 				if (task->src_iovs[i].iov_base) {
533 					spdk_dma_free(task->src_iovs[i].iov_base);
534 				}
535 			}
536 			free(task->src_iovs);
537 		}
538 	} else if (g_workload_selection == ACCEL_OPC_XOR) {
539 		if (task->sources) {
540 			for (i = 0; i < g_xor_src_count; i++) {
541 				spdk_dma_free(task->sources[i]);
542 			}
543 			free(task->sources);
544 		}
545 	} else {
546 		spdk_dma_free(task->src);
547 	}
548 
549 	spdk_dma_free(task->dst);
550 	if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_XOR) {
551 		spdk_dma_free(task->dst2);
552 	}
553 }
554 
555 static int
556 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
557 {
558 	uint32_t i;
559 	uint32_t ttl_len = 0;
560 	uint8_t *dst = (uint8_t *)_dst;
561 
562 	for (i = 0; i < iovcnt; i++) {
563 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
564 			return -1;
565 		}
566 		dst += src_src_iovs[i].iov_len;
567 		ttl_len += src_src_iovs[i].iov_len;
568 	}
569 
570 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
571 		return -1;
572 	}
573 
574 	return 0;
575 }
576 
577 static int _worker_stop(void *arg);
578 
579 static void
580 accel_done(void *arg1, int status)
581 {
582 	struct ap_task *task = arg1;
583 	struct worker_thread *worker = task->worker;
584 	uint32_t sw_crc32c;
585 
586 	assert(worker);
587 	assert(worker->current_queue_depth > 0);
588 
589 	if (g_verify && status == 0) {
590 		switch (worker->workload) {
591 		case ACCEL_OPC_COPY_CRC32C:
592 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
593 			if (task->crc_dst != sw_crc32c) {
594 				SPDK_NOTICELOG("CRC-32C miscompare\n");
595 				worker->xfer_failed++;
596 			}
597 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
598 				SPDK_NOTICELOG("Data miscompare\n");
599 				worker->xfer_failed++;
600 			}
601 			break;
602 		case ACCEL_OPC_CRC32C:
603 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
604 			if (task->crc_dst != sw_crc32c) {
605 				SPDK_NOTICELOG("CRC-32C miscompare\n");
606 				worker->xfer_failed++;
607 			}
608 			break;
609 		case ACCEL_OPC_COPY:
610 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
611 				SPDK_NOTICELOG("Data miscompare\n");
612 				worker->xfer_failed++;
613 			}
614 			break;
615 		case ACCEL_OPC_DUALCAST:
616 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
617 				SPDK_NOTICELOG("Data miscompare, first destination\n");
618 				worker->xfer_failed++;
619 			}
620 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
621 				SPDK_NOTICELOG("Data miscompare, second destination\n");
622 				worker->xfer_failed++;
623 			}
624 			break;
625 		case ACCEL_OPC_FILL:
626 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
627 				SPDK_NOTICELOG("Data miscompare\n");
628 				worker->xfer_failed++;
629 			}
630 			break;
631 		case ACCEL_OPC_COMPARE:
632 			break;
633 		case ACCEL_OPC_COMPRESS:
634 			break;
635 		case ACCEL_OPC_DECOMPRESS:
636 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
637 				SPDK_NOTICELOG("Data miscompare on decompression\n");
638 				worker->xfer_failed++;
639 			}
640 			break;
641 		case ACCEL_OPC_XOR:
642 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
643 					 g_xfer_size_bytes) != 0) {
644 				SPDK_ERRLOG("Failed to generate xor for verification\n");
645 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
646 				SPDK_NOTICELOG("Data miscompare\n");
647 				worker->xfer_failed++;
648 			}
649 			break;
650 		default:
651 			assert(false);
652 			break;
653 		}
654 	}
655 
656 	if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
657 		/* Advance the task to the next segment */
658 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
659 		if (task->cur_seg == NULL) {
660 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
661 		}
662 	}
663 
664 	if (task->expected_status == -EILSEQ) {
665 		assert(status != 0);
666 		worker->injected_miscompares++;
667 		status = 0;
668 	} else if (status) {
669 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
670 		worker->xfer_failed++;
671 	}
672 
673 	worker->current_queue_depth--;
674 
675 	if (!worker->is_draining && status == 0) {
676 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
677 		task = _get_task(worker);
678 		_submit_single(worker, task);
679 	} else {
680 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
681 	}
682 }
683 
684 static int
685 dump_result(void)
686 {
687 	uint64_t total_completed = 0;
688 	uint64_t total_failed = 0;
689 	uint64_t total_miscompared = 0;
690 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
691 	struct worker_thread *worker = g_workers;
692 
693 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
694 	printf("------------------------------------------------------------------------\n");
695 	while (worker != NULL) {
696 
697 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
698 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
699 				       (g_time_in_sec * 1024 * 1024);
700 
701 		total_completed += worker->stats.executed;
702 		total_failed += worker->xfer_failed;
703 		total_miscompared += worker->injected_miscompares;
704 
705 		if (xfer_per_sec) {
706 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
707 			       worker->display.core, worker->display.thread, xfer_per_sec,
708 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
709 		}
710 
711 		worker = worker->next;
712 	}
713 
714 	total_xfer_per_sec = total_completed / g_time_in_sec;
715 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
716 			    (g_time_in_sec * 1024 * 1024);
717 
718 	printf("=========================================================================\n");
719 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
720 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
721 
722 	return total_failed ? 1 : 0;
723 }
724 
725 static inline void
726 _free_task_buffers_in_pool(struct worker_thread *worker)
727 {
728 	struct ap_task *task;
729 
730 	assert(worker);
731 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
732 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
733 		_free_task_buffers(task);
734 	}
735 }
736 
737 static int
738 _check_draining(void *arg)
739 {
740 	struct worker_thread *worker = arg;
741 
742 	assert(worker);
743 
744 	if (worker->current_queue_depth == 0) {
745 		_free_task_buffers_in_pool(worker);
746 		spdk_poller_unregister(&worker->is_draining_poller);
747 		unregister_worker(worker);
748 	}
749 
750 	return SPDK_POLLER_BUSY;
751 }
752 
753 static int
754 _worker_stop(void *arg)
755 {
756 	struct worker_thread *worker = arg;
757 
758 	assert(worker);
759 
760 	spdk_poller_unregister(&worker->stop_poller);
761 
762 	/* now let the worker drain and check it's outstanding IO with a poller */
763 	worker->is_draining = true;
764 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
765 
766 	return SPDK_POLLER_BUSY;
767 }
768 
769 static void
770 _init_thread(void *arg1)
771 {
772 	struct worker_thread *worker;
773 	struct ap_task *task;
774 	int i, num_tasks = g_allocate_depth;
775 	struct display_info *display = arg1;
776 
777 	worker = calloc(1, sizeof(*worker));
778 	if (worker == NULL) {
779 		fprintf(stderr, "Unable to allocate worker\n");
780 		free(display);
781 		return;
782 	}
783 
784 	worker->workload = g_workload_selection;
785 	worker->display.core = display->core;
786 	worker->display.thread = display->thread;
787 	free(display);
788 	worker->core = spdk_env_get_current_core();
789 	worker->thread = spdk_get_thread();
790 	pthread_mutex_lock(&g_workers_lock);
791 	g_num_workers++;
792 	worker->next = g_workers;
793 	g_workers = worker;
794 	pthread_mutex_unlock(&g_workers_lock);
795 	worker->ch = spdk_accel_get_io_channel();
796 	if (worker->ch == NULL) {
797 		fprintf(stderr, "Unable to get an accel channel\n");
798 		goto error;
799 	}
800 
801 	TAILQ_INIT(&worker->tasks_pool);
802 
803 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
804 	if (worker->task_base == NULL) {
805 		fprintf(stderr, "Could not allocate task base.\n");
806 		goto error;
807 	}
808 
809 	task = worker->task_base;
810 	for (i = 0; i < num_tasks; i++) {
811 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
812 		task->worker = worker;
813 		if (_get_task_data_bufs(task)) {
814 			fprintf(stderr, "Unable to get data bufs\n");
815 			goto error;
816 		}
817 		task++;
818 	}
819 
820 	/* Register a poller that will stop the worker at time elapsed */
821 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
822 			      g_time_in_sec * 1000000ULL);
823 
824 	/* Load up queue depth worth of operations. */
825 	for (i = 0; i < g_queue_depth; i++) {
826 		task = _get_task(worker);
827 		if (task == NULL) {
828 			goto error;
829 		}
830 
831 		_submit_single(worker, task);
832 	}
833 	return;
834 error:
835 
836 	_free_task_buffers_in_pool(worker);
837 	free(worker->task_base);
838 	spdk_app_stop(-1);
839 }
840 
841 static void
842 accel_perf_start(void *arg1)
843 {
844 	struct spdk_cpuset tmp_cpumask = {};
845 	char thread_name[32];
846 	uint32_t i;
847 	int j;
848 	struct spdk_thread *thread;
849 	struct display_info *display;
850 
851 	g_tsc_rate = spdk_get_ticks_hz();
852 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
853 
854 	dump_user_config();
855 
856 	printf("Running for %d seconds...\n", g_time_in_sec);
857 	fflush(stdout);
858 
859 	/* Create worker threads for each core that was specified. */
860 	SPDK_ENV_FOREACH_CORE(i) {
861 		for (j = 0; j < g_threads_per_core; j++) {
862 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
863 			spdk_cpuset_zero(&tmp_cpumask);
864 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
865 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
866 			display = calloc(1, sizeof(*display));
867 			if (display == NULL) {
868 				fprintf(stderr, "Unable to allocate memory\n");
869 				spdk_app_stop(-1);
870 				return;
871 			}
872 			display->core = i;
873 			display->thread = j;
874 			spdk_thread_send_msg(thread, _init_thread, display);
875 		}
876 	}
877 }
878 
879 static void
880 accel_perf_free_compress_segs(void)
881 {
882 	struct ap_compress_seg *seg, *tmp;
883 
884 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
885 		free(seg->uncompressed_iovs);
886 		free(seg->compressed_iovs);
887 		spdk_dma_free(seg->compressed_data);
888 		spdk_dma_free(seg->uncompressed_data);
889 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
890 		free(seg);
891 	}
892 }
893 
894 struct accel_perf_prep_ctx {
895 	FILE			*file;
896 	long			remaining;
897 	struct spdk_io_channel	*ch;
898 	struct ap_compress_seg	*cur_seg;
899 };
900 
901 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
902 
903 static void
904 accel_perf_prep_process_seg_cpl(void *ref, int status)
905 {
906 	struct accel_perf_prep_ctx *ctx = ref;
907 	struct ap_compress_seg *seg;
908 
909 	if (status != 0) {
910 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
911 		spdk_dma_free(ctx->cur_seg->compressed_data);
912 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
913 		free(ctx->cur_seg);
914 		spdk_put_io_channel(ctx->ch);
915 		fclose(ctx->file);
916 		free(ctx);
917 		spdk_app_stop(-status);
918 		return;
919 	}
920 
921 	seg = ctx->cur_seg;
922 
923 	if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
924 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
925 		if (seg->compressed_iovs == NULL) {
926 			fprintf(stderr, "unable to allocate iovec\n");
927 			spdk_dma_free(seg->compressed_data);
928 			spdk_dma_free(seg->uncompressed_data);
929 			free(seg);
930 			spdk_put_io_channel(ctx->ch);
931 			fclose(ctx->file);
932 			free(ctx);
933 			spdk_app_stop(-ENOMEM);
934 			return;
935 		}
936 		seg->compressed_iovcnt = g_chained_count;
937 
938 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
939 					  seg->compressed_iovcnt);
940 	}
941 
942 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
943 	ctx->remaining -= seg->uncompressed_len;
944 
945 	accel_perf_prep_process_seg(ctx);
946 }
947 
948 static void
949 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
950 {
951 	struct ap_compress_seg *seg;
952 	int sz, sz_read, sz_padded;
953 	void *ubuf, *cbuf;
954 	struct iovec iov[1];
955 	int rc;
956 
957 	if (ctx->remaining == 0) {
958 		spdk_put_io_channel(ctx->ch);
959 		fclose(ctx->file);
960 		free(ctx);
961 		accel_perf_start(NULL);
962 		return;
963 	}
964 
965 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
966 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
967 	 * would likely either deal with the failure of not having a large enough buffer
968 	 * by submitting another operation with a larger one.  Or, like the vbdev module
969 	 * does, just accept the error and use the data uncompressed marking it as such in
970 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
971 	 * data, etc.
972 	 */
973 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
974 
975 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
976 	if (!ubuf) {
977 		fprintf(stderr, "unable to allocate uncompress buffer\n");
978 		rc = -ENOMEM;
979 		goto error;
980 	}
981 
982 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
983 	if (!cbuf) {
984 		fprintf(stderr, "unable to allocate compress buffer\n");
985 		rc = -ENOMEM;
986 		spdk_dma_free(ubuf);
987 		goto error;
988 	}
989 
990 	seg = calloc(1, sizeof(*seg));
991 	if (!seg) {
992 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
993 		spdk_dma_free(ubuf);
994 		spdk_dma_free(cbuf);
995 		rc = -ENOMEM;
996 		goto error;
997 	}
998 
999 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1000 	if (sz_read != sz) {
1001 		fprintf(stderr, "unable to read input file\n");
1002 		free(seg);
1003 		spdk_dma_free(ubuf);
1004 		spdk_dma_free(cbuf);
1005 		rc = -errno;
1006 		goto error;
1007 	}
1008 
1009 	if (g_workload_selection == ACCEL_OPC_COMPRESS) {
1010 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1011 		if (seg->uncompressed_iovs == NULL) {
1012 			fprintf(stderr, "unable to allocate iovec\n");
1013 			free(seg);
1014 			spdk_dma_free(ubuf);
1015 			spdk_dma_free(cbuf);
1016 			rc = -ENOMEM;
1017 			goto error;
1018 		}
1019 		seg->uncompressed_iovcnt = g_chained_count;
1020 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1021 	}
1022 
1023 	seg->uncompressed_data = ubuf;
1024 	seg->uncompressed_len = sz;
1025 	seg->compressed_data = cbuf;
1026 	seg->compressed_len = sz;
1027 	seg->compressed_len_padded = sz_padded;
1028 
1029 	ctx->cur_seg = seg;
1030 	iov[0].iov_base = seg->uncompressed_data;
1031 	iov[0].iov_len = seg->uncompressed_len;
1032 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1033 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1034 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1035 	 * but real applications may want to consider a more sophisticated method.
1036 	 */
1037 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1038 					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
1039 	if (rc < 0) {
1040 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1041 		goto error;
1042 	}
1043 
1044 	return;
1045 
1046 error:
1047 	spdk_put_io_channel(ctx->ch);
1048 	fclose(ctx->file);
1049 	free(ctx);
1050 	spdk_app_stop(rc);
1051 }
1052 
1053 static void
1054 accel_perf_prep(void *arg1)
1055 {
1056 	struct accel_perf_prep_ctx *ctx;
1057 	int rc = 0;
1058 
1059 	if (g_workload_selection != ACCEL_OPC_COMPRESS &&
1060 	    g_workload_selection != ACCEL_OPC_DECOMPRESS) {
1061 		accel_perf_start(arg1);
1062 		return;
1063 	}
1064 
1065 	if (g_cd_file_in_name == NULL) {
1066 		fprintf(stdout, "A filename is required.\n");
1067 		rc = -EINVAL;
1068 		goto error_end;
1069 	}
1070 
1071 	if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) {
1072 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1073 		rc = -ENOTSUP;
1074 		goto error_end;
1075 	}
1076 
1077 	printf("Preparing input file...\n");
1078 
1079 	ctx = calloc(1, sizeof(*ctx));
1080 	if (ctx == NULL) {
1081 		rc = -ENOMEM;
1082 		goto error_end;
1083 	}
1084 
1085 	ctx->file = fopen(g_cd_file_in_name, "r");
1086 	if (ctx->file == NULL) {
1087 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1088 		rc = -errno;
1089 		goto error_ctx;
1090 	}
1091 
1092 	fseek(ctx->file, 0L, SEEK_END);
1093 	ctx->remaining = ftell(ctx->file);
1094 	fseek(ctx->file, 0L, SEEK_SET);
1095 
1096 	ctx->ch = spdk_accel_get_io_channel();
1097 	if (ctx->ch == NULL) {
1098 		rc = -EAGAIN;
1099 		goto error_file;
1100 	}
1101 
1102 	if (g_xfer_size_bytes == 0) {
1103 		/* size of 0 means "file at a time" */
1104 		g_xfer_size_bytes = ctx->remaining;
1105 	}
1106 
1107 	accel_perf_prep_process_seg(ctx);
1108 	return;
1109 
1110 error_file:
1111 	fclose(ctx->file);
1112 error_ctx:
1113 	free(ctx);
1114 error_end:
1115 	spdk_app_stop(rc);
1116 }
1117 
1118 static void
1119 worker_shutdown(void *ctx)
1120 {
1121 	_worker_stop(ctx);
1122 }
1123 
1124 static void
1125 shutdown_cb(void)
1126 {
1127 	struct worker_thread *worker;
1128 
1129 	pthread_mutex_lock(&g_workers_lock);
1130 	worker = g_workers;
1131 	while (worker) {
1132 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1133 		worker = worker->next;
1134 	}
1135 	pthread_mutex_unlock(&g_workers_lock);
1136 }
1137 
1138 int
1139 main(int argc, char **argv)
1140 {
1141 	struct worker_thread *worker, *tmp;
1142 
1143 	pthread_mutex_init(&g_workers_lock, NULL);
1144 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1145 	g_opts.name = "accel_perf";
1146 	g_opts.reactor_mask = "0x1";
1147 	g_opts.shutdown_cb = shutdown_cb;
1148 	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:x:", NULL, parse_args,
1149 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
1150 		g_rc = -1;
1151 		goto cleanup;
1152 	}
1153 
1154 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
1155 	    (g_workload_selection != ACCEL_OPC_FILL) &&
1156 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
1157 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
1158 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
1159 	    (g_workload_selection != ACCEL_OPC_COMPRESS) &&
1160 	    (g_workload_selection != ACCEL_OPC_DECOMPRESS) &&
1161 	    (g_workload_selection != ACCEL_OPC_DUALCAST) &&
1162 	    (g_workload_selection != ACCEL_OPC_XOR)) {
1163 		usage();
1164 		g_rc = -1;
1165 		goto cleanup;
1166 	}
1167 
1168 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1169 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1170 		usage();
1171 		g_rc = -1;
1172 		goto cleanup;
1173 	}
1174 
1175 	if (g_allocate_depth == 0) {
1176 		g_allocate_depth = g_queue_depth;
1177 	}
1178 
1179 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
1180 	    g_chained_count == 0) {
1181 		usage();
1182 		g_rc = -1;
1183 		goto cleanup;
1184 	}
1185 
1186 	if (g_workload_selection == ACCEL_OPC_XOR && g_xor_src_count < 2) {
1187 		usage();
1188 		g_rc = -1;
1189 		goto cleanup;
1190 	}
1191 
1192 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1193 	if (g_rc) {
1194 		SPDK_ERRLOG("ERROR starting application\n");
1195 	}
1196 
1197 	pthread_mutex_destroy(&g_workers_lock);
1198 
1199 	worker = g_workers;
1200 	while (worker) {
1201 		tmp = worker->next;
1202 		free(worker);
1203 		worker = tmp;
1204 	}
1205 cleanup:
1206 	accel_perf_free_compress_segs();
1207 	spdk_app_fini();
1208 	return g_rc;
1209 }
1210