xref: /spdk/examples/accel/perf/accel_perf.c (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static uint64_t	g_tsc_rate;
48 static uint64_t g_tsc_end;
49 static int g_rc;
50 static int g_xfer_size_bytes = 4096;
51 static int g_queue_depth = 32;
52 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
53  * be at least as much as the queue depth.
54  */
55 static int g_allocate_depth = 0;
56 static int g_threads_per_core = 1;
57 static int g_time_in_sec = 5;
58 static uint32_t g_crc32c_seed = 0;
59 static uint32_t g_crc32c_chained_count = 1;
60 static int g_fail_percent_goal = 0;
61 static uint8_t g_fill_pattern = 255;
62 static bool g_verify = false;
63 static const char *g_workload_type = NULL;
64 static enum accel_opcode g_workload_selection;
65 static struct worker_thread *g_workers = NULL;
66 static int g_num_workers = 0;
67 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
68 
69 struct worker_thread;
70 static void accel_done(void *ref, int status);
71 
72 struct display_info {
73 	int core;
74 	int thread;
75 };
76 
77 struct ap_task {
78 	void			*src;
79 	struct iovec		*iovs;
80 	uint32_t		iov_cnt;
81 	void			*dst;
82 	void			*dst2;
83 	union {
84 		uint32_t	crc_dst;
85 		uint32_t	output_size;
86 	};
87 	struct worker_thread	*worker;
88 	int			expected_status; /* used for the compare operation */
89 	TAILQ_ENTRY(ap_task)	link;
90 };
91 
92 struct worker_thread {
93 	struct spdk_io_channel		*ch;
94 	uint64_t			xfer_completed;
95 	uint64_t			xfer_failed;
96 	uint64_t			injected_miscompares;
97 	uint64_t			current_queue_depth;
98 	TAILQ_HEAD(, ap_task)		tasks_pool;
99 	struct worker_thread		*next;
100 	unsigned			core;
101 	struct spdk_thread		*thread;
102 	bool				is_draining;
103 	struct spdk_poller		*is_draining_poller;
104 	struct spdk_poller		*stop_poller;
105 	void				*task_base;
106 	struct display_info		display;
107 	enum accel_opcode		workload;
108 	void				*rnd_data;
109 };
110 
111 static void
112 dump_user_config(struct spdk_app_opts *opts)
113 {
114 	printf("SPDK Configuration:\n");
115 	printf("Core mask:      %s\n\n", opts->reactor_mask);
116 	printf("Accel Perf Configuration:\n");
117 	printf("Workload Type:  %s\n", g_workload_type);
118 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
119 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
120 		printf("vector count    %u\n", g_crc32c_chained_count);
121 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
122 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
123 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
124 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
125 	}
126 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
127 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
128 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count);
129 	} else {
130 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
131 	}
132 	printf("Queue depth:    %u\n", g_queue_depth);
133 	printf("Allocate depth: %u\n", g_allocate_depth);
134 	printf("# threads/core: %u\n", g_threads_per_core);
135 	printf("Run time:       %u seconds\n", g_time_in_sec);
136 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
137 }
138 
139 static void
140 usage(void)
141 {
142 	printf("accel_perf options:\n");
143 	printf("\t[-h help message]\n");
144 	printf("\t[-q queue depth per core]\n");
145 	printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n");
146 	printf("\t[-T number of threads per core\n");
147 	printf("\t[-n number of channels]\n");
148 	printf("\t[-o transfer size in bytes]\n");
149 	printf("\t[-t time in seconds]\n");
150 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, dualcast\n");
151 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
152 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
153 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
154 	printf("\t[-y verify result if this switch is on]\n");
155 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
156 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
157 }
158 
159 static int
160 parse_args(int argc, char *argv)
161 {
162 	int argval = 0;
163 
164 	switch (argc) {
165 	case 'a':
166 	case 'C':
167 	case 'f':
168 	case 'T':
169 	case 'o':
170 	case 'P':
171 	case 'q':
172 	case 's':
173 	case 't':
174 		argval = spdk_strtol(optarg, 10);
175 		if (argval < 0) {
176 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
177 			usage();
178 			return 1;
179 		}
180 		break;
181 	default:
182 		break;
183 	};
184 
185 	switch (argc) {
186 	case 'a':
187 		g_allocate_depth = argval;
188 		break;
189 	case 'C':
190 		g_crc32c_chained_count = argval;
191 		break;
192 	case 'f':
193 		g_fill_pattern = (uint8_t)argval;
194 		break;
195 	case 'T':
196 		g_threads_per_core = argval;
197 		break;
198 	case 'o':
199 		g_xfer_size_bytes = argval;
200 		break;
201 	case 'P':
202 		g_fail_percent_goal = argval;
203 		break;
204 	case 'q':
205 		g_queue_depth = argval;
206 		break;
207 	case 's':
208 		g_crc32c_seed = argval;
209 		break;
210 	case 't':
211 		g_time_in_sec = argval;
212 		break;
213 	case 'y':
214 		g_verify = true;
215 		break;
216 	case 'w':
217 		g_workload_type = optarg;
218 		if (!strcmp(g_workload_type, "copy")) {
219 			g_workload_selection = ACCEL_OPC_COPY;
220 		} else if (!strcmp(g_workload_type, "fill")) {
221 			g_workload_selection = ACCEL_OPC_FILL;
222 		} else if (!strcmp(g_workload_type, "crc32c")) {
223 			g_workload_selection = ACCEL_OPC_CRC32C;
224 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
225 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
226 		} else if (!strcmp(g_workload_type, "compare")) {
227 			g_workload_selection = ACCEL_OPC_COMPARE;
228 		} else if (!strcmp(g_workload_type, "dualcast")) {
229 			g_workload_selection = ACCEL_OPC_DUALCAST;
230 		} else if (!strcmp(g_workload_type, "compress")) {
231 			g_workload_selection = ACCEL_OPC_COMPRESS;
232 		}
233 		break;
234 	default:
235 		usage();
236 		return 1;
237 	}
238 
239 	return 0;
240 }
241 
242 static int dump_result(void);
243 static void
244 unregister_worker(void *arg1)
245 {
246 	struct worker_thread *worker = arg1;
247 
248 	free(worker->task_base);
249 	free(worker->rnd_data);
250 	spdk_put_io_channel(worker->ch);
251 	pthread_mutex_lock(&g_workers_lock);
252 	assert(g_num_workers >= 1);
253 	if (--g_num_workers == 0) {
254 		pthread_mutex_unlock(&g_workers_lock);
255 		g_rc = dump_result();
256 		spdk_app_stop(0);
257 	}
258 	pthread_mutex_unlock(&g_workers_lock);
259 }
260 
261 static int
262 _get_task_data_bufs(struct ap_task *task)
263 {
264 	uint32_t align = 0;
265 	uint32_t i = 0;
266 	int dst_buff_len = g_xfer_size_bytes;
267 
268 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
269 	 * we do this for all engines to keep it simple.
270 	 */
271 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
272 		align = ALIGN_4K;
273 	}
274 
275 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
276 		assert(g_crc32c_chained_count > 0);
277 		task->iov_cnt = g_crc32c_chained_count;
278 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
279 		if (!task->iovs) {
280 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
281 			return -ENOMEM;
282 		}
283 
284 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
285 			dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count;
286 		}
287 
288 		for (i = 0; i < task->iov_cnt; i++) {
289 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
290 			if (task->iovs[i].iov_base == NULL) {
291 				return -ENOMEM;
292 			}
293 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
294 			task->iovs[i].iov_len = g_xfer_size_bytes;
295 		}
296 
297 	} else {
298 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
299 		if (task->src == NULL) {
300 			fprintf(stderr, "Unable to alloc src buffer\n");
301 			return -ENOMEM;
302 		}
303 
304 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
305 		if (g_workload_selection == ACCEL_OPC_FILL) {
306 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
307 		} else if (g_workload_selection == ACCEL_OPC_COMPRESS) {
308 			memcpy(task->src, task->worker->rnd_data, g_xfer_size_bytes);
309 		} else {
310 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
311 		}
312 	}
313 
314 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
315 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
316 		if (task->dst == NULL) {
317 			fprintf(stderr, "Unable to alloc dst buffer\n");
318 			return -ENOMEM;
319 		}
320 
321 		/* For compare we want the buffers to match, otherwise not. */
322 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
323 			memset(task->dst, DATA_PATTERN, dst_buff_len);
324 		} else {
325 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
326 		}
327 	}
328 
329 	/* For dualcast 2 buffers are needed for the operation.  For compress we use the second buffer to
330 	 * store the original pre-compressed data so we have a copy of it when we go to decompress.
331 	 */
332 	if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) {
333 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
334 		if (task->dst2 == NULL) {
335 			fprintf(stderr, "Unable to alloc dst buffer\n");
336 			return -ENOMEM;
337 		}
338 		if (g_workload_selection == ACCEL_OPC_DUALCAST) {
339 			memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
340 		} else if (g_workload_selection == ACCEL_OPC_COMPRESS) {
341 			/* copy the oriignal data to dst2 so we can compare it to
342 			 * the results of decompression if -y is used.
343 			 */
344 			assert(task->src); /* for scan-build */
345 			memcpy(task->dst2, task->src, g_xfer_size_bytes);
346 		}
347 	}
348 
349 	return 0;
350 }
351 
352 inline static struct ap_task *
353 _get_task(struct worker_thread *worker)
354 {
355 	struct ap_task *task;
356 
357 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
358 		task = TAILQ_FIRST(&worker->tasks_pool);
359 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
360 	} else {
361 		fprintf(stderr, "Unable to get ap_task\n");
362 		return NULL;
363 	}
364 
365 	return task;
366 }
367 
368 /* Submit one operation using the same ap task that just completed. */
369 static void
370 _submit_single(struct worker_thread *worker, struct ap_task *task)
371 {
372 	int random_num;
373 	int rc = 0;
374 	int flags = 0;
375 
376 	assert(worker);
377 
378 	switch (worker->workload) {
379 	case ACCEL_OPC_COPY:
380 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
381 					    g_xfer_size_bytes, flags, accel_done, task);
382 		break;
383 	case ACCEL_OPC_FILL:
384 		/* For fill use the first byte of the task->dst buffer */
385 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
386 					    g_xfer_size_bytes, flags, accel_done, task);
387 		break;
388 	case ACCEL_OPC_CRC32C:
389 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
390 					       task->iovs, task->iov_cnt, g_crc32c_seed,
391 					       accel_done, task);
392 		break;
393 	case ACCEL_OPC_COPY_CRC32C:
394 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt,
395 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
396 		break;
397 	case ACCEL_OPC_COMPARE:
398 		random_num = rand() % 100;
399 		if (random_num < g_fail_percent_goal) {
400 			task->expected_status = -EILSEQ;
401 			*(uint8_t *)task->dst = ~DATA_PATTERN;
402 		} else {
403 			task->expected_status = 0;
404 			*(uint8_t *)task->dst = DATA_PATTERN;
405 		}
406 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
407 					       g_xfer_size_bytes, accel_done, task);
408 		break;
409 	case ACCEL_OPC_DUALCAST:
410 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
411 						task->src, g_xfer_size_bytes, flags, accel_done, task);
412 		break;
413 	case ACCEL_OPC_COMPRESS:
414 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->src,
415 						g_xfer_size_bytes, g_xfer_size_bytes, &task->output_size,
416 						flags, accel_done, task);
417 		break;
418 	default:
419 		assert(false);
420 		break;
421 
422 	}
423 
424 	if (rc) {
425 		accel_done(task, rc);
426 	}
427 }
428 
429 static void
430 _free_task_buffers(struct ap_task *task)
431 {
432 	uint32_t i;
433 
434 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
435 		if (task->iovs) {
436 			for (i = 0; i < task->iov_cnt; i++) {
437 				if (task->iovs[i].iov_base) {
438 					spdk_dma_free(task->iovs[i].iov_base);
439 				}
440 			}
441 			free(task->iovs);
442 		}
443 	} else {
444 		spdk_dma_free(task->src);
445 	}
446 
447 	spdk_dma_free(task->dst);
448 	if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) {
449 		spdk_dma_free(task->dst2);
450 	}
451 }
452 
453 static int
454 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt)
455 {
456 	uint32_t i;
457 	uint32_t ttl_len = 0;
458 	uint8_t *dst = (uint8_t *)_dst;
459 
460 	for (i = 0; i < iovcnt; i++) {
461 		if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) {
462 			return -1;
463 		}
464 		dst += src_iovs[i].iov_len;
465 		ttl_len += src_iovs[i].iov_len;
466 	}
467 
468 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
469 		return -1;
470 	}
471 
472 	return 0;
473 }
474 
475 static int _worker_stop(void *arg);
476 
477 static void
478 accel_done(void *arg1, int status)
479 {
480 	struct ap_task *task = arg1;
481 	struct worker_thread *worker = task->worker;
482 	uint32_t sw_crc32c;
483 	int rc;
484 
485 	assert(worker);
486 	assert(worker->current_queue_depth > 0);
487 
488 	if (!worker->is_draining && status == -EINVAL && worker->workload == ACCEL_OPC_COMPRESS) {
489 		printf("Invalid configuration, compress workload needs ISA-L or IAA. Exiting\n");
490 		_worker_stop(worker);
491 	}
492 
493 	if (g_verify && status == 0) {
494 		switch (worker->workload) {
495 		case ACCEL_OPC_COPY_CRC32C:
496 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
497 			if (task->crc_dst != sw_crc32c) {
498 				SPDK_NOTICELOG("CRC-32C miscompare\n");
499 				worker->xfer_failed++;
500 			}
501 			if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) {
502 				SPDK_NOTICELOG("Data miscompare\n");
503 				worker->xfer_failed++;
504 			}
505 			break;
506 		case ACCEL_OPC_CRC32C:
507 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
508 			if (task->crc_dst != sw_crc32c) {
509 				SPDK_NOTICELOG("CRC-32C miscompare\n");
510 				worker->xfer_failed++;
511 			}
512 			break;
513 		case ACCEL_OPC_COPY:
514 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
515 				SPDK_NOTICELOG("Data miscompare\n");
516 				worker->xfer_failed++;
517 			}
518 			break;
519 		case ACCEL_OPC_DUALCAST:
520 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
521 				SPDK_NOTICELOG("Data miscompare, first destination\n");
522 				worker->xfer_failed++;
523 			}
524 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
525 				SPDK_NOTICELOG("Data miscompare, second destination\n");
526 				worker->xfer_failed++;
527 			}
528 			break;
529 		case ACCEL_OPC_FILL:
530 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
531 				SPDK_NOTICELOG("Data miscompare\n");
532 				worker->xfer_failed++;
533 			}
534 			break;
535 		case ACCEL_OPC_COMPARE:
536 			break;
537 		case ACCEL_OPC_COMPRESS:
538 			/* We've completed the compression phase, now need to uncompress the compressed data
539 			 * and compare that to the original buffer to see if it matches.  So we flip flor
540 			 * src and destination then compare task->src to task->dst which is where we saved
541 			 * the orgiinal data.
542 			 */
543 			if (!worker->is_draining) {
544 				worker->workload = ACCEL_OPC_DECOMPRESS;
545 				worker->xfer_completed++;
546 				memset(task->src, 0, g_xfer_size_bytes);
547 				rc = spdk_accel_submit_decompress(worker->ch, task->src, task->dst,
548 								  g_xfer_size_bytes, g_xfer_size_bytes, 0, accel_done, task);
549 				if (rc) {
550 					SPDK_NOTICELOG("Unable to submit decomrpess for verficiation, tc = %d\n", rc);
551 				}
552 				return;
553 			}
554 			break;
555 		case ACCEL_OPC_DECOMPRESS:
556 			worker->workload = ACCEL_OPC_COMPRESS;
557 			if (memcmp(task->dst2, task->src, g_xfer_size_bytes)) {
558 				SPDK_NOTICELOG("Data miscompare after decompression\n");
559 				worker->xfer_failed++;
560 			}
561 			break;
562 		default:
563 			assert(false);
564 			break;
565 		}
566 	}
567 
568 	if (task->expected_status == -EILSEQ) {
569 		assert(status != 0);
570 		worker->injected_miscompares++;
571 		status = 0;
572 	} else if (status) {
573 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
574 		worker->xfer_failed++;
575 	}
576 
577 	worker->xfer_completed++;
578 	worker->current_queue_depth--;
579 
580 	if (!worker->is_draining && status == 0) {
581 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
582 		task = _get_task(worker);
583 		_submit_single(worker, task);
584 		worker->current_queue_depth++;
585 	} else {
586 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
587 	}
588 }
589 
590 static int
591 dump_result(void)
592 {
593 	uint64_t total_completed = 0;
594 	uint64_t total_failed = 0;
595 	uint64_t total_miscompared = 0;
596 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
597 	struct worker_thread *worker = g_workers;
598 
599 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
600 	printf("------------------------------------------------------------------------\n");
601 	while (worker != NULL) {
602 
603 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
604 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
605 				       (g_time_in_sec * 1024 * 1024);
606 
607 		total_completed += worker->xfer_completed;
608 		total_failed += worker->xfer_failed;
609 		total_miscompared += worker->injected_miscompares;
610 
611 		if (xfer_per_sec) {
612 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
613 			       worker->display.core, worker->display.thread, xfer_per_sec,
614 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
615 		}
616 
617 		worker = worker->next;
618 	}
619 
620 	total_xfer_per_sec = total_completed / g_time_in_sec;
621 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
622 			    (g_time_in_sec * 1024 * 1024);
623 
624 	printf("=========================================================================\n");
625 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
626 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
627 
628 	return total_failed ? 1 : 0;
629 }
630 
631 static inline void
632 _free_task_buffers_in_pool(struct worker_thread *worker)
633 {
634 	struct ap_task *task;
635 
636 	assert(worker);
637 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
638 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
639 		_free_task_buffers(task);
640 	}
641 }
642 
643 static int
644 _check_draining(void *arg)
645 {
646 	struct worker_thread *worker = arg;
647 
648 	assert(worker);
649 
650 	if (worker->current_queue_depth == 0) {
651 		_free_task_buffers_in_pool(worker);
652 		spdk_poller_unregister(&worker->is_draining_poller);
653 		unregister_worker(worker);
654 	}
655 
656 	return SPDK_POLLER_BUSY;
657 }
658 
659 static int
660 _worker_stop(void *arg)
661 {
662 	struct worker_thread *worker = arg;
663 
664 	assert(worker);
665 
666 	spdk_poller_unregister(&worker->stop_poller);
667 
668 	/* now let the worker drain and check it's outstanding IO with a poller */
669 	worker->is_draining = true;
670 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
671 
672 	return SPDK_POLLER_BUSY;
673 }
674 
675 static void
676 _init_thread(void *arg1)
677 {
678 	struct worker_thread *worker;
679 	struct ap_task *task;
680 	int i, num_tasks = g_allocate_depth;
681 	struct display_info *display = arg1;
682 	uint8_t *offset;
683 	uint64_t j;
684 
685 	worker = calloc(1, sizeof(*worker));
686 	if (worker == NULL) {
687 		fprintf(stderr, "Unable to allocate worker\n");
688 		free(display);
689 		return;
690 	}
691 
692 	worker->workload = g_workload_selection;
693 	worker->display.core = display->core;
694 	worker->display.thread = display->thread;
695 	free(display);
696 	worker->core = spdk_env_get_current_core();
697 	worker->thread = spdk_get_thread();
698 	pthread_mutex_lock(&g_workers_lock);
699 	g_num_workers++;
700 	worker->next = g_workers;
701 	g_workers = worker;
702 	pthread_mutex_unlock(&g_workers_lock);
703 	worker->ch = spdk_accel_engine_get_io_channel();
704 	if (worker->ch == NULL) {
705 		fprintf(stderr, "Unable to get an accel channel\n");
706 		goto error;
707 	}
708 
709 	TAILQ_INIT(&worker->tasks_pool);
710 
711 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
712 	if (worker->task_base == NULL) {
713 		fprintf(stderr, "Could not allocate task base.\n");
714 		goto error;
715 	}
716 
717 	if (g_workload_selection == ACCEL_OPC_COMPRESS) {
718 		worker->rnd_data = calloc(1, g_xfer_size_bytes);
719 		if (worker->rnd_data == NULL) {
720 			printf("unable to allcoate rnd_data buffer\n");
721 			goto error;
722 		}
723 		/* only fill half the data buffer with rnd data to make it more
724 		 * compressible.
725 		 */
726 		offset = worker->rnd_data;
727 		for (j = 0; j < g_xfer_size_bytes / sizeof(uint8_t) / 2; j++) {
728 			*offset = rand() % 256;
729 			offset++;
730 		}
731 	}
732 
733 	task = worker->task_base;
734 	for (i = 0; i < num_tasks; i++) {
735 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
736 		task->worker = worker;
737 		if (_get_task_data_bufs(task)) {
738 			fprintf(stderr, "Unable to get data bufs\n");
739 			goto error;
740 		}
741 		task++;
742 	}
743 
744 	/* Register a poller that will stop the worker at time elapsed */
745 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
746 			      g_time_in_sec * 1000000ULL);
747 
748 	/* Load up queue depth worth of operations. */
749 	for (i = 0; i < g_queue_depth; i++) {
750 		task = _get_task(worker);
751 		worker->current_queue_depth++;
752 		if (task == NULL) {
753 			goto error;
754 		}
755 
756 		_submit_single(worker, task);
757 	}
758 	return;
759 error:
760 
761 	free(worker->rnd_data);
762 	_free_task_buffers_in_pool(worker);
763 	free(worker->task_base);
764 	spdk_app_stop(-1);
765 }
766 
767 static void
768 accel_perf_start(void *arg1)
769 {
770 	struct spdk_cpuset tmp_cpumask = {};
771 	char thread_name[32];
772 	uint32_t i;
773 	int j;
774 	struct spdk_thread *thread;
775 	struct display_info *display;
776 
777 	g_tsc_rate = spdk_get_ticks_hz();
778 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
779 
780 	printf("Running for %d seconds...\n", g_time_in_sec);
781 	fflush(stdout);
782 
783 	/* Create worker threads for each core that was specified. */
784 	SPDK_ENV_FOREACH_CORE(i) {
785 		for (j = 0; j < g_threads_per_core; j++) {
786 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
787 			spdk_cpuset_zero(&tmp_cpumask);
788 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
789 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
790 			display = calloc(1, sizeof(*display));
791 			if (display == NULL) {
792 				fprintf(stderr, "Unable to allocate memory\n");
793 				spdk_app_stop(-1);
794 				return;
795 			}
796 			display->core = i;
797 			display->thread = j;
798 			spdk_thread_send_msg(thread, _init_thread, display);
799 		}
800 	}
801 }
802 
803 int
804 main(int argc, char **argv)
805 {
806 	struct spdk_app_opts opts = {};
807 	struct worker_thread *worker, *tmp;
808 
809 	pthread_mutex_init(&g_workers_lock, NULL);
810 	spdk_app_opts_init(&opts, sizeof(opts));
811 	opts.reactor_mask = "0x1";
812 	if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
813 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
814 		g_rc = -1;
815 		goto cleanup;
816 	}
817 
818 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
819 	    (g_workload_selection != ACCEL_OPC_FILL) &&
820 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
821 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
822 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
823 	    (g_workload_selection != ACCEL_OPC_DUALCAST) &&
824 	    (g_workload_selection != ACCEL_OPC_COMPRESS)) {
825 		usage();
826 		g_rc = -1;
827 		goto cleanup;
828 	}
829 
830 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
831 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
832 		usage();
833 		g_rc = -1;
834 		goto cleanup;
835 	}
836 
837 	if (g_allocate_depth == 0) {
838 		g_allocate_depth = g_queue_depth;
839 	}
840 
841 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
842 	    g_crc32c_chained_count == 0) {
843 		usage();
844 		g_rc = -1;
845 		goto cleanup;
846 	}
847 
848 	dump_user_config(&opts);
849 	g_rc = spdk_app_start(&opts, accel_perf_start, NULL);
850 	if (g_rc) {
851 		SPDK_ERRLOG("ERROR starting application\n");
852 	}
853 
854 	pthread_mutex_destroy(&g_workers_lock);
855 
856 	worker = g_workers;
857 	while (worker) {
858 		tmp = worker->next;
859 		free(worker);
860 		worker = tmp;
861 	}
862 cleanup:
863 	spdk_app_fini();
864 	return g_rc;
865 }
866