1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 16 #define DATA_PATTERN 0x5a 17 #define ALIGN_4K 0x1000 18 19 static uint64_t g_tsc_rate; 20 static uint64_t g_tsc_end; 21 static int g_rc; 22 static int g_xfer_size_bytes = 4096; 23 static int g_queue_depth = 32; 24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 25 * be at least as much as the queue depth. 26 */ 27 static int g_allocate_depth = 0; 28 static int g_threads_per_core = 1; 29 static int g_time_in_sec = 5; 30 static uint32_t g_crc32c_seed = 0; 31 static uint32_t g_chained_count = 1; 32 static int g_fail_percent_goal = 0; 33 static uint8_t g_fill_pattern = 255; 34 static bool g_verify = false; 35 static const char *g_workload_type = NULL; 36 static enum accel_opcode g_workload_selection; 37 static struct worker_thread *g_workers = NULL; 38 static int g_num_workers = 0; 39 static char *g_cd_file_in_name = NULL; 40 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 41 static struct spdk_app_opts g_opts = {}; 42 43 struct ap_compress_seg { 44 void *uncompressed_data; 45 uint32_t uncompressed_len; 46 struct iovec *uncompressed_iovs; 47 uint32_t uncompressed_iovcnt; 48 49 void *compressed_data; 50 uint32_t compressed_len; 51 struct iovec *compressed_iovs; 52 uint32_t compressed_iovcnt; 53 54 STAILQ_ENTRY(ap_compress_seg) link; 55 }; 56 57 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 58 59 struct worker_thread; 60 static void accel_done(void *ref, int status); 61 62 struct display_info { 63 int core; 64 int thread; 65 }; 66 67 struct ap_task { 68 void *src; 69 struct iovec *src_iovs; 70 uint32_t src_iovcnt; 71 struct iovec *dst_iovs; 72 uint32_t dst_iovcnt; 73 void *dst; 74 void *dst2; 75 uint32_t crc_dst; 76 uint32_t compressed_sz; 77 struct ap_compress_seg *cur_seg; 78 struct worker_thread *worker; 79 int expected_status; /* used for the compare operation */ 80 TAILQ_ENTRY(ap_task) link; 81 }; 82 83 struct worker_thread { 84 struct spdk_io_channel *ch; 85 uint64_t xfer_completed; 86 uint64_t xfer_failed; 87 uint64_t injected_miscompares; 88 uint64_t current_queue_depth; 89 TAILQ_HEAD(, ap_task) tasks_pool; 90 struct worker_thread *next; 91 unsigned core; 92 struct spdk_thread *thread; 93 bool is_draining; 94 struct spdk_poller *is_draining_poller; 95 struct spdk_poller *stop_poller; 96 void *task_base; 97 struct display_info display; 98 enum accel_opcode workload; 99 }; 100 101 static void 102 dump_user_config(void) 103 { 104 const char *module_name = NULL; 105 int rc; 106 107 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 108 if (rc) { 109 printf("error getting module name (%d)\n", rc); 110 } 111 112 printf("\nSPDK Configuration:\n"); 113 printf("Core mask: %s\n\n", g_opts.reactor_mask); 114 printf("Accel Perf Configuration:\n"); 115 printf("Workload Type: %s\n", g_workload_type); 116 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 117 printf("CRC-32C seed: %u\n", g_crc32c_seed); 118 } else if (g_workload_selection == ACCEL_OPC_FILL) { 119 printf("Fill pattern: 0x%x\n", g_fill_pattern); 120 } else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 121 printf("Failure inject: %u percent\n", g_fail_percent_goal); 122 } 123 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 124 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 125 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 126 } else { 127 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 128 } 129 printf("vector count %u\n", g_chained_count); 130 printf("Module: %s\n", module_name); 131 if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 132 printf("File Name: %s\n", g_cd_file_in_name); 133 } 134 printf("Queue depth: %u\n", g_queue_depth); 135 printf("Allocate depth: %u\n", g_allocate_depth); 136 printf("# threads/core: %u\n", g_threads_per_core); 137 printf("Run time: %u seconds\n", g_time_in_sec); 138 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 139 } 140 141 static void 142 usage(void) 143 { 144 printf("accel_perf options:\n"); 145 printf("\t[-h help message]\n"); 146 printf("\t[-q queue depth per core]\n"); 147 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 148 printf("\t[-T number of threads per core\n"); 149 printf("\t[-n number of channels]\n"); 150 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 151 printf("\t[-t time in seconds]\n"); 152 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast\n"); 153 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 154 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 155 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 156 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 157 printf("\t[-y verify result if this switch is on]\n"); 158 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 159 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 160 } 161 162 static int 163 parse_args(int argc, char *argv) 164 { 165 int argval = 0; 166 167 switch (argc) { 168 case 'a': 169 case 'C': 170 case 'f': 171 case 'T': 172 case 'o': 173 case 'P': 174 case 'q': 175 case 's': 176 case 't': 177 argval = spdk_strtol(optarg, 10); 178 if (argval < 0) { 179 fprintf(stderr, "-%c option must be non-negative.\n", argc); 180 usage(); 181 return 1; 182 } 183 break; 184 default: 185 break; 186 }; 187 188 switch (argc) { 189 case 'a': 190 g_allocate_depth = argval; 191 break; 192 case 'C': 193 g_chained_count = argval; 194 break; 195 case 'l': 196 g_cd_file_in_name = optarg; 197 break; 198 case 'f': 199 g_fill_pattern = (uint8_t)argval; 200 break; 201 case 'T': 202 g_threads_per_core = argval; 203 break; 204 case 'o': 205 g_xfer_size_bytes = argval; 206 break; 207 case 'P': 208 g_fail_percent_goal = argval; 209 break; 210 case 'q': 211 g_queue_depth = argval; 212 break; 213 case 's': 214 g_crc32c_seed = argval; 215 break; 216 case 't': 217 g_time_in_sec = argval; 218 break; 219 case 'y': 220 g_verify = true; 221 break; 222 case 'w': 223 g_workload_type = optarg; 224 if (!strcmp(g_workload_type, "copy")) { 225 g_workload_selection = ACCEL_OPC_COPY; 226 } else if (!strcmp(g_workload_type, "fill")) { 227 g_workload_selection = ACCEL_OPC_FILL; 228 } else if (!strcmp(g_workload_type, "crc32c")) { 229 g_workload_selection = ACCEL_OPC_CRC32C; 230 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 231 g_workload_selection = ACCEL_OPC_COPY_CRC32C; 232 } else if (!strcmp(g_workload_type, "compare")) { 233 g_workload_selection = ACCEL_OPC_COMPARE; 234 } else if (!strcmp(g_workload_type, "dualcast")) { 235 g_workload_selection = ACCEL_OPC_DUALCAST; 236 } else if (!strcmp(g_workload_type, "compress")) { 237 g_workload_selection = ACCEL_OPC_COMPRESS; 238 } else if (!strcmp(g_workload_type, "decompress")) { 239 g_workload_selection = ACCEL_OPC_DECOMPRESS; 240 } else { 241 usage(); 242 return 1; 243 } 244 break; 245 default: 246 usage(); 247 return 1; 248 } 249 250 return 0; 251 } 252 253 static int dump_result(void); 254 static void 255 unregister_worker(void *arg1) 256 { 257 struct worker_thread *worker = arg1; 258 259 free(worker->task_base); 260 spdk_put_io_channel(worker->ch); 261 spdk_thread_exit(spdk_get_thread()); 262 pthread_mutex_lock(&g_workers_lock); 263 assert(g_num_workers >= 1); 264 if (--g_num_workers == 0) { 265 pthread_mutex_unlock(&g_workers_lock); 266 g_rc = dump_result(); 267 spdk_app_stop(0); 268 } else { 269 pthread_mutex_unlock(&g_workers_lock); 270 } 271 } 272 273 static void 274 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 275 { 276 uint64_t ele_size; 277 uint8_t *data; 278 uint32_t i; 279 280 ele_size = spdk_divide_round_up(sz, iovcnt); 281 282 data = buf; 283 for (i = 0; i < iovcnt; i++) { 284 ele_size = spdk_min(ele_size, sz); 285 assert(ele_size > 0); 286 287 iovs[i].iov_base = data; 288 iovs[i].iov_len = ele_size; 289 290 data += ele_size; 291 sz -= ele_size; 292 } 293 assert(sz == 0); 294 } 295 296 static int 297 _get_task_data_bufs(struct ap_task *task) 298 { 299 uint32_t align = 0; 300 uint32_t i = 0; 301 int dst_buff_len = g_xfer_size_bytes; 302 303 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 304 * we do this for all modules to keep it simple. 305 */ 306 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 307 align = ALIGN_4K; 308 } 309 310 if (g_workload_selection == ACCEL_OPC_COMPRESS || 311 g_workload_selection == ACCEL_OPC_DECOMPRESS) { 312 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 313 } else if (g_workload_selection == ACCEL_OPC_CRC32C || 314 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 315 assert(g_chained_count > 0); 316 task->src_iovcnt = g_chained_count; 317 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 318 if (!task->src_iovs) { 319 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 320 return -ENOMEM; 321 } 322 323 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 324 dst_buff_len = g_xfer_size_bytes * g_chained_count; 325 } 326 327 for (i = 0; i < task->src_iovcnt; i++) { 328 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 329 if (task->src_iovs[i].iov_base == NULL) { 330 return -ENOMEM; 331 } 332 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 333 task->src_iovs[i].iov_len = g_xfer_size_bytes; 334 } 335 336 } else { 337 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 338 if (task->src == NULL) { 339 fprintf(stderr, "Unable to alloc src buffer\n"); 340 return -ENOMEM; 341 } 342 343 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 344 if (g_workload_selection == ACCEL_OPC_FILL) { 345 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 346 } else { 347 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 348 } 349 } 350 351 if (g_workload_selection != ACCEL_OPC_CRC32C) { 352 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 353 if (task->dst == NULL) { 354 fprintf(stderr, "Unable to alloc dst buffer\n"); 355 return -ENOMEM; 356 } 357 358 /* For compare we want the buffers to match, otherwise not. */ 359 if (g_workload_selection == ACCEL_OPC_COMPARE) { 360 memset(task->dst, DATA_PATTERN, dst_buff_len); 361 } else { 362 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 363 } 364 365 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 366 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 367 if (!task->dst_iovs) { 368 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 369 return -ENOMEM; 370 } 371 task->dst_iovcnt = g_chained_count; 372 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 373 } 374 } 375 376 /* For dualcast 2 buffers are needed for the operation. */ 377 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 378 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 379 if (task->dst2 == NULL) { 380 fprintf(stderr, "Unable to alloc dst buffer\n"); 381 return -ENOMEM; 382 } 383 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 384 } 385 386 return 0; 387 } 388 389 inline static struct ap_task * 390 _get_task(struct worker_thread *worker) 391 { 392 struct ap_task *task; 393 394 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 395 task = TAILQ_FIRST(&worker->tasks_pool); 396 TAILQ_REMOVE(&worker->tasks_pool, task, link); 397 } else { 398 fprintf(stderr, "Unable to get ap_task\n"); 399 return NULL; 400 } 401 402 return task; 403 } 404 405 /* Submit one operation using the same ap task that just completed. */ 406 static void 407 _submit_single(struct worker_thread *worker, struct ap_task *task) 408 { 409 int random_num; 410 int rc = 0; 411 int flags = 0; 412 413 assert(worker); 414 415 switch (worker->workload) { 416 case ACCEL_OPC_COPY: 417 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 418 g_xfer_size_bytes, flags, accel_done, task); 419 break; 420 case ACCEL_OPC_FILL: 421 /* For fill use the first byte of the task->dst buffer */ 422 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 423 g_xfer_size_bytes, flags, accel_done, task); 424 break; 425 case ACCEL_OPC_CRC32C: 426 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 427 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 428 accel_done, task); 429 break; 430 case ACCEL_OPC_COPY_CRC32C: 431 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 432 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 433 break; 434 case ACCEL_OPC_COMPARE: 435 random_num = rand() % 100; 436 if (random_num < g_fail_percent_goal) { 437 task->expected_status = -EILSEQ; 438 *(uint8_t *)task->dst = ~DATA_PATTERN; 439 } else { 440 task->expected_status = 0; 441 *(uint8_t *)task->dst = DATA_PATTERN; 442 } 443 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 444 g_xfer_size_bytes, accel_done, task); 445 break; 446 case ACCEL_OPC_DUALCAST: 447 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 448 task->src, g_xfer_size_bytes, flags, accel_done, task); 449 break; 450 case ACCEL_OPC_COMPRESS: 451 task->src_iovs = task->cur_seg->uncompressed_iovs; 452 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 453 rc = spdk_accel_submit_compress(worker->ch, task->dst, g_xfer_size_bytes, task->src_iovs, 454 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 455 break; 456 case ACCEL_OPC_DECOMPRESS: 457 task->src_iovs = task->cur_seg->compressed_iovs; 458 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 459 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 460 task->src_iovcnt, flags, accel_done, task); 461 break; 462 default: 463 assert(false); 464 break; 465 466 } 467 468 worker->current_queue_depth++; 469 if (rc) { 470 accel_done(task, rc); 471 } 472 } 473 474 static void 475 _free_task_buffers(struct ap_task *task) 476 { 477 uint32_t i; 478 479 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 480 free(task->dst_iovs); 481 } else if (g_workload_selection == ACCEL_OPC_CRC32C || 482 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 483 if (task->src_iovs) { 484 for (i = 0; i < task->src_iovcnt; i++) { 485 if (task->src_iovs[i].iov_base) { 486 spdk_dma_free(task->src_iovs[i].iov_base); 487 } 488 } 489 free(task->src_iovs); 490 } 491 } else { 492 spdk_dma_free(task->src); 493 } 494 495 spdk_dma_free(task->dst); 496 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 497 spdk_dma_free(task->dst2); 498 } 499 } 500 501 static int 502 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 503 { 504 uint32_t i; 505 uint32_t ttl_len = 0; 506 uint8_t *dst = (uint8_t *)_dst; 507 508 for (i = 0; i < iovcnt; i++) { 509 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 510 return -1; 511 } 512 dst += src_src_iovs[i].iov_len; 513 ttl_len += src_src_iovs[i].iov_len; 514 } 515 516 if (ttl_len != iovcnt * g_xfer_size_bytes) { 517 return -1; 518 } 519 520 return 0; 521 } 522 523 static int _worker_stop(void *arg); 524 525 static void 526 accel_done(void *arg1, int status) 527 { 528 struct ap_task *task = arg1; 529 struct worker_thread *worker = task->worker; 530 uint32_t sw_crc32c; 531 532 assert(worker); 533 assert(worker->current_queue_depth > 0); 534 535 if (g_verify && status == 0) { 536 switch (worker->workload) { 537 case ACCEL_OPC_COPY_CRC32C: 538 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 539 if (task->crc_dst != sw_crc32c) { 540 SPDK_NOTICELOG("CRC-32C miscompare\n"); 541 worker->xfer_failed++; 542 } 543 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 544 SPDK_NOTICELOG("Data miscompare\n"); 545 worker->xfer_failed++; 546 } 547 break; 548 case ACCEL_OPC_CRC32C: 549 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 550 if (task->crc_dst != sw_crc32c) { 551 SPDK_NOTICELOG("CRC-32C miscompare\n"); 552 worker->xfer_failed++; 553 } 554 break; 555 case ACCEL_OPC_COPY: 556 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 557 SPDK_NOTICELOG("Data miscompare\n"); 558 worker->xfer_failed++; 559 } 560 break; 561 case ACCEL_OPC_DUALCAST: 562 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 563 SPDK_NOTICELOG("Data miscompare, first destination\n"); 564 worker->xfer_failed++; 565 } 566 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 567 SPDK_NOTICELOG("Data miscompare, second destination\n"); 568 worker->xfer_failed++; 569 } 570 break; 571 case ACCEL_OPC_FILL: 572 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 573 SPDK_NOTICELOG("Data miscompare\n"); 574 worker->xfer_failed++; 575 } 576 break; 577 case ACCEL_OPC_COMPARE: 578 break; 579 case ACCEL_OPC_COMPRESS: 580 break; 581 case ACCEL_OPC_DECOMPRESS: 582 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 583 SPDK_NOTICELOG("Data miscompare on decompression\n"); 584 worker->xfer_failed++; 585 } 586 break; 587 default: 588 assert(false); 589 break; 590 } 591 } 592 593 if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 594 /* Advance the task to the next segment */ 595 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 596 if (task->cur_seg == NULL) { 597 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 598 } 599 } 600 601 if (task->expected_status == -EILSEQ) { 602 assert(status != 0); 603 worker->injected_miscompares++; 604 status = 0; 605 } else if (status) { 606 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 607 worker->xfer_failed++; 608 } 609 610 worker->xfer_completed++; 611 worker->current_queue_depth--; 612 613 if (!worker->is_draining && status == 0) { 614 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 615 task = _get_task(worker); 616 _submit_single(worker, task); 617 } else { 618 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 619 } 620 } 621 622 static int 623 dump_result(void) 624 { 625 uint64_t total_completed = 0; 626 uint64_t total_failed = 0; 627 uint64_t total_miscompared = 0; 628 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 629 struct worker_thread *worker = g_workers; 630 631 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 632 printf("------------------------------------------------------------------------\n"); 633 while (worker != NULL) { 634 635 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 636 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 637 (g_time_in_sec * 1024 * 1024); 638 639 total_completed += worker->xfer_completed; 640 total_failed += worker->xfer_failed; 641 total_miscompared += worker->injected_miscompares; 642 643 if (xfer_per_sec) { 644 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 645 worker->display.core, worker->display.thread, xfer_per_sec, 646 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 647 } 648 649 worker = worker->next; 650 } 651 652 total_xfer_per_sec = total_completed / g_time_in_sec; 653 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 654 (g_time_in_sec * 1024 * 1024); 655 656 printf("=========================================================================\n"); 657 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 658 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 659 660 return total_failed ? 1 : 0; 661 } 662 663 static inline void 664 _free_task_buffers_in_pool(struct worker_thread *worker) 665 { 666 struct ap_task *task; 667 668 assert(worker); 669 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 670 TAILQ_REMOVE(&worker->tasks_pool, task, link); 671 _free_task_buffers(task); 672 } 673 } 674 675 static int 676 _check_draining(void *arg) 677 { 678 struct worker_thread *worker = arg; 679 680 assert(worker); 681 682 if (worker->current_queue_depth == 0) { 683 _free_task_buffers_in_pool(worker); 684 spdk_poller_unregister(&worker->is_draining_poller); 685 unregister_worker(worker); 686 } 687 688 return SPDK_POLLER_BUSY; 689 } 690 691 static int 692 _worker_stop(void *arg) 693 { 694 struct worker_thread *worker = arg; 695 696 assert(worker); 697 698 spdk_poller_unregister(&worker->stop_poller); 699 700 /* now let the worker drain and check it's outstanding IO with a poller */ 701 worker->is_draining = true; 702 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 703 704 return SPDK_POLLER_BUSY; 705 } 706 707 static void 708 _init_thread(void *arg1) 709 { 710 struct worker_thread *worker; 711 struct ap_task *task; 712 int i, num_tasks = g_allocate_depth; 713 struct display_info *display = arg1; 714 715 worker = calloc(1, sizeof(*worker)); 716 if (worker == NULL) { 717 fprintf(stderr, "Unable to allocate worker\n"); 718 free(display); 719 return; 720 } 721 722 worker->workload = g_workload_selection; 723 worker->display.core = display->core; 724 worker->display.thread = display->thread; 725 free(display); 726 worker->core = spdk_env_get_current_core(); 727 worker->thread = spdk_get_thread(); 728 pthread_mutex_lock(&g_workers_lock); 729 g_num_workers++; 730 worker->next = g_workers; 731 g_workers = worker; 732 pthread_mutex_unlock(&g_workers_lock); 733 worker->ch = spdk_accel_get_io_channel(); 734 if (worker->ch == NULL) { 735 fprintf(stderr, "Unable to get an accel channel\n"); 736 goto error; 737 } 738 739 TAILQ_INIT(&worker->tasks_pool); 740 741 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 742 if (worker->task_base == NULL) { 743 fprintf(stderr, "Could not allocate task base.\n"); 744 goto error; 745 } 746 747 task = worker->task_base; 748 for (i = 0; i < num_tasks; i++) { 749 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 750 task->worker = worker; 751 if (_get_task_data_bufs(task)) { 752 fprintf(stderr, "Unable to get data bufs\n"); 753 goto error; 754 } 755 task++; 756 } 757 758 /* Register a poller that will stop the worker at time elapsed */ 759 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 760 g_time_in_sec * 1000000ULL); 761 762 /* Load up queue depth worth of operations. */ 763 for (i = 0; i < g_queue_depth; i++) { 764 task = _get_task(worker); 765 if (task == NULL) { 766 goto error; 767 } 768 769 _submit_single(worker, task); 770 } 771 return; 772 error: 773 774 _free_task_buffers_in_pool(worker); 775 free(worker->task_base); 776 spdk_app_stop(-1); 777 } 778 779 static void 780 accel_perf_start(void *arg1) 781 { 782 struct spdk_cpuset tmp_cpumask = {}; 783 char thread_name[32]; 784 uint32_t i; 785 int j; 786 struct spdk_thread *thread; 787 struct display_info *display; 788 789 g_tsc_rate = spdk_get_ticks_hz(); 790 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 791 792 dump_user_config(); 793 794 printf("Running for %d seconds...\n", g_time_in_sec); 795 fflush(stdout); 796 797 /* Create worker threads for each core that was specified. */ 798 SPDK_ENV_FOREACH_CORE(i) { 799 for (j = 0; j < g_threads_per_core; j++) { 800 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 801 spdk_cpuset_zero(&tmp_cpumask); 802 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 803 thread = spdk_thread_create(thread_name, &tmp_cpumask); 804 display = calloc(1, sizeof(*display)); 805 if (display == NULL) { 806 fprintf(stderr, "Unable to allocate memory\n"); 807 spdk_app_stop(-1); 808 return; 809 } 810 display->core = i; 811 display->thread = j; 812 spdk_thread_send_msg(thread, _init_thread, display); 813 } 814 } 815 } 816 817 static void 818 accel_perf_free_compress_segs(void) 819 { 820 struct ap_compress_seg *seg, *tmp; 821 822 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 823 free(seg->uncompressed_iovs); 824 free(seg->compressed_iovs); 825 spdk_dma_free(seg->compressed_data); 826 spdk_dma_free(seg->uncompressed_data); 827 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 828 free(seg); 829 } 830 } 831 832 struct accel_perf_prep_ctx { 833 FILE *file; 834 long remaining; 835 struct spdk_io_channel *ch; 836 struct ap_compress_seg *cur_seg; 837 }; 838 839 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 840 841 static void 842 accel_perf_prep_process_seg_cpl(void *ref, int status) 843 { 844 struct accel_perf_prep_ctx *ctx = ref; 845 struct ap_compress_seg *seg; 846 847 if (status != 0) { 848 fprintf(stderr, "error (%d) on initial compress completion\n", status); 849 spdk_dma_free(ctx->cur_seg->compressed_data); 850 spdk_dma_free(ctx->cur_seg->uncompressed_data); 851 free(ctx->cur_seg); 852 spdk_put_io_channel(ctx->ch); 853 fclose(ctx->file); 854 free(ctx); 855 spdk_app_stop(-status); 856 return; 857 } 858 859 seg = ctx->cur_seg; 860 861 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 862 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 863 if (seg->compressed_iovs == NULL) { 864 fprintf(stderr, "unable to allocate iovec\n"); 865 spdk_dma_free(seg->compressed_data); 866 spdk_dma_free(seg->uncompressed_data); 867 free(seg); 868 spdk_put_io_channel(ctx->ch); 869 fclose(ctx->file); 870 free(ctx); 871 spdk_app_stop(-ENOMEM); 872 return; 873 } 874 seg->compressed_iovcnt = g_chained_count; 875 876 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 877 seg->compressed_iovcnt); 878 } 879 880 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 881 ctx->remaining -= seg->uncompressed_len; 882 883 accel_perf_prep_process_seg(ctx); 884 } 885 886 static void 887 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 888 { 889 struct ap_compress_seg *seg; 890 int sz, sz_read; 891 void *ubuf, *cbuf; 892 struct iovec iov[1]; 893 int rc; 894 895 if (ctx->remaining == 0) { 896 spdk_put_io_channel(ctx->ch); 897 fclose(ctx->file); 898 free(ctx); 899 accel_perf_start(NULL); 900 return; 901 } 902 903 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 904 905 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 906 if (!ubuf) { 907 fprintf(stderr, "unable to allocate uncompress buffer\n"); 908 rc = -ENOMEM; 909 goto error; 910 } 911 912 cbuf = spdk_dma_malloc(sz, ALIGN_4K, NULL); 913 if (!cbuf) { 914 fprintf(stderr, "unable to allocate compress buffer\n"); 915 rc = -ENOMEM; 916 spdk_dma_free(ubuf); 917 goto error; 918 } 919 920 seg = calloc(1, sizeof(*seg)); 921 if (!seg) { 922 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 923 spdk_dma_free(ubuf); 924 spdk_dma_free(cbuf); 925 rc = -ENOMEM; 926 goto error; 927 } 928 929 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 930 if (sz_read != sz) { 931 fprintf(stderr, "unable to read input file\n"); 932 free(seg); 933 spdk_dma_free(ubuf); 934 spdk_dma_free(cbuf); 935 rc = -errno; 936 goto error; 937 } 938 939 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 940 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 941 if (seg->uncompressed_iovs == NULL) { 942 fprintf(stderr, "unable to allocate iovec\n"); 943 free(seg); 944 spdk_dma_free(ubuf); 945 spdk_dma_free(cbuf); 946 rc = -ENOMEM; 947 goto error; 948 } 949 seg->uncompressed_iovcnt = g_chained_count; 950 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 951 } 952 953 seg->uncompressed_data = ubuf; 954 seg->uncompressed_len = sz; 955 seg->compressed_data = cbuf; 956 seg->compressed_len = sz; 957 958 ctx->cur_seg = seg; 959 iov[0].iov_base = seg->uncompressed_data; 960 iov[0].iov_len = seg->uncompressed_len; 961 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 962 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 963 * to hold the compressed data. This example app simply uses the same size as the input 964 * buffer which will work for example purposes but when using the API in your application 965 * be sure to allocate enough room in the destination buffer for cases where the data is 966 * no compressible, the addition of header information will cause it to be larger than the 967 * original input. 968 */ 969 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len, iov, 1, 970 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 971 if (rc < 0) { 972 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 973 goto error; 974 } 975 976 return; 977 978 error: 979 spdk_put_io_channel(ctx->ch); 980 fclose(ctx->file); 981 free(ctx); 982 spdk_app_stop(rc); 983 } 984 985 static void 986 accel_perf_prep(void *arg1) 987 { 988 struct accel_perf_prep_ctx *ctx; 989 int rc = 0; 990 991 if (g_workload_selection != ACCEL_OPC_COMPRESS && 992 g_workload_selection != ACCEL_OPC_DECOMPRESS) { 993 accel_perf_start(arg1); 994 return; 995 } 996 997 if (g_cd_file_in_name == NULL) { 998 fprintf(stdout, "A filename is required.\n"); 999 rc = -EINVAL; 1000 goto error_end; 1001 } 1002 1003 if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) { 1004 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1005 rc = -ENOTSUP; 1006 goto error_end; 1007 } 1008 1009 printf("Preparing input file...\n"); 1010 1011 ctx = calloc(1, sizeof(*ctx)); 1012 if (ctx == NULL) { 1013 rc = -ENOMEM; 1014 goto error_end; 1015 } 1016 1017 ctx->file = fopen(g_cd_file_in_name, "r"); 1018 if (ctx->file == NULL) { 1019 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1020 rc = -errno; 1021 goto error_ctx; 1022 } 1023 1024 fseek(ctx->file, 0L, SEEK_END); 1025 ctx->remaining = ftell(ctx->file); 1026 fseek(ctx->file, 0L, SEEK_SET); 1027 1028 ctx->ch = spdk_accel_get_io_channel(); 1029 if (ctx->ch == NULL) { 1030 rc = -EAGAIN; 1031 goto error_file; 1032 } 1033 1034 if (g_xfer_size_bytes == 0) { 1035 /* size of 0 means "file at a time" */ 1036 g_xfer_size_bytes = ctx->remaining; 1037 } 1038 1039 accel_perf_prep_process_seg(ctx); 1040 return; 1041 1042 error_file: 1043 fclose(ctx->file); 1044 error_ctx: 1045 free(ctx); 1046 error_end: 1047 spdk_app_stop(rc); 1048 } 1049 1050 int 1051 main(int argc, char **argv) 1052 { 1053 struct worker_thread *worker, *tmp; 1054 1055 pthread_mutex_init(&g_workers_lock, NULL); 1056 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1057 g_opts.name = "accel_perf"; 1058 g_opts.reactor_mask = "0x1"; 1059 if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:", NULL, parse_args, 1060 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 1061 g_rc = -1; 1062 goto cleanup; 1063 } 1064 1065 if ((g_workload_selection != ACCEL_OPC_COPY) && 1066 (g_workload_selection != ACCEL_OPC_FILL) && 1067 (g_workload_selection != ACCEL_OPC_CRC32C) && 1068 (g_workload_selection != ACCEL_OPC_COPY_CRC32C) && 1069 (g_workload_selection != ACCEL_OPC_COMPARE) && 1070 (g_workload_selection != ACCEL_OPC_COMPRESS) && 1071 (g_workload_selection != ACCEL_OPC_DECOMPRESS) && 1072 (g_workload_selection != ACCEL_OPC_DUALCAST)) { 1073 usage(); 1074 g_rc = -1; 1075 goto cleanup; 1076 } 1077 1078 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1079 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1080 usage(); 1081 g_rc = -1; 1082 goto cleanup; 1083 } 1084 1085 if (g_allocate_depth == 0) { 1086 g_allocate_depth = g_queue_depth; 1087 } 1088 1089 if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) && 1090 g_chained_count == 0) { 1091 usage(); 1092 g_rc = -1; 1093 goto cleanup; 1094 } 1095 1096 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1097 if (g_rc) { 1098 SPDK_ERRLOG("ERROR starting application\n"); 1099 } 1100 1101 pthread_mutex_destroy(&g_workers_lock); 1102 1103 worker = g_workers; 1104 while (worker) { 1105 tmp = worker->next; 1106 free(worker); 1107 worker = tmp; 1108 } 1109 cleanup: 1110 accel_perf_free_compress_segs(); 1111 spdk_app_fini(); 1112 return g_rc; 1113 } 1114