1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 43 static uint64_t g_tsc_rate; 44 static uint64_t g_tsc_us_rate; 45 static uint64_t g_tsc_end; 46 static int g_xfer_size_bytes = 4096; 47 static int g_queue_depth = 32; 48 static int g_time_in_sec = 5; 49 static uint32_t g_crc32c_seed = 0; 50 static bool g_verify = false; 51 static const char *g_workload_type = NULL; 52 static enum accel_capability g_workload_selection; 53 static struct worker_thread *g_workers = NULL; 54 static int g_num_workers = 0; 55 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 56 57 struct worker_thread { 58 struct spdk_io_channel *ch; 59 uint64_t xfer_completed; 60 uint64_t xfer_failed; 61 uint64_t current_queue_depth; 62 struct spdk_mempool *data_pool; 63 struct spdk_mempool *task_pool; 64 struct worker_thread *next; 65 unsigned core; 66 struct spdk_thread *thread; 67 bool is_draining; 68 struct spdk_poller *is_draining_poller; 69 struct spdk_poller *stop_poller; 70 }; 71 72 struct ap_task { 73 void *src; 74 void *dst; 75 struct worker_thread *worker; 76 }; 77 78 inline static struct ap_task * 79 __ap_task_from_accel_task(struct spdk_accel_task *at) 80 { 81 return (struct ap_task *)((uintptr_t)at - sizeof(struct ap_task)); 82 } 83 84 inline static struct spdk_accel_task * 85 __accel_task_from_ap_task(struct ap_task *ap) 86 { 87 return (struct spdk_accel_task *)((uintptr_t)ap + sizeof(struct ap_task)); 88 } 89 90 static void 91 dump_user_config(struct spdk_app_opts *opts) 92 { 93 printf("SPDK Configuration:\n"); 94 printf("Core mask: %s\n\n", opts->reactor_mask); 95 printf("Accel Perf Configuration:\n"); 96 printf("Workload Type: %s\n", g_workload_type); 97 if (!strcmp(g_workload_type, "crc32c")) { 98 printf("CRC-32C seed: %u", g_crc32c_seed); 99 } 100 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 101 printf("Queue depth: %u\n", g_queue_depth); 102 printf("Run time: %u seconds\n", g_time_in_sec); 103 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 104 } 105 106 static void 107 usage(void) 108 { 109 printf("accel_perf options:\n"); 110 printf("\t[-h help message]\n"); 111 printf("\t[-q queue depth]\n"); 112 printf("\t[-n number of channels]\n"); 113 printf("\t[-o transfer size in bytes]\n"); 114 printf("\t[-t time in seconds]\n"); 115 printf("\t[-w workload type must be one of these: copy, fill, crc32c\n"); 116 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 117 printf("\t[-y verify result if this switch is on]\n"); 118 } 119 120 static int 121 parse_args(int argc, char *argv) 122 { 123 switch (argc) { 124 case 'o': 125 g_xfer_size_bytes = spdk_strtol(optarg, 10); 126 break; 127 case 'q': 128 g_queue_depth = spdk_strtol(optarg, 10); 129 break; 130 case 's': 131 g_crc32c_seed = spdk_strtol(optarg, 10); 132 break; 133 case 't': 134 g_time_in_sec = spdk_strtol(optarg, 10); 135 break; 136 case 'y': 137 g_verify = true; 138 break; 139 case 'w': 140 g_workload_type = optarg; 141 if (!strcmp(g_workload_type, "copy")) { 142 g_workload_selection = ACCEL_COPY; 143 } else if (!strcmp(g_workload_type, "fill")) { 144 g_workload_selection = ACCEL_FILL; 145 } else if (!strcmp(g_workload_type, "crc32c")) { 146 g_workload_selection = ACCEL_CRC32C; 147 } 148 break; 149 default: 150 usage(); 151 return 1; 152 } 153 return 0; 154 } 155 156 static void 157 unregister_worker(void *arg1) 158 { 159 struct worker_thread *worker = arg1; 160 161 spdk_mempool_free(worker->data_pool); 162 spdk_mempool_free(worker->task_pool); 163 spdk_put_io_channel(worker->ch); 164 pthread_mutex_lock(&g_workers_lock); 165 assert(g_num_workers >= 1); 166 if (--g_num_workers == 0) { 167 pthread_mutex_unlock(&g_workers_lock); 168 spdk_app_stop(0); 169 } 170 pthread_mutex_unlock(&g_workers_lock); 171 } 172 173 static void accel_done(void *ref, int status); 174 175 static void 176 _submit_single(void *arg1, void *arg2) 177 { 178 struct worker_thread *worker = arg1; 179 struct ap_task *task = arg2; 180 181 assert(worker); 182 183 if (g_verify) { 184 memset(task->src, 0x5a, g_xfer_size_bytes); 185 memset(task->dst, 0xa5, g_xfer_size_bytes); 186 } 187 task->worker = worker; 188 task->worker->current_queue_depth++; 189 switch (g_workload_selection) { 190 case ACCEL_COPY: 191 spdk_accel_submit_copy(__accel_task_from_ap_task(task), 192 worker->ch, task->dst, 193 task->src, g_xfer_size_bytes, accel_done); 194 break; 195 case ACCEL_FILL: 196 /* For fill use the first byte of the task->dst buffer */ 197 spdk_accel_submit_fill(__accel_task_from_ap_task(task), 198 worker->ch, task->dst, *(uint8_t *)task->src, 199 g_xfer_size_bytes, accel_done); 200 break; 201 case ACCEL_CRC32C: 202 spdk_accel_submit_crc32c(__accel_task_from_ap_task(task), 203 worker->ch, (uint32_t *)task->dst, task->src, g_crc32c_seed, 204 g_xfer_size_bytes, accel_done); 205 break; 206 default: 207 assert(false); 208 break; 209 210 } 211 } 212 213 static void 214 _accel_done(void *arg1) 215 { 216 struct ap_task *task = arg1; 217 struct worker_thread *worker = task->worker; 218 uint32_t sw_crc32c; 219 220 assert(worker); 221 assert(worker->current_queue_depth > 0); 222 223 if (g_verify) { 224 if (!strcmp(g_workload_type, "crc32c")) { 225 /* calculate sw CRC-32C and compare to sw aceel result. */ 226 sw_crc32c = spdk_crc32c_update(task->src, g_xfer_size_bytes, ~g_crc32c_seed); 227 if (*(uint32_t *)task->dst != sw_crc32c) { 228 SPDK_NOTICELOG("CRC-32C miscompare\n"); 229 worker->xfer_failed++; 230 /* TODO: cleanup */ 231 exit(-1); 232 } 233 } else if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 234 SPDK_NOTICELOG("Data miscompare\n"); 235 worker->xfer_failed++; 236 /* TODO: cleanup */ 237 exit(-1); 238 } 239 } 240 worker->xfer_completed++; 241 worker->current_queue_depth--; 242 243 if (!worker->is_draining) { 244 _submit_single(worker, task); 245 } else { 246 spdk_mempool_put(worker->data_pool, task->src); 247 spdk_mempool_put(worker->data_pool, task->dst); 248 spdk_mempool_put(worker->task_pool, task); 249 } 250 } 251 252 static int 253 dump_result(void) 254 { 255 uint64_t total_completed = 0; 256 uint64_t total_failed = 0; 257 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 258 struct worker_thread *worker = g_workers; 259 260 printf("\nCore Transfers Bandwidth Failed\n"); 261 printf("-------------------------------------------------\n"); 262 while (worker != NULL) { 263 264 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 265 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 266 (g_time_in_sec * 1024 * 1024); 267 268 total_completed += worker->xfer_completed; 269 total_failed += worker->xfer_failed; 270 271 if (xfer_per_sec) { 272 printf("%10d%12" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 "\n", 273 worker->core, xfer_per_sec, 274 bw_in_MiBps, worker->xfer_failed); 275 } 276 277 worker = worker->next; 278 } 279 280 total_xfer_per_sec = total_completed / g_time_in_sec; 281 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 282 (g_time_in_sec * 1024 * 1024); 283 284 printf("=================================================\n"); 285 printf("Total:%16" PRIu64 "/s%8" PRIu64 " MiB/s%11" PRIu64 "\n\n", 286 total_xfer_per_sec, total_bw_in_MiBps, total_failed); 287 288 return total_failed ? 1 : 0; 289 } 290 291 static int 292 _check_draining(void *arg) 293 { 294 struct worker_thread *worker = arg; 295 296 assert(worker); 297 298 if (worker->current_queue_depth == 0) { 299 spdk_poller_unregister(&worker->is_draining_poller); 300 unregister_worker(worker); 301 } 302 303 return -1; 304 } 305 306 static int 307 _worker_stop(void *arg) 308 { 309 struct worker_thread *worker = arg; 310 311 assert(worker); 312 313 spdk_poller_unregister(&worker->stop_poller); 314 315 /* now let the worker drain and check it's outstanding IO with a poller */ 316 worker->is_draining = true; 317 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 318 319 return 0; 320 } 321 322 static void 323 _init_thread_done(void *ctx) 324 { 325 } 326 327 static void 328 _init_thread(void *arg1) 329 { 330 struct worker_thread *worker; 331 char buf_pool_name[30], task_pool_name[30]; 332 struct ap_task *task; 333 int i; 334 335 worker = calloc(1, sizeof(*worker)); 336 if (worker == NULL) { 337 fprintf(stderr, "Unable to allocate worker\n"); 338 return; 339 } 340 341 worker->core = spdk_env_get_current_core(); 342 worker->thread = spdk_get_thread(); 343 worker->next = g_workers; 344 worker->ch = spdk_accel_engine_get_io_channel(); 345 snprintf(buf_pool_name, sizeof(buf_pool_name), "buf_pool_%d", g_num_workers); 346 snprintf(task_pool_name, sizeof(task_pool_name), "task_pool_%d", g_num_workers); 347 worker->data_pool = spdk_mempool_create(buf_pool_name, 348 g_queue_depth * 2, /* src + dst */ 349 g_xfer_size_bytes, 350 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 351 SPDK_ENV_SOCKET_ID_ANY); 352 worker->task_pool = spdk_mempool_create(task_pool_name, 353 g_queue_depth, 354 spdk_accel_task_size() + sizeof(struct ap_task), 355 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 356 SPDK_ENV_SOCKET_ID_ANY); 357 if (!worker->data_pool || !worker->task_pool) { 358 fprintf(stderr, "Could not allocate buffer pool.\n"); 359 spdk_mempool_free(worker->data_pool); 360 spdk_mempool_free(worker->task_pool); 361 free(worker); 362 return; 363 } 364 365 /* Register a poller that will stop the worker at time elapsed */ 366 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 367 g_time_in_sec * 1000000ULL); 368 369 g_workers = worker; 370 pthread_mutex_lock(&g_workers_lock); 371 g_num_workers++; 372 pthread_mutex_unlock(&g_workers_lock); 373 374 for (i = 0; i < g_queue_depth; i++) { 375 task = spdk_mempool_get(worker->task_pool); 376 if (!task) { 377 fprintf(stderr, "Unable to get accel_task\n"); 378 return; 379 } 380 task->src = spdk_mempool_get(worker->data_pool); 381 task->dst = spdk_mempool_get(worker->data_pool); 382 _submit_single(worker, task); 383 } 384 } 385 386 static void 387 accel_done(void *ref, int status) 388 { 389 struct ap_task *task = __ap_task_from_accel_task(ref); 390 struct worker_thread *worker = task->worker; 391 392 assert(worker); 393 394 spdk_thread_send_msg(worker->thread, _accel_done, task); 395 } 396 397 static void 398 accel_perf_start(void *arg1) 399 { 400 uint64_t capabilites; 401 struct spdk_io_channel *accel_ch; 402 403 accel_ch = spdk_accel_engine_get_io_channel(); 404 capabilites = spdk_accel_get_capabilities(accel_ch); 405 spdk_put_io_channel(accel_ch); 406 407 if ((capabilites & g_workload_selection) != g_workload_selection) { 408 SPDK_ERRLOG("Selected workload is not supported by the current engine\n"); 409 SPDK_NOTICELOG("Software engine is selected by default, enable a HW engine via RPC\n\n"); 410 spdk_app_stop(-1); 411 return; 412 } 413 414 g_tsc_rate = spdk_get_ticks_hz(); 415 g_tsc_us_rate = g_tsc_rate / (1000 * 1000); 416 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 417 418 printf("Running for %d seconds...\n", g_time_in_sec); 419 fflush(stdout); 420 421 spdk_for_each_thread(_init_thread, NULL, _init_thread_done); 422 } 423 424 int 425 main(int argc, char **argv) 426 { 427 struct spdk_app_opts opts = {}; 428 struct worker_thread *worker, *tmp; 429 int rc = 0; 430 431 pthread_mutex_init(&g_workers_lock, NULL); 432 spdk_app_opts_init(&opts); 433 opts.reactor_mask = "0x1"; 434 if ((rc = spdk_app_parse_args(argc, argv, &opts, "o:q:t:yw:", NULL, parse_args, 435 usage)) != SPDK_APP_PARSE_ARGS_SUCCESS) { 436 rc = -1; 437 goto cleanup; 438 } 439 440 if (g_workload_type == NULL || 441 (strcmp(g_workload_type, "copy") && 442 strcmp(g_workload_type, "fill") && 443 strcmp(g_workload_type, "crc32c"))) { 444 usage(); 445 rc = -1; 446 goto cleanup; 447 } 448 449 dump_user_config(&opts); 450 rc = spdk_app_start(&opts, accel_perf_start, NULL); 451 if (rc) { 452 SPDK_ERRLOG("ERROR starting application\n"); 453 } else { 454 dump_result(); 455 } 456 457 pthread_mutex_destroy(&g_workers_lock); 458 459 worker = g_workers; 460 while (worker) { 461 tmp = worker->next; 462 free(worker); 463 worker = tmp; 464 } 465 cleanup: 466 spdk_app_fini(); 467 return rc; 468 } 469