1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 42 static uint64_t g_tsc_rate; 43 static uint64_t g_tsc_us_rate; 44 static uint64_t g_tsc_end; 45 static int g_xfer_size_bytes = 4096; 46 static int g_queue_depth = 32; 47 static int g_time_in_sec = 5; 48 static bool g_verify = false; 49 static const char *g_workload_type = NULL; 50 static enum accel_capability g_workload_selection; 51 static struct worker_thread *g_workers = NULL; 52 static int g_num_workers = 0; 53 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 54 55 struct worker_thread { 56 struct spdk_io_channel *ch; 57 uint64_t xfer_completed; 58 uint64_t xfer_failed; 59 uint64_t current_queue_depth; 60 struct spdk_mempool *data_pool; 61 struct spdk_mempool *task_pool; 62 struct worker_thread *next; 63 unsigned core; 64 struct spdk_thread *thread; 65 bool is_draining; 66 struct spdk_poller *is_draining_poller; 67 struct spdk_poller *stop_poller; 68 }; 69 70 struct ap_task { 71 void *src; 72 void *dst; 73 struct worker_thread *worker; 74 }; 75 76 inline static struct ap_task * 77 __ap_task_from_accel_task(struct spdk_accel_task *at) 78 { 79 return (struct ap_task *)((uintptr_t)at - sizeof(struct ap_task)); 80 } 81 82 inline static struct spdk_accel_task * 83 __accel_task_from_ap_task(struct ap_task *ap) 84 { 85 return (struct spdk_accel_task *)((uintptr_t)ap + sizeof(struct ap_task)); 86 } 87 88 static void 89 dump_user_config(struct spdk_app_opts *opts) 90 { 91 printf("SPDK Configuration:\n"); 92 printf("Core mask: %s\n\n", opts->reactor_mask); 93 printf("Accel Perf Configuration:\n"); 94 printf("Workload Type: %s\n", g_workload_type); 95 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 96 printf("Queue depth: %u\n", g_queue_depth); 97 printf("Run time: %u seconds\n", g_time_in_sec); 98 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 99 } 100 101 static void 102 usage(void) 103 { 104 printf("accel_perf options:\n"); 105 printf("\t[-h help message]\n"); 106 printf("\t[-q queue depth]\n"); 107 printf("\t[-n number of channels]\n"); 108 printf("\t[-o transfer size in bytes]\n"); 109 printf("\t[-t time in seconds]\n"); 110 printf("\t[-w workload type must be one of these: copy, fill\n"); 111 printf("\t[-y verify result if this switch is on]\n"); 112 } 113 114 static int 115 parse_args(int argc, char *argv) 116 { 117 switch (argc) { 118 case 'o': 119 g_xfer_size_bytes = spdk_strtol(optarg, 10); 120 break; 121 case 'q': 122 g_queue_depth = spdk_strtol(optarg, 10); 123 break; 124 case 't': 125 g_time_in_sec = spdk_strtol(optarg, 10); 126 break; 127 case 'y': 128 g_verify = true; 129 break; 130 case 'w': 131 g_workload_type = optarg; 132 if (!strcmp(g_workload_type, "copy")) { 133 g_workload_selection = ACCEL_COPY; 134 } else if (!strcmp(g_workload_type, "fill")) { 135 g_workload_selection = ACCEL_FILL; 136 } 137 break; 138 default: 139 usage(); 140 return 1; 141 } 142 return 0; 143 } 144 145 static void 146 unregister_worker(void *arg1) 147 { 148 struct worker_thread *worker = arg1; 149 150 spdk_mempool_free(worker->data_pool); 151 spdk_mempool_free(worker->task_pool); 152 spdk_put_io_channel(worker->ch); 153 pthread_mutex_lock(&g_workers_lock); 154 assert(g_num_workers >= 1); 155 if (--g_num_workers == 0) { 156 pthread_mutex_unlock(&g_workers_lock); 157 spdk_app_stop(0); 158 } 159 pthread_mutex_unlock(&g_workers_lock); 160 } 161 162 static void accel_done(void *ref, int status); 163 164 static void 165 _submit_single(void *arg1, void *arg2) 166 { 167 struct worker_thread *worker = arg1; 168 struct ap_task *task = arg2; 169 170 assert(worker); 171 172 if (g_verify) { 173 memset(task->src, 0x5a, g_xfer_size_bytes); 174 memset(task->dst, 0xa5, g_xfer_size_bytes); 175 } 176 task->worker = worker; 177 task->worker->current_queue_depth++; 178 if (!strcmp(g_workload_type, "copy")) { 179 spdk_accel_submit_copy(__accel_task_from_ap_task(task), 180 worker->ch, task->dst, 181 task->src, g_xfer_size_bytes, accel_done); 182 } else if (!strcmp(g_workload_type, "fill")) { 183 /* For fill use the first byte of the task->dst buffer */ 184 spdk_accel_submit_fill(__accel_task_from_ap_task(task), 185 worker->ch, task->dst, *(uint8_t *)task->src, 186 g_xfer_size_bytes, accel_done); 187 } else { 188 assert(false); 189 } 190 } 191 192 static void 193 _accel_done(void *arg1) 194 { 195 struct ap_task *task = arg1; 196 struct worker_thread *worker = task->worker; 197 198 assert(worker); 199 assert(worker->current_queue_depth > 0); 200 201 if (g_verify) { 202 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 203 SPDK_NOTICELOG("Data miscompare\n"); 204 worker->xfer_failed++; 205 /* TODO: cleanup */ 206 exit(-1); 207 } 208 } 209 worker->xfer_completed++; 210 worker->current_queue_depth--; 211 212 if (!worker->is_draining) { 213 _submit_single(worker, task); 214 } else { 215 spdk_mempool_put(worker->data_pool, task->src); 216 spdk_mempool_put(worker->data_pool, task->dst); 217 spdk_mempool_put(worker->task_pool, task); 218 } 219 } 220 221 static int 222 dump_result(void) 223 { 224 uint64_t total_completed = 0; 225 uint64_t total_failed = 0; 226 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 227 struct worker_thread *worker = g_workers; 228 229 printf("\nCore Transfers Bandwidth Failed\n"); 230 printf("-------------------------------------------------\n"); 231 while (worker != NULL) { 232 233 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 234 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 235 (g_time_in_sec * 1024 * 1024); 236 237 total_completed += worker->xfer_completed; 238 total_failed += worker->xfer_failed; 239 240 if (xfer_per_sec) { 241 printf("%10d%12" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 "\n", 242 worker->core, xfer_per_sec, 243 bw_in_MiBps, worker->xfer_failed); 244 } 245 246 worker = worker->next; 247 } 248 249 total_xfer_per_sec = total_completed / g_time_in_sec; 250 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 251 (g_time_in_sec * 1024 * 1024); 252 253 printf("=================================================\n"); 254 printf("Total:%16" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 "\n\n", 255 total_xfer_per_sec, total_bw_in_MiBps, total_failed); 256 257 return total_failed ? 1 : 0; 258 } 259 260 static int 261 _check_draining(void *arg) 262 { 263 struct worker_thread *worker = arg; 264 265 assert(worker); 266 267 if (worker->current_queue_depth == 0) { 268 spdk_poller_unregister(&worker->is_draining_poller); 269 unregister_worker(worker); 270 } 271 272 return -1; 273 } 274 275 static int 276 _worker_stop(void *arg) 277 { 278 struct worker_thread *worker = arg; 279 280 assert(worker); 281 282 spdk_poller_unregister(&worker->stop_poller); 283 284 /* now let the worker drain and check it's outstanding IO with a poller */ 285 worker->is_draining = true; 286 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 287 288 return 0; 289 } 290 291 static void 292 _init_thread_done(void *ctx) 293 { 294 } 295 296 static void 297 _init_thread(void *arg1) 298 { 299 struct worker_thread *worker; 300 char buf_pool_name[30], task_pool_name[30]; 301 struct ap_task *task; 302 int i; 303 304 worker = calloc(1, sizeof(*worker)); 305 if (worker == NULL) { 306 fprintf(stderr, "Unable to allocate worker\n"); 307 return; 308 } 309 310 worker->core = spdk_env_get_current_core(); 311 worker->thread = spdk_get_thread(); 312 worker->next = g_workers; 313 worker->ch = spdk_accel_engine_get_io_channel(); 314 snprintf(buf_pool_name, sizeof(buf_pool_name), "buf_pool_%d", g_num_workers); 315 snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers); 316 worker->data_pool = spdk_mempool_create(buf_pool_name, 317 g_queue_depth * 2, /* src + dst */ 318 g_xfer_size_bytes, 319 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 320 SPDK_ENV_SOCKET_ID_ANY); 321 worker->task_pool = spdk_mempool_create(task_pool_name, 322 g_queue_depth, 323 spdk_accel_task_size() + sizeof(struct ap_task), 324 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 325 SPDK_ENV_SOCKET_ID_ANY); 326 if (!worker->data_pool || !worker->task_pool) { 327 fprintf(stderr, "Could not allocate buffer pool.\n"); 328 spdk_mempool_free(worker->data_pool); 329 spdk_mempool_free(worker->task_pool); 330 free(worker); 331 return; 332 } 333 334 /* Register a poller that will stop the worker at time elapsed */ 335 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 336 g_time_in_sec * 1000000ULL); 337 338 g_workers = worker; 339 pthread_mutex_lock(&g_workers_lock); 340 g_num_workers++; 341 pthread_mutex_unlock(&g_workers_lock); 342 343 for (i = 0; i < g_queue_depth; i++) { 344 task = spdk_mempool_get(worker->task_pool); 345 if (!task) { 346 fprintf(stderr, "Unable to get accel_task\n"); 347 return; 348 } 349 task->src = spdk_mempool_get(worker->data_pool); 350 task->dst = spdk_mempool_get(worker->data_pool); 351 _submit_single(worker, task); 352 } 353 } 354 355 static void 356 accel_done(void *ref, int status) 357 { 358 struct ap_task *task = __ap_task_from_accel_task(ref); 359 struct worker_thread *worker = task->worker; 360 361 assert(worker); 362 363 spdk_thread_send_msg(worker->thread, _accel_done, task); 364 } 365 366 static void 367 accel_perf_start(void *arg1) 368 { 369 uint64_t capabilites; 370 struct spdk_io_channel *accel_ch; 371 372 accel_ch = spdk_accel_engine_get_io_channel(); 373 capabilites = spdk_accel_get_capabilities(accel_ch); 374 spdk_put_io_channel(accel_ch); 375 376 if ((capabilites & g_workload_selection) != g_workload_selection) { 377 SPDK_ERRLOG("Selected workload is not supported by the current engine\n"); 378 SPDK_NOTICELOG("Software engine is selected by default, enable a HW engine via RPC\n\n"); 379 spdk_app_stop(-1); 380 return; 381 } 382 383 g_tsc_rate = spdk_get_ticks_hz(); 384 g_tsc_us_rate = g_tsc_rate / (1000 * 1000); 385 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 386 387 printf("Running for %d seconds...\n", g_time_in_sec); 388 fflush(stdout); 389 390 spdk_for_each_thread(_init_thread, NULL, _init_thread_done); 391 } 392 393 int 394 main(int argc, char **argv) 395 { 396 struct spdk_app_opts opts = {}; 397 struct worker_thread *worker, *tmp; 398 int rc = 0; 399 400 pthread_mutex_init(&g_workers_lock, NULL); 401 spdk_app_opts_init(&opts); 402 opts.reactor_mask = "0x1"; 403 if ((rc = spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:", NULL, parse_args, 404 usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) { 405 rc = -1; 406 goto cleanup; 407 } 408 409 if (g_workload_type == NULL || 410 (strcmp(g_workload_type, "copy") && 411 strcmp(g_workload_type, "fill"))) { 412 usage(); 413 rc = -1; 414 goto cleanup; 415 } 416 417 dump_user_config(&opts); 418 rc = spdk_app_start(&opts, accel_perf_start, NULL); 419 if (rc) { 420 SPDK_ERRLOG("ERROR starting application\n"); 421 } else { 422 dump_result(); 423 } 424 425 pthread_mutex_destroy(&g_workers_lock); 426 427 worker = g_workers; 428 while (worker) { 429 tmp = worker->next; 430 free(worker); 431 worker = tmp; 432 } 433 cleanup: 434 spdk_app_fini(); 435 return rc; 436 } 437