xref: /spdk/examples/accel/perf/accel_perf.c (revision 975852a079578816478a906717d1cf45fc97ddf3)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 #include "spdk/crc32.h"
42 #include "spdk/util.h"
43 
44 #define DATA_PATTERN 0x5a
45 #define ALIGN_4K 0x1000
46 
47 static bool g_using_sw_engine = false;
48 static uint64_t	g_tsc_rate;
49 static uint64_t g_tsc_end;
50 static int g_rc;
51 static int g_xfer_size_bytes = 4096;
52 static int g_queue_depth = 32;
53 /* g_allocate_depth indicates how many tasks we allocate per worker. It will
54  * be at least as much as the queue depth.
55  */
56 static int g_allocate_depth = 0;
57 static int g_ops_per_batch = 0;
58 static int g_threads_per_core = 1;
59 static int g_time_in_sec = 5;
60 static uint32_t g_crc32c_seed = 0;
61 static uint32_t g_crc32c_chained_count = 1;
62 static int g_fail_percent_goal = 0;
63 static uint8_t g_fill_pattern = 255;
64 static bool g_verify = false;
65 static const char *g_workload_type = NULL;
66 static enum accel_capability g_workload_selection;
67 static struct worker_thread *g_workers = NULL;
68 static int g_num_workers = 0;
69 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
70 
71 struct worker_thread;
72 static void accel_done(void *ref, int status);
73 
74 struct display_info {
75 	int core;
76 	int thread;
77 };
78 
79 struct ap_task {
80 	void			*src;
81 	struct iovec		*iovs;
82 	uint32_t		iov_cnt;
83 	void			*dst;
84 	void			*dst2;
85 	uint32_t		crc_dst;
86 	struct worker_thread	*worker;
87 	int			status;
88 	int			expected_status; /* used for the compare operation */
89 	TAILQ_ENTRY(ap_task)	link;
90 };
91 
92 struct accel_batch {
93 	int				status;
94 	int				cmd_count;
95 	struct spdk_accel_batch		*batch;
96 	struct worker_thread		*worker;
97 	TAILQ_ENTRY(accel_batch)	link;
98 };
99 
100 struct worker_thread {
101 	struct spdk_io_channel		*ch;
102 	uint64_t			xfer_completed;
103 	uint64_t			xfer_failed;
104 	uint64_t			injected_miscompares;
105 	uint64_t			current_queue_depth;
106 	TAILQ_HEAD(, ap_task)		tasks_pool;
107 	struct worker_thread		*next;
108 	unsigned			core;
109 	struct spdk_thread		*thread;
110 	bool				is_draining;
111 	struct spdk_poller		*is_draining_poller;
112 	struct spdk_poller		*stop_poller;
113 	void				*task_base;
114 	struct accel_batch		*batch_base;
115 	struct display_info		display;
116 	TAILQ_HEAD(, accel_batch)	in_prep_batches;
117 	TAILQ_HEAD(, accel_batch)	in_use_batches;
118 	TAILQ_HEAD(, accel_batch)	to_submit_batches;
119 };
120 
121 static void
122 dump_user_config(struct spdk_app_opts *opts)
123 {
124 	printf("SPDK Configuration:\n");
125 	printf("Core mask:      %s\n\n", opts->reactor_mask);
126 	printf("Accel Perf Configuration:\n");
127 	printf("Workload Type:  %s\n", g_workload_type);
128 	if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) {
129 		printf("CRC-32C seed:   %u\n", g_crc32c_seed);
130 		printf("vector count    %u\n", g_crc32c_chained_count);
131 	} else if (g_workload_selection == ACCEL_FILL) {
132 		printf("Fill pattern:   0x%x\n", g_fill_pattern);
133 	} else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) {
134 		printf("Failure inject: %u percent\n", g_fail_percent_goal);
135 	}
136 	if (g_workload_selection == ACCEL_COPY_CRC32C) {
137 		printf("Vector size:    %u bytes\n", g_xfer_size_bytes);
138 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count);
139 	} else {
140 		printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
141 	}
142 	printf("Queue depth:    %u\n", g_queue_depth);
143 	printf("Allocate depth: %u\n", g_allocate_depth);
144 	printf("# threads/core: %u\n", g_threads_per_core);
145 	printf("Run time:       %u seconds\n", g_time_in_sec);
146 	if (g_ops_per_batch > 0) {
147 		printf("Batching:       %u operations\n", g_ops_per_batch);
148 	} else {
149 		printf("Batching:       Disabled\n");
150 	}
151 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
152 }
153 
154 static void
155 usage(void)
156 {
157 	printf("accel_perf options:\n");
158 	printf("\t[-h help message]\n");
159 	printf("\t[-q queue depth per core]\n");
160 	printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n");
161 	printf("\t[-T number of threads per core\n");
162 	printf("\t[-n number of channels]\n");
163 	printf("\t[-o transfer size in bytes]\n");
164 	printf("\t[-t time in seconds]\n");
165 	printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n");
166 	printf("\t[-s for crc32c workload, use this seed value (default 0)\n");
167 	printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n");
168 	printf("\t[-f for fill workload, use this BYTE value (default 255)\n");
169 	printf("\t[-y verify result if this switch is on]\n");
170 	printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n");
171 	printf("\t[-a tasks to allocate per core (default: same value as -q)]\n");
172 	printf("\t\tCan be used to spread operations across a wider range of memory.\n");
173 }
174 
175 static int
176 parse_args(int argc, char *argv)
177 {
178 	int argval = 0;
179 
180 	switch (argc) {
181 	case 'a':
182 	case 'b':
183 	case 'C':
184 	case 'f':
185 	case 'T':
186 	case 'o':
187 	case 'P':
188 	case 'q':
189 	case 's':
190 	case 't':
191 		argval = spdk_strtol(optarg, 10);
192 		if (argval < 0) {
193 			fprintf(stderr, "-%c option must be non-negative.\n", argc);
194 			usage();
195 			return 1;
196 		}
197 		break;
198 	default:
199 		break;
200 	};
201 
202 	switch (argc) {
203 	case 'a':
204 		g_allocate_depth = argval;
205 		break;
206 	case 'b':
207 		g_ops_per_batch = argval;
208 		break;
209 	case 'C':
210 		g_crc32c_chained_count = argval;
211 		break;
212 	case 'f':
213 		g_fill_pattern = (uint8_t)argval;
214 		break;
215 	case 'T':
216 		g_threads_per_core = argval;
217 		break;
218 	case 'o':
219 		g_xfer_size_bytes = argval;
220 		break;
221 	case 'P':
222 		g_fail_percent_goal = argval;
223 		break;
224 	case 'q':
225 		g_queue_depth = argval;
226 		break;
227 	case 's':
228 		g_crc32c_seed = argval;
229 		break;
230 	case 't':
231 		g_time_in_sec = argval;
232 		break;
233 	case 'y':
234 		g_verify = true;
235 		break;
236 	case 'w':
237 		g_workload_type = optarg;
238 		if (!strcmp(g_workload_type, "copy")) {
239 			g_workload_selection = ACCEL_COPY;
240 		} else if (!strcmp(g_workload_type, "fill")) {
241 			g_workload_selection = ACCEL_FILL;
242 		} else if (!strcmp(g_workload_type, "crc32c")) {
243 			g_workload_selection = ACCEL_CRC32C;
244 		} else if (!strcmp(g_workload_type, "copy_crc32c")) {
245 			g_workload_selection = ACCEL_COPY_CRC32C;
246 		} else if (!strcmp(g_workload_type, "compare")) {
247 			g_workload_selection = ACCEL_COMPARE;
248 		} else if (!strcmp(g_workload_type, "dualcast")) {
249 			g_workload_selection = ACCEL_DUALCAST;
250 		}
251 		break;
252 	default:
253 		usage();
254 		return 1;
255 	}
256 
257 	return 0;
258 }
259 
260 static int dump_result(void);
261 static void
262 unregister_worker(void *arg1)
263 {
264 	struct worker_thread *worker = arg1;
265 
266 	free(worker->task_base);
267 	free(worker->batch_base);
268 	spdk_put_io_channel(worker->ch);
269 	pthread_mutex_lock(&g_workers_lock);
270 	assert(g_num_workers >= 1);
271 	if (--g_num_workers == 0) {
272 		pthread_mutex_unlock(&g_workers_lock);
273 		g_rc = dump_result();
274 		spdk_app_stop(0);
275 	}
276 	pthread_mutex_unlock(&g_workers_lock);
277 }
278 
279 static int
280 _get_task_data_bufs(struct ap_task *task)
281 {
282 	uint32_t align = 0;
283 	uint32_t i = 0;
284 	int dst_buff_len = g_xfer_size_bytes;
285 
286 	/* For dualcast, the DSA HW requires 4K alignment on destination addresses but
287 	 * we do this for all engines to keep it simple.
288 	 */
289 	if (g_workload_selection == ACCEL_DUALCAST) {
290 		align = ALIGN_4K;
291 	}
292 
293 	if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) {
294 		assert(g_crc32c_chained_count > 0);
295 		task->iov_cnt = g_crc32c_chained_count;
296 		task->iovs = calloc(task->iov_cnt, sizeof(struct iovec));
297 		if (!task->iovs) {
298 			fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task);
299 			return -ENOMEM;
300 		}
301 
302 		if (g_workload_selection == ACCEL_COPY_CRC32C) {
303 			dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count;
304 		}
305 
306 		for (i = 0; i < task->iov_cnt; i++) {
307 			task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
308 			if (task->iovs[i].iov_base == NULL) {
309 				return -ENOMEM;
310 			}
311 			memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes);
312 			task->iovs[i].iov_len = g_xfer_size_bytes;
313 		}
314 
315 	} else {
316 		task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL);
317 		if (task->src == NULL) {
318 			fprintf(stderr, "Unable to alloc src buffer\n");
319 			return -ENOMEM;
320 		}
321 
322 		/* For fill, set the entire src buffer so we can check if verify is enabled. */
323 		if (g_workload_selection == ACCEL_FILL) {
324 			memset(task->src, g_fill_pattern, g_xfer_size_bytes);
325 		} else {
326 			memset(task->src, DATA_PATTERN, g_xfer_size_bytes);
327 		}
328 	}
329 
330 	if (g_workload_selection != ACCEL_COPY_CRC32C) {
331 		task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL);
332 		if (task->dst == NULL) {
333 			fprintf(stderr, "Unable to alloc dst buffer\n");
334 			return -ENOMEM;
335 		}
336 
337 		/* For compare we want the buffers to match, otherwise not. */
338 		if (g_workload_selection == ACCEL_COMPARE) {
339 			memset(task->dst, DATA_PATTERN, dst_buff_len);
340 		} else {
341 			memset(task->dst, ~DATA_PATTERN, dst_buff_len);
342 		}
343 	}
344 
345 	if (g_workload_selection == ACCEL_DUALCAST) {
346 		task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL);
347 		if (task->dst2 == NULL) {
348 			fprintf(stderr, "Unable to alloc dst buffer\n");
349 			return -ENOMEM;
350 		}
351 		memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes);
352 	}
353 
354 	return 0;
355 }
356 
357 inline static struct ap_task *
358 _get_task(struct worker_thread *worker)
359 {
360 	struct ap_task *task;
361 
362 	if (!TAILQ_EMPTY(&worker->tasks_pool)) {
363 		task = TAILQ_FIRST(&worker->tasks_pool);
364 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
365 	} else {
366 		fprintf(stderr, "Unable to get ap_task\n");
367 		return NULL;
368 	}
369 
370 	return task;
371 }
372 
373 /* Submit one operation using the same ap task that just completed. */
374 static void
375 _submit_single(struct worker_thread *worker, struct ap_task *task)
376 {
377 	int random_num;
378 	int rc = 0;
379 
380 	assert(worker);
381 
382 	switch (g_workload_selection) {
383 	case ACCEL_COPY:
384 		rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src,
385 					    g_xfer_size_bytes, accel_done, task);
386 		break;
387 	case ACCEL_FILL:
388 		/* For fill use the first byte of the task->dst buffer */
389 		rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src,
390 					    g_xfer_size_bytes, accel_done, task);
391 		break;
392 	case ACCEL_CRC32C:
393 		rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst,
394 					       task->iovs, task->iov_cnt, g_crc32c_seed,
395 					       accel_done, task);
396 		break;
397 	case ACCEL_COPY_CRC32C:
398 		rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt,
399 						    &task->crc_dst, g_crc32c_seed, accel_done, task);
400 		break;
401 	case ACCEL_COMPARE:
402 		random_num = rand() % 100;
403 		if (random_num < g_fail_percent_goal) {
404 			task->expected_status = -EILSEQ;
405 			*(uint8_t *)task->dst = ~DATA_PATTERN;
406 		} else {
407 			task->expected_status = 0;
408 			*(uint8_t *)task->dst = DATA_PATTERN;
409 		}
410 		rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src,
411 					       g_xfer_size_bytes, accel_done, task);
412 		break;
413 	case ACCEL_DUALCAST:
414 		rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2,
415 						task->src, g_xfer_size_bytes, accel_done, task);
416 		break;
417 	default:
418 		assert(false);
419 		break;
420 
421 	}
422 
423 	if (rc) {
424 		accel_done(task, rc);
425 	}
426 }
427 
428 static int
429 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task,
430 		struct accel_batch *worker_batch)
431 {
432 	struct spdk_accel_batch *batch = worker_batch->batch;
433 	int rc = 0;
434 
435 	worker_batch->cmd_count++;
436 	assert(worker_batch->cmd_count <= g_ops_per_batch);
437 
438 	switch (g_workload_selection) {
439 	case ACCEL_COPY:
440 		rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst,
441 						task->src, g_xfer_size_bytes, accel_done, task);
442 		break;
443 	case ACCEL_DUALCAST:
444 		rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2,
445 						    task->src, g_xfer_size_bytes, accel_done, task);
446 		break;
447 	case ACCEL_COMPARE:
448 		rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src,
449 						   g_xfer_size_bytes, accel_done, task);
450 		break;
451 	case ACCEL_FILL:
452 		rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst,
453 						*(uint8_t *)task->src,
454 						g_xfer_size_bytes, accel_done, task);
455 		break;
456 	case ACCEL_COPY_CRC32C:
457 		rc = spdk_accel_batch_prep_copy_crc32c(worker->ch, batch, task->dst, task->src, &task->crc_dst,
458 						       g_crc32c_seed, g_xfer_size_bytes, accel_done, task);
459 		break;
460 	case ACCEL_CRC32C:
461 		rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, &task->crc_dst,
462 						   task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task);
463 		break;
464 	default:
465 		assert(false);
466 		break;
467 	}
468 
469 	return rc;
470 }
471 
472 static void
473 _free_task_buffers(struct ap_task *task)
474 {
475 	uint32_t i;
476 
477 	if (g_workload_selection == ACCEL_CRC32C) {
478 		if (task->iovs) {
479 			for (i = 0; i < task->iov_cnt; i++) {
480 				if (task->iovs[i].iov_base) {
481 					spdk_dma_free(task->iovs[i].iov_base);
482 				}
483 			}
484 			free(task->iovs);
485 		}
486 	} else {
487 		spdk_dma_free(task->src);
488 	}
489 
490 	spdk_dma_free(task->dst);
491 	if (g_workload_selection == ACCEL_DUALCAST) {
492 		spdk_dma_free(task->dst2);
493 	}
494 }
495 
496 static void _batch_done(void *cb_arg);
497 static void
498 _build_batch(struct worker_thread *worker, struct ap_task *task)
499 {
500 	struct accel_batch *worker_batch = NULL;
501 	int rc;
502 
503 	assert(!TAILQ_EMPTY(&worker->in_prep_batches));
504 
505 	worker_batch = TAILQ_FIRST(&worker->in_prep_batches);
506 
507 	/* If an accel batch hasn't been created yet do so now. */
508 	if (worker_batch->batch == NULL) {
509 		worker_batch->batch = spdk_accel_batch_create(worker->ch);
510 		if (worker_batch->batch == NULL) {
511 			fprintf(stderr, "error unable to create new batch\n");
512 			return;
513 		}
514 	}
515 
516 	/* Prep the command re-using the last completed command's task */
517 	rc = _batch_prep_cmd(worker, task, worker_batch);
518 	if (rc) {
519 		fprintf(stderr, "error preping command for batch\n");
520 		goto error;
521 	}
522 
523 	/* If this batch is full move it to the to_submit list so it gets
524 	 * submitted as batches complete.
525 	 */
526 	if (worker_batch->cmd_count == g_ops_per_batch) {
527 		TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link);
528 		TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link);
529 	}
530 
531 	return;
532 error:
533 	spdk_accel_batch_cancel(worker->ch, worker_batch->batch);
534 
535 }
536 
537 static void batch_done(void *cb_arg, int status);
538 static void
539 _drain_batch(struct worker_thread *worker)
540 {
541 	struct accel_batch *worker_batch, *tmp;
542 	int rc;
543 
544 	/* submit any batches that were being built up. */
545 	TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) {
546 		if (worker_batch->cmd_count == 0) {
547 			continue;
548 		}
549 		worker->current_queue_depth += worker_batch->cmd_count + 1;
550 
551 		TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link);
552 		TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link);
553 		rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch);
554 		if (rc == 0) {
555 			worker_batch->cmd_count = 0;
556 		} else {
557 			fprintf(stderr, "error sending final batch\n");
558 			worker->current_queue_depth -= worker_batch->cmd_count + 1;
559 			break;
560 		}
561 	}
562 }
563 
564 static void
565 _batch_done(void *cb_arg)
566 {
567 	struct accel_batch *worker_batch = (struct accel_batch *)cb_arg;
568 	struct worker_thread *worker = worker_batch->worker;
569 	int rc;
570 
571 	assert(TAILQ_EMPTY(&worker->in_use_batches) == 0);
572 
573 	if (worker_batch->status) {
574 		SPDK_ERRLOG("error %d\n", worker_batch->status);
575 	}
576 
577 	worker->current_queue_depth--;
578 	TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link);
579 	TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link);
580 	worker_batch->batch = NULL;
581 	worker_batch->cmd_count = 0;
582 
583 	if (!worker->is_draining) {
584 		worker_batch = TAILQ_FIRST(&worker->to_submit_batches);
585 		if (worker_batch != NULL) {
586 
587 			assert(worker_batch->cmd_count == g_ops_per_batch);
588 
589 			/* Add one for the batch command itself. */
590 			worker->current_queue_depth += g_ops_per_batch + 1;
591 			TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link);
592 			TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link);
593 
594 			rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch);
595 			if (rc) {
596 				fprintf(stderr, "error ending batch\n");
597 				worker->current_queue_depth -= g_ops_per_batch + 1;
598 				return;
599 			}
600 		}
601 	} else {
602 		_drain_batch(worker);
603 	}
604 }
605 
606 static void
607 batch_done(void *cb_arg, int status)
608 {
609 	struct accel_batch *worker_batch = (struct accel_batch *)cb_arg;
610 
611 	assert(worker_batch->worker);
612 
613 	worker_batch->status = status;
614 	spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch);
615 }
616 
617 static int
618 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt)
619 {
620 	uint32_t i;
621 	uint32_t ttl_len = 0;
622 	uint8_t *dst = (uint8_t *)_dst;
623 
624 	for (i = 0; i < iovcnt; i++) {
625 		if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) {
626 			return -1;
627 		}
628 		dst += src_iovs[i].iov_len;
629 		ttl_len += src_iovs[i].iov_len;
630 	}
631 
632 	if (ttl_len != iovcnt * g_xfer_size_bytes) {
633 		return -1;
634 	}
635 
636 	return 0;
637 }
638 
639 static void
640 _accel_done(void *arg1)
641 {
642 	struct ap_task *task = arg1;
643 	struct worker_thread *worker = task->worker;
644 	uint32_t sw_crc32c;
645 
646 	assert(worker);
647 	assert(worker->current_queue_depth > 0);
648 
649 	if (g_verify && task->status == 0) {
650 		switch (g_workload_selection) {
651 		case ACCEL_COPY_CRC32C:
652 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
653 			if (task->crc_dst != sw_crc32c) {
654 				SPDK_NOTICELOG("CRC-32C miscompare\n");
655 				worker->xfer_failed++;
656 			}
657 			if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) {
658 				SPDK_NOTICELOG("Data miscompare\n");
659 				worker->xfer_failed++;
660 			}
661 			break;
662 		case ACCEL_CRC32C:
663 			sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed);
664 			if (task->crc_dst != sw_crc32c) {
665 				SPDK_NOTICELOG("CRC-32C miscompare\n");
666 				worker->xfer_failed++;
667 			}
668 			break;
669 		case ACCEL_COPY:
670 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
671 				SPDK_NOTICELOG("Data miscompare\n");
672 				worker->xfer_failed++;
673 			}
674 			break;
675 		case ACCEL_DUALCAST:
676 			if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
677 				SPDK_NOTICELOG("Data miscompare, first destination\n");
678 				worker->xfer_failed++;
679 			}
680 			if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) {
681 				SPDK_NOTICELOG("Data miscompare, second destination\n");
682 				worker->xfer_failed++;
683 			}
684 			break;
685 		case ACCEL_FILL:
686 			if (memcmp(task->dst, task->src, g_xfer_size_bytes)) {
687 				SPDK_NOTICELOG("Data miscompare\n");
688 				worker->xfer_failed++;
689 			}
690 			break;
691 		case ACCEL_COMPARE:
692 			break;
693 		default:
694 			assert(false);
695 			break;
696 		}
697 	}
698 
699 	if (task->expected_status == -EILSEQ) {
700 		assert(task->status != 0);
701 		worker->injected_miscompares++;
702 	} else if (task->status) {
703 		/* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */
704 		worker->xfer_failed++;
705 	}
706 
707 	worker->xfer_completed++;
708 	worker->current_queue_depth--;
709 
710 	if (!worker->is_draining) {
711 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
712 		task = _get_task(worker);
713 		if (g_ops_per_batch == 0) {
714 			_submit_single(worker, task);
715 			worker->current_queue_depth++;
716 		} else {
717 			_build_batch(worker, task);
718 		}
719 	} else if (g_ops_per_batch > 0) {
720 		_drain_batch(worker);
721 	} else {
722 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
723 	}
724 }
725 
726 static int
727 dump_result(void)
728 {
729 	uint64_t total_completed = 0;
730 	uint64_t total_failed = 0;
731 	uint64_t total_miscompared = 0;
732 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
733 	struct worker_thread *worker = g_workers;
734 
735 	printf("\nCore,Thread   Transfers     Bandwidth     Failed     Miscompares\n");
736 	printf("------------------------------------------------------------------------\n");
737 	while (worker != NULL) {
738 
739 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
740 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
741 				       (g_time_in_sec * 1024 * 1024);
742 
743 		total_completed += worker->xfer_completed;
744 		total_failed += worker->xfer_failed;
745 		total_miscompared += worker->injected_miscompares;
746 
747 		if (xfer_per_sec) {
748 			printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n",
749 			       worker->display.core, worker->display.thread, xfer_per_sec,
750 			       bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares);
751 		}
752 
753 		worker = worker->next;
754 	}
755 
756 	total_xfer_per_sec = total_completed / g_time_in_sec;
757 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
758 			    (g_time_in_sec * 1024 * 1024);
759 
760 	printf("=========================================================================\n");
761 	printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n",
762 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared);
763 
764 	return total_failed ? 1 : 0;
765 }
766 
767 static inline void
768 _free_task_buffers_in_pool(struct worker_thread *worker)
769 {
770 	struct ap_task *task;
771 
772 	assert(worker);
773 	while ((task = TAILQ_FIRST(&worker->tasks_pool))) {
774 		TAILQ_REMOVE(&worker->tasks_pool, task, link);
775 		_free_task_buffers(task);
776 	}
777 }
778 
779 static int
780 _check_draining(void *arg)
781 {
782 	struct worker_thread *worker = arg;
783 
784 	assert(worker);
785 
786 	if (worker->current_queue_depth == 0) {
787 		_free_task_buffers_in_pool(worker);
788 		spdk_poller_unregister(&worker->is_draining_poller);
789 		unregister_worker(worker);
790 	}
791 
792 	return -1;
793 }
794 
795 static int
796 _worker_stop(void *arg)
797 {
798 	struct worker_thread *worker = arg;
799 
800 	assert(worker);
801 
802 	spdk_poller_unregister(&worker->stop_poller);
803 
804 	/* now let the worker drain and check it's outstanding IO with a poller */
805 	worker->is_draining = true;
806 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
807 
808 	return 0;
809 }
810 
811 static void
812 _init_thread(void *arg1)
813 {
814 	struct worker_thread *worker;
815 	struct ap_task *task;
816 	int i, rc, num_batches;
817 	int max_per_batch;
818 	int remaining = g_queue_depth;
819 	int num_tasks = g_allocate_depth;
820 	struct accel_batch *tmp;
821 	struct accel_batch *worker_batch = NULL;
822 	struct display_info *display = arg1;
823 	uint64_t capabilities;
824 
825 	worker = calloc(1, sizeof(*worker));
826 	if (worker == NULL) {
827 		fprintf(stderr, "Unable to allocate worker\n");
828 		free(display);
829 		return;
830 	}
831 
832 	worker->display.core = display->core;
833 	worker->display.thread = display->thread;
834 	free(display);
835 	worker->core = spdk_env_get_current_core();
836 	worker->thread = spdk_get_thread();
837 	pthread_mutex_lock(&g_workers_lock);
838 	g_num_workers++;
839 	worker->next = g_workers;
840 	g_workers = worker;
841 	pthread_mutex_unlock(&g_workers_lock);
842 	worker->ch = spdk_accel_engine_get_io_channel();
843 
844 	if (g_num_workers == 1) {
845 		capabilities = spdk_accel_get_capabilities(worker->ch);
846 		if ((capabilities & g_workload_selection) != g_workload_selection) {
847 			g_using_sw_engine = true;
848 			SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n");
849 			SPDK_WARNLOG("The software engine will be used instead.\n\n");
850 		}
851 	}
852 
853 	TAILQ_INIT(&worker->tasks_pool);
854 
855 	if (g_ops_per_batch > 0) {
856 
857 		max_per_batch = spdk_accel_batch_get_max(worker->ch);
858 		assert(max_per_batch > 0);
859 
860 		if (g_ops_per_batch > max_per_batch) {
861 			fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch);
862 			g_ops_per_batch = max_per_batch;
863 		}
864 
865 		if (g_ops_per_batch > g_queue_depth) {
866 			fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth);
867 			g_ops_per_batch = g_queue_depth;
868 		}
869 
870 		TAILQ_INIT(&worker->in_prep_batches);
871 		TAILQ_INIT(&worker->to_submit_batches);
872 		TAILQ_INIT(&worker->in_use_batches);
873 
874 		/* A worker_batch will live on one of 3 lists:
875 		 * IN_PREP: as individual IOs complete new ones are built on on a
876 		 *          worker_batch on this list until it reaches g_ops_per_batch.
877 		 * TO_SUBMIT: as batches are built up on IO completion they are moved
878 		 *	      to this list once they are full.  This list is used in
879 		 *	      batch completion to start new batches.
880 		 * IN_USE: the worker_batch is outstanding and will be moved to in prep
881 		 *         list when the batch is completed.
882 		 *
883 		 * So we need enough to cover Q depth loading and then one to replace
884 		 * each one of those and for when everything is outstanding there needs
885 		 * to be one extra batch to build up while the last batch is completing
886 		 * IO but before it's completed the batch command.
887 		 */
888 		num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1;
889 		worker->batch_base = calloc(num_batches, sizeof(struct accel_batch));
890 		worker_batch = worker->batch_base;
891 		for (i = 0; i < num_batches; i++) {
892 			worker_batch->worker = worker;
893 			TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link);
894 			worker_batch++;
895 		}
896 	}
897 
898 	worker->task_base = calloc(num_tasks, sizeof(struct ap_task));
899 	if (worker->task_base == NULL) {
900 		fprintf(stderr, "Could not allocate task base.\n");
901 		goto error;
902 	}
903 
904 	task = worker->task_base;
905 	for (i = 0; i < num_tasks; i++) {
906 		TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link);
907 		task->worker = worker;
908 		if (_get_task_data_bufs(task)) {
909 			fprintf(stderr, "Unable to get data bufs\n");
910 			goto error;
911 		}
912 		task++;
913 	}
914 
915 	/* Register a poller that will stop the worker at time elapsed */
916 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
917 			      g_time_in_sec * 1000000ULL);
918 
919 	/* If batching is enabled load up to the full Q depth before
920 	 * processing any completions, then ping pong between two batches,
921 	 * one processing and one being built up for when the other completes.
922 	 */
923 	if (g_ops_per_batch > 0) {
924 		do {
925 			worker_batch = TAILQ_FIRST(&worker->in_prep_batches);
926 			if (worker_batch == NULL) {
927 				goto error;
928 			}
929 
930 			worker_batch->batch = spdk_accel_batch_create(worker->ch);
931 			if (worker_batch->batch == NULL) {
932 				raise(SIGINT);
933 				break;
934 			}
935 
936 			for (i = 0; i < g_ops_per_batch; i++) {
937 				task = _get_task(worker);
938 				worker->current_queue_depth++;
939 				if (task == NULL) {
940 					goto error;
941 				}
942 
943 				rc = _batch_prep_cmd(worker, task, worker_batch);
944 				if (rc) {
945 					fprintf(stderr, "error preping command\n");
946 					goto error;
947 				}
948 			}
949 
950 			/* for the batch operation itself. */
951 			task->worker->current_queue_depth++;
952 			TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link);
953 			TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link);
954 
955 			rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch);
956 			if (rc) {
957 				fprintf(stderr, "error ending batch\n");
958 				goto error;
959 			}
960 			assert(remaining >= g_ops_per_batch);
961 			remaining -= g_ops_per_batch;
962 		} while (remaining > 0);
963 	}
964 
965 	/* Submit as singles when no batching is enabled or we ran out of batches. */
966 	for (i = 0; i < remaining; i++) {
967 		task = _get_task(worker);
968 		worker->current_queue_depth++;
969 		if (task == NULL) {
970 			goto error;
971 		}
972 
973 		_submit_single(worker, task);
974 	}
975 	return;
976 error:
977 	if (worker_batch && worker_batch->batch) {
978 		TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) {
979 			spdk_accel_batch_cancel(worker->ch, worker_batch->batch);
980 			TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link);
981 		}
982 	}
983 
984 	_free_task_buffers_in_pool(worker);
985 	free(worker->batch_base);
986 	free(worker->task_base);
987 	free(worker);
988 	spdk_app_stop(-1);
989 }
990 
991 static void
992 accel_done(void *cb_arg, int status)
993 {
994 	struct ap_task *task = (struct ap_task *)cb_arg;
995 	struct worker_thread *worker = task->worker;
996 
997 	assert(worker);
998 
999 	task->status = status;
1000 	if (g_using_sw_engine == false) {
1001 		_accel_done(task);
1002 	} else {
1003 		spdk_thread_send_msg(worker->thread, _accel_done, task);
1004 	}
1005 }
1006 
1007 static void
1008 accel_perf_start(void *arg1)
1009 {
1010 	struct spdk_cpuset tmp_cpumask = {};
1011 	char thread_name[32];
1012 	uint32_t i;
1013 	int j;
1014 	struct spdk_thread *thread;
1015 	struct display_info *display;
1016 
1017 	g_tsc_rate = spdk_get_ticks_hz();
1018 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
1019 
1020 	printf("Running for %d seconds...\n", g_time_in_sec);
1021 	fflush(stdout);
1022 
1023 	/* Create worker threads for each core that was specified. */
1024 	SPDK_ENV_FOREACH_CORE(i) {
1025 		for (j = 0; j < g_threads_per_core; j++) {
1026 			snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j);
1027 			spdk_cpuset_zero(&tmp_cpumask);
1028 			spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
1029 			thread = spdk_thread_create(thread_name, &tmp_cpumask);
1030 			display = calloc(1, sizeof(*display));
1031 			if (display == NULL) {
1032 				fprintf(stderr, "Unable to allocate memory\n");
1033 				spdk_app_stop(-1);
1034 				return;
1035 			}
1036 			display->core = i;
1037 			display->thread = j;
1038 			spdk_thread_send_msg(thread, _init_thread, display);
1039 		}
1040 	}
1041 }
1042 
1043 int
1044 main(int argc, char **argv)
1045 {
1046 	struct spdk_app_opts opts = {};
1047 	struct worker_thread *worker, *tmp;
1048 
1049 	pthread_mutex_init(&g_workers_lock, NULL);
1050 	spdk_app_opts_init(&opts, sizeof(opts));
1051 	opts.reactor_mask = "0x1";
1052 	if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:b:T:", NULL, parse_args,
1053 				usage) != SPDK_APP_PARSE_ARGS_SUCCESS) {
1054 		g_rc = -1;
1055 		goto cleanup;
1056 	}
1057 
1058 	if ((g_workload_selection != ACCEL_COPY) &&
1059 	    (g_workload_selection != ACCEL_FILL) &&
1060 	    (g_workload_selection != ACCEL_CRC32C) &&
1061 	    (g_workload_selection != ACCEL_COPY_CRC32C) &&
1062 	    (g_workload_selection != ACCEL_COMPARE) &&
1063 	    (g_workload_selection != ACCEL_DUALCAST)) {
1064 		usage();
1065 		g_rc = -1;
1066 		goto cleanup;
1067 	}
1068 
1069 	if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) {
1070 		fprintf(stdout, "batch size must be a multiple of queue depth\n");
1071 		usage();
1072 		g_rc = -1;
1073 		goto cleanup;
1074 	}
1075 
1076 	if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) {
1077 		fprintf(stdout, "allocate depth must be at least as big as queue depth\n");
1078 		usage();
1079 		g_rc = -1;
1080 		goto cleanup;
1081 	}
1082 
1083 	if (g_allocate_depth == 0) {
1084 		g_allocate_depth = g_queue_depth;
1085 	}
1086 
1087 	if ((g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) &&
1088 	    g_crc32c_chained_count == 0) {
1089 		usage();
1090 		g_rc = -1;
1091 		goto cleanup;
1092 	}
1093 
1094 	dump_user_config(&opts);
1095 	g_rc = spdk_app_start(&opts, accel_perf_start, NULL);
1096 	if (g_rc) {
1097 		SPDK_ERRLOG("ERROR starting application\n");
1098 	}
1099 
1100 	pthread_mutex_destroy(&g_workers_lock);
1101 
1102 	worker = g_workers;
1103 	while (worker) {
1104 		tmp = worker->next;
1105 		free(worker);
1106 		worker = tmp;
1107 	}
1108 cleanup:
1109 	spdk_app_fini();
1110 	return g_rc;
1111 }
1112