xref: /spdk/examples/accel/perf/accel_perf.c (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static uint64_t	g_tsc_rate;
48 static uint64_t g_tsc_us_rate;
49 static uint64_t g_tsc_end;
50 static int g_xfer_size_bytes = 4096;
51 static int g_queue_depth = 32;
52 static int g_time_in_sec = 5;
53 static uint32_t g_crc32c_seed = 0;
54 static int g_fail_percent_goal = 0;
55 static uint8_t g_fill_pattern = 255;
56 static bool g_verify = false;
57 static const char *g_workload_type = NULL;
58 static enum accel_capability g_workload_selection;
59 static struct worker_thread *g_workers = NULL;
60 static int g_num_workers = 0;
61 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
62 uint64_t g_capabilites;
63 
64 struct worker_thread;
65 static void accel_done(void *ref, int status);
66 
67 struct ap_task {
68 	void			*src;
69 	void			*dst;
70 	void			*dst2;
71 	struct worker_thread	*worker;
72 	int			status;
73 	int			expected_status; /* used for the compare operation */
74 	TAILQ_ENTRY(ap_task)	link;
75 };
76 
77 struct worker_thread {
78 	struct spdk_io_channel		*ch;
79 	uint64_t			xfer_completed;
80 	uint64_t			xfer_failed;
81 	uint64_t			injected_miscompares;
82 	uint64_t			current_queue_depth;
83 	TAILQ_HEAD(, ap_task)		tasks_pool;
84 	struct worker_thread		*next;
85 	unsigned			core;
86 	struct spdk_thread		*thread;
87 	bool				is_draining;
88 	struct spdk_poller		*is_draining_poller;
89 	struct spdk_poller		*stop_poller;
90 	void				*task_base;
91 };
92 
93 static void
94 dump_user_config(struct spdk_app_opts *opts)
95 {
96 	printf("SPDK Configuration:\n");
97 	printf("Core mask:      %s\n\n", opts->reactor_mask);
98 	printf("Accel Perf Configuration:\n");
99 	printf("Workload Type:  %s\n", g_workload_type);
100 	if (g_workload_selection == ACCEL_CRC32C) {
101 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
102 	} else if (g_workload_selection == ACCEL_FILL) {
103 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
104 	} else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) {
105 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
106 	}
107 	printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
108 	printf("Queue depth:    %u\n", g_queue_depth);
109 	printf("Run time:       %u seconds\n", g_time_in_sec);
110 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
111 }
112 
113 static void
114 usage(void)
115 {
116 	printf("accel_perf options:\n");
117 	printf("\t[-h help message]\n");
118 	printf("\t[-q queue depth]\n");
119 	printf("\t[-n number of channels]\n");
120 	printf("\t[-o transfer size in bytes]\n");
121 	printf("\t[-t time in seconds]\n");
122 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, compare, dualcast\n");
123 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
124 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
125 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
126 	printf("\t[-y verify result if this switch is on]\n");
127 }
128 
129 static int
130 parse_args(int argc, char *argv)
131 {
132 	switch (argc) {
133 	case 'f':
134 		g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10);
135 		break;
136 	case 'o':
137 		g_xfer_size_bytes = spdk_strtol(optarg, 10);
138 		break;
139 	case 'P':
140 		g_fail_percent_goal = spdk_strtol(optarg, 10);
141 		break;
142 	case 'q':
143 		g_queue_depth = spdk_strtol(optarg, 10);
144 		break;
145 	case 's':
146 		g_crc32c_seed = spdk_strtol(optarg, 10);
147 		break;
148 	case 't':
149 		g_time_in_sec = spdk_strtol(optarg, 10);
150 		break;
151 	case 'y':
152 		g_verify = true;
153 		break;
154 	case 'w':
155 		g_workload_type = optarg;
156 		if (!strcmp(g_workload_type, "copy")) {
157 			g_workload_selection = ACCEL_COPY;
158 		} else if (!strcmp(g_workload_type, "fill")) {
159 			g_workload_selection = ACCEL_FILL;
160 		} else if (!strcmp(g_workload_type, "crc32c")) {
161 			g_workload_selection = ACCEL_CRC32C;
162 		} else if (!strcmp(g_workload_type, "compare")) {
163 			g_workload_selection = ACCEL_COMPARE;
164 		} else if (!strcmp(g_workload_type, "dualcast")) {
165 			g_workload_selection = ACCEL_DUALCAST;
166 		}
167 		break;
168 	default:
169 		usage();
170 		return 1;
171 	}
172 	return 0;
173 }
174 
175 static void
176 unregister_worker(void *arg1)
177 {
178 	struct worker_thread *worker = arg1;
179 
180 	free(worker->task_base);
181 	spdk_put_io_channel(worker->ch);
182 	pthread_mutex_lock(&g_workers_lock);
183 	assert(g_num_workers >= 1);
184 	if (--g_num_workers == 0) {
185 		pthread_mutex_unlock(&g_workers_lock);
186 		spdk_app_stop(0);
187 	}
188 	pthread_mutex_unlock(&g_workers_lock);
189 }
190 
191 static int
192 _get_task_data_bufs(struct ap_task *task)
193 {
194 	uint32_t align = 0;
195 
196 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
197 	 * we do this for all engines to keep it simple.
198 	 */
199 	if (g_workload_selection == ACCEL_DUALCAST) {
200 		align = ALIGN_4K;
201 	}
202 
203 	task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
204 	if (task->src == NULL) {
205 		fprintf(stderr, "Unable to alloc src buffer\n");
206 		return -ENOMEM;
207 	}
208 	memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
209 
210 	task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
211 	if (task->dst == NULL) {
212 		fprintf(stderr, "Unable to alloc dst buffer\n");
213 		return -ENOMEM;
214 	}
215 
216 	/* For compare we want the buffers to match, otherwise not. */
217 	if (g_workload_selection == ACCEL_COMPARE) {
218 		memset(task->dst, DATA_PATTERN, g_xfer_size_bytes);
219 	} else {
220 		memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes);
221 	}
222 
223 	/* For fill, set the entire src buffer so we can check if verify is enabled. */
224 	if (g_workload_selection == ACCEL_FILL) {
225 		memset(task->src, g_fill_pattern, g_xfer_size_bytes);
226 	}
227 
228 	if (g_workload_selection == ACCEL_DUALCAST) {
229 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
230 		if (task->dst2 == NULL) {
231 			fprintf(stderr, "Unable to alloc dst buffer\n");
232 			return -ENOMEM;
233 		}
234 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
235 	}
236 
237 	return 0;
238 }
239 
240 inline static struct ap_task *
241 _get_task(struct worker_thread *worker)
242 {
243 	struct ap_task *task;
244 
245 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
246 		task = TAILQ_FIRST(&worker->tasks_pool);
247 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
248 	} else {
249 		fprintf(stderr, "Unable to get ap_task\n");
250 		return NULL;
251 	}
252 
253 	task->worker = worker;
254 	task->worker->current_queue_depth++;
255 	return task;
256 }
257 
258 static void accel_done(void *ref, int status);
259 
260 static void
261 _submit_single(struct worker_thread *worker, struct ap_task *task)
262 {
263 	int random_num;
264 	int rc = 0;
265 
266 	assert(worker);
267 
268 	switch (g_workload_selection) {
269 	case ACCEL_COPY:
270 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
271 					    g_xfer_size_bytes, accel_done, task);
272 		break;
273 	case ACCEL_FILL:
274 		/* For fill use the first byte of the task->dst buffer */
275 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
276 					    g_xfer_size_bytes, accel_done, task);
277 		break;
278 	case ACCEL_CRC32C:
279 		rc = spdk_accel_submit_crc32c(worker->ch, (uint32_t *)task->dst,
280 					      task->src, g_crc32c_seed,
281 					      g_xfer_size_bytes, accel_done, task);
282 		break;
283 	case ACCEL_COMPARE:
284 		random_num = rand() % 100;
285 		if (random_num < g_fail_percent_goal) {
286 			task->expected_status = -EILSEQ;
287 			*(uint8_t *)task->dst = ~DATA_PATTERN;
288 		} else {
289 			task->expected_status = 0;
290 			*(uint8_t *)task->dst = DATA_PATTERN;
291 		}
292 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
293 					       g_xfer_size_bytes, accel_done, task);
294 		break;
295 	case ACCEL_DUALCAST:
296 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
297 						task->src, g_xfer_size_bytes, accel_done, task);
298 		break;
299 	default:
300 		assert(false);
301 		break;
302 
303 	}
304 
305 	if (rc) {
306 		accel_done(task, rc);
307 	}
308 }
309 
310 static int
311 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, struct spdk_accel_batch *batch)
312 {
313 	int rc = 0;
314 
315 	switch (g_workload_selection) {
316 	case ACCEL_COPY:
317 		rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst,
318 						task->src, g_xfer_size_bytes, accel_done, task);
319 		break;
320 	case ACCEL_DUALCAST:
321 		rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2,
322 						    task->src, g_xfer_size_bytes, accel_done, task);
323 		break;
324 	case ACCEL_COMPARE:
325 		rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src,
326 						   g_xfer_size_bytes, accel_done, task);
327 		break;
328 	case ACCEL_FILL:
329 		rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst,
330 						*(uint8_t *)task->src,
331 						g_xfer_size_bytes, accel_done, task);
332 		break;
333 	case ACCEL_CRC32C:
334 		rc = spdk_accel_batch_prep_crc32c(worker->ch, batch, (uint32_t *)task->dst,
335 						  task->src, g_crc32c_seed, g_xfer_size_bytes, accel_done, task);
336 		break;
337 	default:
338 		assert(false);
339 		break;
340 	}
341 
342 	return rc;
343 }
344 
345 static void
346 _free_task(struct ap_task *task)
347 {
348 	spdk_dma_free(task->src);
349 	spdk_dma_free(task->dst);
350 	if (g_workload_selection == ACCEL_DUALCAST) {
351 		spdk_dma_free(task->dst2);
352 	}
353 }
354 
355 static void
356 batch_done(void *cb_arg, int status)
357 {
358 	struct ap_task *task = (struct ap_task *)cb_arg;
359 	struct worker_thread *worker = task->worker;
360 
361 	worker->current_queue_depth--;
362 	TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
363 }
364 
365 static void
366 _accel_done(void *arg1)
367 {
368 	struct ap_task *task = arg1;
369 	struct worker_thread *worker = task->worker;
370 	uint32_t sw_crc32c;
371 
372 	assert(worker);
373 	assert(worker->current_queue_depth > 0);
374 
375 	if (g_verify && task->status == 0) {
376 		switch (g_workload_selection) {
377 		case ACCEL_CRC32C:
378 			/* calculate sw CRC-32C and compare to sw aceel result. */
379 			sw_crc32c = spdk_crc32c_update(task->src, g_xfer_size_bytes, ~g_crc32c_seed);
380 			if (*(uint32_t *)task->dst != sw_crc32c) {
381 				SPDK_NOTICELOG("CRC-32C miscompare\n");
382 				worker->xfer_failed++;
383 			}
384 			break;
385 		case ACCEL_COPY:
386 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
387 				SPDK_NOTICELOG("Data miscompare\n");
388 				worker->xfer_failed++;
389 			}
390 			break;
391 		case ACCEL_DUALCAST:
392 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
393 				SPDK_NOTICELOG("Data miscompare, first destination\n");
394 				worker->xfer_failed++;
395 			}
396 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
397 				SPDK_NOTICELOG("Data miscompare, second destination\n");
398 				worker->xfer_failed++;
399 			}
400 			break;
401 		case ACCEL_FILL:
402 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
403 				SPDK_NOTICELOG("Data miscompare\n");
404 				worker->xfer_failed++;
405 			}
406 			break;
407 		case ACCEL_COMPARE:
408 			break;
409 		default:
410 			assert(false);
411 			break;
412 		}
413 	}
414 
415 	if (task->expected_status == -EILSEQ) {
416 		assert(task->status != 0);
417 		worker->injected_miscompares++;
418 	} else if (task->status) {
419 		/* Expected to pass but API reported error. */
420 		worker->xfer_failed++;
421 	}
422 
423 	worker->xfer_completed++;
424 	worker->current_queue_depth--;
425 
426 	if (!worker->is_draining) {
427 		_submit_single(worker, task);
428 		worker->current_queue_depth++;
429 	}
430 }
431 
432 static int
433 dump_result(void)
434 {
435 	uint64_t total_completed = 0;
436 	uint64_t total_failed = 0;
437 	uint64_t total_miscompared = 0;
438 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
439 	struct worker_thread *worker = g_workers;
440 
441 	printf("\nCore           Transfers     Bandwidth     Failed     Miscompares\n");
442 	printf("-----------------------------------------------------------------\n");
443 	while (worker != NULL) {
444 
445 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
446 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
447 				       (g_time_in_sec * 1024 * 1024);
448 
449 		total_completed += worker->xfer_completed;
450 		total_failed += worker->xfer_failed;
451 		total_miscompared += worker->injected_miscompares;
452 
453 		if (xfer_per_sec) {
454 			printf("%10d%12" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 " %11" PRIu64 "\n",
455 			       worker->core, xfer_per_sec,
456 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
457 		}
458 
459 		worker = worker->next;
460 	}
461 
462 	total_xfer_per_sec = total_completed / g_time_in_sec;
463 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
464 			    (g_time_in_sec * 1024 * 1024);
465 
466 	printf("==================================================================\n");
467 	printf("Total:%16" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 " %11" PRIu64"\n\n",
468 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
469 
470 	return total_failed ? 1 : 0;
471 }
472 
473 static int
474 _check_draining(void *arg)
475 {
476 	struct worker_thread *worker = arg;
477 	struct ap_task *task;
478 
479 	assert(worker);
480 
481 	if (worker->current_queue_depth == 0) {
482 		while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
483 			TAILQ_REMOVE(&worker->tasks_pool, task, link);
484 			_free_task(task);
485 		}
486 		spdk_poller_unregister(&worker->is_draining_poller);
487 		unregister_worker(worker);
488 	}
489 
490 	return -1;
491 }
492 
493 static int
494 _worker_stop(void *arg)
495 {
496 	struct worker_thread *worker = arg;
497 
498 	assert(worker);
499 
500 	spdk_poller_unregister(&worker->stop_poller);
501 
502 	/* now let the worker drain and check it's outstanding IO with a poller */
503 	worker->is_draining = true;
504 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
505 
506 	return 0;
507 }
508 
509 static void
510 _init_thread_done(void *ctx)
511 {
512 }
513 
514 static void
515 _init_thread(void *arg1)
516 {
517 	struct worker_thread *worker;
518 	struct ap_task *task;
519 	int i, rc, max_per_batch, batch_count, num_tasks;
520 	int remaining = g_queue_depth;
521 	struct spdk_accel_batch *batch, *new_batch;
522 
523 	worker = calloc(1, sizeof(*worker));
524 	if (worker == NULL) {
525 		fprintf(stderr, "Unable to allocate worker\n");
526 		return;
527 	}
528 
529 	worker->core = spdk_env_get_current_core();
530 	worker->thread = spdk_get_thread();
531 	worker->next = g_workers;
532 	worker->ch = spdk_accel_engine_get_io_channel();
533 
534 	max_per_batch = spdk_accel_batch_get_max(worker->ch);
535 	assert(max_per_batch > 0);
536 	num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch);
537 
538 	TAILQ_INIT(&worker->tasks_pool);
539 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
540 	if (worker->task_base == NULL) {
541 		fprintf(stderr, "Could not allocate task base.\n");
542 		goto error;
543 	}
544 
545 	task = worker->task_base;
546 	for (i = 0; i < num_tasks; i++) {
547 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
548 		if (_get_task_data_bufs(task)) {
549 			fprintf(stderr, "Unable to get data bufs\n");
550 			goto error;
551 		}
552 		task++;
553 	}
554 
555 	/* Register a poller that will stop the worker at time elapsed */
556 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
557 			      g_time_in_sec * 1000000ULL);
558 
559 	g_workers = worker;
560 	pthread_mutex_lock(&g_workers_lock);
561 	g_num_workers++;
562 	pthread_mutex_unlock(&g_workers_lock);
563 
564 	/* Batching is only possible if there is at least 2 operations. */
565 	if (g_queue_depth > 1) {
566 
567 		/* Outter loop sets up each batch command, inner loop populates the
568 		 * batch descriptors.
569 		 */
570 		do {
571 			new_batch = spdk_accel_batch_create(worker->ch);
572 			if (new_batch == NULL) {
573 				break;
574 			}
575 
576 			batch = new_batch;
577 			batch_count = 0;
578 
579 			do {
580 				task = _get_task(worker);
581 				if (task == NULL) {
582 					goto error;
583 				}
584 
585 				rc = _batch_prep_cmd(worker, task, batch);
586 				if (rc) {
587 					fprintf(stderr, "error preping command\n");
588 					goto error;
589 				}
590 				remaining--;
591 				batch_count++;
592 			} while (batch_count < max_per_batch && remaining > 0);
593 
594 			/* Now send the batch command. */
595 			task = _get_task(worker);
596 			if (task == NULL) {
597 				goto error;
598 			}
599 
600 			rc = spdk_accel_batch_submit(worker->ch, batch, batch_done, task);
601 			if (rc) {
602 				fprintf(stderr, "error ending batch %d\n", rc);
603 				goto error;
604 			}
605 			/* We can't build a batch unless it has 2 descriptors (per spec). */
606 		} while (remaining > 1);
607 
608 		/* If there are no more left, we're done. */
609 		if (remaining == 0) {
610 			return;
611 		}
612 	}
613 
614 	/* For engines that don't support batch or for the odd event that
615 	 * a batch ends with only one descriptor left.
616 	 */
617 	for (i = 0; i < remaining; i++) {
618 
619 		task = _get_task(worker);
620 		if (task == NULL) {
621 			goto error;
622 		}
623 
624 		_submit_single(worker, task);
625 	}
626 	return;
627 error:
628 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
629 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
630 		_free_task(task);
631 	}
632 	free(worker->task_base);
633 	free(worker);
634 	spdk_app_stop(-1);
635 }
636 
637 static void
638 accel_done(void *cb_arg, int status)
639 {
640 	struct ap_task *task = (struct ap_task *)cb_arg;
641 	struct worker_thread *worker = task->worker;
642 
643 	assert(worker);
644 
645 	task->status = status;
646 	spdk_thread_send_msg(worker->thread, _accel_done, task);
647 }
648 
649 static void
650 accel_perf_start(void *arg1)
651 {
652 	struct spdk_io_channel *accel_ch;
653 
654 	accel_ch = spdk_accel_engine_get_io_channel();
655 	g_capabilites = spdk_accel_get_capabilities(accel_ch);
656 	spdk_put_io_channel(accel_ch);
657 
658 	if ((g_capabilites & g_workload_selection) != g_workload_selection) {
659 		SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n");
660 		SPDK_WARNLOG("The software engine will be used instead.\n\n");
661 	}
662 
663 	g_tsc_rate = spdk_get_ticks_hz();
664 	g_tsc_us_rate = g_tsc_rate / (1000 * 1000);
665 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
666 
667 	printf("Running for %d seconds...\n", g_time_in_sec);
668 	fflush(stdout);
669 
670 	spdk_for_each_thread(_init_thread, NULL, _init_thread_done);
671 }
672 
673 int
674 main(int argc, char **argv)
675 {
676 	struct spdk_app_opts opts = {};
677 	struct worker_thread *worker, *tmp;
678 	int rc = 0;
679 
680 	pthread_mutex_init(&g_workers_lock, NULL);
681 	spdk_app_opts_init(&opts, sizeof(opts));
682 	opts.reactor_mask = "0x1";
683 	if (spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:P:f:", NULL, parse_args,
684 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
685 		rc = -1;
686 		goto cleanup;
687 	}
688 
689 	if ((g_workload_selection != ACCEL_COPY) &&
690 	    (g_workload_selection != ACCEL_FILL) &&
691 	    (g_workload_selection != ACCEL_CRC32C) &&
692 	    (g_workload_selection != ACCEL_COMPARE) &&
693 	    (g_workload_selection != ACCEL_DUALCAST)) {
694 		usage();
695 		rc = -1;
696 		goto cleanup;
697 	}
698 
699 	dump_user_config(&opts);
700 	rc = spdk_app_start(&opts, accel_perf_start, NULL);
701 	if (rc) {
702 		SPDK_ERRLOG("ERROR starting application\n");
703 	} else {
704 		dump_result();
705 	}
706 
707 	pthread_mutex_destroy(&g_workers_lock);
708 
709 	worker = g_workers;
710 	while (worker) {
711 		tmp = worker->next;
712 		free(worker);
713 		worker = tmp;
714 	}
715 cleanup:
716 	spdk_app_fini();
717 	return rc;
718 }
719