xref: /spdk/examples/accel/perf/accel_perf.c (revision 8e9bf1815df2455d994df622f5b43078193b4a84)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static uint64_t	g_tsc_rate;
48 static uint64_t g_tsc_end;
49 static int g_rc;
50 static int g_xfer_size_bytes = 4096;
51 static int g_queue_depth = 32;
52 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
53  * be at least as much as the queue depth.
54  */
55 static int g_allocate_depth = 0;
56 static int g_threads_per_core = 1;
57 static int g_time_in_sec = 5;
58 static uint32_t g_crc32c_seed = 0;
59 static uint32_t g_crc32c_chained_count = 1;
60 static int g_fail_percent_goal = 0;
61 static uint8_t g_fill_pattern = 255;
62 static bool g_verify = false;
63 static const char *g_workload_type = NULL;
64 static enum accel_capability g_workload_selection;
65 static struct worker_thread *g_workers = NULL;
66 static int g_num_workers = 0;
67 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
68 
69 struct worker_thread;
70 static void accel_done(void *ref, int status);
71 
72 struct display_info {
73 	int core;
74 	int thread;
75 };
76 
77 struct ap_task {
78 	void			*src;
79 	struct iovec		*iovs;
80 	uint32_t		iov_cnt;
81 	void			*dst;
82 	void			*dst2;
83 	uint32_t		crc_dst;
84 	struct worker_thread	*worker;
85 	int			expected_status; /* used for the compare operation */
86 	TAILQ_ENTRY(ap_task)	link;
87 };
88 
89 struct worker_thread {
90 	struct spdk_io_channel		*ch;
91 	uint64_t			xfer_completed;
92 	uint64_t			xfer_failed;
93 	uint64_t			injected_miscompares;
94 	uint64_t			current_queue_depth;
95 	TAILQ_HEAD(, ap_task)		tasks_pool;
96 	struct worker_thread		*next;
97 	unsigned			core;
98 	struct spdk_thread		*thread;
99 	bool				is_draining;
100 	struct spdk_poller		*is_draining_poller;
101 	struct spdk_poller		*stop_poller;
102 	void				*task_base;
103 	struct display_info		display;
104 };
105 
106 static void
107 dump_user_config(struct spdk_app_opts *opts)
108 {
109 	printf("SPDK Configuration:\n");
110 	printf("Core mask:      %s\n\n", opts->reactor_mask);
111 	printf("Accel Perf Configuration:\n");
112 	printf("Workload Type:  %s\n", g_workload_type);
113 	if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) {
114 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
115 		printf("vector count    %u\n", g_crc32c_chained_count);
116 	} else if (g_workload_selection == ACCEL_FILL) {
117 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
118 	} else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) {
119 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
120 	}
121 	if (g_workload_selection == ACCEL_COPY_CRC32C) {
122 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
123 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count);
124 	} else {
125 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
126 	}
127 	printf("Queue depth:    %u\n", g_queue_depth);
128 	printf("Allocate depth: %u\n", g_allocate_depth);
129 	printf("# threads/core: %u\n", g_threads_per_core);
130 	printf("Run time:       %u seconds\n", g_time_in_sec);
131 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
132 }
133 
134 static void
135 usage(void)
136 {
137 	printf("accel_perf options:\n");
138 	printf("\t[-h help message]\n");
139 	printf("\t[-q queue depth per core]\n");
140 	printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n");
141 	printf("\t[-T number of threads per core\n");
142 	printf("\t[-n number of channels]\n");
143 	printf("\t[-o transfer size in bytes]\n");
144 	printf("\t[-t time in seconds]\n");
145 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n");
146 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
147 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
148 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
149 	printf("\t[-y verify result if this switch is on]\n");
150 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
151 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
152 }
153 
154 static int
155 parse_args(int argc, char *argv)
156 {
157 	int argval = 0;
158 
159 	switch (argc) {
160 	case 'a':
161 	case 'C':
162 	case 'f':
163 	case 'T':
164 	case 'o':
165 	case 'P':
166 	case 'q':
167 	case 's':
168 	case 't':
169 		argval = spdk_strtol(optarg, 10);
170 		if (argval < 0) {
171 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
172 			usage();
173 			return 1;
174 		}
175 		break;
176 	default:
177 		break;
178 	};
179 
180 	switch (argc) {
181 	case 'a':
182 		g_allocate_depth = argval;
183 		break;
184 	case 'C':
185 		g_crc32c_chained_count = argval;
186 		break;
187 	case 'f':
188 		g_fill_pattern = (uint8_t)argval;
189 		break;
190 	case 'T':
191 		g_threads_per_core = argval;
192 		break;
193 	case 'o':
194 		g_xfer_size_bytes = argval;
195 		break;
196 	case 'P':
197 		g_fail_percent_goal = argval;
198 		break;
199 	case 'q':
200 		g_queue_depth = argval;
201 		break;
202 	case 's':
203 		g_crc32c_seed = argval;
204 		break;
205 	case 't':
206 		g_time_in_sec = argval;
207 		break;
208 	case 'y':
209 		g_verify = true;
210 		break;
211 	case 'w':
212 		g_workload_type = optarg;
213 		if (!strcmp(g_workload_type, "copy")) {
214 			g_workload_selection = ACCEL_COPY;
215 		} else if (!strcmp(g_workload_type, "fill")) {
216 			g_workload_selection = ACCEL_FILL;
217 		} else if (!strcmp(g_workload_type, "crc32c")) {
218 			g_workload_selection = ACCEL_CRC32C;
219 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
220 			g_workload_selection = ACCEL_COPY_CRC32C;
221 		} else if (!strcmp(g_workload_type, "compare")) {
222 			g_workload_selection = ACCEL_COMPARE;
223 		} else if (!strcmp(g_workload_type, "dualcast")) {
224 			g_workload_selection = ACCEL_DUALCAST;
225 		}
226 		break;
227 	default:
228 		usage();
229 		return 1;
230 	}
231 
232 	return 0;
233 }
234 
235 static int dump_result(void);
236 static void
237 unregister_worker(void *arg1)
238 {
239 	struct worker_thread *worker = arg1;
240 
241 	free(worker->task_base);
242 	spdk_put_io_channel(worker->ch);
243 	pthread_mutex_lock(&g_workers_lock);
244 	assert(g_num_workers >= 1);
245 	if (--g_num_workers == 0) {
246 		pthread_mutex_unlock(&g_workers_lock);
247 		g_rc = dump_result();
248 		spdk_app_stop(0);
249 	}
250 	pthread_mutex_unlock(&g_workers_lock);
251 }
252 
253 static int
254 _get_task_data_bufs(struct ap_task *task)
255 {
256 	uint32_t align = 0;
257 	uint32_t i = 0;
258 	int dst_buff_len = g_xfer_size_bytes;
259 
260 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
261 	 * we do this for all engines to keep it simple.
262 	 */
263 	if (g_workload_selection == ACCEL_DUALCAST) {
264 		align = ALIGN_4K;
265 	}
266 
267 	if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) {
268 		assert(g_crc32c_chained_count > 0);
269 		task->iov_cnt = g_crc32c_chained_count;
270 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
271 		if (!task->iovs) {
272 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
273 			return -ENOMEM;
274 		}
275 
276 		if (g_workload_selection == ACCEL_COPY_CRC32C) {
277 			dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count;
278 		}
279 
280 		for (i = 0; i < task->iov_cnt; i++) {
281 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
282 			if (task->iovs[i].iov_base == NULL) {
283 				return -ENOMEM;
284 			}
285 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
286 			task->iovs[i].iov_len = g_xfer_size_bytes;
287 		}
288 
289 	} else {
290 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
291 		if (task->src == NULL) {
292 			fprintf(stderr, "Unable to alloc src buffer\n");
293 			return -ENOMEM;
294 		}
295 
296 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
297 		if (g_workload_selection == ACCEL_FILL) {
298 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
299 		} else {
300 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
301 		}
302 	}
303 
304 	if (g_workload_selection != ACCEL_CRC32C) {
305 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
306 		if (task->dst == NULL) {
307 			fprintf(stderr, "Unable to alloc dst buffer\n");
308 			return -ENOMEM;
309 		}
310 
311 		/* For compare we want the buffers to match, otherwise not. */
312 		if (g_workload_selection == ACCEL_COMPARE) {
313 			memset(task->dst, DATA_PATTERN, dst_buff_len);
314 		} else {
315 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
316 		}
317 	}
318 
319 	if (g_workload_selection == ACCEL_DUALCAST) {
320 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
321 		if (task->dst2 == NULL) {
322 			fprintf(stderr, "Unable to alloc dst buffer\n");
323 			return -ENOMEM;
324 		}
325 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
326 	}
327 
328 	return 0;
329 }
330 
331 inline static struct ap_task *
332 _get_task(struct worker_thread *worker)
333 {
334 	struct ap_task *task;
335 
336 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
337 		task = TAILQ_FIRST(&worker->tasks_pool);
338 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
339 	} else {
340 		fprintf(stderr, "Unable to get ap_task\n");
341 		return NULL;
342 	}
343 
344 	return task;
345 }
346 
347 /* Submit one operation using the same ap task that just completed. */
348 static void
349 _submit_single(struct worker_thread *worker, struct ap_task *task)
350 {
351 	int random_num;
352 	int rc = 0;
353 
354 	assert(worker);
355 
356 	switch (g_workload_selection) {
357 	case ACCEL_COPY:
358 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
359 					    g_xfer_size_bytes, accel_done, task);
360 		break;
361 	case ACCEL_FILL:
362 		/* For fill use the first byte of the task->dst buffer */
363 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
364 					    g_xfer_size_bytes, accel_done, task);
365 		break;
366 	case ACCEL_CRC32C:
367 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
368 					       task->iovs, task->iov_cnt, g_crc32c_seed,
369 					       accel_done, task);
370 		break;
371 	case ACCEL_COPY_CRC32C:
372 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt,
373 						    &task->crc_dst, g_crc32c_seed, accel_done, task);
374 		break;
375 	case ACCEL_COMPARE:
376 		random_num = rand() % 100;
377 		if (random_num < g_fail_percent_goal) {
378 			task->expected_status = -EILSEQ;
379 			*(uint8_t *)task->dst = ~DATA_PATTERN;
380 		} else {
381 			task->expected_status = 0;
382 			*(uint8_t *)task->dst = DATA_PATTERN;
383 		}
384 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
385 					       g_xfer_size_bytes, accel_done, task);
386 		break;
387 	case ACCEL_DUALCAST:
388 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
389 						task->src, g_xfer_size_bytes, accel_done, task);
390 		break;
391 	default:
392 		assert(false);
393 		break;
394 
395 	}
396 
397 	if (rc) {
398 		accel_done(task, rc);
399 	}
400 }
401 
402 static void
403 _free_task_buffers(struct ap_task *task)
404 {
405 	uint32_t i;
406 
407 	if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) {
408 		if (task->iovs) {
409 			for (i = 0; i < task->iov_cnt; i++) {
410 				if (task->iovs[i].iov_base) {
411 					spdk_dma_free(task->iovs[i].iov_base);
412 				}
413 			}
414 			free(task->iovs);
415 		}
416 	} else {
417 		spdk_dma_free(task->src);
418 	}
419 
420 	spdk_dma_free(task->dst);
421 	if (g_workload_selection == ACCEL_DUALCAST) {
422 		spdk_dma_free(task->dst2);
423 	}
424 }
425 
426 static int
427 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt)
428 {
429 	uint32_t i;
430 	uint32_t ttl_len = 0;
431 	uint8_t *dst = (uint8_t *)_dst;
432 
433 	for (i = 0; i < iovcnt; i++) {
434 		if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) {
435 			return -1;
436 		}
437 		dst += src_iovs[i].iov_len;
438 		ttl_len += src_iovs[i].iov_len;
439 	}
440 
441 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
442 		return -1;
443 	}
444 
445 	return 0;
446 }
447 
448 static void
449 accel_done(void *arg1, int status)
450 {
451 	struct ap_task *task = arg1;
452 	struct worker_thread *worker = task->worker;
453 	uint32_t sw_crc32c;
454 
455 	assert(worker);
456 	assert(worker->current_queue_depth > 0);
457 
458 	if (g_verify && status == 0) {
459 		switch (g_workload_selection) {
460 		case ACCEL_COPY_CRC32C:
461 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
462 			if (task->crc_dst != sw_crc32c) {
463 				SPDK_NOTICELOG("CRC-32C miscompare\n");
464 				worker->xfer_failed++;
465 			}
466 			if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) {
467 				SPDK_NOTICELOG("Data miscompare\n");
468 				worker->xfer_failed++;
469 			}
470 			break;
471 		case ACCEL_CRC32C:
472 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
473 			if (task->crc_dst != sw_crc32c) {
474 				SPDK_NOTICELOG("CRC-32C miscompare\n");
475 				worker->xfer_failed++;
476 			}
477 			break;
478 		case ACCEL_COPY:
479 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
480 				SPDK_NOTICELOG("Data miscompare\n");
481 				worker->xfer_failed++;
482 			}
483 			break;
484 		case ACCEL_DUALCAST:
485 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
486 				SPDK_NOTICELOG("Data miscompare, first destination\n");
487 				worker->xfer_failed++;
488 			}
489 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
490 				SPDK_NOTICELOG("Data miscompare, second destination\n");
491 				worker->xfer_failed++;
492 			}
493 			break;
494 		case ACCEL_FILL:
495 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
496 				SPDK_NOTICELOG("Data miscompare\n");
497 				worker->xfer_failed++;
498 			}
499 			break;
500 		case ACCEL_COMPARE:
501 			break;
502 		default:
503 			assert(false);
504 			break;
505 		}
506 	}
507 
508 	if (task->expected_status == -EILSEQ) {
509 		assert(status != 0);
510 		worker->injected_miscompares++;
511 	} else if (status) {
512 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
513 		worker->xfer_failed++;
514 	}
515 
516 	worker->xfer_completed++;
517 	worker->current_queue_depth--;
518 
519 	if (!worker->is_draining) {
520 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
521 		task = _get_task(worker);
522 		_submit_single(worker, task);
523 		worker->current_queue_depth++;
524 	} else {
525 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
526 	}
527 }
528 
529 static int
530 dump_result(void)
531 {
532 	uint64_t total_completed = 0;
533 	uint64_t total_failed = 0;
534 	uint64_t total_miscompared = 0;
535 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
536 	struct worker_thread *worker = g_workers;
537 
538 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
539 	printf("------------------------------------------------------------------------\n");
540 	while (worker != NULL) {
541 
542 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
543 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
544 				       (g_time_in_sec * 1024 * 1024);
545 
546 		total_completed += worker->xfer_completed;
547 		total_failed += worker->xfer_failed;
548 		total_miscompared += worker->injected_miscompares;
549 
550 		if (xfer_per_sec) {
551 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
552 			       worker->display.core, worker->display.thread, xfer_per_sec,
553 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
554 		}
555 
556 		worker = worker->next;
557 	}
558 
559 	total_xfer_per_sec = total_completed / g_time_in_sec;
560 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
561 			    (g_time_in_sec * 1024 * 1024);
562 
563 	printf("=========================================================================\n");
564 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
565 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
566 
567 	return total_failed ? 1 : 0;
568 }
569 
570 static inline void
571 _free_task_buffers_in_pool(struct worker_thread *worker)
572 {
573 	struct ap_task *task;
574 
575 	assert(worker);
576 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
577 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
578 		_free_task_buffers(task);
579 	}
580 }
581 
582 static int
583 _check_draining(void *arg)
584 {
585 	struct worker_thread *worker = arg;
586 
587 	assert(worker);
588 
589 	if (worker->current_queue_depth == 0) {
590 		_free_task_buffers_in_pool(worker);
591 		spdk_poller_unregister(&worker->is_draining_poller);
592 		unregister_worker(worker);
593 	}
594 
595 	return SPDK_POLLER_BUSY;
596 }
597 
598 static int
599 _worker_stop(void *arg)
600 {
601 	struct worker_thread *worker = arg;
602 
603 	assert(worker);
604 
605 	spdk_poller_unregister(&worker->stop_poller);
606 
607 	/* now let the worker drain and check it's outstanding IO with a poller */
608 	worker->is_draining = true;
609 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
610 
611 	return SPDK_POLLER_BUSY;
612 }
613 
614 static inline void
615 identify_accel_engine_usage(struct spdk_io_channel *ch)
616 {
617 	uint64_t capabilities;
618 
619 	assert(ch != NULL);
620 	capabilities = spdk_accel_get_capabilities(ch);
621 	if ((capabilities & g_workload_selection) != g_workload_selection) {
622 		SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n");
623 		SPDK_WARNLOG("The software engine will be used instead.\n\n");
624 	}
625 }
626 
627 static void
628 _init_thread(void *arg1)
629 {
630 	struct worker_thread *worker;
631 	struct ap_task *task;
632 	int i, num_tasks = g_allocate_depth;
633 	struct display_info *display = arg1;
634 
635 	worker = calloc(1, sizeof(*worker));
636 	if (worker == NULL) {
637 		fprintf(stderr, "Unable to allocate worker\n");
638 		free(display);
639 		return;
640 	}
641 
642 	worker->display.core = display->core;
643 	worker->display.thread = display->thread;
644 	free(display);
645 	worker->core = spdk_env_get_current_core();
646 	worker->thread = spdk_get_thread();
647 	pthread_mutex_lock(&g_workers_lock);
648 	i = g_num_workers;
649 	g_num_workers++;
650 	worker->next = g_workers;
651 	g_workers = worker;
652 	pthread_mutex_unlock(&g_workers_lock);
653 	worker->ch = spdk_accel_engine_get_io_channel();
654 
655 	if (i == 0) {
656 		identify_accel_engine_usage(worker->ch);
657 	}
658 
659 	TAILQ_INIT(&worker->tasks_pool);
660 
661 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
662 	if (worker->task_base == NULL) {
663 		fprintf(stderr, "Could not allocate task base.\n");
664 		goto error;
665 	}
666 
667 	task = worker->task_base;
668 	for (i = 0; i < num_tasks; i++) {
669 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
670 		task->worker = worker;
671 		if (_get_task_data_bufs(task)) {
672 			fprintf(stderr, "Unable to get data bufs\n");
673 			goto error;
674 		}
675 		task++;
676 	}
677 
678 	/* Register a poller that will stop the worker at time elapsed */
679 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
680 			      g_time_in_sec * 1000000ULL);
681 
682 	/* Load up queue depth worth of operations. */
683 	for (i = 0; i < g_queue_depth; i++) {
684 		task = _get_task(worker);
685 		worker->current_queue_depth++;
686 		if (task == NULL) {
687 			goto error;
688 		}
689 
690 		_submit_single(worker, task);
691 	}
692 	return;
693 error:
694 
695 	_free_task_buffers_in_pool(worker);
696 	free(worker->task_base);
697 	free(worker);
698 	spdk_app_stop(-1);
699 }
700 
701 static void
702 accel_perf_start(void *arg1)
703 {
704 	struct spdk_cpuset tmp_cpumask = {};
705 	char thread_name[32];
706 	uint32_t i;
707 	int j;
708 	struct spdk_thread *thread;
709 	struct display_info *display;
710 
711 	g_tsc_rate = spdk_get_ticks_hz();
712 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
713 
714 	printf("Running for %d seconds...\n", g_time_in_sec);
715 	fflush(stdout);
716 
717 	/* Create worker threads for each core that was specified. */
718 	SPDK_ENV_FOREACH_CORE(i) {
719 		for (j = 0; j < g_threads_per_core; j++) {
720 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
721 			spdk_cpuset_zero(&tmp_cpumask);
722 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
723 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
724 			display = calloc(1, sizeof(*display));
725 			if (display == NULL) {
726 				fprintf(stderr, "Unable to allocate memory\n");
727 				spdk_app_stop(-1);
728 				return;
729 			}
730 			display->core = i;
731 			display->thread = j;
732 			spdk_thread_send_msg(thread, _init_thread, display);
733 		}
734 	}
735 }
736 
737 int
738 main(int argc, char **argv)
739 {
740 	struct spdk_app_opts opts = {};
741 	struct worker_thread *worker, *tmp;
742 
743 	pthread_mutex_init(&g_workers_lock, NULL);
744 	spdk_app_opts_init(&opts, sizeof(opts));
745 	opts.reactor_mask = "0x1";
746 	if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
747 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
748 		g_rc = -1;
749 		goto cleanup;
750 	}
751 
752 	if ((g_workload_selection != ACCEL_COPY) &&
753 	    (g_workload_selection != ACCEL_FILL) &&
754 	    (g_workload_selection != ACCEL_CRC32C) &&
755 	    (g_workload_selection != ACCEL_COPY_CRC32C) &&
756 	    (g_workload_selection != ACCEL_COMPARE) &&
757 	    (g_workload_selection != ACCEL_DUALCAST)) {
758 		usage();
759 		g_rc = -1;
760 		goto cleanup;
761 	}
762 
763 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
764 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
765 		usage();
766 		g_rc = -1;
767 		goto cleanup;
768 	}
769 
770 	if (g_allocate_depth == 0) {
771 		g_allocate_depth = g_queue_depth;
772 	}
773 
774 	if ((g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) &&
775 	    g_crc32c_chained_count == 0) {
776 		usage();
777 		g_rc = -1;
778 		goto cleanup;
779 	}
780 
781 	dump_user_config(&opts);
782 	g_rc = spdk_app_start(&opts, accel_perf_start, NULL);
783 	if (g_rc) {
784 		SPDK_ERRLOG("ERROR starting application\n");
785 	}
786 
787 	pthread_mutex_destroy(&g_workers_lock);
788 
789 	worker = g_workers;
790 	while (worker) {
791 		tmp = worker->next;
792 		free(worker);
793 		worker = tmp;
794 	}
795 cleanup:
796 	spdk_app_fini();
797 	return g_rc;
798 }
799