xref: /spdk/examples/accel/perf/accel_perf.c (revision 18c8b52afa69f39481ebb75711b2f30b11693f9d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 
16 #define DATA_PATTERN 0x5a
17 #define ALIGN_4K 0x1000
18 
19 static uint64_t	g_tsc_rate;
20 static uint64_t g_tsc_end;
21 static int g_rc;
22 static int g_xfer_size_bytes = 4096;
23 static int g_queue_depth = 32;
24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
25  * be at least as much as the queue depth.
26  */
27 static int g_allocate_depth = 0;
28 static int g_threads_per_core = 1;
29 static int g_time_in_sec = 5;
30 static uint32_t g_crc32c_seed = 0;
31 static uint32_t g_crc32c_chained_count = 1;
32 static int g_fail_percent_goal = 0;
33 static uint8_t g_fill_pattern = 255;
34 static bool g_verify = false;
35 static const char *g_workload_type = NULL;
36 static enum accel_opcode g_workload_selection;
37 static struct worker_thread *g_workers = NULL;
38 static int g_num_workers = 0;
39 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
40 static struct spdk_app_opts g_opts = {};
41 
42 struct worker_thread;
43 static void accel_done(void *ref, int status);
44 
45 struct display_info {
46 	int core;
47 	int thread;
48 };
49 
50 struct ap_task {
51 	void			*src;
52 	struct iovec		*iovs;
53 	uint32_t		iov_cnt;
54 	void			*dst;
55 	void			*dst2;
56 	uint32_t		crc_dst;
57 	struct worker_thread	*worker;
58 	int			expected_status; /* used for the compare operation */
59 	TAILQ_ENTRY(ap_task)	link;
60 };
61 
62 struct worker_thread {
63 	struct spdk_io_channel		*ch;
64 	uint64_t			xfer_completed;
65 	uint64_t			xfer_failed;
66 	uint64_t			injected_miscompares;
67 	uint64_t			current_queue_depth;
68 	TAILQ_HEAD(, ap_task)		tasks_pool;
69 	struct worker_thread		*next;
70 	unsigned			core;
71 	struct spdk_thread		*thread;
72 	bool				is_draining;
73 	struct spdk_poller		*is_draining_poller;
74 	struct spdk_poller		*stop_poller;
75 	void				*task_base;
76 	struct display_info		display;
77 	enum accel_opcode		workload;
78 };
79 
80 static void
81 dump_user_config(void)
82 {
83 	const char *engine_name = NULL;
84 	int rc;
85 
86 	rc = spdk_accel_get_opc_engine_name(g_workload_selection, &engine_name);
87 	if (rc) {
88 		printf("error getting engine name (%d)\n", rc);
89 	}
90 
91 	printf("\nSPDK Configuration:\n");
92 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
93 	printf("Accel Perf Configuration:\n");
94 	printf("Workload Type:  %s\n", g_workload_type);
95 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
96 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
97 		printf("vector count    %u\n", g_crc32c_chained_count);
98 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
99 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
100 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
101 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
102 	}
103 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
104 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
105 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count);
106 	} else {
107 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
108 	}
109 	printf("Engine:         %s\n", engine_name);
110 	printf("Queue depth:    %u\n", g_queue_depth);
111 	printf("Allocate depth: %u\n", g_allocate_depth);
112 	printf("# threads/core: %u\n", g_threads_per_core);
113 	printf("Run time:       %u seconds\n", g_time_in_sec);
114 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
115 }
116 
117 static void
118 usage(void)
119 {
120 	printf("accel_perf options:\n");
121 	printf("\t[-h help message]\n");
122 	printf("\t[-q queue depth per core]\n");
123 	printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n");
124 	printf("\t[-T number of threads per core\n");
125 	printf("\t[-n number of channels]\n");
126 	printf("\t[-o transfer size in bytes]\n");
127 	printf("\t[-t time in seconds]\n");
128 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n");
129 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
130 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
131 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
132 	printf("\t[-y verify result if this switch is on]\n");
133 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
134 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
135 }
136 
137 static int
138 parse_args(int argc, char *argv)
139 {
140 	int argval = 0;
141 
142 	switch (argc) {
143 	case 'a':
144 	case 'C':
145 	case 'f':
146 	case 'T':
147 	case 'o':
148 	case 'P':
149 	case 'q':
150 	case 's':
151 	case 't':
152 		argval = spdk_strtol(optarg, 10);
153 		if (argval < 0) {
154 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
155 			usage();
156 			return 1;
157 		}
158 		break;
159 	default:
160 		break;
161 	};
162 
163 	switch (argc) {
164 	case 'a':
165 		g_allocate_depth = argval;
166 		break;
167 	case 'C':
168 		g_crc32c_chained_count = argval;
169 		break;
170 	case 'f':
171 		g_fill_pattern = (uint8_t)argval;
172 		break;
173 	case 'T':
174 		g_threads_per_core = argval;
175 		break;
176 	case 'o':
177 		g_xfer_size_bytes = argval;
178 		break;
179 	case 'P':
180 		g_fail_percent_goal = argval;
181 		break;
182 	case 'q':
183 		g_queue_depth = argval;
184 		break;
185 	case 's':
186 		g_crc32c_seed = argval;
187 		break;
188 	case 't':
189 		g_time_in_sec = argval;
190 		break;
191 	case 'y':
192 		g_verify = true;
193 		break;
194 	case 'w':
195 		g_workload_type = optarg;
196 		if (!strcmp(g_workload_type, "copy")) {
197 			g_workload_selection = ACCEL_OPC_COPY;
198 		} else if (!strcmp(g_workload_type, "fill")) {
199 			g_workload_selection = ACCEL_OPC_FILL;
200 		} else if (!strcmp(g_workload_type, "crc32c")) {
201 			g_workload_selection = ACCEL_OPC_CRC32C;
202 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
203 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
204 		} else if (!strcmp(g_workload_type, "compare")) {
205 			g_workload_selection = ACCEL_OPC_COMPARE;
206 		} else if (!strcmp(g_workload_type, "dualcast")) {
207 			g_workload_selection = ACCEL_OPC_DUALCAST;
208 		} else {
209 			usage();
210 			return 1;
211 		}
212 		break;
213 	default:
214 		usage();
215 		return 1;
216 	}
217 
218 	return 0;
219 }
220 
221 static int dump_result(void);
222 static void
223 unregister_worker(void *arg1)
224 {
225 	struct worker_thread *worker = arg1;
226 
227 	free(worker->task_base);
228 	spdk_put_io_channel(worker->ch);
229 	pthread_mutex_lock(&g_workers_lock);
230 	assert(g_num_workers >= 1);
231 	if (--g_num_workers == 0) {
232 		pthread_mutex_unlock(&g_workers_lock);
233 		g_rc = dump_result();
234 		spdk_app_stop(0);
235 	}
236 	pthread_mutex_unlock(&g_workers_lock);
237 }
238 
239 static int
240 _get_task_data_bufs(struct ap_task *task)
241 {
242 	uint32_t align = 0;
243 	uint32_t i = 0;
244 	int dst_buff_len = g_xfer_size_bytes;
245 
246 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
247 	 * we do this for all engines to keep it simple.
248 	 */
249 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
250 		align = ALIGN_4K;
251 	}
252 
253 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
254 		assert(g_crc32c_chained_count > 0);
255 		task->iov_cnt = g_crc32c_chained_count;
256 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
257 		if (!task->iovs) {
258 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
259 			return -ENOMEM;
260 		}
261 
262 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
263 			dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count;
264 		}
265 
266 		for (i = 0; i < task->iov_cnt; i++) {
267 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
268 			if (task->iovs[i].iov_base == NULL) {
269 				return -ENOMEM;
270 			}
271 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
272 			task->iovs[i].iov_len = g_xfer_size_bytes;
273 		}
274 
275 	} else {
276 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
277 		if (task->src == NULL) {
278 			fprintf(stderr, "Unable to alloc src buffer\n");
279 			return -ENOMEM;
280 		}
281 
282 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
283 		if (g_workload_selection == ACCEL_OPC_FILL) {
284 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
285 		} else {
286 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
287 		}
288 	}
289 
290 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
291 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
292 		if (task->dst == NULL) {
293 			fprintf(stderr, "Unable to alloc dst buffer\n");
294 			return -ENOMEM;
295 		}
296 
297 		/* For compare we want the buffers to match, otherwise not. */
298 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
299 			memset(task->dst, DATA_PATTERN, dst_buff_len);
300 		} else {
301 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
302 		}
303 	}
304 
305 	/* For dualcast 2 buffers are needed for the operation.  */
306 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
307 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
308 		if (task->dst2 == NULL) {
309 			fprintf(stderr, "Unable to alloc dst buffer\n");
310 			return -ENOMEM;
311 		}
312 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
313 	}
314 
315 	return 0;
316 }
317 
318 inline static struct ap_task *
319 _get_task(struct worker_thread *worker)
320 {
321 	struct ap_task *task;
322 
323 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
324 		task = TAILQ_FIRST(&worker->tasks_pool);
325 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
326 	} else {
327 		fprintf(stderr, "Unable to get ap_task\n");
328 		return NULL;
329 	}
330 
331 	return task;
332 }
333 
334 /* Submit one operation using the same ap task that just completed. */
335 static void
336 _submit_single(struct worker_thread *worker, struct ap_task *task)
337 {
338 	int random_num;
339 	int rc = 0;
340 	int flags = 0;
341 
342 	assert(worker);
343 
344 	switch (worker->workload) {
345 	case ACCEL_OPC_COPY:
346 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
347 					    g_xfer_size_bytes, flags, accel_done, task);
348 		break;
349 	case ACCEL_OPC_FILL:
350 		/* For fill use the first byte of the task->dst buffer */
351 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
352 					    g_xfer_size_bytes, flags, accel_done, task);
353 		break;
354 	case ACCEL_OPC_CRC32C:
355 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
356 					       task->iovs, task->iov_cnt, g_crc32c_seed,
357 					       accel_done, task);
358 		break;
359 	case ACCEL_OPC_COPY_CRC32C:
360 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt,
361 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
362 		break;
363 	case ACCEL_OPC_COMPARE:
364 		random_num = rand() % 100;
365 		if (random_num < g_fail_percent_goal) {
366 			task->expected_status = -EILSEQ;
367 			*(uint8_t *)task->dst = ~DATA_PATTERN;
368 		} else {
369 			task->expected_status = 0;
370 			*(uint8_t *)task->dst = DATA_PATTERN;
371 		}
372 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
373 					       g_xfer_size_bytes, accel_done, task);
374 		break;
375 	case ACCEL_OPC_DUALCAST:
376 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
377 						task->src, g_xfer_size_bytes, flags, accel_done, task);
378 		break;
379 	default:
380 		assert(false);
381 		break;
382 
383 	}
384 
385 	worker->current_queue_depth++;
386 	if (rc) {
387 		accel_done(task, rc);
388 	}
389 }
390 
391 static void
392 _free_task_buffers(struct ap_task *task)
393 {
394 	uint32_t i;
395 
396 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
397 		if (task->iovs) {
398 			for (i = 0; i < task->iov_cnt; i++) {
399 				if (task->iovs[i].iov_base) {
400 					spdk_dma_free(task->iovs[i].iov_base);
401 				}
402 			}
403 			free(task->iovs);
404 		}
405 	} else {
406 		spdk_dma_free(task->src);
407 	}
408 
409 	spdk_dma_free(task->dst);
410 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
411 		spdk_dma_free(task->dst2);
412 	}
413 }
414 
415 static int
416 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt)
417 {
418 	uint32_t i;
419 	uint32_t ttl_len = 0;
420 	uint8_t *dst = (uint8_t *)_dst;
421 
422 	for (i = 0; i < iovcnt; i++) {
423 		if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) {
424 			return -1;
425 		}
426 		dst += src_iovs[i].iov_len;
427 		ttl_len += src_iovs[i].iov_len;
428 	}
429 
430 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
431 		return -1;
432 	}
433 
434 	return 0;
435 }
436 
437 static int _worker_stop(void *arg);
438 
439 static void
440 accel_done(void *arg1, int status)
441 {
442 	struct ap_task *task = arg1;
443 	struct worker_thread *worker = task->worker;
444 	uint32_t sw_crc32c;
445 
446 	assert(worker);
447 	assert(worker->current_queue_depth > 0);
448 
449 	if (g_verify && status == 0) {
450 		switch (worker->workload) {
451 		case ACCEL_OPC_COPY_CRC32C:
452 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
453 			if (task->crc_dst != sw_crc32c) {
454 				SPDK_NOTICELOG("CRC-32C miscompare\n");
455 				worker->xfer_failed++;
456 			}
457 			if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) {
458 				SPDK_NOTICELOG("Data miscompare\n");
459 				worker->xfer_failed++;
460 			}
461 			break;
462 		case ACCEL_OPC_CRC32C:
463 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
464 			if (task->crc_dst != sw_crc32c) {
465 				SPDK_NOTICELOG("CRC-32C miscompare\n");
466 				worker->xfer_failed++;
467 			}
468 			break;
469 		case ACCEL_OPC_COPY:
470 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
471 				SPDK_NOTICELOG("Data miscompare\n");
472 				worker->xfer_failed++;
473 			}
474 			break;
475 		case ACCEL_OPC_DUALCAST:
476 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
477 				SPDK_NOTICELOG("Data miscompare, first destination\n");
478 				worker->xfer_failed++;
479 			}
480 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
481 				SPDK_NOTICELOG("Data miscompare, second destination\n");
482 				worker->xfer_failed++;
483 			}
484 			break;
485 		case ACCEL_OPC_FILL:
486 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
487 				SPDK_NOTICELOG("Data miscompare\n");
488 				worker->xfer_failed++;
489 			}
490 			break;
491 		case ACCEL_OPC_COMPARE:
492 			break;
493 		default:
494 			assert(false);
495 			break;
496 		}
497 	}
498 
499 	if (task->expected_status == -EILSEQ) {
500 		assert(status != 0);
501 		worker->injected_miscompares++;
502 		status = 0;
503 	} else if (status) {
504 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
505 		worker->xfer_failed++;
506 	}
507 
508 	worker->xfer_completed++;
509 	worker->current_queue_depth--;
510 
511 	if (!worker->is_draining && status == 0) {
512 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
513 		task = _get_task(worker);
514 		_submit_single(worker, task);
515 	} else {
516 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
517 	}
518 }
519 
520 static int
521 dump_result(void)
522 {
523 	uint64_t total_completed = 0;
524 	uint64_t total_failed = 0;
525 	uint64_t total_miscompared = 0;
526 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
527 	struct worker_thread *worker = g_workers;
528 
529 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
530 	printf("------------------------------------------------------------------------\n");
531 	while (worker != NULL) {
532 
533 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
534 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
535 				       (g_time_in_sec * 1024 * 1024);
536 
537 		total_completed += worker->xfer_completed;
538 		total_failed += worker->xfer_failed;
539 		total_miscompared += worker->injected_miscompares;
540 
541 		if (xfer_per_sec) {
542 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
543 			       worker->display.core, worker->display.thread, xfer_per_sec,
544 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
545 		}
546 
547 		worker = worker->next;
548 	}
549 
550 	total_xfer_per_sec = total_completed / g_time_in_sec;
551 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
552 			    (g_time_in_sec * 1024 * 1024);
553 
554 	printf("=========================================================================\n");
555 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
556 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
557 
558 	return total_failed ? 1 : 0;
559 }
560 
561 static inline void
562 _free_task_buffers_in_pool(struct worker_thread *worker)
563 {
564 	struct ap_task *task;
565 
566 	assert(worker);
567 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
568 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
569 		_free_task_buffers(task);
570 	}
571 }
572 
573 static int
574 _check_draining(void *arg)
575 {
576 	struct worker_thread *worker = arg;
577 
578 	assert(worker);
579 
580 	if (worker->current_queue_depth == 0) {
581 		_free_task_buffers_in_pool(worker);
582 		spdk_poller_unregister(&worker->is_draining_poller);
583 		unregister_worker(worker);
584 	}
585 
586 	return SPDK_POLLER_BUSY;
587 }
588 
589 static int
590 _worker_stop(void *arg)
591 {
592 	struct worker_thread *worker = arg;
593 
594 	assert(worker);
595 
596 	spdk_poller_unregister(&worker->stop_poller);
597 
598 	/* now let the worker drain and check it's outstanding IO with a poller */
599 	worker->is_draining = true;
600 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
601 
602 	return SPDK_POLLER_BUSY;
603 }
604 
605 static void
606 _init_thread(void *arg1)
607 {
608 	struct worker_thread *worker;
609 	struct ap_task *task;
610 	int i, num_tasks = g_allocate_depth;
611 	struct display_info *display = arg1;
612 
613 	worker = calloc(1, sizeof(*worker));
614 	if (worker == NULL) {
615 		fprintf(stderr, "Unable to allocate worker\n");
616 		free(display);
617 		return;
618 	}
619 
620 	worker->workload = g_workload_selection;
621 	worker->display.core = display->core;
622 	worker->display.thread = display->thread;
623 	free(display);
624 	worker->core = spdk_env_get_current_core();
625 	worker->thread = spdk_get_thread();
626 	pthread_mutex_lock(&g_workers_lock);
627 	g_num_workers++;
628 	worker->next = g_workers;
629 	g_workers = worker;
630 	pthread_mutex_unlock(&g_workers_lock);
631 	worker->ch = spdk_accel_get_io_channel();
632 	if (worker->ch == NULL) {
633 		fprintf(stderr, "Unable to get an accel channel\n");
634 		goto error;
635 	}
636 
637 	TAILQ_INIT(&worker->tasks_pool);
638 
639 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
640 	if (worker->task_base == NULL) {
641 		fprintf(stderr, "Could not allocate task base.\n");
642 		goto error;
643 	}
644 
645 	task = worker->task_base;
646 	for (i = 0; i < num_tasks; i++) {
647 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
648 		task->worker = worker;
649 		if (_get_task_data_bufs(task)) {
650 			fprintf(stderr, "Unable to get data bufs\n");
651 			goto error;
652 		}
653 		task++;
654 	}
655 
656 	/* Register a poller that will stop the worker at time elapsed */
657 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
658 			      g_time_in_sec * 1000000ULL);
659 
660 	/* Load up queue depth worth of operations. */
661 	for (i = 0; i < g_queue_depth; i++) {
662 		task = _get_task(worker);
663 		if (task == NULL) {
664 			goto error;
665 		}
666 
667 		_submit_single(worker, task);
668 	}
669 	return;
670 error:
671 
672 	_free_task_buffers_in_pool(worker);
673 	free(worker->task_base);
674 	spdk_app_stop(-1);
675 }
676 
677 static void
678 accel_perf_start(void *arg1)
679 {
680 	struct spdk_cpuset tmp_cpumask = {};
681 	char thread_name[32];
682 	uint32_t i;
683 	int j;
684 	struct spdk_thread *thread;
685 	struct display_info *display;
686 
687 	g_tsc_rate = spdk_get_ticks_hz();
688 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
689 
690 	dump_user_config();
691 
692 	printf("Running for %d seconds...\n", g_time_in_sec);
693 	fflush(stdout);
694 
695 	/* Create worker threads for each core that was specified. */
696 	SPDK_ENV_FOREACH_CORE(i) {
697 		for (j = 0; j < g_threads_per_core; j++) {
698 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
699 			spdk_cpuset_zero(&tmp_cpumask);
700 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
701 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
702 			display = calloc(1, sizeof(*display));
703 			if (display == NULL) {
704 				fprintf(stderr, "Unable to allocate memory\n");
705 				spdk_app_stop(-1);
706 				return;
707 			}
708 			display->core = i;
709 			display->thread = j;
710 			spdk_thread_send_msg(thread, _init_thread, display);
711 		}
712 	}
713 }
714 
715 int
716 main(int argc, char **argv)
717 {
718 	struct worker_thread *worker, *tmp;
719 
720 	pthread_mutex_init(&g_workers_lock, NULL);
721 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
722 	g_opts.name = "accel_perf";
723 	g_opts.reactor_mask = "0x1";
724 	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
725 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
726 		g_rc = -1;
727 		goto cleanup;
728 	}
729 
730 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
731 	    (g_workload_selection != ACCEL_OPC_FILL) &&
732 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
733 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
734 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
735 	    (g_workload_selection != ACCEL_OPC_DUALCAST)) {
736 		usage();
737 		g_rc = -1;
738 		goto cleanup;
739 	}
740 
741 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
742 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
743 		usage();
744 		g_rc = -1;
745 		goto cleanup;
746 	}
747 
748 	if (g_allocate_depth == 0) {
749 		g_allocate_depth = g_queue_depth;
750 	}
751 
752 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
753 	    g_crc32c_chained_count == 0) {
754 		usage();
755 		g_rc = -1;
756 		goto cleanup;
757 	}
758 
759 	g_rc = spdk_app_start(&g_opts, accel_perf_start, NULL);
760 	if (g_rc) {
761 		SPDK_ERRLOG("ERROR starting application\n");
762 	}
763 
764 	pthread_mutex_destroy(&g_workers_lock);
765 
766 	worker = g_workers;
767 	while (worker) {
768 		tmp = worker->next;
769 		free(worker);
770 		worker = tmp;
771 	}
772 cleanup:
773 	spdk_app_fini();
774 	return g_rc;
775 }
776