xref: /spdk/examples/accel/perf/accel_perf.c (revision 33f97fa33ad89651d75bafb5fb87dc4cd28dde6a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 #include "spdk/thread.h"
36 #include "spdk/env.h"
37 #include "spdk/event.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/accel_engine.h"
41 
42 static uint64_t	g_tsc_rate;
43 static uint64_t g_tsc_us_rate;
44 static uint64_t g_tsc_end;
45 static int g_xfer_size_bytes = 4096;
46 static int g_queue_depth = 32;
47 static int g_time_in_sec = 5;
48 static bool g_verify = false;
49 static const char *g_workload_type = NULL;
50 static struct worker_thread *g_workers = NULL;
51 static int g_num_workers = 0;
52 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER;
53 
54 struct worker_thread {
55 	struct spdk_io_channel		*ch;
56 	uint64_t			xfer_completed;
57 	uint64_t			xfer_failed;
58 	uint64_t			current_queue_depth;
59 	struct spdk_mempool		*data_pool;
60 	struct spdk_mempool		*task_pool;
61 	struct worker_thread		*next;
62 	unsigned			core;
63 	struct spdk_thread		*thread;
64 	bool				is_draining;
65 	struct spdk_poller		*is_draining_poller;
66 	struct spdk_poller		*stop_poller;
67 };
68 
69 struct ap_task {
70 	void			*src;
71 	void			*dst;
72 	struct worker_thread	*worker;
73 };
74 
75 inline static struct ap_task *
76 __ap_task_from_accel_task(struct spdk_accel_task *at)
77 {
78 	return (struct ap_task *)((uintptr_t)at - sizeof(struct ap_task));
79 }
80 
81 inline static struct spdk_accel_task *
82 __accel_task_from_ap_task(struct ap_task *ap)
83 {
84 	return (struct spdk_accel_task *)((uintptr_t)ap + sizeof(struct ap_task));
85 }
86 
87 static void
88 dump_user_config(struct spdk_app_opts *opts)
89 {
90 	printf("SPDK Configuration:\n");
91 	printf("Core mask:      %s\n\n", opts->reactor_mask);
92 	printf("Accel Perf Configuration:\n");
93 	printf("Workload Type:  %s\n", g_workload_type);
94 	printf("Transfer size:  %u bytes\n", g_xfer_size_bytes);
95 	printf("Queue depth:    %u\n", g_queue_depth);
96 	printf("Run time:       %u seconds\n", g_time_in_sec);
97 	printf("Verify:         %s\n\n", g_verify ? "Yes" : "No");
98 }
99 
100 static void
101 usage(void)
102 {
103 	printf("accel_perf options:\n");
104 	printf("\t[-h help message]\n");
105 	printf("\t[-q queue depth]\n");
106 	printf("\t[-n number of channels]\n");
107 	printf("\t[-o transfer size in bytes]\n");
108 	printf("\t[-t time in seconds]\n");
109 	printf("\t[-w workload type must be one of these: copy, fill\n");
110 	printf("\t[-y verify result if this switch is on]\n");
111 }
112 
113 static int
114 parse_args(int argc, char *argv)
115 {
116 	switch (argc) {
117 	case 'o':
118 		g_xfer_size_bytes = spdk_strtol(optarg, 10);
119 		break;
120 	case 'q':
121 		g_queue_depth = spdk_strtol(optarg, 10);
122 		break;
123 	case 't':
124 		g_time_in_sec = spdk_strtol(optarg, 10);
125 		break;
126 	case 'y':
127 		g_verify = true;
128 		break;
129 	case 'w':
130 		g_workload_type = optarg;
131 		break;
132 	default:
133 		usage();
134 		return 1;
135 	}
136 	return 0;
137 }
138 
139 static void
140 unregister_worker(void *arg1)
141 {
142 	struct worker_thread *worker = arg1;
143 
144 	spdk_mempool_free(worker->data_pool);
145 	spdk_mempool_free(worker->task_pool);
146 	spdk_put_io_channel(worker->ch);
147 	pthread_mutex_lock(&g_workers_lock);
148 	assert(g_num_workers >= 1);
149 	if (--g_num_workers == 0) {
150 		pthread_mutex_unlock(&g_workers_lock);
151 		spdk_app_stop(0);
152 	}
153 	pthread_mutex_unlock(&g_workers_lock);
154 }
155 
156 static void accel_done(void *ref, int status);
157 
158 static void
159 _submit_single(void *arg1, void *arg2)
160 {
161 	struct worker_thread *worker = arg1;
162 	struct ap_task *task = arg2;
163 
164 	assert(worker);
165 
166 	if (g_verify) {
167 		memset(task->src, 0x5a, g_xfer_size_bytes);
168 		memset(task->dst, 0xa5, g_xfer_size_bytes);
169 	}
170 	task->worker = worker;
171 	task->worker->current_queue_depth++;
172 	if (!strcmp(g_workload_type, "copy")) {
173 		spdk_accel_submit_copy(__accel_task_from_ap_task(task),
174 				       worker->ch, task->dst,
175 				       task->src, g_xfer_size_bytes, accel_done);
176 	} else if (!strcmp(g_workload_type, "fill")) {
177 		/* For fill use the first byte of the task->dst buffer */
178 		spdk_accel_submit_fill(__accel_task_from_ap_task(task),
179 				       worker->ch, task->dst, *(uint8_t *)task->src,
180 				       g_xfer_size_bytes, accel_done);
181 	} else {
182 		assert(false);
183 	}
184 }
185 
186 static void
187 _accel_done(void *arg1)
188 {
189 	struct ap_task *task = arg1;
190 	struct worker_thread *worker = task->worker;
191 
192 	assert(worker);
193 	assert(worker->current_queue_depth > 0);
194 
195 	if (g_verify) {
196 		if (memcmp(task->src, task->dst, g_xfer_size_bytes)) {
197 			SPDK_NOTICELOG("Data miscompare\n");
198 			worker->xfer_failed++;
199 			/* TODO: cleanup */
200 			exit(-1);
201 		}
202 	}
203 	worker->xfer_completed++;
204 	worker->current_queue_depth--;
205 
206 	if (!worker->is_draining) {
207 		_submit_single(worker, task);
208 	} else {
209 		spdk_mempool_put(worker->data_pool, task->src);
210 		spdk_mempool_put(worker->data_pool, task->dst);
211 		spdk_mempool_put(worker->task_pool, task);
212 	}
213 }
214 
215 static int
216 dump_result(void)
217 {
218 	uint64_t total_completed = 0;
219 	uint64_t total_failed = 0;
220 	uint64_t total_xfer_per_sec, total_bw_in_MiBps;
221 	struct worker_thread *worker = g_workers;
222 
223 	printf("\nCore           Transfers     Bandwidth     Failed\n");
224 	printf("-------------------------------------------------\n");
225 	while (worker != NULL) {
226 
227 		uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec;
228 		uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) /
229 				       (g_time_in_sec * 1024 * 1024);
230 
231 		total_completed += worker->xfer_completed;
232 		total_failed += worker->xfer_failed;
233 
234 		if (xfer_per_sec) {
235 			printf("%10d%12" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 "\n",
236 			       worker->core, xfer_per_sec,
237 			       bw_in_MiBps, worker->xfer_failed);
238 		}
239 
240 		worker = worker->next;
241 	}
242 
243 	total_xfer_per_sec = total_completed / g_time_in_sec;
244 	total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) /
245 			    (g_time_in_sec * 1024 * 1024);
246 
247 	printf("=================================================\n");
248 	printf("Total:%16" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 "\n\n",
249 	       total_xfer_per_sec, total_bw_in_MiBps, total_failed);
250 
251 	return total_failed ? 1 : 0;
252 }
253 
254 static int
255 _check_draining(void *arg)
256 {
257 	struct worker_thread *worker = arg;
258 
259 	assert(worker);
260 
261 	if (worker->current_queue_depth == 0) {
262 		spdk_poller_unregister(&worker->is_draining_poller);
263 		unregister_worker(worker);
264 	}
265 
266 	return -1;
267 }
268 
269 static int
270 _worker_stop(void *arg)
271 {
272 	struct worker_thread *worker = arg;
273 
274 	assert(worker);
275 
276 	spdk_poller_unregister(&worker->stop_poller);
277 
278 	/* now let the worker drain and check it's outstanding IO with a poller */
279 	worker->is_draining = true;
280 	worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0);
281 
282 	return 0;
283 }
284 
285 static void
286 _init_thread_done(void *ctx)
287 {
288 }
289 
290 static void
291 _init_thread(void *arg1)
292 {
293 	struct worker_thread *worker;
294 	char buf_pool_name[30], task_pool_name[30];
295 	struct ap_task *task;
296 	int i;
297 
298 	worker = calloc(1, sizeof(*worker));
299 	if (worker == NULL) {
300 		fprintf(stderr, "Unable to allocate worker\n");
301 		return;
302 	}
303 
304 	worker->core = spdk_env_get_current_core();
305 	worker->thread = spdk_get_thread();
306 	worker->next = g_workers;
307 	worker->ch = spdk_accel_engine_get_io_channel();
308 	snprintf(buf_pool_name, sizeof(buf_pool_name), "buf_pool_%d", g_num_workers);
309 	snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers);
310 	worker->data_pool = spdk_mempool_create(buf_pool_name,
311 						g_queue_depth * 2, /* src + dst */
312 						g_xfer_size_bytes,
313 						SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
314 						SPDK_ENV_SOCKET_ID_ANY);
315 	worker->task_pool = spdk_mempool_create(task_pool_name,
316 						g_queue_depth,
317 						spdk_accel_task_size() + sizeof(struct ap_task),
318 						SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
319 						SPDK_ENV_SOCKET_ID_ANY);
320 	if (!worker->data_pool || !worker->task_pool) {
321 		fprintf(stderr, "Could not allocate buffer pool.\n");
322 		spdk_mempool_free(worker->data_pool);
323 		spdk_mempool_free(worker->task_pool);
324 		free(worker);
325 		return;
326 	}
327 
328 	/* Register a poller that will stop the worker at time elapsed */
329 	worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker,
330 			      g_time_in_sec * 1000000ULL);
331 
332 	g_workers = worker;
333 	pthread_mutex_lock(&g_workers_lock);
334 	g_num_workers++;
335 	pthread_mutex_unlock(&g_workers_lock);
336 
337 	for (i = 0; i < g_queue_depth; i++) {
338 		task = spdk_mempool_get(worker->task_pool);
339 		if (!task) {
340 			fprintf(stderr, "Unable to get accel_task\n");
341 			return;
342 		}
343 		task->src = spdk_mempool_get(worker->data_pool);
344 		task->dst = spdk_mempool_get(worker->data_pool);
345 		_submit_single(worker, task);
346 	}
347 }
348 
349 static void
350 accel_done(void *ref, int status)
351 {
352 	struct ap_task *task = __ap_task_from_accel_task(ref);
353 	struct worker_thread *worker = task->worker;
354 
355 	assert(worker);
356 
357 	spdk_thread_send_msg(worker->thread, _accel_done, task);
358 }
359 
360 static void
361 accel_perf_start(void *arg1)
362 {
363 	g_tsc_rate = spdk_get_ticks_hz();
364 	g_tsc_us_rate = g_tsc_rate / (1000 * 1000);
365 	g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
366 
367 	printf("Running for %d seconds...\n", g_time_in_sec);
368 	fflush(stdout);
369 
370 	spdk_for_each_thread(_init_thread, NULL, _init_thread_done);
371 }
372 
373 int
374 main(int argc, char **argv)
375 {
376 	struct spdk_app_opts opts = {};
377 	struct worker_thread *worker, *tmp;
378 	int rc = 0;
379 
380 	pthread_mutex_init(&g_workers_lock, NULL);
381 	spdk_app_opts_init(&opts);
382 	opts.reactor_mask = "0x1";
383 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:", NULL, parse_args,
384 				      usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) {
385 		rc = -1;
386 		goto cleanup;
387 	}
388 
389 	if (g_workload_type == NULL ||
390 	    (strcmp(g_workload_type, "copy") &&
391 	     strcmp(g_workload_type, "fill"))) {
392 		usage();
393 		rc = -1;
394 		goto cleanup;
395 	}
396 
397 	dump_user_config(&opts);
398 	rc = spdk_app_start(&opts, accel_perf_start, NULL);
399 	if (rc) {
400 		SPDK_ERRLOG("ERROR starting application\n");
401 	} else {
402 		dump_result();
403 	}
404 
405 	pthread_mutex_destroy(&g_workers_lock);
406 
407 	worker = g_workers;
408 	while (worker) {
409 		tmp = worker->next;
410 		free(worker);
411 		worker = tmp;
412 	}
413 cleanup:
414 	spdk_app_fini();
415 	return rc;
416 }
417