xref: /spdk/examples/accel/perf/accel_perf.c (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 
17 #define DATA_PATTERN 0x5a
18 #define ALIGN_4K 0x1000
19 #define COMP_BUF_PAD_PERCENTAGE 1.1L
20 
21 static uint64_t	g_tsc_rate;
22 static uint64_t g_tsc_end;
23 static int g_rc;
24 static int g_xfer_size_bytes = 4096;
25 static int g_queue_depth = 32;
26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
27  * be at least as much as the queue depth.
28  */
29 static int g_allocate_depth = 0;
30 static int g_threads_per_core = 1;
31 static int g_time_in_sec = 5;
32 static uint32_t g_crc32c_seed = 0;
33 static uint32_t g_chained_count = 1;
34 static int g_fail_percent_goal = 0;
35 static uint8_t g_fill_pattern = 255;
36 static uint32_t g_xor_src_count = 2;
37 static bool g_verify = false;
38 static const char *g_workload_type = NULL;
39 static enum accel_opcode g_workload_selection;
40 static struct worker_thread *g_workers = NULL;
41 static int g_num_workers = 0;
42 static char *g_cd_file_in_name = NULL;
43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
44 static struct spdk_app_opts g_opts = {};
45 
46 struct ap_compress_seg {
47 	void		*uncompressed_data;
48 	uint32_t	uncompressed_len;
49 	struct iovec	*uncompressed_iovs;
50 	uint32_t	uncompressed_iovcnt;
51 
52 	void		*compressed_data;
53 	uint32_t	compressed_len;
54 	uint32_t	compressed_len_padded;
55 	struct iovec	*compressed_iovs;
56 	uint32_t	compressed_iovcnt;
57 
58 	STAILQ_ENTRY(ap_compress_seg)	link;
59 };
60 
61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
62 
63 struct worker_thread;
64 static void accel_done(void *ref, int status);
65 
66 struct display_info {
67 	int core;
68 	int thread;
69 };
70 
71 struct ap_task {
72 	void			*src;
73 	struct iovec		*src_iovs;
74 	uint32_t		src_iovcnt;
75 	void			**sources;
76 	struct iovec		*dst_iovs;
77 	uint32_t		dst_iovcnt;
78 	void			*dst;
79 	void			*dst2;
80 	uint32_t		crc_dst;
81 	uint32_t		compressed_sz;
82 	struct ap_compress_seg *cur_seg;
83 	struct worker_thread	*worker;
84 	int			expected_status; /* used for the compare operation */
85 	TAILQ_ENTRY(ap_task)	link;
86 };
87 
88 struct worker_thread {
89 	struct spdk_io_channel		*ch;
90 	uint64_t			xfer_completed;
91 	uint64_t			xfer_failed;
92 	uint64_t			injected_miscompares;
93 	uint64_t			current_queue_depth;
94 	TAILQ_HEAD(, ap_task)		tasks_pool;
95 	struct worker_thread		*next;
96 	unsigned			core;
97 	struct spdk_thread		*thread;
98 	bool				is_draining;
99 	struct spdk_poller		*is_draining_poller;
100 	struct spdk_poller		*stop_poller;
101 	void				*task_base;
102 	struct display_info		display;
103 	enum accel_opcode		workload;
104 };
105 
106 static void
107 dump_user_config(void)
108 {
109 	const char *module_name = NULL;
110 	int rc;
111 
112 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
113 	if (rc) {
114 		printf("error getting module name (%d)\n", rc);
115 	}
116 
117 	printf("\nSPDK Configuration:\n");
118 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
119 	printf("Accel Perf Configuration:\n");
120 	printf("Workload Type:  %s\n", g_workload_type);
121 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
122 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
123 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
124 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
125 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
126 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
127 	} else if (g_workload_selection == ACCEL_OPC_XOR) {
128 		printf("Source buffers: %u\n", g_xor_src_count);
129 	}
130 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
131 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
132 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
133 	} else {
134 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
135 	}
136 	printf("vector count    %u\n", g_chained_count);
137 	printf("Module:         %s\n", module_name);
138 	if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
139 		printf("File Name:      %s\n", g_cd_file_in_name);
140 	}
141 	printf("Queue depth:    %u\n", g_queue_depth);
142 	printf("Allocate depth: %u\n", g_allocate_depth);
143 	printf("# threads/core: %u\n", g_threads_per_core);
144 	printf("Run time:       %u seconds\n", g_time_in_sec);
145 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
146 }
147 
148 static void
149 usage(void)
150 {
151 	printf("accel_perf options:\n");
152 	printf("\t[-h help message]\n");
153 	printf("\t[-q queue depth per core]\n");
154 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
155 	printf("\t[-T number of threads per core\n");
156 	printf("\t[-n number of channels]\n");
157 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
158 	printf("\t[-t time in seconds]\n");
159 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n");
160 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
161 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
162 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
163 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
164 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
165 	printf("\t[-y verify result if this switch is on]\n");
166 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
167 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
168 }
169 
170 static int
171 parse_args(int argc, char *argv)
172 {
173 	int argval = 0;
174 
175 	switch (argc) {
176 	case 'a':
177 	case 'C':
178 	case 'f':
179 	case 'T':
180 	case 'o':
181 	case 'P':
182 	case 'q':
183 	case 's':
184 	case 't':
185 	case 'x':
186 		argval = spdk_strtol(optarg, 10);
187 		if (argval < 0) {
188 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
189 			usage();
190 			return 1;
191 		}
192 		break;
193 	default:
194 		break;
195 	};
196 
197 	switch (argc) {
198 	case 'a':
199 		g_allocate_depth = argval;
200 		break;
201 	case 'C':
202 		g_chained_count = argval;
203 		break;
204 	case 'l':
205 		g_cd_file_in_name = optarg;
206 		break;
207 	case 'f':
208 		g_fill_pattern = (uint8_t)argval;
209 		break;
210 	case 'T':
211 		g_threads_per_core = argval;
212 		break;
213 	case 'o':
214 		g_xfer_size_bytes = argval;
215 		break;
216 	case 'P':
217 		g_fail_percent_goal = argval;
218 		break;
219 	case 'q':
220 		g_queue_depth = argval;
221 		break;
222 	case 's':
223 		g_crc32c_seed = argval;
224 		break;
225 	case 't':
226 		g_time_in_sec = argval;
227 		break;
228 	case 'x':
229 		g_xor_src_count = argval;
230 		break;
231 	case 'y':
232 		g_verify = true;
233 		break;
234 	case 'w':
235 		g_workload_type = optarg;
236 		if (!strcmp(g_workload_type, "copy")) {
237 			g_workload_selection = ACCEL_OPC_COPY;
238 		} else if (!strcmp(g_workload_type, "fill")) {
239 			g_workload_selection = ACCEL_OPC_FILL;
240 		} else if (!strcmp(g_workload_type, "crc32c")) {
241 			g_workload_selection = ACCEL_OPC_CRC32C;
242 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
243 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
244 		} else if (!strcmp(g_workload_type, "compare")) {
245 			g_workload_selection = ACCEL_OPC_COMPARE;
246 		} else if (!strcmp(g_workload_type, "dualcast")) {
247 			g_workload_selection = ACCEL_OPC_DUALCAST;
248 		} else if (!strcmp(g_workload_type, "compress")) {
249 			g_workload_selection = ACCEL_OPC_COMPRESS;
250 		} else if (!strcmp(g_workload_type, "decompress")) {
251 			g_workload_selection = ACCEL_OPC_DECOMPRESS;
252 		} else if (!strcmp(g_workload_type, "xor")) {
253 			g_workload_selection = ACCEL_OPC_XOR;
254 		} else {
255 			usage();
256 			return 1;
257 		}
258 		break;
259 	default:
260 		usage();
261 		return 1;
262 	}
263 
264 	return 0;
265 }
266 
267 static int dump_result(void);
268 static void
269 unregister_worker(void *arg1)
270 {
271 	struct worker_thread *worker = arg1;
272 
273 	free(worker->task_base);
274 	spdk_put_io_channel(worker->ch);
275 	spdk_thread_exit(spdk_get_thread());
276 	pthread_mutex_lock(&g_workers_lock);
277 	assert(g_num_workers >= 1);
278 	if (--g_num_workers == 0) {
279 		pthread_mutex_unlock(&g_workers_lock);
280 		g_rc = dump_result();
281 		spdk_app_stop(0);
282 	} else {
283 		pthread_mutex_unlock(&g_workers_lock);
284 	}
285 }
286 
287 static void
288 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
289 {
290 	uint64_t ele_size;
291 	uint8_t *data;
292 	uint32_t i;
293 
294 	ele_size = spdk_divide_round_up(sz, iovcnt);
295 
296 	data = buf;
297 	for (i = 0; i < iovcnt; i++) {
298 		ele_size = spdk_min(ele_size, sz);
299 		assert(ele_size > 0);
300 
301 		iovs[i].iov_base = data;
302 		iovs[i].iov_len = ele_size;
303 
304 		data += ele_size;
305 		sz -= ele_size;
306 	}
307 	assert(sz == 0);
308 }
309 
310 static int
311 _get_task_data_bufs(struct ap_task *task)
312 {
313 	uint32_t align = 0;
314 	uint32_t i = 0;
315 	int dst_buff_len = g_xfer_size_bytes;
316 
317 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
318 	 * we do this for all modules to keep it simple.
319 	 */
320 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
321 		align = ALIGN_4K;
322 	}
323 
324 	if (g_workload_selection == ACCEL_OPC_COMPRESS ||
325 	    g_workload_selection == ACCEL_OPC_DECOMPRESS) {
326 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
327 
328 		if (g_workload_selection == ACCEL_OPC_COMPRESS) {
329 			dst_buff_len = task->cur_seg->compressed_len_padded;
330 		}
331 
332 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
333 		if (task->dst == NULL) {
334 			fprintf(stderr, "Unable to alloc dst buffer\n");
335 			return -ENOMEM;
336 		}
337 
338 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
339 		if (!task->dst_iovs) {
340 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
341 			return -ENOMEM;
342 		}
343 		task->dst_iovcnt = g_chained_count;
344 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
345 
346 		return 0;
347 	}
348 
349 	if (g_workload_selection == ACCEL_OPC_CRC32C ||
350 	    g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
351 		assert(g_chained_count > 0);
352 		task->src_iovcnt = g_chained_count;
353 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
354 		if (!task->src_iovs) {
355 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
356 			return -ENOMEM;
357 		}
358 
359 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
360 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
361 		}
362 
363 		for (i = 0; i < task->src_iovcnt; i++) {
364 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
365 			if (task->src_iovs[i].iov_base == NULL) {
366 				return -ENOMEM;
367 			}
368 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
369 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
370 		}
371 	} else if (g_workload_selection == ACCEL_OPC_XOR) {
372 		assert(g_xor_src_count > 1);
373 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
374 		if (!task->sources) {
375 			return -ENOMEM;
376 		}
377 
378 		for (i = 0; i < g_xor_src_count; i++) {
379 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
380 			if (!task->sources[i]) {
381 				return -ENOMEM;
382 			}
383 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
384 		}
385 	} else {
386 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
387 		if (task->src == NULL) {
388 			fprintf(stderr, "Unable to alloc src buffer\n");
389 			return -ENOMEM;
390 		}
391 
392 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
393 		if (g_workload_selection == ACCEL_OPC_FILL) {
394 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
395 		} else {
396 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
397 		}
398 	}
399 
400 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
401 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
402 		if (task->dst == NULL) {
403 			fprintf(stderr, "Unable to alloc dst buffer\n");
404 			return -ENOMEM;
405 		}
406 
407 		/* For compare we want the buffers to match, otherwise not. */
408 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
409 			memset(task->dst, DATA_PATTERN, dst_buff_len);
410 		} else {
411 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
412 		}
413 	}
414 
415 	/* For dualcast 2 buffers are needed for the operation.  */
416 	if (g_workload_selection == ACCEL_OPC_DUALCAST ||
417 	    (g_workload_selection == ACCEL_OPC_XOR && g_verify)) {
418 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
419 		if (task->dst2 == NULL) {
420 			fprintf(stderr, "Unable to alloc dst buffer\n");
421 			return -ENOMEM;
422 		}
423 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
424 	}
425 
426 	return 0;
427 }
428 
429 inline static struct ap_task *
430 _get_task(struct worker_thread *worker)
431 {
432 	struct ap_task *task;
433 
434 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
435 		task = TAILQ_FIRST(&worker->tasks_pool);
436 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
437 	} else {
438 		fprintf(stderr, "Unable to get ap_task\n");
439 		return NULL;
440 	}
441 
442 	return task;
443 }
444 
445 /* Submit one operation using the same ap task that just completed. */
446 static void
447 _submit_single(struct worker_thread *worker, struct ap_task *task)
448 {
449 	int random_num;
450 	int rc = 0;
451 	int flags = 0;
452 
453 	assert(worker);
454 
455 	switch (worker->workload) {
456 	case ACCEL_OPC_COPY:
457 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
458 					    g_xfer_size_bytes, flags, accel_done, task);
459 		break;
460 	case ACCEL_OPC_FILL:
461 		/* For fill use the first byte of the task->dst buffer */
462 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
463 					    g_xfer_size_bytes, flags, accel_done, task);
464 		break;
465 	case ACCEL_OPC_CRC32C:
466 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
467 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
468 					       accel_done, task);
469 		break;
470 	case ACCEL_OPC_COPY_CRC32C:
471 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
472 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
473 		break;
474 	case ACCEL_OPC_COMPARE:
475 		random_num = rand() % 100;
476 		if (random_num < g_fail_percent_goal) {
477 			task->expected_status = -EILSEQ;
478 			*(uint8_t *)task->dst = ~DATA_PATTERN;
479 		} else {
480 			task->expected_status = 0;
481 			*(uint8_t *)task->dst = DATA_PATTERN;
482 		}
483 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
484 					       g_xfer_size_bytes, accel_done, task);
485 		break;
486 	case ACCEL_OPC_DUALCAST:
487 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
488 						task->src, g_xfer_size_bytes, flags, accel_done, task);
489 		break;
490 	case ACCEL_OPC_COMPRESS:
491 		task->src_iovs = task->cur_seg->uncompressed_iovs;
492 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
493 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
494 						task->src_iovs,
495 						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
496 		break;
497 	case ACCEL_OPC_DECOMPRESS:
498 		task->src_iovs = task->cur_seg->compressed_iovs;
499 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
500 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
501 						  task->src_iovcnt, NULL, flags, accel_done, task);
502 		break;
503 	case ACCEL_OPC_XOR:
504 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
505 					   g_xfer_size_bytes, accel_done, task);
506 		break;
507 	default:
508 		assert(false);
509 		break;
510 
511 	}
512 
513 	worker->current_queue_depth++;
514 	if (rc) {
515 		accel_done(task, rc);
516 	}
517 }
518 
519 static void
520 _free_task_buffers(struct ap_task *task)
521 {
522 	uint32_t i;
523 
524 	if (g_workload_selection == ACCEL_OPC_DECOMPRESS || g_workload_selection == ACCEL_OPC_COMPRESS) {
525 		free(task->dst_iovs);
526 	} else if (g_workload_selection == ACCEL_OPC_CRC32C ||
527 		   g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
528 		if (task->src_iovs) {
529 			for (i = 0; i < task->src_iovcnt; i++) {
530 				if (task->src_iovs[i].iov_base) {
531 					spdk_dma_free(task->src_iovs[i].iov_base);
532 				}
533 			}
534 			free(task->src_iovs);
535 		}
536 	} else if (g_workload_selection == ACCEL_OPC_XOR) {
537 		if (task->sources) {
538 			for (i = 0; i < g_xor_src_count; i++) {
539 				spdk_dma_free(task->sources[i]);
540 			}
541 			free(task->sources);
542 		}
543 	} else {
544 		spdk_dma_free(task->src);
545 	}
546 
547 	spdk_dma_free(task->dst);
548 	if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_XOR) {
549 		spdk_dma_free(task->dst2);
550 	}
551 }
552 
553 static int
554 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
555 {
556 	uint32_t i;
557 	uint32_t ttl_len = 0;
558 	uint8_t *dst = (uint8_t *)_dst;
559 
560 	for (i = 0; i < iovcnt; i++) {
561 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
562 			return -1;
563 		}
564 		dst += src_src_iovs[i].iov_len;
565 		ttl_len += src_src_iovs[i].iov_len;
566 	}
567 
568 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
569 		return -1;
570 	}
571 
572 	return 0;
573 }
574 
575 static int _worker_stop(void *arg);
576 
577 static void
578 accel_done(void *arg1, int status)
579 {
580 	struct ap_task *task = arg1;
581 	struct worker_thread *worker = task->worker;
582 	uint32_t sw_crc32c;
583 
584 	assert(worker);
585 	assert(worker->current_queue_depth > 0);
586 
587 	if (g_verify && status == 0) {
588 		switch (worker->workload) {
589 		case ACCEL_OPC_COPY_CRC32C:
590 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
591 			if (task->crc_dst != sw_crc32c) {
592 				SPDK_NOTICELOG("CRC-32C miscompare\n");
593 				worker->xfer_failed++;
594 			}
595 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
596 				SPDK_NOTICELOG("Data miscompare\n");
597 				worker->xfer_failed++;
598 			}
599 			break;
600 		case ACCEL_OPC_CRC32C:
601 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
602 			if (task->crc_dst != sw_crc32c) {
603 				SPDK_NOTICELOG("CRC-32C miscompare\n");
604 				worker->xfer_failed++;
605 			}
606 			break;
607 		case ACCEL_OPC_COPY:
608 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
609 				SPDK_NOTICELOG("Data miscompare\n");
610 				worker->xfer_failed++;
611 			}
612 			break;
613 		case ACCEL_OPC_DUALCAST:
614 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
615 				SPDK_NOTICELOG("Data miscompare, first destination\n");
616 				worker->xfer_failed++;
617 			}
618 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
619 				SPDK_NOTICELOG("Data miscompare, second destination\n");
620 				worker->xfer_failed++;
621 			}
622 			break;
623 		case ACCEL_OPC_FILL:
624 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
625 				SPDK_NOTICELOG("Data miscompare\n");
626 				worker->xfer_failed++;
627 			}
628 			break;
629 		case ACCEL_OPC_COMPARE:
630 			break;
631 		case ACCEL_OPC_COMPRESS:
632 			break;
633 		case ACCEL_OPC_DECOMPRESS:
634 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
635 				SPDK_NOTICELOG("Data miscompare on decompression\n");
636 				worker->xfer_failed++;
637 			}
638 			break;
639 		case ACCEL_OPC_XOR:
640 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
641 					 g_xfer_size_bytes) != 0) {
642 				SPDK_ERRLOG("Failed to generate xor for verification\n");
643 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
644 				SPDK_NOTICELOG("Data miscompare\n");
645 				worker->xfer_failed++;
646 			}
647 			break;
648 		default:
649 			assert(false);
650 			break;
651 		}
652 	}
653 
654 	if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
655 		/* Advance the task to the next segment */
656 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
657 		if (task->cur_seg == NULL) {
658 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
659 		}
660 	}
661 
662 	if (task->expected_status == -EILSEQ) {
663 		assert(status != 0);
664 		worker->injected_miscompares++;
665 		status = 0;
666 	} else if (status) {
667 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
668 		worker->xfer_failed++;
669 	}
670 
671 	worker->xfer_completed++;
672 	worker->current_queue_depth--;
673 
674 	if (!worker->is_draining && status == 0) {
675 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
676 		task = _get_task(worker);
677 		_submit_single(worker, task);
678 	} else {
679 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
680 	}
681 }
682 
683 static int
684 dump_result(void)
685 {
686 	uint64_t total_completed = 0;
687 	uint64_t total_failed = 0;
688 	uint64_t total_miscompared = 0;
689 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
690 	struct worker_thread *worker = g_workers;
691 
692 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
693 	printf("------------------------------------------------------------------------\n");
694 	while (worker != NULL) {
695 
696 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
697 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
698 				       (g_time_in_sec * 1024 * 1024);
699 
700 		total_completed += worker->xfer_completed;
701 		total_failed += worker->xfer_failed;
702 		total_miscompared += worker->injected_miscompares;
703 
704 		if (xfer_per_sec) {
705 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
706 			       worker->display.core, worker->display.thread, xfer_per_sec,
707 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
708 		}
709 
710 		worker = worker->next;
711 	}
712 
713 	total_xfer_per_sec = total_completed / g_time_in_sec;
714 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
715 			    (g_time_in_sec * 1024 * 1024);
716 
717 	printf("=========================================================================\n");
718 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
719 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
720 
721 	return total_failed ? 1 : 0;
722 }
723 
724 static inline void
725 _free_task_buffers_in_pool(struct worker_thread *worker)
726 {
727 	struct ap_task *task;
728 
729 	assert(worker);
730 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
731 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
732 		_free_task_buffers(task);
733 	}
734 }
735 
736 static int
737 _check_draining(void *arg)
738 {
739 	struct worker_thread *worker = arg;
740 
741 	assert(worker);
742 
743 	if (worker->current_queue_depth == 0) {
744 		_free_task_buffers_in_pool(worker);
745 		spdk_poller_unregister(&worker->is_draining_poller);
746 		unregister_worker(worker);
747 	}
748 
749 	return SPDK_POLLER_BUSY;
750 }
751 
752 static int
753 _worker_stop(void *arg)
754 {
755 	struct worker_thread *worker = arg;
756 
757 	assert(worker);
758 
759 	spdk_poller_unregister(&worker->stop_poller);
760 
761 	/* now let the worker drain and check it's outstanding IO with a poller */
762 	worker->is_draining = true;
763 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
764 
765 	return SPDK_POLLER_BUSY;
766 }
767 
768 static void
769 _init_thread(void *arg1)
770 {
771 	struct worker_thread *worker;
772 	struct ap_task *task;
773 	int i, num_tasks = g_allocate_depth;
774 	struct display_info *display = arg1;
775 
776 	worker = calloc(1, sizeof(*worker));
777 	if (worker == NULL) {
778 		fprintf(stderr, "Unable to allocate worker\n");
779 		free(display);
780 		return;
781 	}
782 
783 	worker->workload = g_workload_selection;
784 	worker->display.core = display->core;
785 	worker->display.thread = display->thread;
786 	free(display);
787 	worker->core = spdk_env_get_current_core();
788 	worker->thread = spdk_get_thread();
789 	pthread_mutex_lock(&g_workers_lock);
790 	g_num_workers++;
791 	worker->next = g_workers;
792 	g_workers = worker;
793 	pthread_mutex_unlock(&g_workers_lock);
794 	worker->ch = spdk_accel_get_io_channel();
795 	if (worker->ch == NULL) {
796 		fprintf(stderr, "Unable to get an accel channel\n");
797 		goto error;
798 	}
799 
800 	TAILQ_INIT(&worker->tasks_pool);
801 
802 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
803 	if (worker->task_base == NULL) {
804 		fprintf(stderr, "Could not allocate task base.\n");
805 		goto error;
806 	}
807 
808 	task = worker->task_base;
809 	for (i = 0; i < num_tasks; i++) {
810 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
811 		task->worker = worker;
812 		if (_get_task_data_bufs(task)) {
813 			fprintf(stderr, "Unable to get data bufs\n");
814 			goto error;
815 		}
816 		task++;
817 	}
818 
819 	/* Register a poller that will stop the worker at time elapsed */
820 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
821 			      g_time_in_sec * 1000000ULL);
822 
823 	/* Load up queue depth worth of operations. */
824 	for (i = 0; i < g_queue_depth; i++) {
825 		task = _get_task(worker);
826 		if (task == NULL) {
827 			goto error;
828 		}
829 
830 		_submit_single(worker, task);
831 	}
832 	return;
833 error:
834 
835 	_free_task_buffers_in_pool(worker);
836 	free(worker->task_base);
837 	spdk_app_stop(-1);
838 }
839 
840 static void
841 accel_perf_start(void *arg1)
842 {
843 	struct spdk_cpuset tmp_cpumask = {};
844 	char thread_name[32];
845 	uint32_t i;
846 	int j;
847 	struct spdk_thread *thread;
848 	struct display_info *display;
849 
850 	g_tsc_rate = spdk_get_ticks_hz();
851 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
852 
853 	dump_user_config();
854 
855 	printf("Running for %d seconds...\n", g_time_in_sec);
856 	fflush(stdout);
857 
858 	/* Create worker threads for each core that was specified. */
859 	SPDK_ENV_FOREACH_CORE(i) {
860 		for (j = 0; j < g_threads_per_core; j++) {
861 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
862 			spdk_cpuset_zero(&tmp_cpumask);
863 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
864 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
865 			display = calloc(1, sizeof(*display));
866 			if (display == NULL) {
867 				fprintf(stderr, "Unable to allocate memory\n");
868 				spdk_app_stop(-1);
869 				return;
870 			}
871 			display->core = i;
872 			display->thread = j;
873 			spdk_thread_send_msg(thread, _init_thread, display);
874 		}
875 	}
876 }
877 
878 static void
879 accel_perf_free_compress_segs(void)
880 {
881 	struct ap_compress_seg *seg, *tmp;
882 
883 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
884 		free(seg->uncompressed_iovs);
885 		free(seg->compressed_iovs);
886 		spdk_dma_free(seg->compressed_data);
887 		spdk_dma_free(seg->uncompressed_data);
888 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
889 		free(seg);
890 	}
891 }
892 
893 struct accel_perf_prep_ctx {
894 	FILE			*file;
895 	long			remaining;
896 	struct spdk_io_channel	*ch;
897 	struct ap_compress_seg	*cur_seg;
898 };
899 
900 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
901 
902 static void
903 accel_perf_prep_process_seg_cpl(void *ref, int status)
904 {
905 	struct accel_perf_prep_ctx *ctx = ref;
906 	struct ap_compress_seg *seg;
907 
908 	if (status != 0) {
909 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
910 		spdk_dma_free(ctx->cur_seg->compressed_data);
911 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
912 		free(ctx->cur_seg);
913 		spdk_put_io_channel(ctx->ch);
914 		fclose(ctx->file);
915 		free(ctx);
916 		spdk_app_stop(-status);
917 		return;
918 	}
919 
920 	seg = ctx->cur_seg;
921 
922 	if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
923 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
924 		if (seg->compressed_iovs == NULL) {
925 			fprintf(stderr, "unable to allocate iovec\n");
926 			spdk_dma_free(seg->compressed_data);
927 			spdk_dma_free(seg->uncompressed_data);
928 			free(seg);
929 			spdk_put_io_channel(ctx->ch);
930 			fclose(ctx->file);
931 			free(ctx);
932 			spdk_app_stop(-ENOMEM);
933 			return;
934 		}
935 		seg->compressed_iovcnt = g_chained_count;
936 
937 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
938 					  seg->compressed_iovcnt);
939 	}
940 
941 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
942 	ctx->remaining -= seg->uncompressed_len;
943 
944 	accel_perf_prep_process_seg(ctx);
945 }
946 
947 static void
948 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
949 {
950 	struct ap_compress_seg *seg;
951 	int sz, sz_read, sz_padded;
952 	void *ubuf, *cbuf;
953 	struct iovec iov[1];
954 	int rc;
955 
956 	if (ctx->remaining == 0) {
957 		spdk_put_io_channel(ctx->ch);
958 		fclose(ctx->file);
959 		free(ctx);
960 		accel_perf_start(NULL);
961 		return;
962 	}
963 
964 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
965 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
966 	 * would likely either deal with the failure of not having a large enough buffer
967 	 * by submitting another operation with a larger one.  Or, like the vbdev module
968 	 * does, just accept the error and use the data uncompressed marking it as such in
969 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
970 	 * data, etc.
971 	 */
972 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
973 
974 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
975 	if (!ubuf) {
976 		fprintf(stderr, "unable to allocate uncompress buffer\n");
977 		rc = -ENOMEM;
978 		goto error;
979 	}
980 
981 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
982 	if (!cbuf) {
983 		fprintf(stderr, "unable to allocate compress buffer\n");
984 		rc = -ENOMEM;
985 		spdk_dma_free(ubuf);
986 		goto error;
987 	}
988 
989 	seg = calloc(1, sizeof(*seg));
990 	if (!seg) {
991 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
992 		spdk_dma_free(ubuf);
993 		spdk_dma_free(cbuf);
994 		rc = -ENOMEM;
995 		goto error;
996 	}
997 
998 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
999 	if (sz_read != sz) {
1000 		fprintf(stderr, "unable to read input file\n");
1001 		free(seg);
1002 		spdk_dma_free(ubuf);
1003 		spdk_dma_free(cbuf);
1004 		rc = -errno;
1005 		goto error;
1006 	}
1007 
1008 	if (g_workload_selection == ACCEL_OPC_COMPRESS) {
1009 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1010 		if (seg->uncompressed_iovs == NULL) {
1011 			fprintf(stderr, "unable to allocate iovec\n");
1012 			free(seg);
1013 			spdk_dma_free(ubuf);
1014 			spdk_dma_free(cbuf);
1015 			rc = -ENOMEM;
1016 			goto error;
1017 		}
1018 		seg->uncompressed_iovcnt = g_chained_count;
1019 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1020 	}
1021 
1022 	seg->uncompressed_data = ubuf;
1023 	seg->uncompressed_len = sz;
1024 	seg->compressed_data = cbuf;
1025 	seg->compressed_len = sz;
1026 	seg->compressed_len_padded = sz_padded;
1027 
1028 	ctx->cur_seg = seg;
1029 	iov[0].iov_base = seg->uncompressed_data;
1030 	iov[0].iov_len = seg->uncompressed_len;
1031 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1032 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1033 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1034 	 * but real applications may want to consider a more sophisticated method.
1035 	 */
1036 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1037 					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
1038 	if (rc < 0) {
1039 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1040 		goto error;
1041 	}
1042 
1043 	return;
1044 
1045 error:
1046 	spdk_put_io_channel(ctx->ch);
1047 	fclose(ctx->file);
1048 	free(ctx);
1049 	spdk_app_stop(rc);
1050 }
1051 
1052 static void
1053 accel_perf_prep(void *arg1)
1054 {
1055 	struct accel_perf_prep_ctx *ctx;
1056 	int rc = 0;
1057 
1058 	if (g_workload_selection != ACCEL_OPC_COMPRESS &&
1059 	    g_workload_selection != ACCEL_OPC_DECOMPRESS) {
1060 		accel_perf_start(arg1);
1061 		return;
1062 	}
1063 
1064 	if (g_cd_file_in_name == NULL) {
1065 		fprintf(stdout, "A filename is required.\n");
1066 		rc = -EINVAL;
1067 		goto error_end;
1068 	}
1069 
1070 	if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) {
1071 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1072 		rc = -ENOTSUP;
1073 		goto error_end;
1074 	}
1075 
1076 	printf("Preparing input file...\n");
1077 
1078 	ctx = calloc(1, sizeof(*ctx));
1079 	if (ctx == NULL) {
1080 		rc = -ENOMEM;
1081 		goto error_end;
1082 	}
1083 
1084 	ctx->file = fopen(g_cd_file_in_name, "r");
1085 	if (ctx->file == NULL) {
1086 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1087 		rc = -errno;
1088 		goto error_ctx;
1089 	}
1090 
1091 	fseek(ctx->file, 0L, SEEK_END);
1092 	ctx->remaining = ftell(ctx->file);
1093 	fseek(ctx->file, 0L, SEEK_SET);
1094 
1095 	ctx->ch = spdk_accel_get_io_channel();
1096 	if (ctx->ch == NULL) {
1097 		rc = -EAGAIN;
1098 		goto error_file;
1099 	}
1100 
1101 	if (g_xfer_size_bytes == 0) {
1102 		/* size of 0 means "file at a time" */
1103 		g_xfer_size_bytes = ctx->remaining;
1104 	}
1105 
1106 	accel_perf_prep_process_seg(ctx);
1107 	return;
1108 
1109 error_file:
1110 	fclose(ctx->file);
1111 error_ctx:
1112 	free(ctx);
1113 error_end:
1114 	spdk_app_stop(rc);
1115 }
1116 
1117 int
1118 main(int argc, char **argv)
1119 {
1120 	struct worker_thread *worker, *tmp;
1121 
1122 	pthread_mutex_init(&g_workers_lock, NULL);
1123 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1124 	g_opts.name = "accel_perf";
1125 	g_opts.reactor_mask = "0x1";
1126 	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:x:", NULL, parse_args,
1127 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
1128 		g_rc = -1;
1129 		goto cleanup;
1130 	}
1131 
1132 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
1133 	    (g_workload_selection != ACCEL_OPC_FILL) &&
1134 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
1135 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
1136 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
1137 	    (g_workload_selection != ACCEL_OPC_COMPRESS) &&
1138 	    (g_workload_selection != ACCEL_OPC_DECOMPRESS) &&
1139 	    (g_workload_selection != ACCEL_OPC_DUALCAST) &&
1140 	    (g_workload_selection != ACCEL_OPC_XOR)) {
1141 		usage();
1142 		g_rc = -1;
1143 		goto cleanup;
1144 	}
1145 
1146 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1147 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1148 		usage();
1149 		g_rc = -1;
1150 		goto cleanup;
1151 	}
1152 
1153 	if (g_allocate_depth == 0) {
1154 		g_allocate_depth = g_queue_depth;
1155 	}
1156 
1157 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
1158 	    g_chained_count == 0) {
1159 		usage();
1160 		g_rc = -1;
1161 		goto cleanup;
1162 	}
1163 
1164 	if (g_workload_selection == ACCEL_OPC_XOR && g_xor_src_count < 2) {
1165 		usage();
1166 		g_rc = -1;
1167 		goto cleanup;
1168 	}
1169 
1170 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1171 	if (g_rc) {
1172 		SPDK_ERRLOG("ERROR starting application\n");
1173 	}
1174 
1175 	pthread_mutex_destroy(&g_workers_lock);
1176 
1177 	worker = g_workers;
1178 	while (worker) {
1179 		tmp = worker->next;
1180 		free(worker);
1181 		worker = tmp;
1182 	}
1183 cleanup:
1184 	accel_perf_free_compress_segs();
1185 	spdk_app_fini();
1186 	return g_rc;
1187 }
1188