xref: /spdk/examples/accel/perf/accel_perf.c (revision 13ca6e52d3c7d413a244c67cfbb419e55f3293da)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static uint64_t	g_tsc_rate;
48 static uint64_t g_tsc_end;
49 static int g_rc;
50 static int g_xfer_size_bytes = 4096;
51 static int g_queue_depth = 32;
52 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
53  * be at least as much as the queue depth.
54  */
55 static int g_allocate_depth = 0;
56 static int g_threads_per_core = 1;
57 static int g_time_in_sec = 5;
58 static uint32_t g_crc32c_seed = 0;
59 static uint32_t g_crc32c_chained_count = 1;
60 static int g_fail_percent_goal = 0;
61 static uint8_t g_fill_pattern = 255;
62 static bool g_verify = false;
63 static const char *g_workload_type = NULL;
64 static enum accel_opcode g_workload_selection;
65 static struct worker_thread *g_workers = NULL;
66 static int g_num_workers = 0;
67 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
68 
69 struct worker_thread;
70 static void accel_done(void *ref, int status);
71 
72 struct display_info {
73 	int core;
74 	int thread;
75 };
76 
77 struct ap_task {
78 	void			*src;
79 	struct iovec		*iovs;
80 	uint32_t		iov_cnt;
81 	void			*dst;
82 	void			*dst2;
83 	uint32_t		crc_dst;
84 	struct worker_thread	*worker;
85 	int			expected_status; /* used for the compare operation */
86 	TAILQ_ENTRY(ap_task)	link;
87 };
88 
89 struct worker_thread {
90 	struct spdk_io_channel		*ch;
91 	uint64_t			xfer_completed;
92 	uint64_t			xfer_failed;
93 	uint64_t			injected_miscompares;
94 	uint64_t			current_queue_depth;
95 	TAILQ_HEAD(, ap_task)		tasks_pool;
96 	struct worker_thread		*next;
97 	unsigned			core;
98 	struct spdk_thread		*thread;
99 	bool				is_draining;
100 	struct spdk_poller		*is_draining_poller;
101 	struct spdk_poller		*stop_poller;
102 	void				*task_base;
103 	struct display_info		display;
104 };
105 
106 static void
107 dump_user_config(struct spdk_app_opts *opts)
108 {
109 	printf("SPDK Configuration:\n");
110 	printf("Core mask:      %s\n\n", opts->reactor_mask);
111 	printf("Accel Perf Configuration:\n");
112 	printf("Workload Type:  %s\n", g_workload_type);
113 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
114 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
115 		printf("vector count    %u\n", g_crc32c_chained_count);
116 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
117 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
118 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
119 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
120 	}
121 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
122 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
123 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count);
124 	} else {
125 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
126 	}
127 	printf("Queue depth:    %u\n", g_queue_depth);
128 	printf("Allocate depth: %u\n", g_allocate_depth);
129 	printf("# threads/core: %u\n", g_threads_per_core);
130 	printf("Run time:       %u seconds\n", g_time_in_sec);
131 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
132 }
133 
134 static void
135 usage(void)
136 {
137 	printf("accel_perf options:\n");
138 	printf("\t[-h help message]\n");
139 	printf("\t[-q queue depth per core]\n");
140 	printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n");
141 	printf("\t[-T number of threads per core\n");
142 	printf("\t[-n number of channels]\n");
143 	printf("\t[-o transfer size in bytes]\n");
144 	printf("\t[-t time in seconds]\n");
145 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n");
146 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
147 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
148 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
149 	printf("\t[-y verify result if this switch is on]\n");
150 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
151 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
152 }
153 
154 static int
155 parse_args(int argc, char *argv)
156 {
157 	int argval = 0;
158 
159 	switch (argc) {
160 	case 'a':
161 	case 'C':
162 	case 'f':
163 	case 'T':
164 	case 'o':
165 	case 'P':
166 	case 'q':
167 	case 's':
168 	case 't':
169 		argval = spdk_strtol(optarg, 10);
170 		if (argval < 0) {
171 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
172 			usage();
173 			return 1;
174 		}
175 		break;
176 	default:
177 		break;
178 	};
179 
180 	switch (argc) {
181 	case 'a':
182 		g_allocate_depth = argval;
183 		break;
184 	case 'C':
185 		g_crc32c_chained_count = argval;
186 		break;
187 	case 'f':
188 		g_fill_pattern = (uint8_t)argval;
189 		break;
190 	case 'T':
191 		g_threads_per_core = argval;
192 		break;
193 	case 'o':
194 		g_xfer_size_bytes = argval;
195 		break;
196 	case 'P':
197 		g_fail_percent_goal = argval;
198 		break;
199 	case 'q':
200 		g_queue_depth = argval;
201 		break;
202 	case 's':
203 		g_crc32c_seed = argval;
204 		break;
205 	case 't':
206 		g_time_in_sec = argval;
207 		break;
208 	case 'y':
209 		g_verify = true;
210 		break;
211 	case 'w':
212 		g_workload_type = optarg;
213 		if (!strcmp(g_workload_type, "copy")) {
214 			g_workload_selection = ACCEL_OPC_COPY;
215 		} else if (!strcmp(g_workload_type, "fill")) {
216 			g_workload_selection = ACCEL_OPC_FILL;
217 		} else if (!strcmp(g_workload_type, "crc32c")) {
218 			g_workload_selection = ACCEL_OPC_CRC32C;
219 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
220 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
221 		} else if (!strcmp(g_workload_type, "compare")) {
222 			g_workload_selection = ACCEL_OPC_COMPARE;
223 		} else if (!strcmp(g_workload_type, "dualcast")) {
224 			g_workload_selection = ACCEL_OPC_DUALCAST;
225 		}
226 		break;
227 	default:
228 		usage();
229 		return 1;
230 	}
231 
232 	return 0;
233 }
234 
235 static int dump_result(void);
236 static void
237 unregister_worker(void *arg1)
238 {
239 	struct worker_thread *worker = arg1;
240 
241 	free(worker->task_base);
242 	spdk_put_io_channel(worker->ch);
243 	pthread_mutex_lock(&g_workers_lock);
244 	assert(g_num_workers >= 1);
245 	if (--g_num_workers == 0) {
246 		pthread_mutex_unlock(&g_workers_lock);
247 		g_rc = dump_result();
248 		spdk_app_stop(0);
249 	}
250 	pthread_mutex_unlock(&g_workers_lock);
251 }
252 
253 static int
254 _get_task_data_bufs(struct ap_task *task)
255 {
256 	uint32_t align = 0;
257 	uint32_t i = 0;
258 	int dst_buff_len = g_xfer_size_bytes;
259 
260 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
261 	 * we do this for all engines to keep it simple.
262 	 */
263 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
264 		align = ALIGN_4K;
265 	}
266 
267 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
268 		assert(g_crc32c_chained_count > 0);
269 		task->iov_cnt = g_crc32c_chained_count;
270 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
271 		if (!task->iovs) {
272 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
273 			return -ENOMEM;
274 		}
275 
276 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
277 			dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count;
278 		}
279 
280 		for (i = 0; i < task->iov_cnt; i++) {
281 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
282 			if (task->iovs[i].iov_base == NULL) {
283 				return -ENOMEM;
284 			}
285 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
286 			task->iovs[i].iov_len = g_xfer_size_bytes;
287 		}
288 
289 	} else {
290 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
291 		if (task->src == NULL) {
292 			fprintf(stderr, "Unable to alloc src buffer\n");
293 			return -ENOMEM;
294 		}
295 
296 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
297 		if (g_workload_selection == ACCEL_OPC_FILL) {
298 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
299 		} else {
300 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
301 		}
302 	}
303 
304 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
305 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
306 		if (task->dst == NULL) {
307 			fprintf(stderr, "Unable to alloc dst buffer\n");
308 			return -ENOMEM;
309 		}
310 
311 		/* For compare we want the buffers to match, otherwise not. */
312 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
313 			memset(task->dst, DATA_PATTERN, dst_buff_len);
314 		} else {
315 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
316 		}
317 	}
318 
319 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
320 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
321 		if (task->dst2 == NULL) {
322 			fprintf(stderr, "Unable to alloc dst buffer\n");
323 			return -ENOMEM;
324 		}
325 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
326 	}
327 
328 	return 0;
329 }
330 
331 inline static struct ap_task *
332 _get_task(struct worker_thread *worker)
333 {
334 	struct ap_task *task;
335 
336 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
337 		task = TAILQ_FIRST(&worker->tasks_pool);
338 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
339 	} else {
340 		fprintf(stderr, "Unable to get ap_task\n");
341 		return NULL;
342 	}
343 
344 	return task;
345 }
346 
347 /* Submit one operation using the same ap task that just completed. */
348 static void
349 _submit_single(struct worker_thread *worker, struct ap_task *task)
350 {
351 	int random_num;
352 	int rc = 0;
353 	int flags = 0;
354 
355 	assert(worker);
356 
357 	switch (g_workload_selection) {
358 	case ACCEL_OPC_COPY:
359 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
360 					    g_xfer_size_bytes, flags, accel_done, task);
361 		break;
362 	case ACCEL_OPC_FILL:
363 		/* For fill use the first byte of the task->dst buffer */
364 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
365 					    g_xfer_size_bytes, flags, accel_done, task);
366 		break;
367 	case ACCEL_OPC_CRC32C:
368 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
369 					       task->iovs, task->iov_cnt, g_crc32c_seed,
370 					       accel_done, task);
371 		break;
372 	case ACCEL_OPC_COPY_CRC32C:
373 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt,
374 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
375 		break;
376 	case ACCEL_OPC_COMPARE:
377 		random_num = rand() % 100;
378 		if (random_num < g_fail_percent_goal) {
379 			task->expected_status = -EILSEQ;
380 			*(uint8_t *)task->dst = ~DATA_PATTERN;
381 		} else {
382 			task->expected_status = 0;
383 			*(uint8_t *)task->dst = DATA_PATTERN;
384 		}
385 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
386 					       g_xfer_size_bytes, accel_done, task);
387 		break;
388 	case ACCEL_OPC_DUALCAST:
389 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
390 						task->src, g_xfer_size_bytes, flags, accel_done, task);
391 		break;
392 	default:
393 		assert(false);
394 		break;
395 
396 	}
397 
398 	if (rc) {
399 		accel_done(task, rc);
400 	}
401 }
402 
403 static void
404 _free_task_buffers(struct ap_task *task)
405 {
406 	uint32_t i;
407 
408 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
409 		if (task->iovs) {
410 			for (i = 0; i < task->iov_cnt; i++) {
411 				if (task->iovs[i].iov_base) {
412 					spdk_dma_free(task->iovs[i].iov_base);
413 				}
414 			}
415 			free(task->iovs);
416 		}
417 	} else {
418 		spdk_dma_free(task->src);
419 	}
420 
421 	spdk_dma_free(task->dst);
422 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
423 		spdk_dma_free(task->dst2);
424 	}
425 }
426 
427 static int
428 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt)
429 {
430 	uint32_t i;
431 	uint32_t ttl_len = 0;
432 	uint8_t *dst = (uint8_t *)_dst;
433 
434 	for (i = 0; i < iovcnt; i++) {
435 		if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) {
436 			return -1;
437 		}
438 		dst += src_iovs[i].iov_len;
439 		ttl_len += src_iovs[i].iov_len;
440 	}
441 
442 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
443 		return -1;
444 	}
445 
446 	return 0;
447 }
448 
449 static void
450 accel_done(void *arg1, int status)
451 {
452 	struct ap_task *task = arg1;
453 	struct worker_thread *worker = task->worker;
454 	uint32_t sw_crc32c;
455 
456 	assert(worker);
457 	assert(worker->current_queue_depth > 0);
458 
459 	if (g_verify && status == 0) {
460 		switch (g_workload_selection) {
461 		case ACCEL_OPC_COPY_CRC32C:
462 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
463 			if (task->crc_dst != sw_crc32c) {
464 				SPDK_NOTICELOG("CRC-32C miscompare\n");
465 				worker->xfer_failed++;
466 			}
467 			if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) {
468 				SPDK_NOTICELOG("Data miscompare\n");
469 				worker->xfer_failed++;
470 			}
471 			break;
472 		case ACCEL_OPC_CRC32C:
473 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
474 			if (task->crc_dst != sw_crc32c) {
475 				SPDK_NOTICELOG("CRC-32C miscompare\n");
476 				worker->xfer_failed++;
477 			}
478 			break;
479 		case ACCEL_OPC_COPY:
480 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
481 				SPDK_NOTICELOG("Data miscompare\n");
482 				worker->xfer_failed++;
483 			}
484 			break;
485 		case ACCEL_OPC_DUALCAST:
486 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
487 				SPDK_NOTICELOG("Data miscompare, first destination\n");
488 				worker->xfer_failed++;
489 			}
490 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
491 				SPDK_NOTICELOG("Data miscompare, second destination\n");
492 				worker->xfer_failed++;
493 			}
494 			break;
495 		case ACCEL_OPC_FILL:
496 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
497 				SPDK_NOTICELOG("Data miscompare\n");
498 				worker->xfer_failed++;
499 			}
500 			break;
501 		case ACCEL_OPC_COMPARE:
502 			break;
503 		default:
504 			assert(false);
505 			break;
506 		}
507 	}
508 
509 	if (task->expected_status == -EILSEQ) {
510 		assert(status != 0);
511 		worker->injected_miscompares++;
512 	} else if (status) {
513 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
514 		worker->xfer_failed++;
515 	}
516 
517 	worker->xfer_completed++;
518 	worker->current_queue_depth--;
519 
520 	if (!worker->is_draining) {
521 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
522 		task = _get_task(worker);
523 		_submit_single(worker, task);
524 		worker->current_queue_depth++;
525 	} else {
526 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
527 	}
528 }
529 
530 static int
531 dump_result(void)
532 {
533 	uint64_t total_completed = 0;
534 	uint64_t total_failed = 0;
535 	uint64_t total_miscompared = 0;
536 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
537 	struct worker_thread *worker = g_workers;
538 
539 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
540 	printf("------------------------------------------------------------------------\n");
541 	while (worker != NULL) {
542 
543 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
544 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
545 				       (g_time_in_sec * 1024 * 1024);
546 
547 		total_completed += worker->xfer_completed;
548 		total_failed += worker->xfer_failed;
549 		total_miscompared += worker->injected_miscompares;
550 
551 		if (xfer_per_sec) {
552 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
553 			       worker->display.core, worker->display.thread, xfer_per_sec,
554 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
555 		}
556 
557 		worker = worker->next;
558 	}
559 
560 	total_xfer_per_sec = total_completed / g_time_in_sec;
561 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
562 			    (g_time_in_sec * 1024 * 1024);
563 
564 	printf("=========================================================================\n");
565 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
566 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
567 
568 	return total_failed ? 1 : 0;
569 }
570 
571 static inline void
572 _free_task_buffers_in_pool(struct worker_thread *worker)
573 {
574 	struct ap_task *task;
575 
576 	assert(worker);
577 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
578 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
579 		_free_task_buffers(task);
580 	}
581 }
582 
583 static int
584 _check_draining(void *arg)
585 {
586 	struct worker_thread *worker = arg;
587 
588 	assert(worker);
589 
590 	if (worker->current_queue_depth == 0) {
591 		_free_task_buffers_in_pool(worker);
592 		spdk_poller_unregister(&worker->is_draining_poller);
593 		unregister_worker(worker);
594 	}
595 
596 	return SPDK_POLLER_BUSY;
597 }
598 
599 static int
600 _worker_stop(void *arg)
601 {
602 	struct worker_thread *worker = arg;
603 
604 	assert(worker);
605 
606 	spdk_poller_unregister(&worker->stop_poller);
607 
608 	/* now let the worker drain and check it's outstanding IO with a poller */
609 	worker->is_draining = true;
610 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
611 
612 	return SPDK_POLLER_BUSY;
613 }
614 
615 static void
616 _init_thread(void *arg1)
617 {
618 	struct worker_thread *worker;
619 	struct ap_task *task;
620 	int i, num_tasks = g_allocate_depth;
621 	struct display_info *display = arg1;
622 
623 	worker = calloc(1, sizeof(*worker));
624 	if (worker == NULL) {
625 		fprintf(stderr, "Unable to allocate worker\n");
626 		free(display);
627 		return;
628 	}
629 
630 	worker->display.core = display->core;
631 	worker->display.thread = display->thread;
632 	free(display);
633 	worker->core = spdk_env_get_current_core();
634 	worker->thread = spdk_get_thread();
635 	pthread_mutex_lock(&g_workers_lock);
636 	g_num_workers++;
637 	worker->next = g_workers;
638 	g_workers = worker;
639 	pthread_mutex_unlock(&g_workers_lock);
640 	worker->ch = spdk_accel_engine_get_io_channel();
641 
642 	TAILQ_INIT(&worker->tasks_pool);
643 
644 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
645 	if (worker->task_base == NULL) {
646 		fprintf(stderr, "Could not allocate task base.\n");
647 		goto error;
648 	}
649 
650 	task = worker->task_base;
651 	for (i = 0; i < num_tasks; i++) {
652 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
653 		task->worker = worker;
654 		if (_get_task_data_bufs(task)) {
655 			fprintf(stderr, "Unable to get data bufs\n");
656 			goto error;
657 		}
658 		task++;
659 	}
660 
661 	/* Register a poller that will stop the worker at time elapsed */
662 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
663 			      g_time_in_sec * 1000000ULL);
664 
665 	/* Load up queue depth worth of operations. */
666 	for (i = 0; i < g_queue_depth; i++) {
667 		task = _get_task(worker);
668 		worker->current_queue_depth++;
669 		if (task == NULL) {
670 			goto error;
671 		}
672 
673 		_submit_single(worker, task);
674 	}
675 	return;
676 error:
677 
678 	_free_task_buffers_in_pool(worker);
679 	free(worker->task_base);
680 	free(worker);
681 	spdk_app_stop(-1);
682 }
683 
684 static void
685 accel_perf_start(void *arg1)
686 {
687 	struct spdk_cpuset tmp_cpumask = {};
688 	char thread_name[32];
689 	uint32_t i;
690 	int j;
691 	struct spdk_thread *thread;
692 	struct display_info *display;
693 
694 	g_tsc_rate = spdk_get_ticks_hz();
695 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
696 
697 	printf("Running for %d seconds...\n", g_time_in_sec);
698 	fflush(stdout);
699 
700 	/* Create worker threads for each core that was specified. */
701 	SPDK_ENV_FOREACH_CORE(i) {
702 		for (j = 0; j < g_threads_per_core; j++) {
703 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
704 			spdk_cpuset_zero(&tmp_cpumask);
705 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
706 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
707 			display = calloc(1, sizeof(*display));
708 			if (display == NULL) {
709 				fprintf(stderr, "Unable to allocate memory\n");
710 				spdk_app_stop(-1);
711 				return;
712 			}
713 			display->core = i;
714 			display->thread = j;
715 			spdk_thread_send_msg(thread, _init_thread, display);
716 		}
717 	}
718 }
719 
720 int
721 main(int argc, char **argv)
722 {
723 	struct spdk_app_opts opts = {};
724 	struct worker_thread *worker, *tmp;
725 
726 	pthread_mutex_init(&g_workers_lock, NULL);
727 	spdk_app_opts_init(&opts, sizeof(opts));
728 	opts.reactor_mask = "0x1";
729 	if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args,
730 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
731 		g_rc = -1;
732 		goto cleanup;
733 	}
734 
735 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
736 	    (g_workload_selection != ACCEL_OPC_FILL) &&
737 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
738 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
739 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
740 	    (g_workload_selection != ACCEL_OPC_DUALCAST)) {
741 		usage();
742 		g_rc = -1;
743 		goto cleanup;
744 	}
745 
746 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
747 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
748 		usage();
749 		g_rc = -1;
750 		goto cleanup;
751 	}
752 
753 	if (g_allocate_depth == 0) {
754 		g_allocate_depth = g_queue_depth;
755 	}
756 
757 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
758 	    g_crc32c_chained_count == 0) {
759 		usage();
760 		g_rc = -1;
761 		goto cleanup;
762 	}
763 
764 	dump_user_config(&opts);
765 	g_rc = spdk_app_start(&opts, accel_perf_start, NULL);
766 	if (g_rc) {
767 		SPDK_ERRLOG("ERROR starting application\n");
768 	}
769 
770 	pthread_mutex_destroy(&g_workers_lock);
771 
772 	worker = g_workers;
773 	while (worker) {
774 		tmp = worker->next;
775 		free(worker);
776 		worker = tmp;
777 	}
778 cleanup:
779 	spdk_app_fini();
780 	return g_rc;
781 }
782