xref: /spdk/examples/accel/perf/accel_perf.c (revision 0098e636761237b77c12c30c2408263a5d2260cc)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel_engine.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 
16 #define DATA_PATTERN 0x5a
17 #define ALIGN_4K 0x1000
18 
19 static uint64_t	g_tsc_rate;
20 static uint64_t g_tsc_end;
21 static int g_rc;
22 static int g_xfer_size_bytes = 4096;
23 static int g_queue_depth = 32;
24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
25  * be at least as much as the queue depth.
26  */
27 static int g_allocate_depth = 0;
28 static int g_threads_per_core = 1;
29 static int g_time_in_sec = 5;
30 static uint32_t g_crc32c_seed = 0;
31 static uint32_t g_crc32c_chained_count = 1;
32 static int g_fail_percent_goal = 0;
33 static uint8_t g_fill_pattern = 255;
34 static bool g_verify = false;
35 static const char *g_workload_type = NULL;
36 static enum accel_opcode g_workload_selection;
37 static struct worker_thread *g_workers = NULL;
38 static int g_num_workers = 0;
39 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
40 
41 struct worker_thread;
42 static void accel_done(void *ref, int status);
43 
44 struct display_info {
45 	int core;
46 	int thread;
47 };
48 
49 struct ap_task {
50 	void			*src;
51 	struct iovec		*iovs;
52 	uint32_t		iov_cnt;
53 	void			*dst;
54 	void			*dst2;
55 	union {
56 		uint32_t	crc_dst;
57 		uint32_t	output_size;
58 	};
59 	struct worker_thread	*worker;
60 	int			expected_status; /* used for the compare operation */
61 	TAILQ_ENTRY(ap_task)	link;
62 };
63 
64 struct worker_thread {
65 	struct spdk_io_channel		*ch;
66 	uint64_t			xfer_completed;
67 	uint64_t			xfer_failed;
68 	uint64_t			injected_miscompares;
69 	uint64_t			current_queue_depth;
70 	TAILQ_HEAD(, ap_task)		tasks_pool;
71 	struct worker_thread		*next;
72 	unsigned			core;
73 	struct spdk_thread		*thread;
74 	bool				is_draining;
75 	struct spdk_poller		*is_draining_poller;
76 	struct spdk_poller		*stop_poller;
77 	void				*task_base;
78 	struct display_info		display;
79 	enum accel_opcode		workload;
80 	void				*rnd_data;
81 };
82 
83 static void
84 dump_user_config(struct spdk_app_opts *opts)
85 {
86 	printf("SPDK Configuration:\n");
87 	printf("Core mask:      %s\n\n", opts->reactor_mask);
88 	printf("Accel Perf Configuration:\n");
89 	printf("Workload Type:  %s\n", g_workload_type);
90 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
91 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
92 		printf("vector count    %u\n", g_crc32c_chained_count);
93 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
94 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
95 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
96 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
97 	}
98 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
99 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
100 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count);
101 	} else {
102 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
103 	}
104 	printf("Queue depth:    %u\n", g_queue_depth);
105 	printf("Allocate depth: %u\n", g_allocate_depth);
106 	printf("# threads/core: %u\n", g_threads_per_core);
107 	printf("Run time:       %u seconds\n", g_time_in_sec);
108 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
109 }
110 
111 static void
112 usage(void)
113 {
114 	printf("accel_perf options:\n");
115 	printf("\t[-h help message]\n");
116 	printf("\t[-q queue depth per core]\n");
117 	printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n");
118 	printf("\t[-T number of threads per core\n");
119 	printf("\t[-n number of channels]\n");
120 	printf("\t[-o transfer size in bytes]\n");
121 	printf("\t[-t time in seconds]\n");
122 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, dualcast\n");
123 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
124 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
125 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
126 	printf("\t[-y verify result if this switch is on]\n");
127 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
128 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
129 }
130 
131 static int
132 parse_args(int argc, char *argv)
133 {
134 	int argval = 0;
135 
136 	switch (argc) {
137 	case 'a':
138 	case 'C':
139 	case 'f':
140 	case 'T':
141 	case 'o':
142 	case 'P':
143 	case 'q':
144 	case 's':
145 	case 't':
146 		argval = spdk_strtol(optarg, 10);
147 		if (argval < 0) {
148 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
149 			usage();
150 			return 1;
151 		}
152 		break;
153 	default:
154 		break;
155 	};
156 
157 	switch (argc) {
158 	case 'a':
159 		g_allocate_depth = argval;
160 		break;
161 	case 'C':
162 		g_crc32c_chained_count = argval;
163 		break;
164 	case 'f':
165 		g_fill_pattern = (uint8_t)argval;
166 		break;
167 	case 'T':
168 		g_threads_per_core = argval;
169 		break;
170 	case 'o':
171 		g_xfer_size_bytes = argval;
172 		break;
173 	case 'P':
174 		g_fail_percent_goal = argval;
175 		break;
176 	case 'q':
177 		g_queue_depth = argval;
178 		break;
179 	case 's':
180 		g_crc32c_seed = argval;
181 		break;
182 	case 't':
183 		g_time_in_sec = argval;
184 		break;
185 	case 'y':
186 		g_verify = true;
187 		break;
188 	case 'w':
189 		g_workload_type = optarg;
190 		if (!strcmp(g_workload_type, "copy")) {
191 			g_workload_selection = ACCEL_OPC_COPY;
192 		} else if (!strcmp(g_workload_type, "fill")) {
193 			g_workload_selection = ACCEL_OPC_FILL;
194 		} else if (!strcmp(g_workload_type, "crc32c")) {
195 			g_workload_selection = ACCEL_OPC_CRC32C;
196 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
197 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
198 		} else if (!strcmp(g_workload_type, "compare")) {
199 			g_workload_selection = ACCEL_OPC_COMPARE;
200 		} else if (!strcmp(g_workload_type, "dualcast")) {
201 			g_workload_selection = ACCEL_OPC_DUALCAST;
202 		} else if (!strcmp(g_workload_type, "compress")) {
203 			g_workload_selection = ACCEL_OPC_COMPRESS;
204 		}
205 		break;
206 	default:
207 		usage();
208 		return 1;
209 	}
210 
211 	return 0;
212 }
213 
214 static int dump_result(void);
215 static void
216 unregister_worker(void *arg1)
217 {
218 	struct worker_thread *worker = arg1;
219 
220 	free(worker->task_base);
221 	free(worker->rnd_data);
222 	spdk_put_io_channel(worker->ch);
223 	pthread_mutex_lock(&g_workers_lock);
224 	assert(g_num_workers >= 1);
225 	if (--g_num_workers == 0) {
226 		pthread_mutex_unlock(&g_workers_lock);
227 		g_rc = dump_result();
228 		spdk_app_stop(0);
229 	}
230 	pthread_mutex_unlock(&g_workers_lock);
231 }
232 
233 static int
234 _get_task_data_bufs(struct ap_task *task)
235 {
236 	uint32_t align = 0;
237 	uint32_t i = 0;
238 	int dst_buff_len = g_xfer_size_bytes;
239 
240 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
241 	 * we do this for all engines to keep it simple.
242 	 */
243 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
244 		align = ALIGN_4K;
245 	}
246 
247 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
248 		assert(g_crc32c_chained_count > 0);
249 		task->iov_cnt = g_crc32c_chained_count;
250 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
251 		if (!task->iovs) {
252 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
253 			return -ENOMEM;
254 		}
255 
256 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
257 			dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count;
258 		}
259 
260 		for (i = 0; i < task->iov_cnt; i++) {
261 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
262 			if (task->iovs[i].iov_base == NULL) {
263 				return -ENOMEM;
264 			}
265 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
266 			task->iovs[i].iov_len = g_xfer_size_bytes;
267 		}
268 
269 	} else {
270 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
271 		if (task->src == NULL) {
272 			fprintf(stderr, "Unable to alloc src buffer\n");
273 			return -ENOMEM;
274 		}
275 
276 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
277 		if (g_workload_selection == ACCEL_OPC_FILL) {
278 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
279 		} else if (g_workload_selection == ACCEL_OPC_COMPRESS) {
280 			memcpy(task->src, task->worker->rnd_data, g_xfer_size_bytes);
281 		} else {
282 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
283 		}
284 	}
285 
286 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
287 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
288 		if (task->dst == NULL) {
289 			fprintf(stderr, "Unable to alloc dst buffer\n");
290 			return -ENOMEM;
291 		}
292 
293 		/* For compare we want the buffers to match, otherwise not. */
294 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
295 			memset(task->dst, DATA_PATTERN, dst_buff_len);
296 		} else {
297 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
298 		}
299 	}
300 
301 	/* For dualcast 2 buffers are needed for the operation.  For compress we use the second buffer to
302 	 * store the original pre-compressed data so we have a copy of it when we go to decompress.
303 	 */
304 	if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) {
305 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
306 		if (task->dst2 == NULL) {
307 			fprintf(stderr, "Unable to alloc dst buffer\n");
308 			return -ENOMEM;
309 		}
310 		if (g_workload_selection == ACCEL_OPC_DUALCAST) {
311 			memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
312 		} else if (g_workload_selection == ACCEL_OPC_COMPRESS) {
313 			/* copy the oriignal data to dst2 so we can compare it to
314 			 * the results of decompression if -y is used.
315 			 */
316 			assert(task->src); /* for scan-build */
317 			memcpy(task->dst2, task->src, g_xfer_size_bytes);
318 		}
319 	}
320 
321 	return 0;
322 }
323 
324 inline static struct ap_task *
325 _get_task(struct worker_thread *worker)
326 {
327 	struct ap_task *task;
328 
329 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
330 		task = TAILQ_FIRST(&worker->tasks_pool);
331 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
332 	} else {
333 		fprintf(stderr, "Unable to get ap_task\n");
334 		return NULL;
335 	}
336 
337 	return task;
338 }
339 
340 /* Submit one operation using the same ap task that just completed. */
341 static void
342 _submit_single(struct worker_thread *worker, struct ap_task *task)
343 {
344 	int random_num;
345 	int rc = 0;
346 	int flags = 0;
347 
348 	assert(worker);
349 
350 	switch (worker->workload) {
351 	case ACCEL_OPC_COPY:
352 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
353 					    g_xfer_size_bytes, flags, accel_done, task);
354 		break;
355 	case ACCEL_OPC_FILL:
356 		/* For fill use the first byte of the task->dst buffer */
357 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
358 					    g_xfer_size_bytes, flags, accel_done, task);
359 		break;
360 	case ACCEL_OPC_CRC32C:
361 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
362 					       task->iovs, task->iov_cnt, g_crc32c_seed,
363 					       accel_done, task);
364 		break;
365 	case ACCEL_OPC_COPY_CRC32C:
366 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt,
367 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
368 		break;
369 	case ACCEL_OPC_COMPARE:
370 		random_num = rand() % 100;
371 		if (random_num < g_fail_percent_goal) {
372 			task->expected_status = -EILSEQ;
373 			*(uint8_t *)task->dst = ~DATA_PATTERN;
374 		} else {
375 			task->expected_status = 0;
376 			*(uint8_t *)task->dst = DATA_PATTERN;
377 		}
378 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
379 					       g_xfer_size_bytes, accel_done, task);
380 		break;
381 	case ACCEL_OPC_DUALCAST:
382 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
383 						task->src, g_xfer_size_bytes, flags, accel_done, task);
384 		break;
385 	case ACCEL_OPC_COMPRESS:
386 		rc = spdk_accel_submit_compress(worker->ch, task->dst, task->src,
387 						g_xfer_size_bytes, g_xfer_size_bytes, &task->output_size,
388 						flags, accel_done, task);
389 		break;
390 	default:
391 		assert(false);
392 		break;
393 
394 	}
395 
396 	if (rc) {
397 		accel_done(task, rc);
398 	}
399 }
400 
401 static void
402 _free_task_buffers(struct ap_task *task)
403 {
404 	uint32_t i;
405 
406 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
407 		if (task->iovs) {
408 			for (i = 0; i < task->iov_cnt; i++) {
409 				if (task->iovs[i].iov_base) {
410 					spdk_dma_free(task->iovs[i].iov_base);
411 				}
412 			}
413 			free(task->iovs);
414 		}
415 	} else {
416 		spdk_dma_free(task->src);
417 	}
418 
419 	spdk_dma_free(task->dst);
420 	if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) {
421 		spdk_dma_free(task->dst2);
422 	}
423 }
424 
425 static int
426 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt)
427 {
428 	uint32_t i;
429 	uint32_t ttl_len = 0;
430 	uint8_t *dst = (uint8_t *)_dst;
431 
432 	for (i = 0; i < iovcnt; i++) {
433 		if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) {
434 			return -1;
435 		}
436 		dst += src_iovs[i].iov_len;
437 		ttl_len += src_iovs[i].iov_len;
438 	}
439 
440 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
441 		return -1;
442 	}
443 
444 	return 0;
445 }
446 
447 static int _worker_stop(void *arg);
448 
449 static void
450 accel_done(void *arg1, int status)
451 {
452 	struct ap_task *task = arg1;
453 	struct worker_thread *worker = task->worker;
454 	uint32_t sw_crc32c;
455 	int rc;
456 
457 	assert(worker);
458 	assert(worker->current_queue_depth > 0);
459 
460 	if (!worker->is_draining && status == -EINVAL && worker->workload == ACCEL_OPC_COMPRESS) {
461 		printf("Invalid configuration, compress workload needs ISA-L or IAA. Exiting\n");
462 		_worker_stop(worker);
463 	}
464 
465 	if (g_verify && status == 0) {
466 		switch (worker->workload) {
467 		case ACCEL_OPC_COPY_CRC32C:
468 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
469 			if (task->crc_dst != sw_crc32c) {
470 				SPDK_NOTICELOG("CRC-32C miscompare\n");
471 				worker->xfer_failed++;
472 			}
473 			if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) {
474 				SPDK_NOTICELOG("Data miscompare\n");
475 				worker->xfer_failed++;
476 			}
477 			break;
478 		case ACCEL_OPC_CRC32C:
479 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
480 			if (task->crc_dst != sw_crc32c) {
481 				SPDK_NOTICELOG("CRC-32C miscompare\n");
482 				worker->xfer_failed++;
483 			}
484 			break;
485 		case ACCEL_OPC_COPY:
486 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
487 				SPDK_NOTICELOG("Data miscompare\n");
488 				worker->xfer_failed++;
489 			}
490 			break;
491 		case ACCEL_OPC_DUALCAST:
492 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
493 				SPDK_NOTICELOG("Data miscompare, first destination\n");
494 				worker->xfer_failed++;
495 			}
496 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
497 				SPDK_NOTICELOG("Data miscompare, second destination\n");
498 				worker->xfer_failed++;
499 			}
500 			break;
501 		case ACCEL_OPC_FILL:
502 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
503 				SPDK_NOTICELOG("Data miscompare\n");
504 				worker->xfer_failed++;
505 			}
506 			break;
507 		case ACCEL_OPC_COMPARE:
508 			break;
509 		case ACCEL_OPC_COMPRESS:
510 			/* We've completed the compression phase, now need to uncompress the compressed data
511 			 * and compare that to the original buffer to see if it matches.  So we flip flor
512 			 * src and destination then compare task->src to task->dst which is where we saved
513 			 * the orgiinal data.
514 			 */
515 			if (!worker->is_draining) {
516 				worker->workload = ACCEL_OPC_DECOMPRESS;
517 				worker->xfer_completed++;
518 				memset(task->src, 0, g_xfer_size_bytes);
519 				rc = spdk_accel_submit_decompress(worker->ch, task->src, task->dst,
520 								  g_xfer_size_bytes, g_xfer_size_bytes, 0, accel_done, task);
521 				if (rc) {
522 					SPDK_NOTICELOG("Unable to submit decomrpess for verficiation, tc = %d\n", rc);
523 				}
524 				return;
525 			}
526 			break;
527 		case ACCEL_OPC_DECOMPRESS:
528 			worker->workload = ACCEL_OPC_COMPRESS;
529 			if (memcmp(task->dst2, task->src, g_xfer_size_bytes)) {
530 				SPDK_NOTICELOG("Data miscompare after decompression\n");
531 				worker->xfer_failed++;
532 			}
533 			break;
534 		default:
535 			assert(false);
536 			break;
537 		}
538 	}
539 
540 	if (task->expected_status == -EILSEQ) {
541 		assert(status != 0);
542 		worker->injected_miscompares++;
543 		status = 0;
544 	} else if (status) {
545 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
546 		worker->xfer_failed++;
547 	}
548 
549 	worker->xfer_completed++;
550 	worker->current_queue_depth--;
551 
552 	if (!worker->is_draining && status == 0) {
553 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
554 		task = _get_task(worker);
555 		_submit_single(worker, task);
556 		worker->current_queue_depth++;
557 	} else {
558 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
559 	}
560 }
561 
562 static int
563 dump_result(void)
564 {
565 	uint64_t total_completed = 0;
566 	uint64_t total_failed = 0;
567 	uint64_t total_miscompared = 0;
568 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
569 	struct worker_thread *worker = g_workers;
570 
571 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
572 	printf("------------------------------------------------------------------------\n");
573 	while (worker != NULL) {
574 
575 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
576 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
577 				       (g_time_in_sec * 1024 * 1024);
578 
579 		total_completed += worker->xfer_completed;
580 		total_failed += worker->xfer_failed;
581 		total_miscompared += worker->injected_miscompares;
582 
583 		if (xfer_per_sec) {
584 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
585 			       worker->display.core, worker->display.thread, xfer_per_sec,
586 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
587 		}
588 
589 		worker = worker->next;
590 	}
591 
592 	total_xfer_per_sec = total_completed / g_time_in_sec;
593 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
594 			    (g_time_in_sec * 1024 * 1024);
595 
596 	printf("=========================================================================\n");
597 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
598 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
599 
600 	return total_failed ? 1 : 0;
601 }
602 
603 static inline void
604 _free_task_buffers_in_pool(struct worker_thread *worker)
605 {
606 	struct ap_task *task;
607 
608 	assert(worker);
609 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
610 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
611 		_free_task_buffers(task);
612 	}
613 }
614 
615 static int
616 _check_draining(void *arg)
617 {
618 	struct worker_thread *worker = arg;
619 
620 	assert(worker);
621 
622 	if (worker->current_queue_depth == 0) {
623 		_free_task_buffers_in_pool(worker);
624 		spdk_poller_unregister(&worker->is_draining_poller);
625 		unregister_worker(worker);
626 	}
627 
628 	return SPDK_POLLER_BUSY;
629 }
630 
631 static int
632 _worker_stop(void *arg)
633 {
634 	struct worker_thread *worker = arg;
635 
636 	assert(worker);
637 
638 	spdk_poller_unregister(&worker->stop_poller);
639 
640 	/* now let the worker drain and check it's outstanding IO with a poller */
641 	worker->is_draining = true;
642 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
643 
644 	return SPDK_POLLER_BUSY;
645 }
646 
647 static void
648 _init_thread(void *arg1)
649 {
650 	struct worker_thread *worker;
651 	struct ap_task *task;
652 	int i, num_tasks = g_allocate_depth;
653 	struct display_info *display = arg1;
654 	uint8_t *offset;
655 	uint64_t j;
656 
657 	worker = calloc(1, sizeof(*worker));
658 	if (worker == NULL) {
659 		fprintf(stderr, "Unable to allocate worker\n");
660 		free(display);
661 		return;
662 	}
663 
664 	worker->workload = g_workload_selection;
665 	worker->display.core = display->core;
666 	worker->display.thread = display->thread;
667 	free(display);
668 	worker->core = spdk_env_get_current_core();
669 	worker->thread = spdk_get_thread();
670 	pthread_mutex_lock(&g_workers_lock);
671 	g_num_workers++;
672 	worker->next = g_workers;
673 	g_workers = worker;
674 	pthread_mutex_unlock(&g_workers_lock);
675 	worker->ch = spdk_accel_engine_get_io_channel();
676 	if (worker->ch == NULL) {
677 		fprintf(stderr, "Unable to get an accel channel\n");
678 		goto error;
679 	}
680 
681 	TAILQ_INIT(&worker->tasks_pool);
682 
683 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
684 	if (worker->task_base == NULL) {
685 		fprintf(stderr, "Could not allocate task base.\n");
686 		goto error;
687 	}
688 
689 	if (g_workload_selection == ACCEL_OPC_COMPRESS) {
690 		worker->rnd_data = calloc(1, g_xfer_size_bytes);
691 		if (worker->rnd_data == NULL) {
692 			printf("unable to allcoate rnd_data buffer\n");
693 			goto error;
694 		}
695 		/* only fill half the data buffer with rnd data to make it more
696 		 * compressible.
697 		 */
698 		offset = worker->rnd_data;
699 		for (j = 0; j < g_xfer_size_bytes / sizeof(uint8_t) / 2; j++) {
700 			*offset = rand() % 256;
701 			offset++;
702 		}
703 	}
704 
705 	task = worker->task_base;
706 	for (i = 0; i < num_tasks; i++) {
707 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
708 		task->worker = worker;
709 		if (_get_task_data_bufs(task)) {
710 			fprintf(stderr, "Unable to get data bufs\n");
711 			goto error;
712 		}
713 		task++;
714 	}
715 
716 	/* Register a poller that will stop the worker at time elapsed */
717 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
718 			      g_time_in_sec * 1000000ULL);
719 
720 	/* Load up queue depth worth of operations. */
721 	for (i = 0; i < g_queue_depth; i++) {
722 		task = _get_task(worker);
723 		worker->current_queue_depth++;
724 		if (task == NULL) {
725 			goto error;
726 		}
727 
728 		_submit_single(worker, task);
729 	}
730 	return;
731 error:
732 
733 	free(worker->rnd_data);
734 	_free_task_buffers_in_pool(worker);
735 	free(worker->task_base);
736 	spdk_app_stop(-1);
737 }
738 
739 static void
740 accel_perf_start(void *arg1)
741 {
742 	struct spdk_cpuset tmp_cpumask = {};
743 	char thread_name[32];
744 	uint32_t i;
745 	int j;
746 	struct spdk_thread *thread;
747 	struct display_info *display;
748 
749 	g_tsc_rate = spdk_get_ticks_hz();
750 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
751 
752 	printf("Running for %d seconds...\n", g_time_in_sec);
753 	fflush(stdout);
754 
755 	/* Create worker threads for each core that was specified. */
756 	SPDK_ENV_FOREACH_CORE(i) {
757 		for (j = 0; j < g_threads_per_core; j++) {
758 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
759 			spdk_cpuset_zero(&tmp_cpumask);
760 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
761 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
762 			display = calloc(1, sizeof(*display));
763 			if (display == NULL) {
764 				fprintf(stderr, "Unable to allocate memory\n");
765 				spdk_app_stop(-1);
766 				return;
767 			}
768 			display->core = i;
769 			display->thread = j;
770 			spdk_thread_send_msg(thread, _init_thread, display);
771 		}
772 	}
773 }
774 
775 int
776 main(int argc, char **argv)
777 {
778 	struct spdk_app_opts opts = {};
779 	struct worker_thread *worker, *tmp;
780 
781 	pthread_mutex_init(&g_workers_lock, NULL);
782 	spdk_app_opts_init(&opts, sizeof(opts));
783 	opts.reactor_mask = "0x1";
784 	if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
785 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
786 		g_rc = -1;
787 		goto cleanup;
788 	}
789 
790 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
791 	    (g_workload_selection != ACCEL_OPC_FILL) &&
792 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
793 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
794 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
795 	    (g_workload_selection != ACCEL_OPC_DUALCAST) &&
796 	    (g_workload_selection != ACCEL_OPC_COMPRESS)) {
797 		usage();
798 		g_rc = -1;
799 		goto cleanup;
800 	}
801 
802 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
803 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
804 		usage();
805 		g_rc = -1;
806 		goto cleanup;
807 	}
808 
809 	if (g_allocate_depth == 0) {
810 		g_allocate_depth = g_queue_depth;
811 	}
812 
813 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
814 	    g_crc32c_chained_count == 0) {
815 		usage();
816 		g_rc = -1;
817 		goto cleanup;
818 	}
819 
820 	dump_user_config(&opts);
821 	g_rc = spdk_app_start(&opts, accel_perf_start, NULL);
822 	if (g_rc) {
823 		SPDK_ERRLOG("ERROR starting application\n");
824 	}
825 
826 	pthread_mutex_destroy(&g_workers_lock);
827 
828 	worker = g_workers;
829 	while (worker) {
830 		tmp = worker->next;
831 		free(worker);
832 		worker = tmp;
833 	}
834 cleanup:
835 	spdk_app_fini();
836 	return g_rc;
837 }
838