xref: /spdk/examples/accel/perf/accel_perf.c (revision c85df53551dd911ff9dbccfe5d24bf82f0a3d9bf)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 
16 #define DATA_PATTERN 0x5a
17 #define ALIGN_4K 0x1000
18 
19 static uint64_t	g_tsc_rate;
20 static uint64_t g_tsc_end;
21 static int g_rc;
22 static int g_xfer_size_bytes = 4096;
23 static int g_queue_depth = 32;
24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
25  * be at least as much as the queue depth.
26  */
27 static int g_allocate_depth = 0;
28 static int g_threads_per_core = 1;
29 static int g_time_in_sec = 5;
30 static uint32_t g_crc32c_seed = 0;
31 static uint32_t g_chained_count = 1;
32 static int g_fail_percent_goal = 0;
33 static uint8_t g_fill_pattern = 255;
34 static bool g_verify = false;
35 static const char *g_workload_type = NULL;
36 static enum accel_opcode g_workload_selection;
37 static struct worker_thread *g_workers = NULL;
38 static int g_num_workers = 0;
39 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
40 static struct spdk_app_opts g_opts = {};
41 
42 struct worker_thread;
43 static void accel_done(void *ref, int status);
44 
45 struct display_info {
46 	int core;
47 	int thread;
48 };
49 
50 struct ap_task {
51 	void			*src;
52 	struct iovec		*src_iovs;
53 	uint32_t		src_iovcnt;
54 	struct iovec		*dst_iovs;
55 	uint32_t		dst_iovcnt;
56 	void			*dst;
57 	void			*dst2;
58 	uint32_t		crc_dst;
59 	struct worker_thread	*worker;
60 	int			expected_status; /* used for the compare operation */
61 	TAILQ_ENTRY(ap_task)	link;
62 };
63 
64 struct worker_thread {
65 	struct spdk_io_channel		*ch;
66 	uint64_t			xfer_completed;
67 	uint64_t			xfer_failed;
68 	uint64_t			injected_miscompares;
69 	uint64_t			current_queue_depth;
70 	TAILQ_HEAD(, ap_task)		tasks_pool;
71 	struct worker_thread		*next;
72 	unsigned			core;
73 	struct spdk_thread		*thread;
74 	bool				is_draining;
75 	struct spdk_poller		*is_draining_poller;
76 	struct spdk_poller		*stop_poller;
77 	void				*task_base;
78 	struct display_info		display;
79 	enum accel_opcode		workload;
80 };
81 
82 static void
83 dump_user_config(void)
84 {
85 	const char *module_name = NULL;
86 	int rc;
87 
88 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
89 	if (rc) {
90 		printf("error getting module name (%d)\n", rc);
91 	}
92 
93 	printf("\nSPDK Configuration:\n");
94 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
95 	printf("Accel Perf Configuration:\n");
96 	printf("Workload Type:  %s\n", g_workload_type);
97 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
98 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
99 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
100 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
101 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
102 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
103 	}
104 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
105 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
106 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
107 	} else {
108 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
109 	}
110 	printf("vector count    %u\n", g_chained_count);
111 	printf("Module:         %s\n", module_name);
112 	printf("Queue depth:    %u\n", g_queue_depth);
113 	printf("Allocate depth: %u\n", g_allocate_depth);
114 	printf("# threads/core: %u\n", g_threads_per_core);
115 	printf("Run time:       %u seconds\n", g_time_in_sec);
116 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
117 }
118 
119 static void
120 usage(void)
121 {
122 	printf("accel_perf options:\n");
123 	printf("\t[-h help message]\n");
124 	printf("\t[-q queue depth per core]\n");
125 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
126 	printf("\t[-T number of threads per core\n");
127 	printf("\t[-n number of channels]\n");
128 	printf("\t[-o transfer size in bytes]\n");
129 	printf("\t[-t time in seconds]\n");
130 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n");
131 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
132 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
133 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
134 	printf("\t[-y verify result if this switch is on]\n");
135 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
136 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
137 }
138 
139 static int
140 parse_args(int argc, char *argv)
141 {
142 	int argval = 0;
143 
144 	switch (argc) {
145 	case 'a':
146 	case 'C':
147 	case 'f':
148 	case 'T':
149 	case 'o':
150 	case 'P':
151 	case 'q':
152 	case 's':
153 	case 't':
154 		argval = spdk_strtol(optarg, 10);
155 		if (argval < 0) {
156 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
157 			usage();
158 			return 1;
159 		}
160 		break;
161 	default:
162 		break;
163 	};
164 
165 	switch (argc) {
166 	case 'a':
167 		g_allocate_depth = argval;
168 		break;
169 	case 'C':
170 		g_chained_count = argval;
171 		break;
172 	case 'f':
173 		g_fill_pattern = (uint8_t)argval;
174 		break;
175 	case 'T':
176 		g_threads_per_core = argval;
177 		break;
178 	case 'o':
179 		g_xfer_size_bytes = argval;
180 		break;
181 	case 'P':
182 		g_fail_percent_goal = argval;
183 		break;
184 	case 'q':
185 		g_queue_depth = argval;
186 		break;
187 	case 's':
188 		g_crc32c_seed = argval;
189 		break;
190 	case 't':
191 		g_time_in_sec = argval;
192 		break;
193 	case 'y':
194 		g_verify = true;
195 		break;
196 	case 'w':
197 		g_workload_type = optarg;
198 		if (!strcmp(g_workload_type, "copy")) {
199 			g_workload_selection = ACCEL_OPC_COPY;
200 		} else if (!strcmp(g_workload_type, "fill")) {
201 			g_workload_selection = ACCEL_OPC_FILL;
202 		} else if (!strcmp(g_workload_type, "crc32c")) {
203 			g_workload_selection = ACCEL_OPC_CRC32C;
204 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
205 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
206 		} else if (!strcmp(g_workload_type, "compare")) {
207 			g_workload_selection = ACCEL_OPC_COMPARE;
208 		} else if (!strcmp(g_workload_type, "dualcast")) {
209 			g_workload_selection = ACCEL_OPC_DUALCAST;
210 		} else {
211 			usage();
212 			return 1;
213 		}
214 		break;
215 	default:
216 		usage();
217 		return 1;
218 	}
219 
220 	return 0;
221 }
222 
223 static int dump_result(void);
224 static void
225 unregister_worker(void *arg1)
226 {
227 	struct worker_thread *worker = arg1;
228 
229 	free(worker->task_base);
230 	spdk_put_io_channel(worker->ch);
231 	spdk_thread_exit(spdk_get_thread());
232 	pthread_mutex_lock(&g_workers_lock);
233 	assert(g_num_workers >= 1);
234 	if (--g_num_workers == 0) {
235 		pthread_mutex_unlock(&g_workers_lock);
236 		g_rc = dump_result();
237 		spdk_app_stop(0);
238 	}
239 	pthread_mutex_unlock(&g_workers_lock);
240 }
241 
242 static int
243 _get_task_data_bufs(struct ap_task *task)
244 {
245 	uint32_t align = 0;
246 	uint32_t i = 0;
247 	int dst_buff_len = g_xfer_size_bytes;
248 
249 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
250 	 * we do this for all modules to keep it simple.
251 	 */
252 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
253 		align = ALIGN_4K;
254 	}
255 
256 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
257 		assert(g_chained_count > 0);
258 		task->src_iovcnt = g_chained_count;
259 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
260 		if (!task->src_iovs) {
261 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
262 			return -ENOMEM;
263 		}
264 
265 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
266 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
267 		}
268 
269 		for (i = 0; i < task->src_iovcnt; i++) {
270 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
271 			if (task->src_iovs[i].iov_base == NULL) {
272 				return -ENOMEM;
273 			}
274 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
275 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
276 		}
277 
278 	} else {
279 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
280 		if (task->src == NULL) {
281 			fprintf(stderr, "Unable to alloc src buffer\n");
282 			return -ENOMEM;
283 		}
284 
285 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
286 		if (g_workload_selection == ACCEL_OPC_FILL) {
287 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
288 		} else {
289 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
290 		}
291 	}
292 
293 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
294 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
295 		if (task->dst == NULL) {
296 			fprintf(stderr, "Unable to alloc dst buffer\n");
297 			return -ENOMEM;
298 		}
299 
300 		/* For compare we want the buffers to match, otherwise not. */
301 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
302 			memset(task->dst, DATA_PATTERN, dst_buff_len);
303 		} else {
304 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
305 		}
306 	}
307 
308 	/* For dualcast 2 buffers are needed for the operation.  */
309 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
310 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
311 		if (task->dst2 == NULL) {
312 			fprintf(stderr, "Unable to alloc dst buffer\n");
313 			return -ENOMEM;
314 		}
315 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
316 	}
317 
318 	return 0;
319 }
320 
321 inline static struct ap_task *
322 _get_task(struct worker_thread *worker)
323 {
324 	struct ap_task *task;
325 
326 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
327 		task = TAILQ_FIRST(&worker->tasks_pool);
328 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
329 	} else {
330 		fprintf(stderr, "Unable to get ap_task\n");
331 		return NULL;
332 	}
333 
334 	return task;
335 }
336 
337 /* Submit one operation using the same ap task that just completed. */
338 static void
339 _submit_single(struct worker_thread *worker, struct ap_task *task)
340 {
341 	int random_num;
342 	int rc = 0;
343 	int flags = 0;
344 
345 	assert(worker);
346 
347 	switch (worker->workload) {
348 	case ACCEL_OPC_COPY:
349 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
350 					    g_xfer_size_bytes, flags, accel_done, task);
351 		break;
352 	case ACCEL_OPC_FILL:
353 		/* For fill use the first byte of the task->dst buffer */
354 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
355 					    g_xfer_size_bytes, flags, accel_done, task);
356 		break;
357 	case ACCEL_OPC_CRC32C:
358 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
359 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
360 					       accel_done, task);
361 		break;
362 	case ACCEL_OPC_COPY_CRC32C:
363 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
364 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
365 		break;
366 	case ACCEL_OPC_COMPARE:
367 		random_num = rand() % 100;
368 		if (random_num < g_fail_percent_goal) {
369 			task->expected_status = -EILSEQ;
370 			*(uint8_t *)task->dst = ~DATA_PATTERN;
371 		} else {
372 			task->expected_status = 0;
373 			*(uint8_t *)task->dst = DATA_PATTERN;
374 		}
375 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
376 					       g_xfer_size_bytes, accel_done, task);
377 		break;
378 	case ACCEL_OPC_DUALCAST:
379 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
380 						task->src, g_xfer_size_bytes, flags, accel_done, task);
381 		break;
382 	default:
383 		assert(false);
384 		break;
385 
386 	}
387 
388 	worker->current_queue_depth++;
389 	if (rc) {
390 		accel_done(task, rc);
391 	}
392 }
393 
394 static void
395 _free_task_buffers(struct ap_task *task)
396 {
397 	uint32_t i;
398 
399 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
400 		if (task->src_iovs) {
401 			for (i = 0; i < task->src_iovcnt; i++) {
402 				if (task->src_iovs[i].iov_base) {
403 					spdk_dma_free(task->src_iovs[i].iov_base);
404 				}
405 			}
406 			free(task->src_iovs);
407 		}
408 	} else {
409 		spdk_dma_free(task->src);
410 	}
411 
412 	spdk_dma_free(task->dst);
413 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
414 		spdk_dma_free(task->dst2);
415 	}
416 }
417 
418 static int
419 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
420 {
421 	uint32_t i;
422 	uint32_t ttl_len = 0;
423 	uint8_t *dst = (uint8_t *)_dst;
424 
425 	for (i = 0; i < iovcnt; i++) {
426 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
427 			return -1;
428 		}
429 		dst += src_src_iovs[i].iov_len;
430 		ttl_len += src_src_iovs[i].iov_len;
431 	}
432 
433 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
434 		return -1;
435 	}
436 
437 	return 0;
438 }
439 
440 static int _worker_stop(void *arg);
441 
442 static void
443 accel_done(void *arg1, int status)
444 {
445 	struct ap_task *task = arg1;
446 	struct worker_thread *worker = task->worker;
447 	uint32_t sw_crc32c;
448 
449 	assert(worker);
450 	assert(worker->current_queue_depth > 0);
451 
452 	if (g_verify && status == 0) {
453 		switch (worker->workload) {
454 		case ACCEL_OPC_COPY_CRC32C:
455 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
456 			if (task->crc_dst != sw_crc32c) {
457 				SPDK_NOTICELOG("CRC-32C miscompare\n");
458 				worker->xfer_failed++;
459 			}
460 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
461 				SPDK_NOTICELOG("Data miscompare\n");
462 				worker->xfer_failed++;
463 			}
464 			break;
465 		case ACCEL_OPC_CRC32C:
466 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
467 			if (task->crc_dst != sw_crc32c) {
468 				SPDK_NOTICELOG("CRC-32C miscompare\n");
469 				worker->xfer_failed++;
470 			}
471 			break;
472 		case ACCEL_OPC_COPY:
473 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
474 				SPDK_NOTICELOG("Data miscompare\n");
475 				worker->xfer_failed++;
476 			}
477 			break;
478 		case ACCEL_OPC_DUALCAST:
479 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
480 				SPDK_NOTICELOG("Data miscompare, first destination\n");
481 				worker->xfer_failed++;
482 			}
483 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
484 				SPDK_NOTICELOG("Data miscompare, second destination\n");
485 				worker->xfer_failed++;
486 			}
487 			break;
488 		case ACCEL_OPC_FILL:
489 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
490 				SPDK_NOTICELOG("Data miscompare\n");
491 				worker->xfer_failed++;
492 			}
493 			break;
494 		case ACCEL_OPC_COMPARE:
495 			break;
496 		default:
497 			assert(false);
498 			break;
499 		}
500 	}
501 
502 	if (task->expected_status == -EILSEQ) {
503 		assert(status != 0);
504 		worker->injected_miscompares++;
505 		status = 0;
506 	} else if (status) {
507 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
508 		worker->xfer_failed++;
509 	}
510 
511 	worker->xfer_completed++;
512 	worker->current_queue_depth--;
513 
514 	if (!worker->is_draining && status == 0) {
515 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
516 		task = _get_task(worker);
517 		_submit_single(worker, task);
518 	} else {
519 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
520 	}
521 }
522 
523 static int
524 dump_result(void)
525 {
526 	uint64_t total_completed = 0;
527 	uint64_t total_failed = 0;
528 	uint64_t total_miscompared = 0;
529 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
530 	struct worker_thread *worker = g_workers;
531 
532 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
533 	printf("------------------------------------------------------------------------\n");
534 	while (worker != NULL) {
535 
536 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
537 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
538 				       (g_time_in_sec * 1024 * 1024);
539 
540 		total_completed += worker->xfer_completed;
541 		total_failed += worker->xfer_failed;
542 		total_miscompared += worker->injected_miscompares;
543 
544 		if (xfer_per_sec) {
545 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
546 			       worker->display.core, worker->display.thread, xfer_per_sec,
547 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
548 		}
549 
550 		worker = worker->next;
551 	}
552 
553 	total_xfer_per_sec = total_completed / g_time_in_sec;
554 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
555 			    (g_time_in_sec * 1024 * 1024);
556 
557 	printf("=========================================================================\n");
558 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
559 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
560 
561 	return total_failed ? 1 : 0;
562 }
563 
564 static inline void
565 _free_task_buffers_in_pool(struct worker_thread *worker)
566 {
567 	struct ap_task *task;
568 
569 	assert(worker);
570 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
571 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
572 		_free_task_buffers(task);
573 	}
574 }
575 
576 static int
577 _check_draining(void *arg)
578 {
579 	struct worker_thread *worker = arg;
580 
581 	assert(worker);
582 
583 	if (worker->current_queue_depth == 0) {
584 		_free_task_buffers_in_pool(worker);
585 		spdk_poller_unregister(&worker->is_draining_poller);
586 		unregister_worker(worker);
587 	}
588 
589 	return SPDK_POLLER_BUSY;
590 }
591 
592 static int
593 _worker_stop(void *arg)
594 {
595 	struct worker_thread *worker = arg;
596 
597 	assert(worker);
598 
599 	spdk_poller_unregister(&worker->stop_poller);
600 
601 	/* now let the worker drain and check it's outstanding IO with a poller */
602 	worker->is_draining = true;
603 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
604 
605 	return SPDK_POLLER_BUSY;
606 }
607 
608 static void
609 _init_thread(void *arg1)
610 {
611 	struct worker_thread *worker;
612 	struct ap_task *task;
613 	int i, num_tasks = g_allocate_depth;
614 	struct display_info *display = arg1;
615 
616 	worker = calloc(1, sizeof(*worker));
617 	if (worker == NULL) {
618 		fprintf(stderr, "Unable to allocate worker\n");
619 		free(display);
620 		return;
621 	}
622 
623 	worker->workload = g_workload_selection;
624 	worker->display.core = display->core;
625 	worker->display.thread = display->thread;
626 	free(display);
627 	worker->core = spdk_env_get_current_core();
628 	worker->thread = spdk_get_thread();
629 	pthread_mutex_lock(&g_workers_lock);
630 	g_num_workers++;
631 	worker->next = g_workers;
632 	g_workers = worker;
633 	pthread_mutex_unlock(&g_workers_lock);
634 	worker->ch = spdk_accel_get_io_channel();
635 	if (worker->ch == NULL) {
636 		fprintf(stderr, "Unable to get an accel channel\n");
637 		goto error;
638 	}
639 
640 	TAILQ_INIT(&worker->tasks_pool);
641 
642 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
643 	if (worker->task_base == NULL) {
644 		fprintf(stderr, "Could not allocate task base.\n");
645 		goto error;
646 	}
647 
648 	task = worker->task_base;
649 	for (i = 0; i < num_tasks; i++) {
650 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
651 		task->worker = worker;
652 		if (_get_task_data_bufs(task)) {
653 			fprintf(stderr, "Unable to get data bufs\n");
654 			goto error;
655 		}
656 		task++;
657 	}
658 
659 	/* Register a poller that will stop the worker at time elapsed */
660 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
661 			      g_time_in_sec * 1000000ULL);
662 
663 	/* Load up queue depth worth of operations. */
664 	for (i = 0; i < g_queue_depth; i++) {
665 		task = _get_task(worker);
666 		if (task == NULL) {
667 			goto error;
668 		}
669 
670 		_submit_single(worker, task);
671 	}
672 	return;
673 error:
674 
675 	_free_task_buffers_in_pool(worker);
676 	free(worker->task_base);
677 	spdk_app_stop(-1);
678 }
679 
680 static void
681 accel_perf_start(void *arg1)
682 {
683 	struct spdk_cpuset tmp_cpumask = {};
684 	char thread_name[32];
685 	uint32_t i;
686 	int j;
687 	struct spdk_thread *thread;
688 	struct display_info *display;
689 
690 	g_tsc_rate = spdk_get_ticks_hz();
691 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
692 
693 	dump_user_config();
694 
695 	printf("Running for %d seconds...\n", g_time_in_sec);
696 	fflush(stdout);
697 
698 	/* Create worker threads for each core that was specified. */
699 	SPDK_ENV_FOREACH_CORE(i) {
700 		for (j = 0; j < g_threads_per_core; j++) {
701 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
702 			spdk_cpuset_zero(&tmp_cpumask);
703 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
704 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
705 			display = calloc(1, sizeof(*display));
706 			if (display == NULL) {
707 				fprintf(stderr, "Unable to allocate memory\n");
708 				spdk_app_stop(-1);
709 				return;
710 			}
711 			display->core = i;
712 			display->thread = j;
713 			spdk_thread_send_msg(thread, _init_thread, display);
714 		}
715 	}
716 }
717 
718 int
719 main(int argc, char **argv)
720 {
721 	struct worker_thread *worker, *tmp;
722 
723 	pthread_mutex_init(&g_workers_lock, NULL);
724 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
725 	g_opts.name = "accel_perf";
726 	g_opts.reactor_mask = "0x1";
727 	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
728 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
729 		g_rc = -1;
730 		goto cleanup;
731 	}
732 
733 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
734 	    (g_workload_selection != ACCEL_OPC_FILL) &&
735 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
736 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
737 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
738 	    (g_workload_selection != ACCEL_OPC_DUALCAST)) {
739 		usage();
740 		g_rc = -1;
741 		goto cleanup;
742 	}
743 
744 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
745 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
746 		usage();
747 		g_rc = -1;
748 		goto cleanup;
749 	}
750 
751 	if (g_allocate_depth == 0) {
752 		g_allocate_depth = g_queue_depth;
753 	}
754 
755 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
756 	    g_chained_count == 0) {
757 		usage();
758 		g_rc = -1;
759 		goto cleanup;
760 	}
761 
762 	g_rc = spdk_app_start(&g_opts, accel_perf_start, NULL);
763 	if (g_rc) {
764 		SPDK_ERRLOG("ERROR starting application\n");
765 	}
766 
767 	pthread_mutex_destroy(&g_workers_lock);
768 
769 	worker = g_workers;
770 	while (worker) {
771 		tmp = worker->next;
772 		free(worker);
773 		worker = tmp;
774 	}
775 cleanup:
776 	spdk_app_fini();
777 	return g_rc;
778 }
779