xref: /spdk/examples/accel/perf/accel_perf.c (revision 877573897ad52be4fa8989f7617bd655b87e05c4)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 #include "spdk/thread.h"
8 #include "spdk/env.h"
9 #include "spdk/event.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/accel.h"
13 #include "spdk/crc32.h"
14 #include "spdk/util.h"
15 
16 #define DATA_PATTERN 0x5a
17 #define ALIGN_4K 0x1000
18 
19 static uint64_t	g_tsc_rate;
20 static uint64_t g_tsc_end;
21 static int g_rc;
22 static int g_xfer_size_bytes = 4096;
23 static int g_queue_depth = 32;
24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
25  * be at least as much as the queue depth.
26  */
27 static int g_allocate_depth = 0;
28 static int g_threads_per_core = 1;
29 static int g_time_in_sec = 5;
30 static uint32_t g_crc32c_seed = 0;
31 static uint32_t g_chained_count = 1;
32 static int g_fail_percent_goal = 0;
33 static uint8_t g_fill_pattern = 255;
34 static bool g_verify = false;
35 static const char *g_workload_type = NULL;
36 static enum accel_opcode g_workload_selection;
37 static struct worker_thread *g_workers = NULL;
38 static int g_num_workers = 0;
39 static char *g_cd_file_in_name = NULL;
40 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
41 static struct spdk_app_opts g_opts = {};
42 
43 struct ap_compress_seg {
44 	void		*uncompressed_data;
45 	uint32_t	uncompressed_len;
46 	struct iovec	*uncompressed_iovs;
47 	uint32_t	uncompressed_iovcnt;
48 
49 	void		*compressed_data;
50 	uint32_t	compressed_len;
51 	struct iovec	*compressed_iovs;
52 	uint32_t	compressed_iovcnt;
53 
54 	STAILQ_ENTRY(ap_compress_seg)	link;
55 };
56 
57 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs);
58 
59 struct worker_thread;
60 static void accel_done(void *ref, int status);
61 
62 struct display_info {
63 	int core;
64 	int thread;
65 };
66 
67 struct ap_task {
68 	void			*src;
69 	struct iovec		*src_iovs;
70 	uint32_t		src_iovcnt;
71 	struct iovec		*dst_iovs;
72 	uint32_t		dst_iovcnt;
73 	void			*dst;
74 	void			*dst2;
75 	uint32_t		crc_dst;
76 	uint32_t		compressed_sz;
77 	struct ap_compress_seg *cur_seg;
78 	struct worker_thread	*worker;
79 	int			expected_status; /* used for the compare operation */
80 	TAILQ_ENTRY(ap_task)	link;
81 };
82 
83 struct worker_thread {
84 	struct spdk_io_channel		*ch;
85 	uint64_t			xfer_completed;
86 	uint64_t			xfer_failed;
87 	uint64_t			injected_miscompares;
88 	uint64_t			current_queue_depth;
89 	TAILQ_HEAD(, ap_task)		tasks_pool;
90 	struct worker_thread		*next;
91 	unsigned			core;
92 	struct spdk_thread		*thread;
93 	bool				is_draining;
94 	struct spdk_poller		*is_draining_poller;
95 	struct spdk_poller		*stop_poller;
96 	void				*task_base;
97 	struct display_info		display;
98 	enum accel_opcode		workload;
99 };
100 
101 static void
102 dump_user_config(void)
103 {
104 	const char *module_name = NULL;
105 	int rc;
106 
107 	rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name);
108 	if (rc) {
109 		printf("error getting module name (%d)\n", rc);
110 	}
111 
112 	printf("\nSPDK Configuration:\n");
113 	printf("Core mask:      %s\n\n", g_opts.reactor_mask);
114 	printf("Accel Perf Configuration:\n");
115 	printf("Workload Type:  %s\n", g_workload_type);
116 	if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
117 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
118 	} else if (g_workload_selection == ACCEL_OPC_FILL) {
119 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
120 	} else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) {
121 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
122 	}
123 	if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
124 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
125 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_chained_count);
126 	} else {
127 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
128 	}
129 	printf("vector count    %u\n", g_chained_count);
130 	printf("Module:         %s\n", module_name);
131 	if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
132 		printf("File Name:      %s\n", g_cd_file_in_name);
133 	}
134 	printf("Queue depth:    %u\n", g_queue_depth);
135 	printf("Allocate depth: %u\n", g_allocate_depth);
136 	printf("# threads/core: %u\n", g_threads_per_core);
137 	printf("Run time:       %u seconds\n", g_time_in_sec);
138 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
139 }
140 
141 static void
142 usage(void)
143 {
144 	printf("accel_perf options:\n");
145 	printf("\t[-h help message]\n");
146 	printf("\t[-q queue depth per core]\n");
147 	printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n");
148 	printf("\t[-T number of threads per core\n");
149 	printf("\t[-n number of channels]\n");
150 	printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n");
151 	printf("\t[-t time in seconds]\n");
152 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast\n");
153 	printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n");
154 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
155 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
156 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
157 	printf("\t[-y verify result if this switch is on]\n");
158 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
159 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
160 }
161 
162 static int
163 parse_args(int argc, char *argv)
164 {
165 	int argval = 0;
166 
167 	switch (argc) {
168 	case 'a':
169 	case 'C':
170 	case 'f':
171 	case 'T':
172 	case 'o':
173 	case 'P':
174 	case 'q':
175 	case 's':
176 	case 't':
177 		argval = spdk_strtol(optarg, 10);
178 		if (argval < 0) {
179 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
180 			usage();
181 			return 1;
182 		}
183 		break;
184 	default:
185 		break;
186 	};
187 
188 	switch (argc) {
189 	case 'a':
190 		g_allocate_depth = argval;
191 		break;
192 	case 'C':
193 		g_chained_count = argval;
194 		break;
195 	case 'l':
196 		g_cd_file_in_name = optarg;
197 		break;
198 	case 'f':
199 		g_fill_pattern = (uint8_t)argval;
200 		break;
201 	case 'T':
202 		g_threads_per_core = argval;
203 		break;
204 	case 'o':
205 		g_xfer_size_bytes = argval;
206 		break;
207 	case 'P':
208 		g_fail_percent_goal = argval;
209 		break;
210 	case 'q':
211 		g_queue_depth = argval;
212 		break;
213 	case 's':
214 		g_crc32c_seed = argval;
215 		break;
216 	case 't':
217 		g_time_in_sec = argval;
218 		break;
219 	case 'y':
220 		g_verify = true;
221 		break;
222 	case 'w':
223 		g_workload_type = optarg;
224 		if (!strcmp(g_workload_type, "copy")) {
225 			g_workload_selection = ACCEL_OPC_COPY;
226 		} else if (!strcmp(g_workload_type, "fill")) {
227 			g_workload_selection = ACCEL_OPC_FILL;
228 		} else if (!strcmp(g_workload_type, "crc32c")) {
229 			g_workload_selection = ACCEL_OPC_CRC32C;
230 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
231 			g_workload_selection = ACCEL_OPC_COPY_CRC32C;
232 		} else if (!strcmp(g_workload_type, "compare")) {
233 			g_workload_selection = ACCEL_OPC_COMPARE;
234 		} else if (!strcmp(g_workload_type, "dualcast")) {
235 			g_workload_selection = ACCEL_OPC_DUALCAST;
236 		} else if (!strcmp(g_workload_type, "compress")) {
237 			g_workload_selection = ACCEL_OPC_COMPRESS;
238 		} else if (!strcmp(g_workload_type, "decompress")) {
239 			g_workload_selection = ACCEL_OPC_DECOMPRESS;
240 		} else {
241 			usage();
242 			return 1;
243 		}
244 		break;
245 	default:
246 		usage();
247 		return 1;
248 	}
249 
250 	return 0;
251 }
252 
253 static int dump_result(void);
254 static void
255 unregister_worker(void *arg1)
256 {
257 	struct worker_thread *worker = arg1;
258 
259 	free(worker->task_base);
260 	spdk_put_io_channel(worker->ch);
261 	spdk_thread_exit(spdk_get_thread());
262 	pthread_mutex_lock(&g_workers_lock);
263 	assert(g_num_workers >= 1);
264 	if (--g_num_workers == 0) {
265 		pthread_mutex_unlock(&g_workers_lock);
266 		g_rc = dump_result();
267 		spdk_app_stop(0);
268 	} else {
269 		pthread_mutex_unlock(&g_workers_lock);
270 	}
271 }
272 
273 static void
274 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt)
275 {
276 	uint64_t ele_size;
277 	uint8_t *data;
278 	uint32_t i;
279 
280 	ele_size = spdk_divide_round_up(sz, iovcnt);
281 
282 	data = buf;
283 	for (i = 0; i < iovcnt; i++) {
284 		ele_size = spdk_min(ele_size, sz);
285 		assert(ele_size > 0);
286 
287 		iovs[i].iov_base = data;
288 		iovs[i].iov_len = ele_size;
289 
290 		data += ele_size;
291 		sz -= ele_size;
292 	}
293 	assert(sz == 0);
294 }
295 
296 static int
297 _get_task_data_bufs(struct ap_task *task)
298 {
299 	uint32_t align = 0;
300 	uint32_t i = 0;
301 	int dst_buff_len = g_xfer_size_bytes;
302 
303 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
304 	 * we do this for all modules to keep it simple.
305 	 */
306 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
307 		align = ALIGN_4K;
308 	}
309 
310 	if (g_workload_selection == ACCEL_OPC_COMPRESS ||
311 	    g_workload_selection == ACCEL_OPC_DECOMPRESS) {
312 		task->cur_seg = STAILQ_FIRST(&g_compress_segs);
313 	} else if (g_workload_selection == ACCEL_OPC_CRC32C ||
314 		   g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
315 		assert(g_chained_count > 0);
316 		task->src_iovcnt = g_chained_count;
317 		task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec));
318 		if (!task->src_iovs) {
319 			fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task);
320 			return -ENOMEM;
321 		}
322 
323 		if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
324 			dst_buff_len = g_xfer_size_bytes * g_chained_count;
325 		}
326 
327 		for (i = 0; i < task->src_iovcnt; i++) {
328 			task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
329 			if (task->src_iovs[i].iov_base == NULL) {
330 				return -ENOMEM;
331 			}
332 			memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
333 			task->src_iovs[i].iov_len = g_xfer_size_bytes;
334 		}
335 
336 	} else {
337 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
338 		if (task->src == NULL) {
339 			fprintf(stderr, "Unable to alloc src buffer\n");
340 			return -ENOMEM;
341 		}
342 
343 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
344 		if (g_workload_selection == ACCEL_OPC_FILL) {
345 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
346 		} else {
347 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
348 		}
349 	}
350 
351 	if (g_workload_selection != ACCEL_OPC_CRC32C) {
352 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
353 		if (task->dst == NULL) {
354 			fprintf(stderr, "Unable to alloc dst buffer\n");
355 			return -ENOMEM;
356 		}
357 
358 		/* For compare we want the buffers to match, otherwise not. */
359 		if (g_workload_selection == ACCEL_OPC_COMPARE) {
360 			memset(task->dst, DATA_PATTERN, dst_buff_len);
361 		} else {
362 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
363 		}
364 
365 		if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
366 			task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec));
367 			if (!task->dst_iovs) {
368 				fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task);
369 				return -ENOMEM;
370 			}
371 			task->dst_iovcnt = g_chained_count;
372 			accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt);
373 		}
374 	}
375 
376 	/* For dualcast 2 buffers are needed for the operation.  */
377 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
378 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
379 		if (task->dst2 == NULL) {
380 			fprintf(stderr, "Unable to alloc dst buffer\n");
381 			return -ENOMEM;
382 		}
383 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
384 	}
385 
386 	return 0;
387 }
388 
389 inline static struct ap_task *
390 _get_task(struct worker_thread *worker)
391 {
392 	struct ap_task *task;
393 
394 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
395 		task = TAILQ_FIRST(&worker->tasks_pool);
396 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
397 	} else {
398 		fprintf(stderr, "Unable to get ap_task\n");
399 		return NULL;
400 	}
401 
402 	return task;
403 }
404 
405 /* Submit one operation using the same ap task that just completed. */
406 static void
407 _submit_single(struct worker_thread *worker, struct ap_task *task)
408 {
409 	int random_num;
410 	int rc = 0;
411 	int flags = 0;
412 
413 	assert(worker);
414 
415 	switch (worker->workload) {
416 	case ACCEL_OPC_COPY:
417 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
418 					    g_xfer_size_bytes, flags, accel_done, task);
419 		break;
420 	case ACCEL_OPC_FILL:
421 		/* For fill use the first byte of the task->dst buffer */
422 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
423 					    g_xfer_size_bytes, flags, accel_done, task);
424 		break;
425 	case ACCEL_OPC_CRC32C:
426 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
427 					       task->src_iovs, task->src_iovcnt, g_crc32c_seed,
428 					       accel_done, task);
429 		break;
430 	case ACCEL_OPC_COPY_CRC32C:
431 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt,
432 						    &task->crc_dst, g_crc32c_seed, flags, accel_done, task);
433 		break;
434 	case ACCEL_OPC_COMPARE:
435 		random_num = rand() % 100;
436 		if (random_num < g_fail_percent_goal) {
437 			task->expected_status = -EILSEQ;
438 			*(uint8_t *)task->dst = ~DATA_PATTERN;
439 		} else {
440 			task->expected_status = 0;
441 			*(uint8_t *)task->dst = DATA_PATTERN;
442 		}
443 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
444 					       g_xfer_size_bytes, accel_done, task);
445 		break;
446 	case ACCEL_OPC_DUALCAST:
447 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
448 						task->src, g_xfer_size_bytes, flags, accel_done, task);
449 		break;
450 	case ACCEL_OPC_COMPRESS:
451 		task->src_iovs = task->cur_seg->uncompressed_iovs;
452 		task->src_iovcnt = task->cur_seg->uncompressed_iovcnt;
453 		rc = spdk_accel_submit_compress(worker->ch, task->dst, g_xfer_size_bytes, task->src_iovs,
454 						task->src_iovcnt, &task->compressed_sz, flags, accel_done, task);
455 		break;
456 	case ACCEL_OPC_DECOMPRESS:
457 		task->src_iovs = task->cur_seg->compressed_iovs;
458 		task->src_iovcnt = task->cur_seg->compressed_iovcnt;
459 		rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs,
460 						  task->src_iovcnt, flags, accel_done, task);
461 		break;
462 	default:
463 		assert(false);
464 		break;
465 
466 	}
467 
468 	worker->current_queue_depth++;
469 	if (rc) {
470 		accel_done(task, rc);
471 	}
472 }
473 
474 static void
475 _free_task_buffers(struct ap_task *task)
476 {
477 	uint32_t i;
478 
479 	if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
480 		free(task->dst_iovs);
481 	} else if (g_workload_selection == ACCEL_OPC_CRC32C ||
482 		   g_workload_selection == ACCEL_OPC_COPY_CRC32C) {
483 		if (task->src_iovs) {
484 			for (i = 0; i < task->src_iovcnt; i++) {
485 				if (task->src_iovs[i].iov_base) {
486 					spdk_dma_free(task->src_iovs[i].iov_base);
487 				}
488 			}
489 			free(task->src_iovs);
490 		}
491 	} else {
492 		spdk_dma_free(task->src);
493 	}
494 
495 	spdk_dma_free(task->dst);
496 	if (g_workload_selection == ACCEL_OPC_DUALCAST) {
497 		spdk_dma_free(task->dst2);
498 	}
499 }
500 
501 static int
502 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt)
503 {
504 	uint32_t i;
505 	uint32_t ttl_len = 0;
506 	uint8_t *dst = (uint8_t *)_dst;
507 
508 	for (i = 0; i < iovcnt; i++) {
509 		if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) {
510 			return -1;
511 		}
512 		dst += src_src_iovs[i].iov_len;
513 		ttl_len += src_src_iovs[i].iov_len;
514 	}
515 
516 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
517 		return -1;
518 	}
519 
520 	return 0;
521 }
522 
523 static int _worker_stop(void *arg);
524 
525 static void
526 accel_done(void *arg1, int status)
527 {
528 	struct ap_task *task = arg1;
529 	struct worker_thread *worker = task->worker;
530 	uint32_t sw_crc32c;
531 
532 	assert(worker);
533 	assert(worker->current_queue_depth > 0);
534 
535 	if (g_verify && status == 0) {
536 		switch (worker->workload) {
537 		case ACCEL_OPC_COPY_CRC32C:
538 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
539 			if (task->crc_dst != sw_crc32c) {
540 				SPDK_NOTICELOG("CRC-32C miscompare\n");
541 				worker->xfer_failed++;
542 			}
543 			if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) {
544 				SPDK_NOTICELOG("Data miscompare\n");
545 				worker->xfer_failed++;
546 			}
547 			break;
548 		case ACCEL_OPC_CRC32C:
549 			sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed);
550 			if (task->crc_dst != sw_crc32c) {
551 				SPDK_NOTICELOG("CRC-32C miscompare\n");
552 				worker->xfer_failed++;
553 			}
554 			break;
555 		case ACCEL_OPC_COPY:
556 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
557 				SPDK_NOTICELOG("Data miscompare\n");
558 				worker->xfer_failed++;
559 			}
560 			break;
561 		case ACCEL_OPC_DUALCAST:
562 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
563 				SPDK_NOTICELOG("Data miscompare, first destination\n");
564 				worker->xfer_failed++;
565 			}
566 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
567 				SPDK_NOTICELOG("Data miscompare, second destination\n");
568 				worker->xfer_failed++;
569 			}
570 			break;
571 		case ACCEL_OPC_FILL:
572 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
573 				SPDK_NOTICELOG("Data miscompare\n");
574 				worker->xfer_failed++;
575 			}
576 			break;
577 		case ACCEL_OPC_COMPARE:
578 			break;
579 		case ACCEL_OPC_COMPRESS:
580 			break;
581 		case ACCEL_OPC_DECOMPRESS:
582 			if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) {
583 				SPDK_NOTICELOG("Data miscompare on decompression\n");
584 				worker->xfer_failed++;
585 			}
586 			break;
587 		default:
588 			assert(false);
589 			break;
590 		}
591 	}
592 
593 	if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) {
594 		/* Advance the task to the next segment */
595 		task->cur_seg = STAILQ_NEXT(task->cur_seg, link);
596 		if (task->cur_seg == NULL) {
597 			task->cur_seg = STAILQ_FIRST(&g_compress_segs);
598 		}
599 	}
600 
601 	if (task->expected_status == -EILSEQ) {
602 		assert(status != 0);
603 		worker->injected_miscompares++;
604 		status = 0;
605 	} else if (status) {
606 		/* Expected to pass but the accel module reported an error (ex: COMPARE operation). */
607 		worker->xfer_failed++;
608 	}
609 
610 	worker->xfer_completed++;
611 	worker->current_queue_depth--;
612 
613 	if (!worker->is_draining && status == 0) {
614 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
615 		task = _get_task(worker);
616 		_submit_single(worker, task);
617 	} else {
618 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
619 	}
620 }
621 
622 static int
623 dump_result(void)
624 {
625 	uint64_t total_completed = 0;
626 	uint64_t total_failed = 0;
627 	uint64_t total_miscompared = 0;
628 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
629 	struct worker_thread *worker = g_workers;
630 
631 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
632 	printf("------------------------------------------------------------------------\n");
633 	while (worker != NULL) {
634 
635 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
636 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
637 				       (g_time_in_sec * 1024 * 1024);
638 
639 		total_completed += worker->xfer_completed;
640 		total_failed += worker->xfer_failed;
641 		total_miscompared += worker->injected_miscompares;
642 
643 		if (xfer_per_sec) {
644 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
645 			       worker->display.core, worker->display.thread, xfer_per_sec,
646 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
647 		}
648 
649 		worker = worker->next;
650 	}
651 
652 	total_xfer_per_sec = total_completed / g_time_in_sec;
653 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
654 			    (g_time_in_sec * 1024 * 1024);
655 
656 	printf("=========================================================================\n");
657 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
658 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
659 
660 	return total_failed ? 1 : 0;
661 }
662 
663 static inline void
664 _free_task_buffers_in_pool(struct worker_thread *worker)
665 {
666 	struct ap_task *task;
667 
668 	assert(worker);
669 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
670 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
671 		_free_task_buffers(task);
672 	}
673 }
674 
675 static int
676 _check_draining(void *arg)
677 {
678 	struct worker_thread *worker = arg;
679 
680 	assert(worker);
681 
682 	if (worker->current_queue_depth == 0) {
683 		_free_task_buffers_in_pool(worker);
684 		spdk_poller_unregister(&worker->is_draining_poller);
685 		unregister_worker(worker);
686 	}
687 
688 	return SPDK_POLLER_BUSY;
689 }
690 
691 static int
692 _worker_stop(void *arg)
693 {
694 	struct worker_thread *worker = arg;
695 
696 	assert(worker);
697 
698 	spdk_poller_unregister(&worker->stop_poller);
699 
700 	/* now let the worker drain and check it's outstanding IO with a poller */
701 	worker->is_draining = true;
702 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
703 
704 	return SPDK_POLLER_BUSY;
705 }
706 
707 static void
708 _init_thread(void *arg1)
709 {
710 	struct worker_thread *worker;
711 	struct ap_task *task;
712 	int i, num_tasks = g_allocate_depth;
713 	struct display_info *display = arg1;
714 
715 	worker = calloc(1, sizeof(*worker));
716 	if (worker == NULL) {
717 		fprintf(stderr, "Unable to allocate worker\n");
718 		free(display);
719 		return;
720 	}
721 
722 	worker->workload = g_workload_selection;
723 	worker->display.core = display->core;
724 	worker->display.thread = display->thread;
725 	free(display);
726 	worker->core = spdk_env_get_current_core();
727 	worker->thread = spdk_get_thread();
728 	pthread_mutex_lock(&g_workers_lock);
729 	g_num_workers++;
730 	worker->next = g_workers;
731 	g_workers = worker;
732 	pthread_mutex_unlock(&g_workers_lock);
733 	worker->ch = spdk_accel_get_io_channel();
734 	if (worker->ch == NULL) {
735 		fprintf(stderr, "Unable to get an accel channel\n");
736 		goto error;
737 	}
738 
739 	TAILQ_INIT(&worker->tasks_pool);
740 
741 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
742 	if (worker->task_base == NULL) {
743 		fprintf(stderr, "Could not allocate task base.\n");
744 		goto error;
745 	}
746 
747 	task = worker->task_base;
748 	for (i = 0; i < num_tasks; i++) {
749 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
750 		task->worker = worker;
751 		if (_get_task_data_bufs(task)) {
752 			fprintf(stderr, "Unable to get data bufs\n");
753 			goto error;
754 		}
755 		task++;
756 	}
757 
758 	/* Register a poller that will stop the worker at time elapsed */
759 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
760 			      g_time_in_sec * 1000000ULL);
761 
762 	/* Load up queue depth worth of operations. */
763 	for (i = 0; i < g_queue_depth; i++) {
764 		task = _get_task(worker);
765 		if (task == NULL) {
766 			goto error;
767 		}
768 
769 		_submit_single(worker, task);
770 	}
771 	return;
772 error:
773 
774 	_free_task_buffers_in_pool(worker);
775 	free(worker->task_base);
776 	spdk_app_stop(-1);
777 }
778 
779 static void
780 accel_perf_start(void *arg1)
781 {
782 	struct spdk_cpuset tmp_cpumask = {};
783 	char thread_name[32];
784 	uint32_t i;
785 	int j;
786 	struct spdk_thread *thread;
787 	struct display_info *display;
788 
789 	g_tsc_rate = spdk_get_ticks_hz();
790 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
791 
792 	dump_user_config();
793 
794 	printf("Running for %d seconds...\n", g_time_in_sec);
795 	fflush(stdout);
796 
797 	/* Create worker threads for each core that was specified. */
798 	SPDK_ENV_FOREACH_CORE(i) {
799 		for (j = 0; j < g_threads_per_core; j++) {
800 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
801 			spdk_cpuset_zero(&tmp_cpumask);
802 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
803 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
804 			display = calloc(1, sizeof(*display));
805 			if (display == NULL) {
806 				fprintf(stderr, "Unable to allocate memory\n");
807 				spdk_app_stop(-1);
808 				return;
809 			}
810 			display->core = i;
811 			display->thread = j;
812 			spdk_thread_send_msg(thread, _init_thread, display);
813 		}
814 	}
815 }
816 
817 static void
818 accel_perf_free_compress_segs(void)
819 {
820 	struct ap_compress_seg *seg, *tmp;
821 
822 	STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) {
823 		free(seg->uncompressed_iovs);
824 		free(seg->compressed_iovs);
825 		spdk_dma_free(seg->compressed_data);
826 		spdk_dma_free(seg->uncompressed_data);
827 		STAILQ_REMOVE_HEAD(&g_compress_segs, link);
828 		free(seg);
829 	}
830 }
831 
832 struct accel_perf_prep_ctx {
833 	FILE			*file;
834 	long			remaining;
835 	struct spdk_io_channel	*ch;
836 	struct ap_compress_seg	*cur_seg;
837 };
838 
839 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx);
840 
841 static void
842 accel_perf_prep_process_seg_cpl(void *ref, int status)
843 {
844 	struct accel_perf_prep_ctx *ctx = ref;
845 	struct ap_compress_seg *seg;
846 
847 	if (status != 0) {
848 		fprintf(stderr, "error (%d) on initial compress completion\n", status);
849 		spdk_dma_free(ctx->cur_seg->compressed_data);
850 		spdk_dma_free(ctx->cur_seg->uncompressed_data);
851 		free(ctx->cur_seg);
852 		spdk_put_io_channel(ctx->ch);
853 		fclose(ctx->file);
854 		free(ctx);
855 		spdk_app_stop(-status);
856 		return;
857 	}
858 
859 	seg = ctx->cur_seg;
860 
861 	if (g_workload_selection == ACCEL_OPC_DECOMPRESS) {
862 		seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
863 		if (seg->compressed_iovs == NULL) {
864 			fprintf(stderr, "unable to allocate iovec\n");
865 			spdk_dma_free(seg->compressed_data);
866 			spdk_dma_free(seg->uncompressed_data);
867 			free(seg);
868 			spdk_put_io_channel(ctx->ch);
869 			fclose(ctx->file);
870 			free(ctx);
871 			spdk_app_stop(-ENOMEM);
872 			return;
873 		}
874 		seg->compressed_iovcnt = g_chained_count;
875 
876 		accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs,
877 					  seg->compressed_iovcnt);
878 	}
879 
880 	STAILQ_INSERT_TAIL(&g_compress_segs, seg, link);
881 	ctx->remaining -= seg->uncompressed_len;
882 
883 	accel_perf_prep_process_seg(ctx);
884 }
885 
886 static void
887 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx)
888 {
889 	struct ap_compress_seg *seg;
890 	int sz, sz_read;
891 	void *ubuf, *cbuf;
892 	struct iovec iov[1];
893 	int rc;
894 
895 	if (ctx->remaining == 0) {
896 		spdk_put_io_channel(ctx->ch);
897 		fclose(ctx->file);
898 		free(ctx);
899 		accel_perf_start(NULL);
900 		return;
901 	}
902 
903 	sz = spdk_min(ctx->remaining, g_xfer_size_bytes);
904 
905 	ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL);
906 	if (!ubuf) {
907 		fprintf(stderr, "unable to allocate uncompress buffer\n");
908 		rc = -ENOMEM;
909 		goto error;
910 	}
911 
912 	cbuf = spdk_dma_malloc(sz, ALIGN_4K, NULL);
913 	if (!cbuf) {
914 		fprintf(stderr, "unable to allocate compress buffer\n");
915 		rc = -ENOMEM;
916 		spdk_dma_free(ubuf);
917 		goto error;
918 	}
919 
920 	seg = calloc(1, sizeof(*seg));
921 	if (!seg) {
922 		fprintf(stderr, "unable to allocate comp/decomp segment\n");
923 		spdk_dma_free(ubuf);
924 		spdk_dma_free(cbuf);
925 		rc = -ENOMEM;
926 		goto error;
927 	}
928 
929 	sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file);
930 	if (sz_read != sz) {
931 		fprintf(stderr, "unable to read input file\n");
932 		free(seg);
933 		spdk_dma_free(ubuf);
934 		spdk_dma_free(cbuf);
935 		rc = -errno;
936 		goto error;
937 	}
938 
939 	if (g_workload_selection == ACCEL_OPC_COMPRESS) {
940 		seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec));
941 		if (seg->uncompressed_iovs == NULL) {
942 			fprintf(stderr, "unable to allocate iovec\n");
943 			free(seg);
944 			spdk_dma_free(ubuf);
945 			spdk_dma_free(cbuf);
946 			rc = -ENOMEM;
947 			goto error;
948 		}
949 		seg->uncompressed_iovcnt = g_chained_count;
950 		accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt);
951 	}
952 
953 	seg->uncompressed_data = ubuf;
954 	seg->uncompressed_len = sz;
955 	seg->compressed_data = cbuf;
956 	seg->compressed_len = sz;
957 
958 	ctx->cur_seg = seg;
959 	iov[0].iov_base = seg->uncompressed_data;
960 	iov[0].iov_len = seg->uncompressed_len;
961 	/* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance
962 	 * it will fail with -ENOMEM in the event that the destination buffer is not large enough
963 	 * to hold the compressed data.  This example app simply uses the same size as the input
964 	 * buffer which will work for example purposes but when using the API in your application
965 	 * be sure to allocate enough room in the destination buffer for cases where the data is
966 	 * no compressible, the addition of header information will cause it to be larger than the
967 	 * original input.
968 	 */
969 	rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len, iov, 1,
970 					&seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx);
971 	if (rc < 0) {
972 		fprintf(stderr, "error (%d) on initial compress submission\n", rc);
973 		goto error;
974 	}
975 
976 	return;
977 
978 error:
979 	spdk_put_io_channel(ctx->ch);
980 	fclose(ctx->file);
981 	free(ctx);
982 	spdk_app_stop(rc);
983 }
984 
985 static void
986 accel_perf_prep(void *arg1)
987 {
988 	struct accel_perf_prep_ctx *ctx;
989 	int rc = 0;
990 
991 	if (g_workload_selection != ACCEL_OPC_COMPRESS &&
992 	    g_workload_selection != ACCEL_OPC_DECOMPRESS) {
993 		accel_perf_start(arg1);
994 		return;
995 	}
996 
997 	if (g_cd_file_in_name == NULL) {
998 		fprintf(stdout, "A filename is required.\n");
999 		rc = -EINVAL;
1000 		goto error_end;
1001 	}
1002 
1003 	if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) {
1004 		fprintf(stdout, "\nCompression does not support the verify option, aborting.\n");
1005 		rc = -ENOTSUP;
1006 		goto error_end;
1007 	}
1008 
1009 	printf("Preparing input file...\n");
1010 
1011 	ctx = calloc(1, sizeof(*ctx));
1012 	if (ctx == NULL) {
1013 		rc = -ENOMEM;
1014 		goto error_end;
1015 	}
1016 
1017 	ctx->file = fopen(g_cd_file_in_name, "r");
1018 	if (ctx->file == NULL) {
1019 		fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name);
1020 		rc = -errno;
1021 		goto error_ctx;
1022 	}
1023 
1024 	fseek(ctx->file, 0L, SEEK_END);
1025 	ctx->remaining = ftell(ctx->file);
1026 	fseek(ctx->file, 0L, SEEK_SET);
1027 
1028 	ctx->ch = spdk_accel_get_io_channel();
1029 	if (ctx->ch == NULL) {
1030 		rc = -EAGAIN;
1031 		goto error_file;
1032 	}
1033 
1034 	if (g_xfer_size_bytes == 0) {
1035 		/* size of 0 means "file at a time" */
1036 		g_xfer_size_bytes = ctx->remaining;
1037 	}
1038 
1039 	accel_perf_prep_process_seg(ctx);
1040 	return;
1041 
1042 error_file:
1043 	fclose(ctx->file);
1044 error_ctx:
1045 	free(ctx);
1046 error_end:
1047 	spdk_app_stop(rc);
1048 }
1049 
1050 int
1051 main(int argc, char **argv)
1052 {
1053 	struct worker_thread *worker, *tmp;
1054 
1055 	pthread_mutex_init(&g_workers_lock, NULL);
1056 	spdk_app_opts_init(&g_opts, sizeof(g_opts));
1057 	g_opts.name = "accel_perf";
1058 	g_opts.reactor_mask = "0x1";
1059 	if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:", NULL, parse_args,
1060 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
1061 		g_rc = -1;
1062 		goto cleanup;
1063 	}
1064 
1065 	if ((g_workload_selection != ACCEL_OPC_COPY) &&
1066 	    (g_workload_selection != ACCEL_OPC_FILL) &&
1067 	    (g_workload_selection != ACCEL_OPC_CRC32C) &&
1068 	    (g_workload_selection != ACCEL_OPC_COPY_CRC32C) &&
1069 	    (g_workload_selection != ACCEL_OPC_COMPARE) &&
1070 	    (g_workload_selection != ACCEL_OPC_COMPRESS) &&
1071 	    (g_workload_selection != ACCEL_OPC_DECOMPRESS) &&
1072 	    (g_workload_selection != ACCEL_OPC_DUALCAST)) {
1073 		usage();
1074 		g_rc = -1;
1075 		goto cleanup;
1076 	}
1077 
1078 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1079 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1080 		usage();
1081 		g_rc = -1;
1082 		goto cleanup;
1083 	}
1084 
1085 	if (g_allocate_depth == 0) {
1086 		g_allocate_depth = g_queue_depth;
1087 	}
1088 
1089 	if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) &&
1090 	    g_chained_count == 0) {
1091 		usage();
1092 		g_rc = -1;
1093 		goto cleanup;
1094 	}
1095 
1096 	g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL);
1097 	if (g_rc) {
1098 		SPDK_ERRLOG("ERROR starting application\n");
1099 	}
1100 
1101 	pthread_mutex_destroy(&g_workers_lock);
1102 
1103 	worker = g_workers;
1104 	while (worker) {
1105 		tmp = worker->next;
1106 		free(worker);
1107 		worker = tmp;
1108 	}
1109 cleanup:
1110 	accel_perf_free_compress_segs();
1111 	spdk_app_fini();
1112 	return g_rc;
1113 }
1114