xref: /spdk/examples/accel/perf/accel_perf.c (revision c5afa2a0d24ef2f300e7a10d88e13904bc972940)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 #include "spdk/xor.h"
16 
17 #define DATA_PATTERN 0x5a
18 #define ALIGN_4K 0x1000
19 #define COMP_BUF_PAD_PERCENTAGE 1.1L
20 
21 static uint64_t	g_tsc_rate;
22 static uint64_t g_tsc_end;
23 static int g_rc;
24 static int g_xfer_size_bytes = 4096;
25 static int g_queue_depth = 32;
26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
27  * be at least as much as the queue depth.
28  */
29 static int g_allocate_depth = 0;
30 static int g_threads_per_core = 1;
31 static int g_time_in_sec = 5;
32 static uint32_t g_crc32c_seed = 0;
33 static uint32_t g_chained_count = 1;
34 static int g_fail_percent_goal = 0;
35 static uint8_t g_fill_pattern = 255;
36 static uint32_t g_xor_src_count = 2;
37 static bool g_verify = false;
38 static const char *g_workload_type = NULL;
39 static enum spdk_accel_opcode g_workload_selection = SPDK_ACCEL_OPC_LAST;
40 static const char *g_module_name = NULL;
41 static struct worker_thread *g_workers = NULL;
42 static int g_num_workers = 0;
43 static char *g_cd_file_in_name = NULL;
44 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
45 static struct spdk_app_opts g_opts = {};
46 
47 struct ap_compress_seg {
48 	void		*uncompressed_data;
49 	uint32_t	uncompressed_len;
50 	struct iovec	*uncompressed_iovs;
51 	uint32_t	uncompressed_iovcnt;
52 
53 	void		*compressed_data;
54 	uint32_t	compressed_len;
55 	uint32_t	compressed_len_padded;
56 	struct iovec	*compressed_iovs;
57 	uint32_t	compressed_iovcnt;
58 
59 	STAILQ_ENTRY(ap_compress_seg)	link;
60 };
61 
62 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
63 
64 struct worker_thread;
65 static void accel_done(void *ref, int status);
66 
67 struct display_info {
68 	int core;
69 	int thread;
70 };
71 
72 struct ap_task {
73 	void			*src;
74 	struct iovec		*src_iovs;
75 	uint32_t		src_iovcnt;
76 	void			**sources;
77 	struct iovec		*dst_iovs;
78 	uint32_t		dst_iovcnt;
79 	void			*dst;
80 	void			*dst2;
81 	uint32_t		crc_dst;
82 	uint32_t		compressed_sz;
83 	struct ap_compress_seg *cur_seg;
84 	struct worker_thread	*worker;
85 	int			expected_status; /* used for the compare operation */
86 	TAILQ_ENTRY(ap_task)	link;
87 };
88 
89 struct worker_thread {
90 	struct spdk_io_channel		*ch;
91 	struct spdk_accel_opcode_stats	stats;
92 	uint64_t			xfer_failed;
93 	uint64_t			injected_miscompares;
94 	uint64_t			current_queue_depth;
95 	TAILQ_HEAD(, ap_task)		tasks_pool;
96 	struct worker_thread		*next;
97 	unsigned			core;
98 	struct spdk_thread		*thread;
99 	bool				is_draining;
100 	struct spdk_poller		*is_draining_poller;
101 	struct spdk_poller		*stop_poller;
102 	void				*task_base;
103 	struct display_info		display;
104 	enum spdk_accel_opcode		workload;
105 };
106 
107 static void
108 dump_user_config(void)
109 {
110 	const char *module_name = NULL;
111 	int rc;
112 
113 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
114 	if (rc) {
115 		printf("error getting module name (%d)\n", rc);
116 	}
117 
118 	printf("\nSPDK Configuration:\n");
119 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
120 	printf("Accel Perf Configuration:\n");
121 	printf("Workload Type:  %s\n", g_workload_type);
122 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
123 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
124 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
125 	} else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
126 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
127 	} else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
128 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
129 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
130 		printf("Source buffers: %u\n", g_xor_src_count);
131 	}
132 	if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
133 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
134 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
135 	} else {
136 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
137 	}
138 	printf("vector count    %u\n", g_chained_count);
139 	printf("Module:         %s\n", module_name);
140 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
141 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
142 		printf("File Name:      %s\n", g_cd_file_in_name);
143 	}
144 	printf("Queue depth:    %u\n", g_queue_depth);
145 	printf("Allocate depth: %u\n", g_allocate_depth);
146 	printf("# threads/core: %u\n", g_threads_per_core);
147 	printf("Run time:       %u seconds\n", g_time_in_sec);
148 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
149 }
150 
151 static void
152 usage(void)
153 {
154 	printf("accel_perf options:\n");
155 	printf("\t[-h help message]\n");
156 	printf("\t[-q queue depth per core]\n");
157 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
158 	printf("\t[-T number of threads per core\n");
159 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
160 	printf("\t[-t time in seconds]\n");
161 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n");
162 	printf("\t[-M assign module to the operation, not compatible with accel_assign_opc RPC\n");
163 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
164 	printf("\t[-S for crc32c workload, use this seed value (default 0)\n");
165 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
166 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
167 	printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n");
168 	printf("\t[-y verify result if this switch is on]\n");
169 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
170 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
171 }
172 
173 static int
174 parse_args(int ch, char *arg)
175 {
176 	int argval = 0;
177 
178 	switch (ch) {
179 	case 'a':
180 	case 'C':
181 	case 'f':
182 	case 'T':
183 	case 'o':
184 	case 'P':
185 	case 'q':
186 	case 'S':
187 	case 't':
188 	case 'x':
189 		argval = spdk_strtol(optarg, 10);
190 		if (argval < 0) {
191 			fprintf(stderr, "-%c option must be non-negative.\n", ch);
192 			usage();
193 			return 1;
194 		}
195 		break;
196 	default:
197 		break;
198 	};
199 
200 	switch (ch) {
201 	case 'a':
202 		g_allocate_depth = argval;
203 		break;
204 	case 'C':
205 		g_chained_count = argval;
206 		break;
207 	case 'l':
208 		g_cd_file_in_name = optarg;
209 		break;
210 	case 'f':
211 		g_fill_pattern = (uint8_t)argval;
212 		break;
213 	case 'T':
214 		g_threads_per_core = argval;
215 		break;
216 	case 'o':
217 		g_xfer_size_bytes = argval;
218 		break;
219 	case 'P':
220 		g_fail_percent_goal = argval;
221 		break;
222 	case 'q':
223 		g_queue_depth = argval;
224 		break;
225 	case 'S':
226 		g_crc32c_seed = argval;
227 		break;
228 	case 't':
229 		g_time_in_sec = argval;
230 		break;
231 	case 'x':
232 		g_xor_src_count = argval;
233 		break;
234 	case 'y':
235 		g_verify = true;
236 		break;
237 	case 'w':
238 		g_workload_type = optarg;
239 		if (!strcmp(g_workload_type, "copy")) {
240 			g_workload_selection = SPDK_ACCEL_OPC_COPY;
241 		} else if (!strcmp(g_workload_type, "fill")) {
242 			g_workload_selection = SPDK_ACCEL_OPC_FILL;
243 		} else if (!strcmp(g_workload_type, "crc32c")) {
244 			g_workload_selection = SPDK_ACCEL_OPC_CRC32C;
245 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
246 			g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C;
247 		} else if (!strcmp(g_workload_type, "compare")) {
248 			g_workload_selection = SPDK_ACCEL_OPC_COMPARE;
249 		} else if (!strcmp(g_workload_type, "dualcast")) {
250 			g_workload_selection = SPDK_ACCEL_OPC_DUALCAST;
251 		} else if (!strcmp(g_workload_type, "compress")) {
252 			g_workload_selection = SPDK_ACCEL_OPC_COMPRESS;
253 		} else if (!strcmp(g_workload_type, "decompress")) {
254 			g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS;
255 		} else if (!strcmp(g_workload_type, "xor")) {
256 			g_workload_selection = SPDK_ACCEL_OPC_XOR;
257 		} else {
258 			fprintf(stderr, "Unsupported workload type: %s\n", optarg);
259 			usage();
260 			return 1;
261 		}
262 		break;
263 	case 'M':
264 		g_module_name = optarg;
265 		break;
266 
267 	default:
268 		usage();
269 		return 1;
270 	}
271 
272 	return 0;
273 }
274 
275 static int dump_result(void);
276 static void
277 unregister_worker(void *arg1)
278 {
279 	struct worker_thread *worker = arg1;
280 
281 	if (worker->ch) {
282 		spdk_accel_get_opcode_stats(worker->ch, worker->workload,
283 					    &worker->stats, sizeof(worker->stats));
284 		spdk_put_io_channel(worker->ch);
285 		worker->ch = NULL;
286 	}
287 	free(worker->task_base);
288 	spdk_thread_exit(spdk_get_thread());
289 	pthread_mutex_lock(&g_workers_lock);
290 	assert(g_num_workers >= 1);
291 	if (--g_num_workers == 0) {
292 		pthread_mutex_unlock(&g_workers_lock);
293 		/* Only dump results on successful runs */
294 		if (g_rc == 0) {
295 			g_rc = dump_result();
296 		}
297 		spdk_app_stop(g_rc);
298 	} else {
299 		pthread_mutex_unlock(&g_workers_lock);
300 	}
301 }
302 
303 static void
304 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
305 {
306 	uint64_t ele_size;
307 	uint8_t *data;
308 	uint32_t i;
309 
310 	ele_size = spdk_divide_round_up(sz, iovcnt);
311 
312 	data = buf;
313 	for (i = 0; i < iovcnt; i++) {
314 		ele_size = spdk_min(ele_size, sz);
315 		assert(ele_size > 0);
316 
317 		iovs[i].iov_base = data;
318 		iovs[i].iov_len = ele_size;
319 
320 		data += ele_size;
321 		sz -= ele_size;
322 	}
323 	assert(sz == 0);
324 }
325 
326 static int
327 _get_task_data_bufs(struct ap_task *task)
328 {
329 	uint32_t align = 0;
330 	uint32_t i = 0;
331 	int dst_buff_len = g_xfer_size_bytes;
332 
333 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
334 	 * we do this for all modules to keep it simple.
335 	 */
336 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) {
337 		align = ALIGN_4K;
338 	}
339 
340 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS ||
341 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
342 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
343 
344 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
345 			dst_buff_len = task->cur_seg->compressed_len_padded;
346 		}
347 
348 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
349 		if (task->dst == NULL) {
350 			fprintf(stderr, "Unable to alloc dst buffer\n");
351 			return -ENOMEM;
352 		}
353 
354 		task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
355 		if (!task->dst_iovs) {
356 			fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
357 			return -ENOMEM;
358 		}
359 		task->dst_iovcnt = g_chained_count;
360 		accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
361 
362 		return 0;
363 	}
364 
365 	if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
366 	    g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
367 		assert(g_chained_count > 0);
368 		task->src_iovcnt = g_chained_count;
369 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
370 		if (!task->src_iovs) {
371 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
372 			return -ENOMEM;
373 		}
374 
375 		if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
376 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
377 		}
378 
379 		for (i = 0; i < task->src_iovcnt; i++) {
380 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
381 			if (task->src_iovs[i].iov_base == NULL) {
382 				return -ENOMEM;
383 			}
384 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
385 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
386 		}
387 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
388 		assert(g_xor_src_count > 1);
389 		task->sources = calloc(g_xor_src_count, sizeof(*task->sources));
390 		if (!task->sources) {
391 			return -ENOMEM;
392 		}
393 
394 		for (i = 0; i < g_xor_src_count; i++) {
395 			task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
396 			if (!task->sources[i]) {
397 				return -ENOMEM;
398 			}
399 			memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes);
400 		}
401 	} else {
402 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
403 		if (task->src == NULL) {
404 			fprintf(stderr, "Unable to alloc src buffer\n");
405 			return -ENOMEM;
406 		}
407 
408 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
409 		if (g_workload_selection == SPDK_ACCEL_OPC_FILL) {
410 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
411 		} else {
412 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
413 		}
414 	}
415 
416 	if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) {
417 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
418 		if (task->dst == NULL) {
419 			fprintf(stderr, "Unable to alloc dst buffer\n");
420 			return -ENOMEM;
421 		}
422 
423 		/* For compare we want the buffers to match, otherwise not. */
424 		if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) {
425 			memset(task->dst, DATA_PATTERN, dst_buff_len);
426 		} else {
427 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
428 		}
429 	}
430 
431 	/* For dualcast 2 buffers are needed for the operation.  */
432 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST ||
433 	    (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) {
434 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
435 		if (task->dst2 == NULL) {
436 			fprintf(stderr, "Unable to alloc dst buffer\n");
437 			return -ENOMEM;
438 		}
439 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
440 	}
441 
442 	return 0;
443 }
444 
445 inline static struct ap_task *
446 _get_task(struct worker_thread *worker)
447 {
448 	struct ap_task *task;
449 
450 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
451 		task = TAILQ_FIRST(&worker->tasks_pool);
452 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
453 	} else {
454 		fprintf(stderr, "Unable to get ap_task\n");
455 		return NULL;
456 	}
457 
458 	return task;
459 }
460 
461 /* Submit one operation using the same ap task that just completed. */
462 static void
463 _submit_single(struct worker_thread *worker, struct ap_task *task)
464 {
465 	int random_num;
466 	int rc = 0;
467 	int flags = 0;
468 
469 	assert(worker);
470 
471 	switch (worker->workload) {
472 	case SPDK_ACCEL_OPC_COPY:
473 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
474 					    g_xfer_size_bytes, flags, accel_done, task);
475 		break;
476 	case SPDK_ACCEL_OPC_FILL:
477 		/* For fill use the first byte of the task->dst buffer */
478 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
479 					    g_xfer_size_bytes, flags, accel_done, task);
480 		break;
481 	case SPDK_ACCEL_OPC_CRC32C:
482 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
483 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
484 					       accel_done, task);
485 		break;
486 	case SPDK_ACCEL_OPC_COPY_CRC32C:
487 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
488 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
489 		break;
490 	case SPDK_ACCEL_OPC_COMPARE:
491 		random_num = rand() % 100;
492 		if (random_num < g_fail_percent_goal) {
493 			task->expected_status = -EILSEQ;
494 			*(uint8_t *)task->dst = ~DATA_PATTERN;
495 		} else {
496 			task->expected_status = 0;
497 			*(uint8_t *)task->dst = DATA_PATTERN;
498 		}
499 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
500 					       g_xfer_size_bytes, accel_done, task);
501 		break;
502 	case SPDK_ACCEL_OPC_DUALCAST:
503 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
504 						task->src, g_xfer_size_bytes, flags, accel_done, task);
505 		break;
506 	case SPDK_ACCEL_OPC_COMPRESS:
507 		task->src_iovs = task->cur_seg->uncompressed_iovs;
508 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
509 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded,
510 						task->src_iovs,
511 						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
512 		break;
513 	case SPDK_ACCEL_OPC_DECOMPRESS:
514 		task->src_iovs = task->cur_seg->compressed_iovs;
515 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
516 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
517 						  task->src_iovcnt, NULL, flags, accel_done, task);
518 		break;
519 	case SPDK_ACCEL_OPC_XOR:
520 		rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count,
521 					   g_xfer_size_bytes, accel_done, task);
522 		break;
523 	default:
524 		assert(false);
525 		break;
526 
527 	}
528 
529 	worker->current_queue_depth++;
530 	if (rc) {
531 		accel_done(task, rc);
532 	}
533 }
534 
535 static void
536 _free_task_buffers(struct ap_task *task)
537 {
538 	uint32_t i;
539 
540 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS ||
541 	    g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
542 		free(task->dst_iovs);
543 	} else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
544 		   g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) {
545 		if (task->src_iovs) {
546 			for (i = 0; i < task->src_iovcnt; i++) {
547 				if (task->src_iovs[i].iov_base) {
548 					spdk_dma_free(task->src_iovs[i].iov_base);
549 				}
550 			}
551 			free(task->src_iovs);
552 		}
553 	} else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) {
554 		if (task->sources) {
555 			for (i = 0; i < g_xor_src_count; i++) {
556 				spdk_dma_free(task->sources[i]);
557 			}
558 			free(task->sources);
559 		}
560 	} else {
561 		spdk_dma_free(task->src);
562 	}
563 
564 	spdk_dma_free(task->dst);
565 	if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) {
566 		spdk_dma_free(task->dst2);
567 	}
568 }
569 
570 static int
571 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
572 {
573 	uint32_t i;
574 	uint32_t ttl_len = 0;
575 	uint8_t *dst = (uint8_t *)_dst;
576 
577 	for (i = 0; i < iovcnt; i++) {
578 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
579 			return -1;
580 		}
581 		dst += src_src_iovs[i].iov_len;
582 		ttl_len += src_src_iovs[i].iov_len;
583 	}
584 
585 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
586 		return -1;
587 	}
588 
589 	return 0;
590 }
591 
592 static int _worker_stop(void *arg);
593 
594 static void
595 accel_done(void *arg1, int status)
596 {
597 	struct ap_task *task = arg1;
598 	struct worker_thread *worker = task->worker;
599 	uint32_t sw_crc32c;
600 
601 	assert(worker);
602 	assert(worker->current_queue_depth > 0);
603 
604 	if (g_verify && status == 0) {
605 		switch (worker->workload) {
606 		case SPDK_ACCEL_OPC_COPY_CRC32C:
607 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
608 			if (task->crc_dst != sw_crc32c) {
609 				SPDK_NOTICELOG("CRC-32C miscompare\n");
610 				worker->xfer_failed++;
611 			}
612 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
613 				SPDK_NOTICELOG("Data miscompare\n");
614 				worker->xfer_failed++;
615 			}
616 			break;
617 		case SPDK_ACCEL_OPC_CRC32C:
618 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
619 			if (task->crc_dst != sw_crc32c) {
620 				SPDK_NOTICELOG("CRC-32C miscompare\n");
621 				worker->xfer_failed++;
622 			}
623 			break;
624 		case SPDK_ACCEL_OPC_COPY:
625 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
626 				SPDK_NOTICELOG("Data miscompare\n");
627 				worker->xfer_failed++;
628 			}
629 			break;
630 		case SPDK_ACCEL_OPC_DUALCAST:
631 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
632 				SPDK_NOTICELOG("Data miscompare, first destination\n");
633 				worker->xfer_failed++;
634 			}
635 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
636 				SPDK_NOTICELOG("Data miscompare, second destination\n");
637 				worker->xfer_failed++;
638 			}
639 			break;
640 		case SPDK_ACCEL_OPC_FILL:
641 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
642 				SPDK_NOTICELOG("Data miscompare\n");
643 				worker->xfer_failed++;
644 			}
645 			break;
646 		case SPDK_ACCEL_OPC_COMPARE:
647 			break;
648 		case SPDK_ACCEL_OPC_COMPRESS:
649 			break;
650 		case SPDK_ACCEL_OPC_DECOMPRESS:
651 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
652 				SPDK_NOTICELOG("Data miscompare on decompression\n");
653 				worker->xfer_failed++;
654 			}
655 			break;
656 		case SPDK_ACCEL_OPC_XOR:
657 			if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count,
658 					 g_xfer_size_bytes) != 0) {
659 				SPDK_ERRLOG("Failed to generate xor for verification\n");
660 			} else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) {
661 				SPDK_NOTICELOG("Data miscompare\n");
662 				worker->xfer_failed++;
663 			}
664 			break;
665 		default:
666 			assert(false);
667 			break;
668 		}
669 	}
670 
671 	if (worker->workload == SPDK_ACCEL_OPC_COMPRESS ||
672 	    g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
673 		/* Advance the task to the next segment */
674 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
675 		if (task->cur_seg == NULL) {
676 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
677 		}
678 	}
679 
680 	if (task->expected_status == -EILSEQ) {
681 		assert(status != 0);
682 		worker->injected_miscompares++;
683 		status = 0;
684 	} else if (status) {
685 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
686 		worker->xfer_failed++;
687 	}
688 
689 	worker->current_queue_depth--;
690 
691 	if (!worker->is_draining && status == 0) {
692 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
693 		task = _get_task(worker);
694 		_submit_single(worker, task);
695 	} else {
696 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
697 	}
698 }
699 
700 static int
701 dump_result(void)
702 {
703 	uint64_t total_completed = 0;
704 	uint64_t total_failed = 0;
705 	uint64_t total_miscompared = 0;
706 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
707 	struct worker_thread *worker = g_workers;
708 	char tmp[64];
709 
710 	printf("\n%-12s %20s %16s %16s %16s\n",
711 	       "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares");
712 	printf("------------------------------------------------------------------------------------\n");
713 	while (worker != NULL) {
714 
715 		uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec;
716 		uint64_t bw_in_MiBps = worker->stats.num_bytes /
717 				       (g_time_in_sec * 1024 * 1024);
718 
719 		total_completed += worker->stats.executed;
720 		total_failed += worker->xfer_failed;
721 		total_miscompared += worker->injected_miscompares;
722 
723 		snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread);
724 		if (xfer_per_sec) {
725 			printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
726 			       tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed,
727 			       worker->injected_miscompares);
728 		}
729 
730 		worker = worker->next;
731 	}
732 
733 	total_xfer_per_sec = total_completed / g_time_in_sec;
734 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
735 			    (g_time_in_sec * 1024 * 1024);
736 
737 	printf("====================================================================================\n");
738 	printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n",
739 	       "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
740 
741 	return total_failed ? 1 : 0;
742 }
743 
744 static inline void
745 _free_task_buffers_in_pool(struct worker_thread *worker)
746 {
747 	struct ap_task *task;
748 
749 	assert(worker);
750 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
751 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
752 		_free_task_buffers(task);
753 	}
754 }
755 
756 static int
757 _check_draining(void *arg)
758 {
759 	struct worker_thread *worker = arg;
760 
761 	assert(worker);
762 
763 	if (worker->current_queue_depth == 0) {
764 		_free_task_buffers_in_pool(worker);
765 		spdk_poller_unregister(&worker->is_draining_poller);
766 		unregister_worker(worker);
767 	}
768 
769 	return SPDK_POLLER_BUSY;
770 }
771 
772 static int
773 _worker_stop(void *arg)
774 {
775 	struct worker_thread *worker = arg;
776 
777 	assert(worker);
778 
779 	spdk_poller_unregister(&worker->stop_poller);
780 
781 	/* now let the worker drain and check it's outstanding IO with a poller */
782 	worker->is_draining = true;
783 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
784 
785 	return SPDK_POLLER_BUSY;
786 }
787 
788 static void shutdown_cb(void);
789 
790 static void
791 _init_thread(void *arg1)
792 {
793 	struct worker_thread *worker;
794 	struct ap_task *task;
795 	int i, num_tasks = g_allocate_depth;
796 	struct display_info *display = arg1;
797 
798 	worker = calloc(1, sizeof(*worker));
799 	if (worker == NULL) {
800 		fprintf(stderr, "Unable to allocate worker\n");
801 		free(display);
802 		spdk_thread_exit(spdk_get_thread());
803 		goto no_worker;
804 	}
805 
806 	worker->workload = g_workload_selection;
807 	worker->display.core = display->core;
808 	worker->display.thread = display->thread;
809 	free(display);
810 	worker->core = spdk_env_get_current_core();
811 	worker->thread = spdk_get_thread();
812 	pthread_mutex_lock(&g_workers_lock);
813 	g_num_workers++;
814 	worker->next = g_workers;
815 	g_workers = worker;
816 	pthread_mutex_unlock(&g_workers_lock);
817 	worker->ch = spdk_accel_get_io_channel();
818 	if (worker->ch == NULL) {
819 		fprintf(stderr, "Unable to get an accel channel\n");
820 		goto error;
821 	}
822 
823 	TAILQ_INIT(&worker->tasks_pool);
824 
825 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
826 	if (worker->task_base == NULL) {
827 		fprintf(stderr, "Could not allocate task base.\n");
828 		goto error;
829 	}
830 
831 	task = worker->task_base;
832 	for (i = 0; i < num_tasks; i++) {
833 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
834 		task->worker = worker;
835 		if (_get_task_data_bufs(task)) {
836 			fprintf(stderr, "Unable to get data bufs\n");
837 			goto error;
838 		}
839 		task++;
840 	}
841 
842 	/* Register a poller that will stop the worker at time elapsed */
843 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
844 			      g_time_in_sec * 1000000ULL);
845 
846 	/* Load up queue depth worth of operations. */
847 	for (i = 0; i < g_queue_depth; i++) {
848 		task = _get_task(worker);
849 		if (task == NULL) {
850 			goto error;
851 		}
852 
853 		_submit_single(worker, task);
854 	}
855 	return;
856 error:
857 
858 	_free_task_buffers_in_pool(worker);
859 	free(worker->task_base);
860 no_worker:
861 	shutdown_cb();
862 	g_rc = -1;
863 }
864 
865 static void
866 accel_perf_start(void *arg1)
867 {
868 	struct spdk_cpuset tmp_cpumask = {};
869 	char thread_name[32];
870 	uint32_t i;
871 	int j;
872 	struct spdk_thread *thread;
873 	struct display_info *display;
874 
875 	g_tsc_rate = spdk_get_ticks_hz();
876 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
877 
878 	dump_user_config();
879 
880 	printf("Running for %d seconds...\n", g_time_in_sec);
881 	fflush(stdout);
882 
883 	/* Create worker threads for each core that was specified. */
884 	SPDK_ENV_FOREACH_CORE(i) {
885 		for (j = 0; j < g_threads_per_core; j++) {
886 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
887 			spdk_cpuset_zero(&tmp_cpumask);
888 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
889 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
890 			display = calloc(1, sizeof(*display));
891 			if (display == NULL) {
892 				fprintf(stderr, "Unable to allocate memory\n");
893 				spdk_app_stop(-1);
894 				return;
895 			}
896 			display->core = i;
897 			display->thread = j;
898 			spdk_thread_send_msg(thread, _init_thread, display);
899 		}
900 	}
901 }
902 
903 static void
904 accel_perf_free_compress_segs(void)
905 {
906 	struct ap_compress_seg *seg, *tmp;
907 
908 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
909 		free(seg->uncompressed_iovs);
910 		free(seg->compressed_iovs);
911 		spdk_dma_free(seg->compressed_data);
912 		spdk_dma_free(seg->uncompressed_data);
913 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
914 		free(seg);
915 	}
916 }
917 
918 struct accel_perf_prep_ctx {
919 	FILE			*file;
920 	long			remaining;
921 	struct spdk_io_channel	*ch;
922 	struct ap_compress_seg	*cur_seg;
923 };
924 
925 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
926 
927 static void
928 accel_perf_prep_process_seg_cpl(void *ref, int status)
929 {
930 	struct accel_perf_prep_ctx *ctx = ref;
931 	struct ap_compress_seg *seg;
932 
933 	if (status != 0) {
934 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
935 		spdk_dma_free(ctx->cur_seg->compressed_data);
936 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
937 		free(ctx->cur_seg);
938 		spdk_put_io_channel(ctx->ch);
939 		fclose(ctx->file);
940 		free(ctx);
941 		spdk_app_stop(-status);
942 		return;
943 	}
944 
945 	seg = ctx->cur_seg;
946 
947 	if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) {
948 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
949 		if (seg->compressed_iovs == NULL) {
950 			fprintf(stderr, "unable to allocate iovec\n");
951 			spdk_dma_free(seg->compressed_data);
952 			spdk_dma_free(seg->uncompressed_data);
953 			free(seg);
954 			spdk_put_io_channel(ctx->ch);
955 			fclose(ctx->file);
956 			free(ctx);
957 			spdk_app_stop(-ENOMEM);
958 			return;
959 		}
960 		seg->compressed_iovcnt = g_chained_count;
961 
962 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
963 					  seg->compressed_iovcnt);
964 	}
965 
966 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
967 	ctx->remaining -= seg->uncompressed_len;
968 
969 	accel_perf_prep_process_seg(ctx);
970 }
971 
972 static void
973 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
974 {
975 	struct ap_compress_seg *seg;
976 	int sz, sz_read, sz_padded;
977 	void *ubuf, *cbuf;
978 	struct iovec iov[1];
979 	int rc;
980 
981 	if (ctx->remaining == 0) {
982 		spdk_put_io_channel(ctx->ch);
983 		fclose(ctx->file);
984 		free(ctx);
985 		accel_perf_start(NULL);
986 		return;
987 	}
988 
989 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
990 	/* Add 10% pad to the compress buffer for incompressible data. Note that a real app
991 	 * would likely either deal with the failure of not having a large enough buffer
992 	 * by submitting another operation with a larger one.  Or, like the vbdev module
993 	 * does, just accept the error and use the data uncompressed marking it as such in
994 	 * its own metadata so that in the future it doesn't try to decompress uncompressed
995 	 * data, etc.
996 	 */
997 	sz_padded = sz * COMP_BUF_PAD_PERCENTAGE;
998 
999 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
1000 	if (!ubuf) {
1001 		fprintf(stderr, "unable to allocate uncompress buffer\n");
1002 		rc = -ENOMEM;
1003 		goto error;
1004 	}
1005 
1006 	cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL);
1007 	if (!cbuf) {
1008 		fprintf(stderr, "unable to allocate compress buffer\n");
1009 		rc = -ENOMEM;
1010 		spdk_dma_free(ubuf);
1011 		goto error;
1012 	}
1013 
1014 	seg = calloc(1, sizeof(*seg));
1015 	if (!seg) {
1016 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
1017 		spdk_dma_free(ubuf);
1018 		spdk_dma_free(cbuf);
1019 		rc = -ENOMEM;
1020 		goto error;
1021 	}
1022 
1023 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
1024 	if (sz_read != sz) {
1025 		fprintf(stderr, "unable to read input file\n");
1026 		free(seg);
1027 		spdk_dma_free(ubuf);
1028 		spdk_dma_free(cbuf);
1029 		rc = -errno;
1030 		goto error;
1031 	}
1032 
1033 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) {
1034 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
1035 		if (seg->uncompressed_iovs == NULL) {
1036 			fprintf(stderr, "unable to allocate iovec\n");
1037 			free(seg);
1038 			spdk_dma_free(ubuf);
1039 			spdk_dma_free(cbuf);
1040 			rc = -ENOMEM;
1041 			goto error;
1042 		}
1043 		seg->uncompressed_iovcnt = g_chained_count;
1044 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
1045 	}
1046 
1047 	seg->uncompressed_data = ubuf;
1048 	seg->uncompressed_len = sz;
1049 	seg->compressed_data = cbuf;
1050 	seg->compressed_len = sz;
1051 	seg->compressed_len_padded = sz_padded;
1052 
1053 	ctx->cur_seg = seg;
1054 	iov[0].iov_base = seg->uncompressed_data;
1055 	iov[0].iov_len = seg->uncompressed_len;
1056 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
1057 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
1058 	 * to hold the compressed data.  This example app simply adds 10% buffer for compressed data
1059 	 * but real applications may want to consider a more sophisticated method.
1060 	 */
1061 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1,
1062 					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
1063 	if (rc < 0) {
1064 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
1065 		goto error;
1066 	}
1067 
1068 	return;
1069 
1070 error:
1071 	spdk_put_io_channel(ctx->ch);
1072 	fclose(ctx->file);
1073 	free(ctx);
1074 	spdk_app_stop(rc);
1075 }
1076 
1077 static void
1078 accel_perf_prep(void *arg1)
1079 {
1080 	struct accel_perf_prep_ctx *ctx;
1081 	const char *module_name = NULL;
1082 	int rc = 0;
1083 
1084 	if (g_module_name) {
1085 		rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
1086 		if (rc != 0 || strcmp(g_module_name, module_name) != 0) {
1087 			fprintf(stderr, "Module '%s' was assigned via JSON config or RPC, instead of '%s'\n",
1088 				module_name, g_module_name);
1089 			fprintf(stderr, "-M option is not compatible with accel_assign_opc RPC\n");
1090 			rc = -EINVAL;
1091 			goto error_end;
1092 		}
1093 	}
1094 
1095 	if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS &&
1096 	    g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) {
1097 		accel_perf_start(arg1);
1098 		return;
1099 	}
1100 
1101 	if (g_cd_file_in_name == NULL) {
1102 		fprintf(stdout, "A filename is required.\n");
1103 		rc = -EINVAL;
1104 		goto error_end;
1105 	}
1106 
1107 	if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) {
1108 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1109 		rc = -ENOTSUP;
1110 		goto error_end;
1111 	}
1112 
1113 	printf("Preparing input file...\n");
1114 
1115 	ctx = calloc(1, sizeof(*ctx));
1116 	if (ctx == NULL) {
1117 		rc = -ENOMEM;
1118 		goto error_end;
1119 	}
1120 
1121 	ctx->file = fopen(g_cd_file_in_name, "r");
1122 	if (ctx->file == NULL) {
1123 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1124 		rc = -errno;
1125 		goto error_ctx;
1126 	}
1127 
1128 	fseek(ctx->file, 0L, SEEK_END);
1129 	ctx->remaining = ftell(ctx->file);
1130 	fseek(ctx->file, 0L, SEEK_SET);
1131 
1132 	ctx->ch = spdk_accel_get_io_channel();
1133 	if (ctx->ch == NULL) {
1134 		rc = -EAGAIN;
1135 		goto error_file;
1136 	}
1137 
1138 	if (g_xfer_size_bytes == 0) {
1139 		/* size of 0 means "file at a time" */
1140 		g_xfer_size_bytes = ctx->remaining;
1141 	}
1142 
1143 	accel_perf_prep_process_seg(ctx);
1144 	return;
1145 
1146 error_file:
1147 	fclose(ctx->file);
1148 error_ctx:
1149 	free(ctx);
1150 error_end:
1151 	spdk_app_stop(rc);
1152 }
1153 
1154 static void
1155 worker_shutdown(void *ctx)
1156 {
1157 	_worker_stop(ctx);
1158 }
1159 
1160 static void
1161 shutdown_cb(void)
1162 {
1163 	struct worker_thread *worker;
1164 
1165 	pthread_mutex_lock(&g_workers_lock);
1166 	if (!g_workers) {
1167 		spdk_app_stop(1);
1168 		goto unlock;
1169 	}
1170 
1171 	worker = g_workers;
1172 	while (worker) {
1173 		spdk_thread_send_msg(worker->thread, worker_shutdown, worker);
1174 		worker = worker->next;
1175 	}
1176 unlock:
1177 	pthread_mutex_unlock(&g_workers_lock);
1178 }
1179 
1180 int
1181 main(int argc, char **argv)
1182 {
1183 	struct worker_thread *worker, *tmp;
1184 	int rc;
1185 
1186 	pthread_mutex_init(&g_workers_lock, NULL);
1187 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1188 	g_opts.name = "accel_perf";
1189 	g_opts.reactor_mask = "0x1";
1190 	g_opts.shutdown_cb = shutdown_cb;
1191 
1192 	rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:M:P:f:T:l:S:x:", NULL,
1193 				 parse_args, usage);
1194 	if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) {
1195 		return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1;
1196 	}
1197 
1198 	if (g_workload_selection == SPDK_ACCEL_OPC_LAST) {
1199 		fprintf(stderr, "Must provide a workload type\n");
1200 		usage();
1201 		return -1;
1202 	}
1203 
1204 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1205 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1206 		usage();
1207 		return -1;
1208 	}
1209 
1210 	if (g_allocate_depth == 0) {
1211 		g_allocate_depth = g_queue_depth;
1212 	}
1213 
1214 	if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C ||
1215 	     g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) &&
1216 	    g_chained_count == 0) {
1217 		usage();
1218 		return -1;
1219 	}
1220 
1221 	if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) {
1222 		usage();
1223 		return -1;
1224 	}
1225 
1226 	if (g_module_name && spdk_accel_assign_opc(g_workload_selection, g_module_name)) {
1227 		fprintf(stderr, "Was not able to assign '%s' module to the workload\n", g_module_name);
1228 		usage();
1229 		return -1;
1230 	}
1231 
1232 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1233 	if (g_rc) {
1234 		SPDK_ERRLOG("ERROR starting application\n");
1235 	}
1236 
1237 	pthread_mutex_destroy(&g_workers_lock);
1238 
1239 	worker = g_workers;
1240 	while (worker) {
1241 		tmp = worker->next;
1242 		free(worker);
1243 		worker = tmp;
1244 	}
1245 	accel_perf_free_compress_segs();
1246 	spdk_app_fini();
1247 	return g_rc;
1248 }
1249