xref: /spdk/examples/accel/perf/accel_perf.c (revision b6875e1ce57743f3b1416016b9c624d79a862af9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 #include "spdk/dif.h"
17 
18 #define DATA_PATTERN 0x5a
19 #define ALIGN_4K 0x1000
20 #define COMP_BUF_PAD_PERCENTAGE 1.1L
21 
22 static uint64_t	g_tsc_rate;
23 static uint64_t g_tsc_end;
24 static int g_rc;
25 static int g_xfer_size_bytes = 4096;
26 static int g_block_size_bytes = 512;
27 static int g_md_size_bytes = 8;
28 static int g_queue_depth = 32;
29 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
30  * be at least as much as the queue depth.
31  */
32 static int g_allocate_depth = 0;
33 static int g_threads_per_core = 1;
34 static int g_time_in_sec = 5;
35 static uint32_t g_crc32c_seed = 0;
36 static uint32_t g_chained_count = 1;
37 static int g_fail_percent_goal = 0;
38 static uint8_t g_fill_pattern = 255;
39 static uint32_t g_xor_src_count = 2;
40 static bool g_verify = false;
41 static const char *g_workload_type = NULL;
42 static enum spdk_accel_opcode g_workload_selection = SPDK_ACCEL_OPC_LAST;
43 static const char *g_module_name = NULL;
44 static struct worker_thread *g_workers = NULL;
45 static int g_num_workers = 0;
46 static char *g_cd_file_in_name = NULL;
47 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
48 static struct spdk_app_opts g_opts = {};
49 
50 struct ap_compress_seg {
51 	void		*uncompressed_data;
52 	uint32_t	uncompressed_len;
53 	struct iovec	*uncompressed_iovs;
54 	uint32_t	uncompressed_iovcnt;
55 
56 	void		*compressed_data;
57 	uint32_t	compressed_len;
58 	uint32_t	compressed_len_padded;
59 	struct iovec	*compressed_iovs;
60 	uint32_t	compressed_iovcnt;
61 
62 	STAILQ_ENTRY(ap_compress_seg)	link;
63 };
64 
65 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
66 
67 struct worker_thread;
68 static void accel_done(void *ref, int status);
69 
70 struct display_info {
71 	int core;
72 	int thread;
73 };
74 
75 struct ap_task {
76 	void			*src;
77 	struct iovec		*src_iovs;
78 	uint32_t		src_iovcnt;
79 	void			**sources;
80 	struct iovec		*dst_iovs;
81 	uint32_t		dst_iovcnt;
82 	void			*dst;
83 	void			*dst2;
84 	uint32_t		*crc_dst;
85 	uint32_t		compressed_sz;
86 	struct ap_compress_seg *cur_seg;
87 	struct worker_thread	*worker;
88 	int			expected_status; /* used for the compare operation */
89 	uint32_t		num_blocks; /* used for the DIF related operations */
90 	struct spdk_dif_ctx	dif_ctx;
91 	struct spdk_dif_error	dif_err;
92 	TAILQ_ENTRY(ap_task)	link;
93 };
94 
95 struct worker_thread {
96 	struct spdk_io_channel		*ch;
97 	struct spdk_accel_opcode_stats	stats;
98 	uint64_t			xfer_failed;
99 	uint64_t			injected_miscompares;
100 	uint64_t			current_queue_depth;
101 	TAILQ_HEAD(, ap_task)		tasks_pool;
102 	struct worker_thread		*next;
103 	unsigned			core;
104 	struct spdk_thread		*thread;
105 	bool				is_draining;
106 	struct spdk_poller		*is_draining_poller;
107 	struct spdk_poller		*stop_poller;
108 	void				*task_base;
109 	struct display_info		display;
110 	enum spdk_accel_opcode		workload;
111 };
112 
113 static void
114 dump_user_config(void)
115 {
116 	const char *module_name = NULL;
117 	int rc;
118 
119 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
120 	if (rc) {
121 		printf("error getting module name (%d)\n", rc);
122 	}
123 
124 	printf("\nSPDK Configuration:\n");
125 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
126 	printf("Accel Perf Configuration:\n");
127 	printf("Workload Type:  %s\n", g_workload_type);
128 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
129 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
130 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
131 	} else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
132 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
133 	} else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
134 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
135 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
136 		printf("Source buffers: %u\n", g_xor_src_count);
137 	}
138 	if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
139 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
140 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
141 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
142 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
143 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
144 	} else {
145 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
146 	}
147 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
148 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) {
149 		printf("Block size:     %u bytes\n", g_block_size_bytes);
150 		printf("Metadata size:  %u bytes\n", g_md_size_bytes);
151 	}
152 	printf("Vector count    %u\n", g_chained_count);
153 	printf("Module:         %s\n", module_name);
154 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
155 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
156 		printf("File Name:      %s\n", g_cd_file_in_name);
157 	}
158 	printf("Queue depth:    %u\n", g_queue_depth);
159 	printf("Allocate depth: %u\n", g_allocate_depth);
160 	printf("# threads/core: %u\n", g_threads_per_core);
161 	printf("Run time:       %u seconds\n", g_time_in_sec);
162 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
163 }
164 
165 static void
166 usage(void)
167 {
168 	printf("accel_perf options:\n");
169 	printf("\t[-h help message]\n");
170 	printf("\t[-q queue depth per core]\n");
171 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
172 	printf("\t[-T number of threads per core\n");
173 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
174 	printf("\t[-t time in seconds]\n");
175 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor,\n");
176 	printf("\t[                                       dif_verify, dif_verify_copy, dif_generate, dif_generate_copy\n");
177 	printf("\t[-M assign module to the operation, not compatible with accel_assign_opc RPC\n");
178 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
179 	printf("\t[-S for crc32c workload, use this seed value (default 0)\n");
180 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
181 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
182 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
183 	printf("\t[-y verify result if this switch is on]\n");
184 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
185 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
186 }
187 
188 static int
189 parse_args(int ch, char *arg)
190 {
191 	int argval = 0;
192 
193 	switch (ch) {
194 	case 'a':
195 	case 'C':
196 	case 'f':
197 	case 'T':
198 	case 'o':
199 	case 'P':
200 	case 'q':
201 	case 'S':
202 	case 't':
203 	case 'x':
204 		argval = spdk_strtol(optarg, 10);
205 		if (argval < 0) {
206 			fprintf(stderr, "-%c option must be non-negative.\n", ch);
207 			usage();
208 			return 1;
209 		}
210 		break;
211 	default:
212 		break;
213 	};
214 
215 	switch (ch) {
216 	case 'a':
217 		g_allocate_depth = argval;
218 		break;
219 	case 'C':
220 		g_chained_count = argval;
221 		break;
222 	case 'l':
223 		g_cd_file_in_name = optarg;
224 		break;
225 	case 'f':
226 		g_fill_pattern = (uint8_t)argval;
227 		break;
228 	case 'T':
229 		g_threads_per_core = argval;
230 		break;
231 	case 'o':
232 		g_xfer_size_bytes = argval;
233 		break;
234 	case 'P':
235 		g_fail_percent_goal = argval;
236 		break;
237 	case 'q':
238 		g_queue_depth = argval;
239 		break;
240 	case 'S':
241 		g_crc32c_seed = argval;
242 		break;
243 	case 't':
244 		g_time_in_sec = argval;
245 		break;
246 	case 'x':
247 		g_xor_src_count = argval;
248 		break;
249 	case 'y':
250 		g_verify = true;
251 		break;
252 	case 'w':
253 		g_workload_type = optarg;
254 		if (!strcmp(g_workload_type, "copy")) {
255 			g_workload_selection = SPDK_ACCEL_OPC_COPY;
256 		} else if (!strcmp(g_workload_type, "fill")) {
257 			g_workload_selection = SPDK_ACCEL_OPC_FILL;
258 		} else if (!strcmp(g_workload_type, "crc32c")) {
259 			g_workload_selection = SPDK_ACCEL_OPC_CRC32C;
260 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
261 			g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C;
262 		} else if (!strcmp(g_workload_type, "compare")) {
263 			g_workload_selection = SPDK_ACCEL_OPC_COMPARE;
264 		} else if (!strcmp(g_workload_type, "dualcast")) {
265 			g_workload_selection = SPDK_ACCEL_OPC_DUALCAST;
266 		} else if (!strcmp(g_workload_type, "compress")) {
267 			g_workload_selection = SPDK_ACCEL_OPC_COMPRESS;
268 		} else if (!strcmp(g_workload_type, "decompress")) {
269 			g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS;
270 		} else if (!strcmp(g_workload_type, "xor")) {
271 			g_workload_selection = SPDK_ACCEL_OPC_XOR;
272 		} else if (!strcmp(g_workload_type, "dif_verify")) {
273 			g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY;
274 		} else if (!strcmp(g_workload_type, "dif_verify_copy")) {
275 			g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY_COPY;
276 		} else if (!strcmp(g_workload_type, "dif_generate")) {
277 			g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE;
278 		} else if (!strcmp(g_workload_type, "dif_generate_copy")) {
279 			g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE_COPY;
280 		} else {
281 			fprintf(stderr, "Unsupported workload type: %s\n", optarg);
282 			usage();
283 			return 1;
284 		}
285 		break;
286 	case 'M':
287 		g_module_name = optarg;
288 		break;
289 
290 	default:
291 		usage();
292 		return 1;
293 	}
294 
295 	return 0;
296 }
297 
298 static int dump_result(void);
299 static void
300 unregister_worker(void *arg1)
301 {
302 	struct worker_thread *worker = arg1;
303 
304 	if (worker->ch) {
305 		spdk_accel_get_opcode_stats(worker->ch, worker->workload,
306 					    &worker->stats, sizeof(worker->stats));
307 		spdk_put_io_channel(worker->ch);
308 		worker->ch = NULL;
309 	}
310 	free(worker->task_base);
311 	spdk_thread_exit(spdk_get_thread());
312 	pthread_mutex_lock(&g_workers_lock);
313 	assert(g_num_workers >= 1);
314 	if (--g_num_workers == 0) {
315 		pthread_mutex_unlock(&g_workers_lock);
316 		/* Only dump results on successful runs */
317 		if (g_rc == 0) {
318 			g_rc = dump_result();
319 		}
320 		spdk_app_stop(g_rc);
321 	} else {
322 		pthread_mutex_unlock(&g_workers_lock);
323 	}
324 }
325 
326 static void
327 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
328 {
329 	uint64_t ele_size;
330 	uint8_t *data;
331 	uint32_t i;
332 
333 	ele_size = spdk_divide_round_up(sz, iovcnt);
334 
335 	data = buf;
336 	for (i = 0; i < iovcnt; i++) {
337 		ele_size = spdk_min(ele_size, sz);
338 		assert(ele_size > 0);
339 
340 		iovs[i].iov_base = data;
341 		iovs[i].iov_len = ele_size;
342 
343 		data += ele_size;
344 		sz -= ele_size;
345 	}
346 	assert(sz == 0);
347 }
348 
349 static int
350 _get_task_data_bufs(struct ap_task *task)
351 {
352 	uint32_t align = 0;
353 	uint32_t i = 0;
354 	int src_buff_len = g_xfer_size_bytes;
355 	int dst_buff_len = g_xfer_size_bytes;
356 	struct spdk_dif_ctx_init_ext_opts dif_opts;
357 	uint32_t num_blocks, transfer_size_with_md;
358 	int rc;
359 
360 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
361 	 * we do this for all modules to keep it simple.
362 	 */
363 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) {
364 		align = ALIGN_4K;
365 	}
366 
367 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
368 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
369 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
370 
371 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
372 			dst_buff_len = task->cur_seg->compressed_len_padded;
373 		}
374 
375 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
376 		if (task->dst == NULL) {
377 			fprintf(stderr, "Unable to alloc dst buffer\n");
378 			return -ENOMEM;
379 		}
380 
381 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
382 		if (!task->dst_iovs) {
383 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
384 			return -ENOMEM;
385 		}
386 		task->dst_iovcnt = g_chained_count;
387 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
388 
389 		return 0;
390 	}
391 
392 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
393 		task->dst_iovcnt = g_chained_count;
394 		task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec));
395 		if (!task->dst_iovs) {
396 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
397 			return -ENOMEM;
398 		}
399 
400 		num_blocks = g_xfer_size_bytes / g_block_size_bytes;
401 		/* Add bytes for each block for metadata */
402 		transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes);
403 		task->num_blocks = num_blocks;
404 
405 		for (i = 0; i < task->dst_iovcnt; i++) {
406 			task->dst_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL);
407 			if (task->dst_iovs[i].iov_base == NULL) {
408 				return -ENOMEM;
409 			}
410 			task->dst_iovs[i].iov_len = transfer_size_with_md;
411 		}
412 
413 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
414 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
415 
416 		rc = spdk_dif_ctx_init(&task->dif_ctx,
417 				       g_block_size_bytes + g_md_size_bytes,
418 				       g_md_size_bytes, true, true,
419 				       SPDK_DIF_TYPE1,
420 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
421 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
422 		if (rc != 0) {
423 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
424 			return rc;
425 		}
426 	}
427 
428 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
429 		/* Allocate source buffers */
430 		task->src_iovcnt = g_chained_count;
431 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
432 		if (!task->src_iovs) {
433 			fprintf(stderr, "cannot allocate task->src_iovs for task=%p\n", task);
434 			return -ENOMEM;
435 		}
436 
437 		num_blocks = g_xfer_size_bytes / g_block_size_bytes;
438 		/* Add bytes for each block for metadata */
439 		transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes);
440 		task->num_blocks = num_blocks;
441 
442 		for (i = 0; i < task->src_iovcnt; i++) {
443 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL);
444 			if (task->src_iovs[i].iov_base == NULL) {
445 				return -ENOMEM;
446 			}
447 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, transfer_size_with_md);
448 			task->src_iovs[i].iov_len = transfer_size_with_md;
449 		}
450 
451 		/* Allocate destination buffers */
452 		task->dst_iovcnt = g_chained_count;
453 		task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec));
454 		if (!task->dst_iovs) {
455 			fprintf(stderr, "cannot allocated task->dst_iovs fot task=%p\n", task);
456 			return -ENOMEM;
457 		}
458 
459 		for (i = 0; i < task->dst_iovcnt; i++) {
460 			task->dst_iovs[i].iov_base = spdk_dma_zmalloc(dst_buff_len, 0, NULL);
461 			if (task->dst_iovs[i].iov_base == NULL) {
462 				return -ENOMEM;
463 			}
464 			task->dst_iovs[i].iov_len = dst_buff_len;
465 		}
466 
467 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
468 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
469 
470 		/* Init DIF ctx */
471 		rc = spdk_dif_ctx_init(&task->dif_ctx,
472 				       g_block_size_bytes + g_md_size_bytes,
473 				       g_md_size_bytes, true, true,
474 				       SPDK_DIF_TYPE1,
475 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
476 				       0x123, 0xFFFF, 0x234, 0, 0, &dif_opts);
477 		if (rc != 0) {
478 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
479 			return rc;
480 		}
481 
482 		rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx);
483 		if (rc != 0) {
484 			fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc);
485 			return rc;
486 		}
487 	}
488 
489 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
490 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
491 		task->crc_dst = spdk_dma_zmalloc(sizeof(*task->crc_dst), 0, NULL);
492 	}
493 
494 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
495 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
496 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
497 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
498 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) {
499 		assert(g_chained_count > 0);
500 		task->src_iovcnt = g_chained_count;
501 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
502 		if (!task->src_iovs) {
503 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
504 			return -ENOMEM;
505 		}
506 
507 		if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
508 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
509 		}
510 
511 		if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
512 		    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) {
513 			src_buff_len += (g_xfer_size_bytes / g_block_size_bytes) * g_md_size_bytes;
514 		}
515 
516 		for (i = 0; i < task->src_iovcnt; i++) {
517 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(src_buff_len, 0, NULL);
518 			if (task->src_iovs[i].iov_base == NULL) {
519 				return -ENOMEM;
520 			}
521 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, src_buff_len);
522 			task->src_iovs[i].iov_len = src_buff_len;
523 		}
524 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
525 		assert(g_xor_src_count > 1);
526 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
527 		if (!task->sources) {
528 			return -ENOMEM;
529 		}
530 
531 		for (i = 0; i < g_xor_src_count; i++) {
532 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
533 			if (!task->sources[i]) {
534 				return -ENOMEM;
535 			}
536 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
537 		}
538 	} else {
539 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
540 		if (task->src == NULL) {
541 			fprintf(stderr, "Unable to alloc src buffer\n");
542 			return -ENOMEM;
543 		}
544 
545 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
546 		if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
547 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
548 		} else {
549 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
550 		}
551 	}
552 
553 	if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C &&
554 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY &&
555 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE &&
556 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE_COPY &&
557 	    g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
558 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
559 		if (task->dst == NULL) {
560 			fprintf(stderr, "Unable to alloc dst buffer\n");
561 			return -ENOMEM;
562 		}
563 
564 		/* For compare we want the buffers to match, otherwise not. */
565 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) {
566 			memset(task->dst, DATA_PATTERN, dst_buff_len);
567 		} else {
568 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
569 		}
570 	}
571 
572 	/* For dualcast 2 buffers are needed for the operation.  */
573 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST ||
574 	    (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) {
575 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
576 		if (task->dst2 == NULL) {
577 			fprintf(stderr, "Unable to alloc dst buffer\n");
578 			return -ENOMEM;
579 		}
580 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
581 	}
582 
583 	if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
584 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
585 	    g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
586 		dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
587 		dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
588 
589 		task->num_blocks = (g_xfer_size_bytes * g_chained_count) / g_block_size_bytes;
590 
591 		rc = spdk_dif_ctx_init(&task->dif_ctx,
592 				       g_block_size_bytes + g_md_size_bytes,
593 				       g_md_size_bytes, true, true,
594 				       SPDK_DIF_TYPE1,
595 				       SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK,
596 				       16, 0xFFFF, 10, 0, 0, &dif_opts);
597 		if (rc != 0) {
598 			fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc);
599 			return rc;
600 		}
601 
602 		if ((g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) ||
603 		    (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY)) {
604 			rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx);
605 			if (rc != 0) {
606 				fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc);
607 				return rc;
608 			}
609 		}
610 	}
611 
612 	return 0;
613 }
614 
615 inline static struct ap_task *
616 _get_task(struct worker_thread *worker)
617 {
618 	struct ap_task *task;
619 
620 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
621 		task = TAILQ_FIRST(&worker->tasks_pool);
622 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
623 	} else {
624 		fprintf(stderr, "Unable to get ap_task\n");
625 		return NULL;
626 	}
627 
628 	return task;
629 }
630 
631 /* Submit one operation using the same ap task that just completed. */
632 static void
633 _submit_single(struct worker_thread *worker, struct ap_task *task)
634 {
635 	int random_num;
636 	int rc = 0;
637 
638 	assert(worker);
639 
640 	switch (worker->workload) {
641 	case SPDK_ACCEL_OPC_COPY:
642 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
643 					    g_xfer_size_bytes, accel_done, task);
644 		break;
645 	case SPDK_ACCEL_OPC_FILL:
646 		/* For fill use the first byte of the task->dst buffer */
647 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
648 					    g_xfer_size_bytes, accel_done, task);
649 		break;
650 	case SPDK_ACCEL_OPC_CRC32C:
651 		rc = spdk_accel_submit_crc32cv(worker->ch, task->crc_dst,
652 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
653 					       accel_done, task);
654 		break;
655 	case SPDK_ACCEL_OPC_COPY_CRC32C:
656 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
657 						    task->crc_dst, g_crc32c_seed, accel_done, task);
658 		break;
659 	case SPDK_ACCEL_OPC_COMPARE:
660 		random_num = rand() % 100;
661 		if (random_num < g_fail_percent_goal) {
662 			task->expected_status = -EILSEQ;
663 			*(uint8_t *)task->dst = ~DATA_PATTERN;
664 		} else {
665 			task->expected_status = 0;
666 			*(uint8_t *)task->dst = DATA_PATTERN;
667 		}
668 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
669 					       g_xfer_size_bytes, accel_done, task);
670 		break;
671 	case SPDK_ACCEL_OPC_DUALCAST:
672 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
673 						task->src, g_xfer_size_bytes, accel_done, task);
674 		break;
675 	case SPDK_ACCEL_OPC_COMPRESS:
676 		task->src_iovs = task->cur_seg->uncompressed_iovs;
677 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
678 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
679 						task->src_iovs,
680 						task->src_iovcnt, &task->compressed_sz, accel_done, task);
681 		break;
682 	case SPDK_ACCEL_OPC_DECOMPRESS:
683 		task->src_iovs = task->cur_seg->compressed_iovs;
684 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
685 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
686 						  task->src_iovcnt, NULL, accel_done, task);
687 		break;
688 	case SPDK_ACCEL_OPC_XOR:
689 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
690 					   g_xfer_size_bytes, accel_done, task);
691 		break;
692 	case SPDK_ACCEL_OPC_DIF_VERIFY:
693 		rc = spdk_accel_submit_dif_verify(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks,
694 						  &task->dif_ctx, &task->dif_err, accel_done, task);
695 		break;
696 	case SPDK_ACCEL_OPC_DIF_GENERATE:
697 		rc = spdk_accel_submit_dif_generate(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks,
698 						    &task->dif_ctx, accel_done, task);
699 		break;
700 	case SPDK_ACCEL_OPC_DIF_GENERATE_COPY:
701 		rc = spdk_accel_submit_dif_generate_copy(worker->ch, task->dst_iovs, task->dst_iovcnt,
702 				task->src_iovs, task->src_iovcnt,
703 				task->num_blocks, &task->dif_ctx, accel_done, task);
704 		break;
705 	case SPDK_ACCEL_OPC_DIF_VERIFY_COPY:
706 		rc = spdk_accel_submit_dif_verify_copy(worker->ch, task->dst_iovs, task->dst_iovcnt,
707 						       task->src_iovs, task->src_iovcnt, task->num_blocks,
708 						       &task->dif_ctx, &task->dif_err, accel_done, task);
709 		break;
710 	default:
711 		assert(false);
712 		break;
713 
714 	}
715 
716 	worker->current_queue_depth++;
717 	if (rc) {
718 		accel_done(task, rc);
719 	}
720 }
721 
722 static void
723 _free_task_buffers(struct ap_task *task)
724 {
725 	uint32_t i;
726 
727 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS ||
728 	    g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
729 		free(task->dst_iovs);
730 	} else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
731 		   g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
732 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
733 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE ||
734 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY ||
735 		   g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY_COPY) {
736 		if (task->crc_dst) {
737 			spdk_dma_free(task->crc_dst);
738 		}
739 		if (task->src_iovs) {
740 			for (i = 0; i < task->src_iovcnt; i++) {
741 				if (task->src_iovs[i].iov_base) {
742 					spdk_dma_free(task->src_iovs[i].iov_base);
743 				}
744 			}
745 			free(task->src_iovs);
746 		}
747 		if (task->dst_iovs) {
748 			for (i = 0; i < task->dst_iovcnt; i++) {
749 				if (task->dst_iovs[i].iov_base) {
750 					spdk_dma_free(task->dst_iovs[i].iov_base);
751 				}
752 			}
753 			free(task->dst_iovs);
754 		}
755 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
756 		if (task->sources) {
757 			for (i = 0; i < g_xor_src_count; i++) {
758 				spdk_dma_free(task->sources[i]);
759 			}
760 			free(task->sources);
761 		}
762 	} else {
763 		spdk_dma_free(task->src);
764 	}
765 
766 	spdk_dma_free(task->dst);
767 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) {
768 		spdk_dma_free(task->dst2);
769 	}
770 }
771 
772 static int
773 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
774 {
775 	uint32_t i;
776 	uint32_t ttl_len = 0;
777 	uint8_t *dst = (uint8_t *)_dst;
778 
779 	for (i = 0; i < iovcnt; i++) {
780 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
781 			return -1;
782 		}
783 		dst += src_src_iovs[i].iov_len;
784 		ttl_len += src_src_iovs[i].iov_len;
785 	}
786 
787 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
788 		return -1;
789 	}
790 
791 	return 0;
792 }
793 
794 static int _worker_stop(void *arg);
795 
796 static void
797 accel_done(void *arg1, int status)
798 {
799 	struct ap_task *task = arg1;
800 	struct worker_thread *worker = task->worker;
801 	uint32_t sw_crc32c;
802 	struct spdk_dif_error err_blk;
803 
804 	assert(worker);
805 	assert(worker->current_queue_depth > 0);
806 
807 	if (g_verify && status == 0) {
808 		switch (worker->workload) {
809 		case SPDK_ACCEL_OPC_COPY_CRC32C:
810 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
811 			if (*task->crc_dst != sw_crc32c) {
812 				SPDK_NOTICELOG("CRC-32C miscompare\n");
813 				worker->xfer_failed++;
814 			}
815 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
816 				SPDK_NOTICELOG("Data miscompare\n");
817 				worker->xfer_failed++;
818 			}
819 			break;
820 		case SPDK_ACCEL_OPC_CRC32C:
821 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
822 			if (*task->crc_dst != sw_crc32c) {
823 				SPDK_NOTICELOG("CRC-32C miscompare\n");
824 				worker->xfer_failed++;
825 			}
826 			break;
827 		case SPDK_ACCEL_OPC_COPY:
828 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
829 				SPDK_NOTICELOG("Data miscompare\n");
830 				worker->xfer_failed++;
831 			}
832 			break;
833 		case SPDK_ACCEL_OPC_DUALCAST:
834 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
835 				SPDK_NOTICELOG("Data miscompare, first destination\n");
836 				worker->xfer_failed++;
837 			}
838 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
839 				SPDK_NOTICELOG("Data miscompare, second destination\n");
840 				worker->xfer_failed++;
841 			}
842 			break;
843 		case SPDK_ACCEL_OPC_FILL:
844 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
845 				SPDK_NOTICELOG("Data miscompare\n");
846 				worker->xfer_failed++;
847 			}
848 			break;
849 		case SPDK_ACCEL_OPC_COMPARE:
850 			break;
851 		case SPDK_ACCEL_OPC_COMPRESS:
852 			break;
853 		case SPDK_ACCEL_OPC_DECOMPRESS:
854 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
855 				SPDK_NOTICELOG("Data miscompare on decompression\n");
856 				worker->xfer_failed++;
857 			}
858 			break;
859 		case SPDK_ACCEL_OPC_XOR:
860 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
861 					 g_xfer_size_bytes) != 0) {
862 				SPDK_ERRLOG("Failed to generate xor for verification\n");
863 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
864 				SPDK_NOTICELOG("Data miscompare\n");
865 				worker->xfer_failed++;
866 			}
867 			break;
868 		case SPDK_ACCEL_OPC_DIF_VERIFY:
869 			break;
870 		case SPDK_ACCEL_OPC_DIF_GENERATE:
871 			if (spdk_dif_verify(task->src_iovs, task->src_iovcnt, task->num_blocks,
872 					    &task->dif_ctx, &err_blk) != 0) {
873 				SPDK_NOTICELOG("Data miscompare, "
874 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
875 					       err_blk.err_type, err_blk.expected,
876 					       err_blk.actual, err_blk.err_offset);
877 				worker->xfer_failed++;
878 			}
879 			break;
880 		case SPDK_ACCEL_OPC_DIF_GENERATE_COPY:
881 			if (spdk_dif_verify(task->dst_iovs, task->dst_iovcnt, task->num_blocks,
882 					    &task->dif_ctx, &err_blk) != 0) {
883 				SPDK_NOTICELOG("Data miscompare, "
884 					       "err_type %u, expected %lu, actual %lu, err_offset %u\n",
885 					       err_blk.err_type, err_blk.expected,
886 					       err_blk.actual, err_blk.err_offset);
887 				worker->xfer_failed++;
888 			}
889 			break;
890 		case SPDK_ACCEL_OPC_DIF_VERIFY_COPY:
891 			break;
892 		default:
893 			assert(false);
894 			break;
895 		}
896 	}
897 
898 	if (worker->workload == SPDK_ACCEL_OPC_COMPRESS ||
899 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
900 		/* Advance the task to the next segment */
901 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
902 		if (task->cur_seg == NULL) {
903 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
904 		}
905 	}
906 
907 	if (task->expected_status == -EILSEQ) {
908 		assert(status != 0);
909 		worker->injected_miscompares++;
910 		status = 0;
911 	} else if (status) {
912 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
913 		worker->xfer_failed++;
914 	}
915 
916 	worker->current_queue_depth--;
917 
918 	if (!worker->is_draining && status == 0) {
919 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
920 		task = _get_task(worker);
921 		_submit_single(worker, task);
922 	} else {
923 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
924 	}
925 }
926 
927 static int
928 dump_result(void)
929 {
930 	uint64_t total_completed = 0;
931 	uint64_t total_failed = 0;
932 	uint64_t total_miscompared = 0;
933 	uint64_t total_xfer_per_sec, total_bw_in_MiBps = 0;
934 	struct worker_thread *worker = g_workers;
935 	char tmp[64];
936 
937 	printf("\n%-12s %20s %16s %16s %16s\n",
938 	       "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares");
939 	printf("------------------------------------------------------------------------------------\n");
940 	while (worker != NULL) {
941 
942 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
943 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
944 				       (g_time_in_sec * 1024 * 1024);
945 
946 		total_completed += worker->stats.executed;
947 		total_failed += worker->xfer_failed;
948 		total_miscompared += worker->injected_miscompares;
949 		total_bw_in_MiBps += bw_in_MiBps;
950 
951 		snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread);
952 		if (xfer_per_sec) {
953 			printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
954 			       tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed,
955 			       worker->injected_miscompares);
956 		}
957 
958 		worker = worker->next;
959 	}
960 
961 	total_xfer_per_sec = total_completed / g_time_in_sec;
962 
963 	printf("====================================================================================\n");
964 	printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
965 	       "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
966 
967 	return total_failed ? 1 : 0;
968 }
969 
970 static inline void
971 _free_task_buffers_in_pool(struct worker_thread *worker)
972 {
973 	struct ap_task *task;
974 
975 	assert(worker);
976 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
977 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
978 		_free_task_buffers(task);
979 	}
980 }
981 
982 static int
983 _check_draining(void *arg)
984 {
985 	struct worker_thread *worker = arg;
986 
987 	assert(worker);
988 
989 	if (worker->current_queue_depth == 0) {
990 		_free_task_buffers_in_pool(worker);
991 		spdk_poller_unregister(&worker->is_draining_poller);
992 		unregister_worker(worker);
993 	}
994 
995 	return SPDK_POLLER_BUSY;
996 }
997 
998 static int
999 _worker_stop(void *arg)
1000 {
1001 	struct worker_thread *worker = arg;
1002 
1003 	assert(worker);
1004 
1005 	spdk_poller_unregister(&worker->stop_poller);
1006 
1007 	/* now let the worker drain and check it's outstanding IO with a poller */
1008 	worker->is_draining = true;
1009 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
1010 
1011 	return SPDK_POLLER_BUSY;
1012 }
1013 
1014 static void shutdown_cb(void);
1015 
1016 static void
1017 _init_thread(void *arg1)
1018 {
1019 	struct worker_thread *worker;
1020 	struct ap_task *task;
1021 	int i, num_tasks = g_allocate_depth;
1022 	struct display_info *display = arg1;
1023 
1024 	worker = calloc(1, sizeof(*worker));
1025 	if (worker == NULL) {
1026 		fprintf(stderr, "Unable to allocate worker\n");
1027 		free(display);
1028 		spdk_thread_exit(spdk_get_thread());
1029 		goto no_worker;
1030 	}
1031 
1032 	worker->workload = g_workload_selection;
1033 	worker->display.core = display->core;
1034 	worker->display.thread = display->thread;
1035 	free(display);
1036 	worker->core = spdk_env_get_current_core();
1037 	worker->thread = spdk_get_thread();
1038 	pthread_mutex_lock(&g_workers_lock);
1039 	g_num_workers++;
1040 	worker->next = g_workers;
1041 	g_workers = worker;
1042 	pthread_mutex_unlock(&g_workers_lock);
1043 	worker->ch = spdk_accel_get_io_channel();
1044 	if (worker->ch == NULL) {
1045 		fprintf(stderr, "Unable to get an accel channel\n");
1046 		goto error;
1047 	}
1048 
1049 	TAILQ_INIT(&worker->tasks_pool);
1050 
1051 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
1052 	if (worker->task_base == NULL) {
1053 		fprintf(stderr, "Could not allocate task base.\n");
1054 		goto error;
1055 	}
1056 
1057 	task = worker->task_base;
1058 	for (i = 0; i < num_tasks; i++) {
1059 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
1060 		task->worker = worker;
1061 		if (_get_task_data_bufs(task)) {
1062 			fprintf(stderr, "Unable to get data bufs\n");
1063 			goto error;
1064 		}
1065 		task++;
1066 	}
1067 
1068 	/* Register a poller that will stop the worker at time elapsed */
1069 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
1070 			      g_time_in_sec * 1000000ULL);
1071 
1072 	/* Load up queue depth worth of operations. */
1073 	for (i = 0; i < g_queue_depth; i++) {
1074 		task = _get_task(worker);
1075 		if (task == NULL) {
1076 			goto error;
1077 		}
1078 
1079 		_submit_single(worker, task);
1080 	}
1081 	return;
1082 error:
1083 
1084 	_free_task_buffers_in_pool(worker);
1085 	free(worker->task_base);
1086 no_worker:
1087 	shutdown_cb();
1088 	g_rc = -1;
1089 }
1090 
1091 static void
1092 accel_perf_start(void *arg1)
1093 {
1094 	struct spdk_cpuset tmp_cpumask = {};
1095 	char thread_name[32];
1096 	uint32_t i;
1097 	int j;
1098 	struct spdk_thread *thread;
1099 	struct display_info *display;
1100 
1101 	g_tsc_rate = spdk_get_ticks_hz();
1102 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
1103 
1104 	dump_user_config();
1105 
1106 	printf("Running for %d seconds...\n", g_time_in_sec);
1107 	fflush(stdout);
1108 
1109 	/* Create worker threads for each core that was specified. */
1110 	SPDK_ENV_FOREACH_CORE(i) {
1111 		for (j = 0; j < g_threads_per_core; j++) {
1112 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
1113 			spdk_cpuset_zero(&tmp_cpumask);
1114 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
1115 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
1116 			display = calloc(1, sizeof(*display));
1117 			if (display == NULL) {
1118 				fprintf(stderr, "Unable to allocate memory\n");
1119 				spdk_app_stop(-1);
1120 				return;
1121 			}
1122 			display->core = i;
1123 			display->thread = j;
1124 			spdk_thread_send_msg(thread, _init_thread, display);
1125 		}
1126 	}
1127 }
1128 
1129 static void
1130 accel_perf_free_compress_segs(void)
1131 {
1132 	struct ap_compress_seg *seg, *tmp;
1133 
1134 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
1135 		free(seg->uncompressed_iovs);
1136 		free(seg->compressed_iovs);
1137 		spdk_dma_free(seg->compressed_data);
1138 		spdk_dma_free(seg->uncompressed_data);
1139 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
1140 		free(seg);
1141 	}
1142 }
1143 
1144 struct accel_perf_prep_ctx {
1145 	FILE			*file;
1146 	long			remaining;
1147 	struct spdk_io_channel	*ch;
1148 	struct ap_compress_seg	*cur_seg;
1149 };
1150 
1151 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
1152 
1153 static void
1154 accel_perf_prep_process_seg_cpl(void *ref, int status)
1155 {
1156 	struct accel_perf_prep_ctx *ctx = ref;
1157 	struct ap_compress_seg *seg;
1158 
1159 	if (status != 0) {
1160 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
1161 		spdk_dma_free(ctx->cur_seg->compressed_data);
1162 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
1163 		free(ctx->cur_seg);
1164 		spdk_put_io_channel(ctx->ch);
1165 		fclose(ctx->file);
1166 		free(ctx);
1167 		spdk_app_stop(-status);
1168 		return;
1169 	}
1170 
1171 	seg = ctx->cur_seg;
1172 
1173 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
1174 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1175 		if (seg->compressed_iovs == NULL) {
1176 			fprintf(stderr, "unable to allocate iovec\n");
1177 			spdk_dma_free(seg->compressed_data);
1178 			spdk_dma_free(seg->uncompressed_data);
1179 			free(seg);
1180 			spdk_put_io_channel(ctx->ch);
1181 			fclose(ctx->file);
1182 			free(ctx);
1183 			spdk_app_stop(-ENOMEM);
1184 			return;
1185 		}
1186 		seg->compressed_iovcnt = g_chained_count;
1187 
1188 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
1189 					  seg->compressed_iovcnt);
1190 	}
1191 
1192 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
1193 	ctx->remaining -= seg->uncompressed_len;
1194 
1195 	accel_perf_prep_process_seg(ctx);
1196 }
1197 
1198 static void
1199 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
1200 {
1201 	struct ap_compress_seg *seg;
1202 	int sz, sz_read, sz_padded;
1203 	void *ubuf, *cbuf;
1204 	struct iovec iov[1];
1205 	int rc;
1206 
1207 	if (ctx->remaining == 0) {
1208 		spdk_put_io_channel(ctx->ch);
1209 		fclose(ctx->file);
1210 		free(ctx);
1211 		accel_perf_start(NULL);
1212 		return;
1213 	}
1214 
1215 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
1216 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
1217 	 * would likely either deal with the failure of not having a large enough buffer
1218 	 * by submitting another operation with a larger one.  Or, like the vbdev module
1219 	 * does, just accept the error and use the data uncompressed marking it as such in
1220 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
1221 	 * data, etc.
1222 	 */
1223 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
1224 
1225 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
1226 	if (!ubuf) {
1227 		fprintf(stderr, "unable to allocate uncompress buffer\n");
1228 		rc = -ENOMEM;
1229 		goto error;
1230 	}
1231 
1232 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
1233 	if (!cbuf) {
1234 		fprintf(stderr, "unable to allocate compress buffer\n");
1235 		rc = -ENOMEM;
1236 		spdk_dma_free(ubuf);
1237 		goto error;
1238 	}
1239 
1240 	seg = calloc(1, sizeof(*seg));
1241 	if (!seg) {
1242 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
1243 		spdk_dma_free(ubuf);
1244 		spdk_dma_free(cbuf);
1245 		rc = -ENOMEM;
1246 		goto error;
1247 	}
1248 
1249 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1250 	if (sz_read != sz) {
1251 		fprintf(stderr, "unable to read input file\n");
1252 		free(seg);
1253 		spdk_dma_free(ubuf);
1254 		spdk_dma_free(cbuf);
1255 		rc = -errno;
1256 		goto error;
1257 	}
1258 
1259 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
1260 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1261 		if (seg->uncompressed_iovs == NULL) {
1262 			fprintf(stderr, "unable to allocate iovec\n");
1263 			free(seg);
1264 			spdk_dma_free(ubuf);
1265 			spdk_dma_free(cbuf);
1266 			rc = -ENOMEM;
1267 			goto error;
1268 		}
1269 		seg->uncompressed_iovcnt = g_chained_count;
1270 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1271 	}
1272 
1273 	seg->uncompressed_data = ubuf;
1274 	seg->uncompressed_len = sz;
1275 	seg->compressed_data = cbuf;
1276 	seg->compressed_len = sz;
1277 	seg->compressed_len_padded = sz_padded;
1278 
1279 	ctx->cur_seg = seg;
1280 	iov[0].iov_base = seg->uncompressed_data;
1281 	iov[0].iov_len = seg->uncompressed_len;
1282 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1283 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1284 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1285 	 * but real applications may want to consider a more sophisticated method.
1286 	 */
1287 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1288 					&seg->compressed_len, accel_perf_prep_process_seg_cpl, ctx);
1289 	if (rc < 0) {
1290 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1291 		goto error;
1292 	}
1293 
1294 	return;
1295 
1296 error:
1297 	spdk_put_io_channel(ctx->ch);
1298 	fclose(ctx->file);
1299 	free(ctx);
1300 	spdk_app_stop(rc);
1301 }
1302 
1303 static void
1304 accel_perf_prep(void *arg1)
1305 {
1306 	struct accel_perf_prep_ctx *ctx;
1307 	const char *module_name = NULL;
1308 	int rc = 0;
1309 
1310 	if (g_module_name) {
1311 		rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
1312 		if (rc != 0 || strcmp(g_module_name, module_name) != 0) {
1313 			fprintf(stderr, "Module '%s' was assigned via JSON config or RPC, instead of '%s'\n",
1314 				module_name, g_module_name);
1315 			fprintf(stderr, "-M option is not compatible with accel_assign_opc RPC\n");
1316 			rc = -EINVAL;
1317 			goto error_end;
1318 		}
1319 	}
1320 
1321 	if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS &&
1322 	    g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) {
1323 		accel_perf_start(arg1);
1324 		return;
1325 	}
1326 
1327 	if (g_cd_file_in_name == NULL) {
1328 		fprintf(stdout, "A filename is required.\n");
1329 		rc = -EINVAL;
1330 		goto error_end;
1331 	}
1332 
1333 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) {
1334 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1335 		rc = -ENOTSUP;
1336 		goto error_end;
1337 	}
1338 
1339 	printf("Preparing input file...\n");
1340 
1341 	ctx = calloc(1, sizeof(*ctx));
1342 	if (ctx == NULL) {
1343 		rc = -ENOMEM;
1344 		goto error_end;
1345 	}
1346 
1347 	ctx->file = fopen(g_cd_file_in_name, "r");
1348 	if (ctx->file == NULL) {
1349 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1350 		rc = -errno;
1351 		goto error_ctx;
1352 	}
1353 
1354 	fseek(ctx->file, 0L, SEEK_END);
1355 	ctx->remaining = ftell(ctx->file);
1356 	fseek(ctx->file, 0L, SEEK_SET);
1357 
1358 	ctx->ch = spdk_accel_get_io_channel();
1359 	if (ctx->ch == NULL) {
1360 		rc = -EAGAIN;
1361 		goto error_file;
1362 	}
1363 
1364 	if (g_xfer_size_bytes == 0) {
1365 		/* size of 0 means "file at a time" */
1366 		g_xfer_size_bytes = ctx->remaining;
1367 	}
1368 
1369 	accel_perf_prep_process_seg(ctx);
1370 	return;
1371 
1372 error_file:
1373 	fclose(ctx->file);
1374 error_ctx:
1375 	free(ctx);
1376 error_end:
1377 	spdk_app_stop(rc);
1378 }
1379 
1380 static void
1381 worker_shutdown(void *ctx)
1382 {
1383 	_worker_stop(ctx);
1384 }
1385 
1386 static void
1387 shutdown_cb(void)
1388 {
1389 	struct worker_thread *worker;
1390 
1391 	pthread_mutex_lock(&g_workers_lock);
1392 	if (!g_workers) {
1393 		spdk_app_stop(1);
1394 		goto unlock;
1395 	}
1396 
1397 	worker = g_workers;
1398 	while (worker) {
1399 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1400 		worker = worker->next;
1401 	}
1402 unlock:
1403 	pthread_mutex_unlock(&g_workers_lock);
1404 }
1405 
1406 int
1407 main(int argc, char **argv)
1408 {
1409 	struct worker_thread *worker, *tmp;
1410 	int rc;
1411 
1412 	pthread_mutex_init(&g_workers_lock, NULL);
1413 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1414 	g_opts.name = "accel_perf";
1415 	g_opts.reactor_mask = "0x1";
1416 	g_opts.shutdown_cb = shutdown_cb;
1417 	g_opts.rpc_addr = NULL;
1418 
1419 	rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:M:P:f:T:l:S:x:", NULL,
1420 				 parse_args, usage);
1421 	if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) {
1422 		return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1;
1423 	}
1424 
1425 	if (g_workload_selection == SPDK_ACCEL_OPC_LAST) {
1426 		fprintf(stderr, "Must provide a workload type\n");
1427 		usage();
1428 		return -1;
1429 	}
1430 
1431 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1432 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1433 		usage();
1434 		return -1;
1435 	}
1436 
1437 	if (g_allocate_depth == 0) {
1438 		g_allocate_depth = g_queue_depth;
1439 	}
1440 
1441 	if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
1442 	     g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C ||
1443 	     g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY ||
1444 	     g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE) &&
1445 	    g_chained_count == 0) {
1446 		usage();
1447 		return -1;
1448 	}
1449 
1450 	if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) {
1451 		usage();
1452 		return -1;
1453 	}
1454 
1455 	if (g_module_name && spdk_accel_assign_opc(g_workload_selection, g_module_name)) {
1456 		fprintf(stderr, "Was not able to assign '%s' module to the workload\n", g_module_name);
1457 		usage();
1458 		return -1;
1459 	}
1460 
1461 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1462 	if (g_rc) {
1463 		SPDK_ERRLOG("ERROR starting application\n");
1464 	}
1465 
1466 	pthread_mutex_destroy(&g_workers_lock);
1467 
1468 	worker = g_workers;
1469 	while (worker) {
1470 		tmp = worker->next;
1471 		free(worker);
1472 		worker = tmp;
1473 	}
1474 	accel_perf_free_compress_segs();
1475 	spdk_app_fini();
1476 	return g_rc;
1477 }
1478