xref: /spdk/examples/accel/perf/accel_perf.c (revision 2172c432cfdaecc5a279d64e37c6b51e794683c1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static uint64_t	g_tsc_rate;
48 static uint64_t g_tsc_us_rate;
49 static uint64_t g_tsc_end;
50 static int g_xfer_size_bytes = 4096;
51 static int g_queue_depth = 32;
52 static int g_time_in_sec = 5;
53 static uint32_t g_crc32c_seed = 0;
54 static int g_fail_percent_goal = 0;
55 static uint8_t g_fill_pattern = 255;
56 static bool g_verify = false;
57 static const char *g_workload_type = NULL;
58 static enum accel_capability g_workload_selection;
59 static struct worker_thread *g_workers = NULL;
60 static int g_num_workers = 0;
61 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
62 uint64_t g_capabilites;
63 struct ap_task;
64 
65 struct worker_thread {
66 	struct spdk_io_channel		*ch;
67 	uint64_t			xfer_completed;
68 	uint64_t			xfer_failed;
69 	uint64_t			injected_miscompares;
70 	uint64_t			current_queue_depth;
71 	TAILQ_HEAD(, ap_task)		tasks;
72 	struct worker_thread		*next;
73 	unsigned			core;
74 	struct spdk_thread		*thread;
75 	bool				is_draining;
76 	struct spdk_poller		*is_draining_poller;
77 	struct spdk_poller		*stop_poller;
78 };
79 
80 struct ap_task {
81 	void			*src;
82 	void			*dst;
83 	void			*dst2;
84 	struct worker_thread	*worker;
85 	int			status;
86 	int			expected_status; /* used for compare */
87 	TAILQ_ENTRY(ap_task)	link;
88 };
89 
90 static void
91 dump_user_config(struct spdk_app_opts *opts)
92 {
93 	printf("SPDK Configuration:\n");
94 	printf("Core mask:      %s\n\n", opts->reactor_mask);
95 	printf("Accel Perf Configuration:\n");
96 	printf("Workload Type:  %s\n", g_workload_type);
97 	if (g_workload_selection == ACCEL_CRC32C) {
98 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
99 	} else if (g_workload_selection == ACCEL_FILL) {
100 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
101 	} else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) {
102 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
103 	}
104 	printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
105 	printf("Queue depth:    %u\n", g_queue_depth);
106 	printf("Run time:       %u seconds\n", g_time_in_sec);
107 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
108 }
109 
110 static void
111 usage(void)
112 {
113 	printf("accel_perf options:\n");
114 	printf("\t[-h help message]\n");
115 	printf("\t[-q queue depth]\n");
116 	printf("\t[-n number of channels]\n");
117 	printf("\t[-o transfer size in bytes]\n");
118 	printf("\t[-t time in seconds]\n");
119 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, compare, dualcast\n");
120 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
121 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
122 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
123 	printf("\t[-y verify result if this switch is on]\n");
124 }
125 
126 static int
127 parse_args(int argc, char *argv)
128 {
129 	switch (argc) {
130 	case 'f':
131 		g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10);
132 		break;
133 	case 'o':
134 		g_xfer_size_bytes = spdk_strtol(optarg, 10);
135 		break;
136 	case 'P':
137 		g_fail_percent_goal = spdk_strtol(optarg, 10);
138 		break;
139 	case 'q':
140 		g_queue_depth = spdk_strtol(optarg, 10);
141 		break;
142 	case 's':
143 		g_crc32c_seed = spdk_strtol(optarg, 10);
144 		break;
145 	case 't':
146 		g_time_in_sec = spdk_strtol(optarg, 10);
147 		break;
148 	case 'y':
149 		g_verify = true;
150 		break;
151 	case 'w':
152 		g_workload_type = optarg;
153 		if (!strcmp(g_workload_type, "copy")) {
154 			g_workload_selection = ACCEL_COPY;
155 		} else if (!strcmp(g_workload_type, "fill")) {
156 			g_workload_selection = ACCEL_FILL;
157 		} else if (!strcmp(g_workload_type, "crc32c")) {
158 			g_workload_selection = ACCEL_CRC32C;
159 		} else if (!strcmp(g_workload_type, "compare")) {
160 			g_workload_selection = ACCEL_COMPARE;
161 		} else if (!strcmp(g_workload_type, "dualcast")) {
162 			g_workload_selection = ACCEL_DUALCAST;
163 		}
164 		break;
165 	default:
166 		usage();
167 		return 1;
168 	}
169 	return 0;
170 }
171 
172 static void
173 unregister_worker(void *arg1)
174 {
175 	struct worker_thread *worker = arg1;
176 	struct ap_task *task;
177 
178 	while (!TAILQ_EMPTY(&worker->tasks)) {
179 		task = TAILQ_FIRST(&worker->tasks);
180 		TAILQ_REMOVE(&worker->tasks, task, link);
181 		free(task);
182 	}
183 	spdk_put_io_channel(worker->ch);
184 	pthread_mutex_lock(&g_workers_lock);
185 	assert(g_num_workers >= 1);
186 	if (--g_num_workers == 0) {
187 		pthread_mutex_unlock(&g_workers_lock);
188 		spdk_app_stop(0);
189 	}
190 	pthread_mutex_unlock(&g_workers_lock);
191 }
192 
193 static void accel_done(void *ref, int status);
194 
195 static void
196 _submit_single(void *arg1, void *arg2)
197 {
198 	struct worker_thread *worker = arg1;
199 	struct ap_task *task = arg2;
200 	int random_num;
201 	int rc = 0;
202 
203 	assert(worker);
204 
205 	task->worker = worker;
206 	task->worker->current_queue_depth++;
207 	switch (g_workload_selection) {
208 	case ACCEL_COPY:
209 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
210 					    g_xfer_size_bytes, accel_done, task);
211 		break;
212 	case ACCEL_FILL:
213 		/* For fill use the first byte of the task->dst buffer */
214 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
215 					    g_xfer_size_bytes, accel_done, task);
216 		break;
217 	case ACCEL_CRC32C:
218 		rc = spdk_accel_submit_crc32c(worker->ch, (uint32_t *)task->dst,
219 					      task->src, g_crc32c_seed,
220 					      g_xfer_size_bytes, accel_done, task);
221 		break;
222 	case ACCEL_COMPARE:
223 		random_num = rand() % 100;
224 		if (random_num < g_fail_percent_goal) {
225 			task->expected_status = -EILSEQ;
226 			*(uint8_t *)task->dst = ~DATA_PATTERN;
227 		} else {
228 			task->expected_status = 0;
229 			*(uint8_t *)task->dst = DATA_PATTERN;
230 		}
231 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
232 					       g_xfer_size_bytes, accel_done, task);
233 		break;
234 	case ACCEL_DUALCAST:
235 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
236 						task->src, g_xfer_size_bytes, accel_done, task);
237 		break;
238 	default:
239 		assert(false);
240 		break;
241 
242 	}
243 
244 	if (rc) {
245 		accel_done(task, rc);
246 	}
247 }
248 
249 static void
250 _accel_done(void *arg1)
251 {
252 	struct ap_task *task = arg1;
253 	struct worker_thread *worker = task->worker;
254 	uint32_t sw_crc32c;
255 
256 	assert(worker);
257 	assert(worker->current_queue_depth > 0);
258 
259 	if (g_verify && task->status == 0) {
260 		switch (g_workload_selection) {
261 		case ACCEL_CRC32C:
262 			/* calculate sw CRC-32C and compare to sw aceel result. */
263 			sw_crc32c = spdk_crc32c_update(task->src, g_xfer_size_bytes, ~g_crc32c_seed);
264 			if (*(uint32_t *)task->dst != sw_crc32c) {
265 				SPDK_NOTICELOG("CRC-32C miscompare\n");
266 				worker->xfer_failed++;
267 			}
268 			break;
269 		case ACCEL_COPY:
270 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
271 				SPDK_NOTICELOG("Data miscompare\n");
272 				worker->xfer_failed++;
273 			}
274 			break;
275 		case ACCEL_DUALCAST:
276 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
277 				SPDK_NOTICELOG("Data miscompare, first destination\n");
278 				worker->xfer_failed++;
279 			}
280 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
281 				SPDK_NOTICELOG("Data miscompare, second destination\n");
282 				worker->xfer_failed++;
283 			}
284 			break;
285 		case ACCEL_FILL:
286 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
287 				SPDK_NOTICELOG("Data miscompare\n");
288 				worker->xfer_failed++;
289 			}
290 			break;
291 		case ACCEL_COMPARE:
292 			break;
293 		default:
294 			assert(false);
295 			break;
296 		}
297 	}
298 
299 	if (task->expected_status == -EILSEQ) {
300 		assert(task->status != 0);
301 		worker->injected_miscompares++;
302 	} else if (task->status) {
303 		/* Expected to pass but API reported error. */
304 		worker->xfer_failed++;
305 	}
306 
307 	worker->xfer_completed++;
308 	worker->current_queue_depth--;
309 
310 	if (!worker->is_draining) {
311 		_submit_single(worker, task);
312 	} else {
313 		spdk_free(task->src);
314 		spdk_free(task->dst);
315 		if (g_workload_selection == ACCEL_DUALCAST) {
316 			spdk_free(task->dst2);
317 		}
318 		TAILQ_INSERT_TAIL(&worker->tasks, task, link);
319 	}
320 }
321 
322 static void
323 batch_done(void *cb_arg, int status)
324 {
325 	struct ap_task *task = (struct ap_task *)cb_arg;
326 	struct worker_thread *worker = task->worker;
327 
328 	worker->current_queue_depth--;
329 	TAILQ_INSERT_TAIL(&worker->tasks, task, link);
330 }
331 
332 static int
333 dump_result(void)
334 {
335 	uint64_t total_completed = 0;
336 	uint64_t total_failed = 0;
337 	uint64_t total_miscompared = 0;
338 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
339 	struct worker_thread *worker = g_workers;
340 
341 	printf("\nCore           Transfers     Bandwidth     Failed     Miscompares\n");
342 	printf("-----------------------------------------------------------------\n");
343 	while (worker != NULL) {
344 
345 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
346 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
347 				       (g_time_in_sec * 1024 * 1024);
348 
349 		total_completed += worker->xfer_completed;
350 		total_failed += worker->xfer_failed;
351 		total_miscompared += worker->injected_miscompares;
352 
353 		if (xfer_per_sec) {
354 			printf("%10d%12" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 " %11" PRIu64 "\n",
355 			       worker->core, xfer_per_sec,
356 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
357 		}
358 
359 		worker = worker->next;
360 	}
361 
362 	total_xfer_per_sec = total_completed / g_time_in_sec;
363 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
364 			    (g_time_in_sec * 1024 * 1024);
365 
366 	printf("==================================================================\n");
367 	printf("Total:%16" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 " %11" PRIu64"\n\n",
368 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
369 
370 	return total_failed ? 1 : 0;
371 }
372 
373 static int
374 _check_draining(void *arg)
375 {
376 	struct worker_thread *worker = arg;
377 
378 	assert(worker);
379 
380 	if (worker->current_queue_depth == 0) {
381 		spdk_poller_unregister(&worker->is_draining_poller);
382 		unregister_worker(worker);
383 	}
384 
385 	return -1;
386 }
387 
388 static int
389 _worker_stop(void *arg)
390 {
391 	struct worker_thread *worker = arg;
392 
393 	assert(worker);
394 
395 	spdk_poller_unregister(&worker->stop_poller);
396 
397 	/* now let the worker drain and check it's outstanding IO with a poller */
398 	worker->is_draining = true;
399 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
400 
401 	return 0;
402 }
403 
404 static void
405 _init_thread_done(void *ctx)
406 {
407 }
408 
409 static int
410 _get_task_data_bufs(struct ap_task *task)
411 {
412 	uint32_t align = 0;
413 
414 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
415 	 * we do this for all engines to keep it simple.
416 	 */
417 	if (g_workload_selection == ACCEL_DUALCAST) {
418 		align = ALIGN_4K;
419 	}
420 
421 	task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
422 	if (task->src == NULL) {
423 		fprintf(stderr, "Unable to alloc src buffer\n");
424 		return -ENOMEM;
425 	}
426 	memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
427 
428 	task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
429 	if (task->dst == NULL) {
430 		fprintf(stderr, "Unable to alloc dst buffer\n");
431 		return -ENOMEM;
432 	}
433 
434 	/* For compare we want the buffers to match, otherwise not. */
435 	if (g_workload_selection == ACCEL_COMPARE) {
436 		memset(task->dst, DATA_PATTERN, g_xfer_size_bytes);
437 	} else {
438 		memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes);
439 	}
440 
441 	/* For fill, set the entire src buffer so we can check if verify is enabled. */
442 	if (g_workload_selection == ACCEL_FILL) {
443 		memset(task->src, g_fill_pattern, g_xfer_size_bytes);
444 	}
445 
446 	if (g_workload_selection == ACCEL_DUALCAST) {
447 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
448 		if (task->dst2 == NULL) {
449 			fprintf(stderr, "Unable to alloc dst buffer\n");
450 			return -ENOMEM;
451 		}
452 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
453 	}
454 
455 	return 0;
456 }
457 
458 static int
459 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, struct spdk_accel_batch *batch)
460 {
461 	int rc = 0;
462 
463 	switch (g_workload_selection) {
464 	case ACCEL_COPY:
465 		rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst,
466 						task->src, g_xfer_size_bytes, accel_done, task);
467 		break;
468 	case ACCEL_DUALCAST:
469 		rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2,
470 						    task->src, g_xfer_size_bytes, accel_done, task);
471 		break;
472 	case ACCEL_COMPARE:
473 		rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src,
474 						   g_xfer_size_bytes, accel_done, task);
475 		break;
476 	case ACCEL_FILL:
477 		rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst,
478 						*(uint8_t *)task->src,
479 						g_xfer_size_bytes, accel_done, task);
480 		break;
481 	case ACCEL_CRC32C:
482 		rc = spdk_accel_batch_prep_crc32c(worker->ch, batch, (uint32_t *)task->dst,
483 						  task->src, g_crc32c_seed, g_xfer_size_bytes, accel_done, task);
484 		break;
485 	default:
486 		assert(false);
487 		break;
488 	}
489 
490 	return rc;
491 }
492 
493 static void
494 _init_thread(void *arg1)
495 {
496 	struct worker_thread *worker;
497 	struct ap_task *task;
498 	int i, rc, max_per_batch, batch_count, num_tasks;
499 	int remaining = g_queue_depth;
500 	struct spdk_accel_batch *batch, *new_batch;
501 
502 	worker = calloc(1, sizeof(*worker));
503 	if (worker == NULL) {
504 		fprintf(stderr, "Unable to allocate worker\n");
505 		return;
506 	}
507 
508 	worker->core = spdk_env_get_current_core();
509 	worker->thread = spdk_get_thread();
510 	worker->next = g_workers;
511 	worker->ch = spdk_accel_engine_get_io_channel();
512 
513 	max_per_batch = spdk_accel_batch_get_max(worker->ch);
514 	assert(max_per_batch > 0);
515 	num_tasks = g_queue_depth + spdk_divide_round_up(g_queue_depth, max_per_batch);
516 
517 	TAILQ_INIT(&worker->tasks);
518 	for (i = 0; i < num_tasks; i++) {
519 		task = calloc(1, sizeof(struct ap_task));
520 		if (task == NULL) {
521 			fprintf(stderr, "Could not allocate task.\n");
522 			return;
523 			/* TODO cleanup */
524 		}
525 		TAILQ_INSERT_TAIL(&worker->tasks, task, link);
526 	}
527 
528 	/* Register a poller that will stop the worker at time elapsed */
529 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
530 			      g_time_in_sec * 1000000ULL);
531 
532 	g_workers = worker;
533 	pthread_mutex_lock(&g_workers_lock);
534 	g_num_workers++;
535 	pthread_mutex_unlock(&g_workers_lock);
536 
537 	/* Batching is only possible if there is at least 2 operations. */
538 	if (g_queue_depth > 1) {
539 
540 		/* Outter loop sets up each batch command, inner loop populates the
541 		 * batch descriptors.
542 		 */
543 		do {
544 			new_batch = spdk_accel_batch_create(worker->ch);
545 			if (new_batch == NULL) {
546 				break;
547 			}
548 
549 			batch = new_batch;
550 			batch_count = 0;
551 
552 			do {
553 				if (!TAILQ_EMPTY(&worker->tasks)) {
554 					task = TAILQ_FIRST(&worker->tasks);
555 					TAILQ_REMOVE(&worker->tasks, task, link);
556 				} else {
557 					fprintf(stderr, "Unable to get accel_task\n");
558 					goto error;
559 				}
560 				task->worker = worker;
561 				task->worker->current_queue_depth++;
562 
563 				if (_get_task_data_bufs(task)) {
564 					fprintf(stderr, "Unable to get data bufs\n");
565 					goto error;
566 				}
567 
568 				rc = _batch_prep_cmd(worker, task, batch);
569 				if (rc) {
570 					fprintf(stderr, "error preping command\n");
571 					goto error;
572 				}
573 				remaining--;
574 				batch_count++;
575 			} while (batch_count < max_per_batch && remaining > 0);
576 
577 			/* Now send the batch command. */
578 			if (!TAILQ_EMPTY(&worker->tasks)) {
579 				task = TAILQ_FIRST(&worker->tasks);
580 				TAILQ_REMOVE(&worker->tasks, task, link);
581 			} else {
582 				fprintf(stderr, "Unable to get accel_task\n");
583 				goto error;
584 			}
585 			task->worker = worker;
586 			task->worker->current_queue_depth++;
587 
588 			rc = spdk_accel_batch_submit(worker->ch, batch, batch_done, task);
589 			if (rc) {
590 				fprintf(stderr, "error ending batch %d\n", rc);
591 				goto error;
592 			}
593 			/* We can't build a batch unless it has 2 descriptors (per spec). */
594 		} while (remaining > 1);
595 
596 		/* If there are no more left, we're done. */
597 		if (remaining == 0) {
598 			return;
599 		}
600 	}
601 
602 	/* For engines that don't support batch or for the odd event that
603 	 * a batch ends with only one descriptor left.
604 	 */
605 	for (i = 0; i < remaining; i++) {
606 
607 		if (!TAILQ_EMPTY(&worker->tasks)) {
608 			task = TAILQ_FIRST(&worker->tasks);
609 			TAILQ_REMOVE(&worker->tasks, task, link);
610 		} else {
611 			fprintf(stderr, "Unable to get accel_task\n");
612 			goto error;
613 		}
614 
615 		if (_get_task_data_bufs(task)) {
616 			fprintf(stderr, "Unable to get data bufs\n");
617 			goto error;
618 		}
619 
620 		_submit_single(worker, task);
621 	}
622 	return;
623 error:
624 	/* TODO clean exit */
625 	raise(SIGINT);
626 	while (!TAILQ_EMPTY(&worker->tasks)) {
627 		task = TAILQ_FIRST(&worker->tasks);
628 		TAILQ_REMOVE(&worker->tasks, task, link);
629 		free(task);
630 	}
631 	free(worker);
632 	spdk_app_stop(-1);
633 }
634 
635 static void
636 accel_done(void *cb_arg, int status)
637 {
638 	struct ap_task *task = (struct ap_task *)cb_arg;
639 	struct worker_thread *worker = task->worker;
640 
641 	assert(worker);
642 
643 	task->status = status;
644 	spdk_thread_send_msg(worker->thread, _accel_done, task);
645 }
646 
647 static void
648 accel_perf_start(void *arg1)
649 {
650 	struct spdk_io_channel *accel_ch;
651 
652 	accel_ch = spdk_accel_engine_get_io_channel();
653 	g_capabilites = spdk_accel_get_capabilities(accel_ch);
654 	spdk_put_io_channel(accel_ch);
655 
656 	if ((g_capabilites & g_workload_selection) != g_workload_selection) {
657 		SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n");
658 		SPDK_WARNLOG("The software engine will be used instead.\n\n");
659 	}
660 
661 	g_tsc_rate = spdk_get_ticks_hz();
662 	g_tsc_us_rate = g_tsc_rate / (1000 * 1000);
663 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
664 
665 	printf("Running for %d seconds...\n", g_time_in_sec);
666 	fflush(stdout);
667 
668 	spdk_for_each_thread(_init_thread, NULL, _init_thread_done);
669 }
670 
671 int
672 main(int argc, char **argv)
673 {
674 	struct spdk_app_opts opts = {};
675 	struct worker_thread *worker, *tmp;
676 	int rc = 0;
677 
678 	pthread_mutex_init(&g_workers_lock, NULL);
679 	spdk_app_opts_init(&opts);
680 	opts.reactor_mask = "0x1";
681 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:P:f:", NULL, parse_args,
682 				      usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) {
683 		rc = -1;
684 		goto cleanup;
685 	}
686 
687 	if ((g_workload_selection != ACCEL_COPY) &&
688 	    (g_workload_selection != ACCEL_FILL) &&
689 	    (g_workload_selection != ACCEL_CRC32C) &&
690 	    (g_workload_selection != ACCEL_COMPARE) &&
691 	    (g_workload_selection != ACCEL_DUALCAST)) {
692 		usage();
693 		rc = -1;
694 		goto cleanup;
695 	}
696 
697 	dump_user_config(&opts);
698 	rc = spdk_app_start(&opts, accel_perf_start, NULL);
699 	if (rc) {
700 		SPDK_ERRLOG("ERROR starting application\n");
701 	} else {
702 		dump_result();
703 	}
704 
705 	pthread_mutex_destroy(&g_workers_lock);
706 
707 	worker = g_workers;
708 	while (worker) {
709 		tmp = worker->next;
710 		free(worker);
711 		worker = tmp;
712 	}
713 cleanup:
714 	spdk_app_fini();
715 	return rc;
716 }
717