xref: /spdk/examples/accel/perf/accel_perf.c (revision 5ee049eeecb4bfa60a4fbc85fe82ec74dc97e07b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static uint64_t	g_tsc_rate;
48 static uint64_t g_tsc_end;
49 static int g_rc;
50 static int g_xfer_size_bytes = 4096;
51 static int g_queue_depth = 32;
52 static int g_ops_per_batch = 0;
53 static int g_threads_per_core = 1;
54 static int g_time_in_sec = 5;
55 static uint32_t g_crc32c_seed = 0;
56 static uint32_t g_crc32c_chained_count = 1;
57 static int g_fail_percent_goal = 0;
58 static uint8_t g_fill_pattern = 255;
59 static bool g_verify = false;
60 static const char *g_workload_type = NULL;
61 static enum accel_capability g_workload_selection;
62 static struct worker_thread *g_workers = NULL;
63 static int g_num_workers = 0;
64 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
65 uint64_t g_capabilites;
66 
67 struct worker_thread;
68 static void accel_done(void *ref, int status);
69 
70 struct display_info {
71 	int core;
72 	int thread;
73 };
74 
75 struct ap_task {
76 	void			*src;
77 	struct iovec		*iovs;
78 	uint32_t		iov_cnt;
79 	void			*dst;
80 	void			*dst2;
81 	struct worker_thread	*worker;
82 	int			status;
83 	int			expected_status; /* used for the compare operation */
84 	TAILQ_ENTRY(ap_task)	link;
85 };
86 
87 struct accel_batch {
88 	int				status;
89 	int				cmd_count;
90 	struct spdk_accel_batch		*batch;
91 	struct worker_thread		*worker;
92 	TAILQ_ENTRY(accel_batch)	link;
93 };
94 
95 struct worker_thread {
96 	struct spdk_io_channel		*ch;
97 	uint64_t			xfer_completed;
98 	uint64_t			xfer_failed;
99 	uint64_t			injected_miscompares;
100 	uint64_t			current_queue_depth;
101 	TAILQ_HEAD(, ap_task)		tasks_pool;
102 	struct worker_thread		*next;
103 	unsigned			core;
104 	struct spdk_thread		*thread;
105 	bool				is_draining;
106 	struct spdk_poller		*is_draining_poller;
107 	struct spdk_poller		*stop_poller;
108 	void				*task_base;
109 	struct accel_batch		*batch_base;
110 	struct display_info		display;
111 	TAILQ_HEAD(, accel_batch)	in_prep_batches;
112 	TAILQ_HEAD(, accel_batch)	in_use_batches;
113 	TAILQ_HEAD(, accel_batch)	to_submit_batches;
114 };
115 
116 static void
117 dump_user_config(struct spdk_app_opts *opts)
118 {
119 	printf("SPDK Configuration:\n");
120 	printf("Core mask:      %s\n\n", opts->reactor_mask);
121 	printf("Accel Perf Configuration:\n");
122 	printf("Workload Type:  %s\n", g_workload_type);
123 	if (g_workload_selection == ACCEL_CRC32C) {
124 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
125 		printf("vector size:    %u\n", g_crc32c_chained_count);
126 	} else if (g_workload_selection == ACCEL_FILL) {
127 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
128 	} else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) {
129 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
130 	}
131 	printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
132 	printf("Queue depth:    %u\n", g_queue_depth);
133 	printf("# threads/core: %u\n", g_threads_per_core);
134 	printf("Run time:       %u seconds\n", g_time_in_sec);
135 	if (g_ops_per_batch > 0) {
136 		printf("Batching:       %u operations\n", g_ops_per_batch);
137 	} else {
138 		printf("Batching:       Disabled\n");
139 	}
140 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
141 }
142 
143 static void
144 usage(void)
145 {
146 	printf("accel_perf options:\n");
147 	printf("\t[-h help message]\n");
148 	printf("\t[-q queue depth per core]\n");
149 	printf("\t[-C for crc32c workload, use this value to configre the io vector size to test (default 1)\n");
150 	printf("\t[-T number of threads per core\n");
151 	printf("\t[-n number of channels]\n");
152 	printf("\t[-o transfer size in bytes]\n");
153 	printf("\t[-t time in seconds]\n");
154 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, compare, dualcast\n");
155 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
156 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
157 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
158 	printf("\t[-y verify result if this switch is on]\n");
159 	printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n");
160 }
161 
162 static int
163 parse_args(int argc, char *argv)
164 {
165 	switch (argc) {
166 	case 'b':
167 		g_ops_per_batch = spdk_strtol(optarg, 10);
168 		break;
169 	case 'C':
170 		g_crc32c_chained_count = spdk_strtol(optarg, 10);
171 		break;
172 	case 'f':
173 		g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10);
174 		break;
175 	case 'T':
176 		g_threads_per_core = spdk_strtol(optarg, 10);
177 		break;
178 	case 'o':
179 		g_xfer_size_bytes = spdk_strtol(optarg, 10);
180 		break;
181 	case 'P':
182 		g_fail_percent_goal = spdk_strtol(optarg, 10);
183 		break;
184 	case 'q':
185 		g_queue_depth = spdk_strtol(optarg, 10);
186 		break;
187 	case 's':
188 		g_crc32c_seed = spdk_strtol(optarg, 10);
189 		break;
190 	case 't':
191 		g_time_in_sec = spdk_strtol(optarg, 10);
192 		break;
193 	case 'y':
194 		g_verify = true;
195 		break;
196 	case 'w':
197 		g_workload_type = optarg;
198 		if (!strcmp(g_workload_type, "copy")) {
199 			g_workload_selection = ACCEL_COPY;
200 		} else if (!strcmp(g_workload_type, "fill")) {
201 			g_workload_selection = ACCEL_FILL;
202 		} else if (!strcmp(g_workload_type, "crc32c")) {
203 			g_workload_selection = ACCEL_CRC32C;
204 		} else if (!strcmp(g_workload_type, "compare")) {
205 			g_workload_selection = ACCEL_COMPARE;
206 		} else if (!strcmp(g_workload_type, "dualcast")) {
207 			g_workload_selection = ACCEL_DUALCAST;
208 		}
209 		break;
210 	default:
211 		usage();
212 		return 1;
213 	}
214 
215 	return 0;
216 }
217 
218 static int dump_result(void);
219 static void
220 unregister_worker(void *arg1)
221 {
222 	struct worker_thread *worker = arg1;
223 
224 	free(worker->task_base);
225 	free(worker->batch_base);
226 	spdk_put_io_channel(worker->ch);
227 	pthread_mutex_lock(&g_workers_lock);
228 	assert(g_num_workers >= 1);
229 	if (--g_num_workers == 0) {
230 		pthread_mutex_unlock(&g_workers_lock);
231 		g_rc = dump_result();
232 		spdk_app_stop(0);
233 	}
234 	pthread_mutex_unlock(&g_workers_lock);
235 }
236 
237 static int
238 _get_task_data_bufs(struct ap_task *task)
239 {
240 	uint32_t align = 0;
241 	uint32_t i = 0;
242 
243 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
244 	 * we do this for all engines to keep it simple.
245 	 */
246 	if (g_workload_selection == ACCEL_DUALCAST) {
247 		align = ALIGN_4K;
248 	}
249 
250 	if (g_workload_selection == ACCEL_CRC32C) {
251 		assert(g_crc32c_chained_count > 0);
252 		task->iov_cnt = g_crc32c_chained_count;
253 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
254 		if (!task->iovs) {
255 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
256 			return -ENOMEM;
257 		}
258 
259 		for (i = 0; i < task->iov_cnt; i++) {
260 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
261 			if (task->iovs[i].iov_base == NULL) {
262 				return -ENOMEM;
263 			}
264 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
265 			task->iovs[i].iov_len = g_xfer_size_bytes;
266 		}
267 
268 	} else {
269 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
270 		if (task->src == NULL) {
271 			fprintf(stderr, "Unable to alloc src buffer\n");
272 			return -ENOMEM;
273 		}
274 
275 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
276 		if (g_workload_selection == ACCEL_FILL) {
277 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
278 		} else {
279 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
280 		}
281 	}
282 
283 	task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
284 	if (task->dst == NULL) {
285 		fprintf(stderr, "Unable to alloc dst buffer\n");
286 		return -ENOMEM;
287 	}
288 
289 	/* For compare we want the buffers to match, otherwise not. */
290 	if (g_workload_selection == ACCEL_COMPARE) {
291 		memset(task->dst, DATA_PATTERN, g_xfer_size_bytes);
292 	} else {
293 		memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes);
294 	}
295 
296 	if (g_workload_selection == ACCEL_DUALCAST) {
297 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
298 		if (task->dst2 == NULL) {
299 			fprintf(stderr, "Unable to alloc dst buffer\n");
300 			return -ENOMEM;
301 		}
302 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
303 	}
304 
305 	return 0;
306 }
307 
308 inline static struct ap_task *
309 _get_task(struct worker_thread *worker)
310 {
311 	struct ap_task *task;
312 
313 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
314 		task = TAILQ_FIRST(&worker->tasks_pool);
315 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
316 	} else {
317 		fprintf(stderr, "Unable to get ap_task\n");
318 		return NULL;
319 	}
320 
321 	task->worker = worker;
322 	task->worker->current_queue_depth++;
323 	return task;
324 }
325 
326 /* Submit one operation using the same ap task that just completed. */
327 static void
328 _submit_single(struct worker_thread *worker, struct ap_task *task)
329 {
330 	int random_num;
331 	int rc = 0;
332 
333 	assert(worker);
334 
335 	switch (g_workload_selection) {
336 	case ACCEL_COPY:
337 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
338 					    g_xfer_size_bytes, accel_done, task);
339 		break;
340 	case ACCEL_FILL:
341 		/* For fill use the first byte of the task->dst buffer */
342 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
343 					    g_xfer_size_bytes, accel_done, task);
344 		break;
345 	case ACCEL_CRC32C:
346 		rc = spdk_accel_submit_crc32cv(worker->ch, (uint32_t *)task->dst,
347 					       task->iovs, task->iov_cnt, g_crc32c_seed,
348 					       accel_done, task);
349 		break;
350 	case ACCEL_COMPARE:
351 		random_num = rand() % 100;
352 		if (random_num < g_fail_percent_goal) {
353 			task->expected_status = -EILSEQ;
354 			*(uint8_t *)task->dst = ~DATA_PATTERN;
355 		} else {
356 			task->expected_status = 0;
357 			*(uint8_t *)task->dst = DATA_PATTERN;
358 		}
359 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
360 					       g_xfer_size_bytes, accel_done, task);
361 		break;
362 	case ACCEL_DUALCAST:
363 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
364 						task->src, g_xfer_size_bytes, accel_done, task);
365 		break;
366 	default:
367 		assert(false);
368 		break;
369 
370 	}
371 
372 	if (rc) {
373 		accel_done(task, rc);
374 	}
375 }
376 
377 static int
378 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task,
379 		struct accel_batch *worker_batch)
380 {
381 	struct spdk_accel_batch *batch = worker_batch->batch;
382 	int rc = 0;
383 
384 	worker_batch->cmd_count++;
385 	assert(worker_batch->cmd_count <= g_ops_per_batch);
386 
387 	switch (g_workload_selection) {
388 	case ACCEL_COPY:
389 		rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst,
390 						task->src, g_xfer_size_bytes, accel_done, task);
391 		break;
392 	case ACCEL_DUALCAST:
393 		rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2,
394 						    task->src, g_xfer_size_bytes, accel_done, task);
395 		break;
396 	case ACCEL_COMPARE:
397 		rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src,
398 						   g_xfer_size_bytes, accel_done, task);
399 		break;
400 	case ACCEL_FILL:
401 		rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst,
402 						*(uint8_t *)task->src,
403 						g_xfer_size_bytes, accel_done, task);
404 		break;
405 	case ACCEL_CRC32C:
406 		rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, (uint32_t *)task->dst,
407 						   task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task);
408 		break;
409 	default:
410 		assert(false);
411 		break;
412 	}
413 
414 	return rc;
415 }
416 
417 static void
418 _free_task_buffers(struct ap_task *task)
419 {
420 	uint32_t i;
421 
422 	if (g_workload_selection == ACCEL_CRC32C) {
423 		if (task->iovs) {
424 			for (i = 0; i < task->iov_cnt; i++) {
425 				if (task->iovs[i].iov_base) {
426 					spdk_dma_free(task->iovs[i].iov_base);
427 				}
428 			}
429 			free(task->iovs);
430 		}
431 	} else {
432 		spdk_dma_free(task->src);
433 	}
434 
435 	spdk_dma_free(task->dst);
436 	if (g_workload_selection == ACCEL_DUALCAST) {
437 		spdk_dma_free(task->dst2);
438 	}
439 }
440 
441 static void _batch_done(void *cb_arg);
442 static void
443 _build_batch(struct worker_thread *worker, struct ap_task *task)
444 {
445 	struct accel_batch *worker_batch = NULL;
446 	int rc;
447 
448 	assert(!TAILQ_EMPTY(&worker->in_prep_batches));
449 
450 	worker_batch = TAILQ_FIRST(&worker->in_prep_batches);
451 
452 	/* If an accel batch hasn't been created yet do so now. */
453 	if (worker_batch->batch == NULL) {
454 		worker_batch->batch = spdk_accel_batch_create(worker->ch);
455 		if (worker_batch->batch == NULL) {
456 			fprintf(stderr, "error unable to create new batch\n");
457 			return;
458 		}
459 	}
460 
461 	/* Prep the command re-using the last completed command's task */
462 	rc = _batch_prep_cmd(worker, task, worker_batch);
463 	if (rc) {
464 		fprintf(stderr, "error preping command for batch\n");
465 		goto error;
466 	}
467 
468 	/* If this batch is full move it to the to_submit list so it gets
469 	 * submitted as batches complete.
470 	 */
471 	if (worker_batch->cmd_count == g_ops_per_batch) {
472 		TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link);
473 		TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link);
474 	}
475 
476 	return;
477 error:
478 	spdk_accel_batch_cancel(worker->ch, worker_batch->batch);
479 
480 }
481 
482 static void batch_done(void *cb_arg, int status);
483 static void
484 _drain_batch(struct worker_thread *worker)
485 {
486 	struct accel_batch *worker_batch, *tmp;
487 	int rc;
488 
489 	/* submit any batches that were being built up. */
490 	TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) {
491 		if (worker_batch->cmd_count == 0) {
492 			continue;
493 		}
494 		worker->current_queue_depth += worker_batch->cmd_count + 1;
495 
496 		TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link);
497 		TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link);
498 		rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch);
499 		if (rc == 0) {
500 			worker_batch->cmd_count = 0;
501 		} else {
502 			fprintf(stderr, "error sending final batch\n");
503 			worker->current_queue_depth -= worker_batch->cmd_count + 1;
504 			break;
505 		}
506 	}
507 }
508 
509 static void
510 _batch_done(void *cb_arg)
511 {
512 	struct accel_batch *worker_batch = (struct accel_batch *)cb_arg;
513 	struct worker_thread *worker = worker_batch->worker;
514 	int rc;
515 
516 	assert(TAILQ_EMPTY(&worker->in_use_batches) == 0);
517 
518 	if (worker_batch->status) {
519 		SPDK_ERRLOG("error %d\n", worker_batch->status);
520 	}
521 
522 	worker->current_queue_depth--;
523 	TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link);
524 	TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link);
525 	worker_batch->batch = NULL;
526 	worker_batch->cmd_count = 0;
527 
528 	if (!worker->is_draining) {
529 		worker_batch = TAILQ_FIRST(&worker->to_submit_batches);
530 		if (worker_batch != NULL) {
531 
532 			assert(worker_batch->cmd_count == g_ops_per_batch);
533 
534 			/* Add one for the batch command itself. */
535 			worker->current_queue_depth += g_ops_per_batch + 1;
536 			TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link);
537 			TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link);
538 
539 			rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch);
540 			if (rc) {
541 				fprintf(stderr, "error ending batch\n");
542 				worker->current_queue_depth -= g_ops_per_batch + 1;
543 				return;
544 			}
545 		}
546 	} else {
547 		_drain_batch(worker);
548 	}
549 }
550 
551 static void
552 batch_done(void *cb_arg, int status)
553 {
554 	struct accel_batch *worker_batch = (struct accel_batch *)cb_arg;
555 
556 	assert(worker_batch->worker);
557 
558 	worker_batch->status = status;
559 	spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch);
560 }
561 
562 static uint32_t
563 _update_crc32c_iov(struct iovec *iov, int iovcnt, uint32_t crc32c)
564 {
565 	int i;
566 
567 	for (i = 0; i < iovcnt; i++) {
568 		assert(iov[i].iov_base != NULL);
569 		assert(iov[i].iov_len != 0);
570 		crc32c = spdk_crc32c_update(iov[i].iov_base, iov[i].iov_len, crc32c);
571 
572 	}
573 	return crc32c;
574 }
575 
576 static void
577 _accel_done(void *arg1)
578 {
579 	struct ap_task *task = arg1;
580 	struct worker_thread *worker = task->worker;
581 	uint32_t sw_crc32c;
582 
583 	assert(worker);
584 	assert(worker->current_queue_depth > 0);
585 
586 	if (g_verify && task->status == 0) {
587 		switch (g_workload_selection) {
588 		case ACCEL_CRC32C:
589 			sw_crc32c = _update_crc32c_iov(task->iovs, task->iov_cnt, ~g_crc32c_seed);
590 			if (*(uint32_t *)task->dst != sw_crc32c) {
591 				SPDK_NOTICELOG("CRC-32C miscompare\n");
592 				worker->xfer_failed++;
593 			}
594 			break;
595 		case ACCEL_COPY:
596 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
597 				SPDK_NOTICELOG("Data miscompare\n");
598 				worker->xfer_failed++;
599 			}
600 			break;
601 		case ACCEL_DUALCAST:
602 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
603 				SPDK_NOTICELOG("Data miscompare, first destination\n");
604 				worker->xfer_failed++;
605 			}
606 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
607 				SPDK_NOTICELOG("Data miscompare, second destination\n");
608 				worker->xfer_failed++;
609 			}
610 			break;
611 		case ACCEL_FILL:
612 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
613 				SPDK_NOTICELOG("Data miscompare\n");
614 				worker->xfer_failed++;
615 			}
616 			break;
617 		case ACCEL_COMPARE:
618 			break;
619 		default:
620 			assert(false);
621 			break;
622 		}
623 	}
624 
625 	if (task->expected_status == -EILSEQ) {
626 		assert(task->status != 0);
627 		worker->injected_miscompares++;
628 	} else if (task->status) {
629 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
630 		worker->xfer_failed++;
631 	}
632 
633 	worker->xfer_completed++;
634 	worker->current_queue_depth--;
635 
636 	if (!worker->is_draining) {
637 		if (g_ops_per_batch == 0) {
638 			_submit_single(worker, task);
639 			worker->current_queue_depth++;
640 		} else {
641 			_build_batch(worker, task);
642 		}
643 	} else if (g_ops_per_batch > 0) {
644 		_drain_batch(worker);
645 	} else {
646 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
647 	}
648 }
649 
650 static int
651 dump_result(void)
652 {
653 	uint64_t total_completed = 0;
654 	uint64_t total_failed = 0;
655 	uint64_t total_miscompared = 0;
656 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
657 	struct worker_thread *worker = g_workers;
658 
659 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
660 	printf("------------------------------------------------------------------------\n");
661 	while (worker != NULL) {
662 
663 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
664 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
665 				       (g_time_in_sec * 1024 * 1024);
666 
667 		total_completed += worker->xfer_completed;
668 		total_failed += worker->xfer_failed;
669 		total_miscompared += worker->injected_miscompares;
670 
671 		if (xfer_per_sec) {
672 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
673 			       worker->display.core, worker->display.thread, xfer_per_sec,
674 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
675 		}
676 
677 		worker = worker->next;
678 	}
679 
680 	total_xfer_per_sec = total_completed / g_time_in_sec;
681 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
682 			    (g_time_in_sec * 1024 * 1024);
683 
684 	printf("=========================================================================\n");
685 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
686 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
687 
688 	return total_failed ? 1 : 0;
689 }
690 
691 static inline void
692 _free_task_buffers_in_pool(struct worker_thread *worker)
693 {
694 	struct ap_task *task;
695 
696 	assert(worker);
697 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
698 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
699 		_free_task_buffers(task);
700 	}
701 }
702 
703 static int
704 _check_draining(void *arg)
705 {
706 	struct worker_thread *worker = arg;
707 
708 	assert(worker);
709 
710 	if (worker->current_queue_depth == 0) {
711 		_free_task_buffers_in_pool(worker);
712 		spdk_poller_unregister(&worker->is_draining_poller);
713 		unregister_worker(worker);
714 	}
715 
716 	return -1;
717 }
718 
719 static int
720 _worker_stop(void *arg)
721 {
722 	struct worker_thread *worker = arg;
723 
724 	assert(worker);
725 
726 	spdk_poller_unregister(&worker->stop_poller);
727 
728 	/* now let the worker drain and check it's outstanding IO with a poller */
729 	worker->is_draining = true;
730 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
731 
732 	return 0;
733 }
734 
735 static void
736 _init_thread(void *arg1)
737 {
738 	struct worker_thread *worker;
739 	struct ap_task *task;
740 	int i, rc, num_batches;
741 	int max_per_batch;
742 	int remaining = g_queue_depth;
743 	int num_tasks = g_queue_depth;
744 	struct accel_batch *tmp;
745 	struct accel_batch *worker_batch = NULL;
746 	struct display_info *display = arg1;
747 
748 	worker = calloc(1, sizeof(*worker));
749 	if (worker == NULL) {
750 		fprintf(stderr, "Unable to allocate worker\n");
751 		free(display);
752 		return;
753 	}
754 
755 	worker->display.core = display->core;
756 	worker->display.thread = display->thread;
757 	free(display);
758 	worker->core = spdk_env_get_current_core();
759 	worker->thread = spdk_get_thread();
760 	pthread_mutex_lock(&g_workers_lock);
761 	g_num_workers++;
762 	worker->next = g_workers;
763 	g_workers = worker;
764 	pthread_mutex_unlock(&g_workers_lock);
765 	worker->ch = spdk_accel_engine_get_io_channel();
766 
767 	TAILQ_INIT(&worker->tasks_pool);
768 
769 	if (g_ops_per_batch > 0) {
770 
771 		max_per_batch = spdk_accel_batch_get_max(worker->ch);
772 		assert(max_per_batch > 0);
773 
774 		if (g_ops_per_batch > max_per_batch) {
775 			fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch);
776 			g_ops_per_batch = max_per_batch;
777 		}
778 
779 		if (g_ops_per_batch > g_queue_depth) {
780 			fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth);
781 			g_ops_per_batch = g_queue_depth;
782 		}
783 
784 		TAILQ_INIT(&worker->in_prep_batches);
785 		TAILQ_INIT(&worker->to_submit_batches);
786 		TAILQ_INIT(&worker->in_use_batches);
787 
788 		/* A worker_batch will live on one of 3 lists:
789 		 * IN_PREP: as individual IOs complete new ones are built on on a
790 		 *          worker_batch on this list until it reaches g_ops_per_batch.
791 		 * TO_SUBMIT: as batches are built up on IO completion they are moved
792 		 *	      to this list once they are full.  This list is used in
793 		 *	      batch completion to start new batches.
794 		 * IN_USE: the worker_batch is outstanding and will be moved to in prep
795 		 *         list when the batch is completed.
796 		 *
797 		 * So we need enough to cover Q depth loading and then one to replace
798 		 * each one of those and for when everything is outstanding there needs
799 		 * to be one extra batch to build up while the last batch is completing
800 		 * IO but before it's completed the batch command.
801 		 */
802 		num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1;
803 		worker->batch_base = calloc(num_batches, sizeof(struct accel_batch));
804 		worker_batch = worker->batch_base;
805 		for (i = 0; i < num_batches; i++) {
806 			worker_batch->worker = worker;
807 			TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link);
808 			worker_batch++;
809 		}
810 	}
811 
812 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
813 	if (worker->task_base == NULL) {
814 		fprintf(stderr, "Could not allocate task base.\n");
815 		goto error;
816 	}
817 
818 	task = worker->task_base;
819 	for (i = 0; i < num_tasks; i++) {
820 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
821 		if (_get_task_data_bufs(task)) {
822 			fprintf(stderr, "Unable to get data bufs\n");
823 			goto error;
824 		}
825 		task++;
826 	}
827 
828 	/* Register a poller that will stop the worker at time elapsed */
829 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
830 			      g_time_in_sec * 1000000ULL);
831 
832 	/* If batching is enabled load up to the full Q depth before
833 	 * processing any completions, then ping pong between two batches,
834 	 * one processing and one being built up for when the other completes.
835 	 */
836 	if (g_ops_per_batch > 0) {
837 		do {
838 			worker_batch = TAILQ_FIRST(&worker->in_prep_batches);
839 			if (worker_batch == NULL) {
840 				goto error;
841 			}
842 
843 			worker_batch->batch = spdk_accel_batch_create(worker->ch);
844 			if (worker_batch->batch == NULL) {
845 				raise(SIGINT);
846 				break;
847 			}
848 
849 			for (i = 0; i < g_ops_per_batch; i++) {
850 				task = _get_task(worker);
851 				if (task == NULL) {
852 					goto error;
853 				}
854 
855 				rc = _batch_prep_cmd(worker, task, worker_batch);
856 				if (rc) {
857 					fprintf(stderr, "error preping command\n");
858 					goto error;
859 				}
860 			}
861 
862 			/* for the batch operation itself. */
863 			task->worker->current_queue_depth++;
864 			TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link);
865 			TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link);
866 
867 			rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch);
868 			if (rc) {
869 				fprintf(stderr, "error ending batch\n");
870 				goto error;
871 			}
872 			assert(remaining >= g_ops_per_batch);
873 			remaining -= g_ops_per_batch;
874 		} while (remaining > 0);
875 	}
876 
877 	/* Submit as singles when no batching is enabled or we ran out of batches. */
878 	for (i = 0; i < remaining; i++) {
879 		task = _get_task(worker);
880 		if (task == NULL) {
881 			goto error;
882 		}
883 
884 		_submit_single(worker, task);
885 	}
886 	return;
887 error:
888 	if (worker_batch && worker_batch->batch) {
889 		TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) {
890 			spdk_accel_batch_cancel(worker->ch, worker_batch->batch);
891 			TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link);
892 		}
893 	}
894 
895 	_free_task_buffers_in_pool(worker);
896 	free(worker->batch_base);
897 	free(worker->task_base);
898 	free(worker);
899 	spdk_app_stop(-1);
900 }
901 
902 static void
903 accel_done(void *cb_arg, int status)
904 {
905 	struct ap_task *task = (struct ap_task *)cb_arg;
906 	struct worker_thread *worker = task->worker;
907 
908 	assert(worker);
909 
910 	task->status = status;
911 	spdk_thread_send_msg(worker->thread, _accel_done, task);
912 }
913 
914 static void
915 accel_perf_start(void *arg1)
916 {
917 	struct spdk_io_channel *accel_ch;
918 	struct spdk_cpuset tmp_cpumask = {};
919 	char thread_name[32];
920 	uint32_t i;
921 	int j;
922 	struct spdk_thread *thread;
923 	struct display_info *display;
924 
925 	accel_ch = spdk_accel_engine_get_io_channel();
926 	g_capabilites = spdk_accel_get_capabilities(accel_ch);
927 	spdk_put_io_channel(accel_ch);
928 
929 	if ((g_capabilites & g_workload_selection) != g_workload_selection) {
930 		SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n");
931 		SPDK_WARNLOG("The software engine will be used instead.\n\n");
932 	}
933 
934 	g_tsc_rate = spdk_get_ticks_hz();
935 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
936 
937 	printf("Running for %d seconds...\n", g_time_in_sec);
938 	fflush(stdout);
939 
940 	/* Create worker threads for each core that was specified. */
941 	SPDK_ENV_FOREACH_CORE(i) {
942 		for (j = 0; j < g_threads_per_core; j++) {
943 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
944 			spdk_cpuset_zero(&tmp_cpumask);
945 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
946 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
947 			display = calloc(1, sizeof(*display));
948 			if (display == NULL) {
949 				fprintf(stderr, "Unable to allocate memory\n");
950 				spdk_app_stop(-1);
951 				return;
952 			}
953 			display->core = i;
954 			display->thread = j;
955 			spdk_thread_send_msg(thread, _init_thread, display);
956 		}
957 	}
958 }
959 
960 int
961 main(int argc, char **argv)
962 {
963 	struct spdk_app_opts opts = {};
964 	struct worker_thread *worker, *tmp;
965 
966 	pthread_mutex_init(&g_workers_lock, NULL);
967 	spdk_app_opts_init(&opts, sizeof(opts));
968 	opts.reactor_mask = "0x1";
969 	if (spdk_app_parse_args(argc, argv, &opts, "C:o:q:t:yw:P:f:b:T:", NULL, parse_args,
970 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
971 		g_rc = -1;
972 		goto cleanup;
973 	}
974 
975 	if ((g_workload_selection != ACCEL_COPY) &&
976 	    (g_workload_selection != ACCEL_FILL) &&
977 	    (g_workload_selection != ACCEL_CRC32C) &&
978 	    (g_workload_selection != ACCEL_COMPARE) &&
979 	    (g_workload_selection != ACCEL_DUALCAST)) {
980 		usage();
981 		g_rc = -1;
982 		goto cleanup;
983 	}
984 
985 	if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) {
986 		fprintf(stdout, "batch size must be a multiple of queue depth\n");
987 		usage();
988 		g_rc = -1;
989 		goto cleanup;
990 	}
991 
992 	if (g_workload_selection == ACCEL_CRC32C &&
993 	    g_crc32c_chained_count == 0) {
994 		usage();
995 		g_rc = -1;
996 		goto cleanup;
997 	}
998 
999 	dump_user_config(&opts);
1000 	g_rc = spdk_app_start(&opts, accel_perf_start, NULL);
1001 	if (g_rc) {
1002 		SPDK_ERRLOG("ERROR starting application\n");
1003 	}
1004 
1005 	pthread_mutex_destroy(&g_workers_lock);
1006 
1007 	worker = g_workers;
1008 	while (worker) {
1009 		tmp = worker->next;
1010 		free(worker);
1011 		worker = tmp;
1012 	}
1013 cleanup:
1014 	spdk_app_fini();
1015 	return g_rc;
1016 }
1017