1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 16 #define DATA_PATTERN 0x5a 17 #define ALIGN_4K 0x1000 18 19 static uint64_t g_tsc_rate; 20 static uint64_t g_tsc_end; 21 static int g_rc; 22 static int g_xfer_size_bytes = 4096; 23 static int g_queue_depth = 32; 24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 25 * be at least as much as the queue depth. 26 */ 27 static int g_allocate_depth = 0; 28 static int g_threads_per_core = 1; 29 static int g_time_in_sec = 5; 30 static uint32_t g_crc32c_seed = 0; 31 static uint32_t g_chained_count = 1; 32 static int g_fail_percent_goal = 0; 33 static uint8_t g_fill_pattern = 255; 34 static bool g_verify = false; 35 static const char *g_workload_type = NULL; 36 static enum accel_opcode g_workload_selection; 37 static struct worker_thread *g_workers = NULL; 38 static int g_num_workers = 0; 39 static char *g_cd_file_in_name = NULL; 40 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 41 static struct spdk_app_opts g_opts = {}; 42 43 struct ap_compress_seg { 44 void *uncompressed_data; 45 uint32_t uncompressed_len; 46 struct iovec *uncompressed_iovs; 47 uint32_t uncompressed_iovcnt; 48 49 void *compressed_data; 50 uint32_t compressed_len; 51 struct iovec *compressed_iovs; 52 uint32_t compressed_iovcnt; 53 54 STAILQ_ENTRY(ap_compress_seg) link; 55 }; 56 57 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 58 59 struct worker_thread; 60 static void accel_done(void *ref, int status); 61 62 struct display_info { 63 int core; 64 int thread; 65 }; 66 67 struct ap_task { 68 void *src; 69 struct iovec *src_iovs; 70 uint32_t src_iovcnt; 71 struct iovec *dst_iovs; 72 uint32_t dst_iovcnt; 73 void *dst; 74 void *dst2; 75 uint32_t crc_dst; 76 uint32_t compressed_sz; 77 struct ap_compress_seg *cur_seg; 78 struct worker_thread *worker; 79 int expected_status; /* used for the compare operation */ 80 TAILQ_ENTRY(ap_task) link; 81 }; 82 83 struct worker_thread { 84 struct spdk_io_channel *ch; 85 uint64_t xfer_completed; 86 uint64_t xfer_failed; 87 uint64_t injected_miscompares; 88 uint64_t current_queue_depth; 89 TAILQ_HEAD(, ap_task) tasks_pool; 90 struct worker_thread *next; 91 unsigned core; 92 struct spdk_thread *thread; 93 bool is_draining; 94 struct spdk_poller *is_draining_poller; 95 struct spdk_poller *stop_poller; 96 void *task_base; 97 struct display_info display; 98 enum accel_opcode workload; 99 }; 100 101 static void 102 dump_user_config(void) 103 { 104 const char *module_name = NULL; 105 int rc; 106 107 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 108 if (rc) { 109 printf("error getting module name (%d)\n", rc); 110 } 111 112 printf("\nSPDK Configuration:\n"); 113 printf("Core mask: %s\n\n", g_opts.reactor_mask); 114 printf("Accel Perf Configuration:\n"); 115 printf("Workload Type: %s\n", g_workload_type); 116 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 117 printf("CRC-32C seed: %u\n", g_crc32c_seed); 118 } else if (g_workload_selection == ACCEL_OPC_FILL) { 119 printf("Fill pattern: 0x%x\n", g_fill_pattern); 120 } else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 121 printf("Failure inject: %u percent\n", g_fail_percent_goal); 122 } 123 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 124 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 125 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 126 } else { 127 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 128 } 129 printf("vector count %u\n", g_chained_count); 130 printf("Module: %s\n", module_name); 131 if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 132 printf("File Name: %s\n", g_cd_file_in_name); 133 } 134 printf("Queue depth: %u\n", g_queue_depth); 135 printf("Allocate depth: %u\n", g_allocate_depth); 136 printf("# threads/core: %u\n", g_threads_per_core); 137 printf("Run time: %u seconds\n", g_time_in_sec); 138 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 139 } 140 141 static void 142 usage(void) 143 { 144 printf("accel_perf options:\n"); 145 printf("\t[-h help message]\n"); 146 printf("\t[-q queue depth per core]\n"); 147 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 148 printf("\t[-T number of threads per core\n"); 149 printf("\t[-n number of channels]\n"); 150 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 151 printf("\t[-t time in seconds]\n"); 152 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast\n"); 153 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 154 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 155 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 156 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 157 printf("\t[-y verify result if this switch is on]\n"); 158 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 159 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 160 } 161 162 static int 163 parse_args(int argc, char *argv) 164 { 165 int argval = 0; 166 167 switch (argc) { 168 case 'a': 169 case 'C': 170 case 'f': 171 case 'T': 172 case 'o': 173 case 'P': 174 case 'q': 175 case 's': 176 case 't': 177 argval = spdk_strtol(optarg, 10); 178 if (argval < 0) { 179 fprintf(stderr, "-%c option must be non-negative.\n", argc); 180 usage(); 181 return 1; 182 } 183 break; 184 default: 185 break; 186 }; 187 188 switch (argc) { 189 case 'a': 190 g_allocate_depth = argval; 191 break; 192 case 'C': 193 g_chained_count = argval; 194 break; 195 case 'l': 196 g_cd_file_in_name = optarg; 197 break; 198 case 'f': 199 g_fill_pattern = (uint8_t)argval; 200 break; 201 case 'T': 202 g_threads_per_core = argval; 203 break; 204 case 'o': 205 g_xfer_size_bytes = argval; 206 break; 207 case 'P': 208 g_fail_percent_goal = argval; 209 break; 210 case 'q': 211 g_queue_depth = argval; 212 break; 213 case 's': 214 g_crc32c_seed = argval; 215 break; 216 case 't': 217 g_time_in_sec = argval; 218 break; 219 case 'y': 220 g_verify = true; 221 break; 222 case 'w': 223 g_workload_type = optarg; 224 if (!strcmp(g_workload_type, "copy")) { 225 g_workload_selection = ACCEL_OPC_COPY; 226 } else if (!strcmp(g_workload_type, "fill")) { 227 g_workload_selection = ACCEL_OPC_FILL; 228 } else if (!strcmp(g_workload_type, "crc32c")) { 229 g_workload_selection = ACCEL_OPC_CRC32C; 230 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 231 g_workload_selection = ACCEL_OPC_COPY_CRC32C; 232 } else if (!strcmp(g_workload_type, "compare")) { 233 g_workload_selection = ACCEL_OPC_COMPARE; 234 } else if (!strcmp(g_workload_type, "dualcast")) { 235 g_workload_selection = ACCEL_OPC_DUALCAST; 236 } else if (!strcmp(g_workload_type, "compress")) { 237 g_workload_selection = ACCEL_OPC_COMPRESS; 238 } else if (!strcmp(g_workload_type, "decompress")) { 239 g_workload_selection = ACCEL_OPC_DECOMPRESS; 240 } else { 241 usage(); 242 return 1; 243 } 244 break; 245 default: 246 usage(); 247 return 1; 248 } 249 250 return 0; 251 } 252 253 static int dump_result(void); 254 static void 255 unregister_worker(void *arg1) 256 { 257 struct worker_thread *worker = arg1; 258 259 free(worker->task_base); 260 spdk_put_io_channel(worker->ch); 261 spdk_thread_exit(spdk_get_thread()); 262 pthread_mutex_lock(&g_workers_lock); 263 assert(g_num_workers >= 1); 264 if (--g_num_workers == 0) { 265 pthread_mutex_unlock(&g_workers_lock); 266 g_rc = dump_result(); 267 spdk_app_stop(0); 268 } 269 pthread_mutex_unlock(&g_workers_lock); 270 } 271 272 static void 273 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 274 { 275 uint64_t ele_size; 276 uint8_t *data; 277 uint32_t i; 278 279 ele_size = spdk_divide_round_up(sz, iovcnt); 280 281 data = buf; 282 for (i = 0; i < iovcnt; i++) { 283 ele_size = spdk_min(ele_size, sz); 284 assert(ele_size > 0); 285 286 iovs[i].iov_base = data; 287 iovs[i].iov_len = ele_size; 288 289 data += ele_size; 290 sz -= ele_size; 291 } 292 assert(sz == 0); 293 } 294 295 static int 296 _get_task_data_bufs(struct ap_task *task) 297 { 298 uint32_t align = 0; 299 uint32_t i = 0; 300 int dst_buff_len = g_xfer_size_bytes; 301 302 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 303 * we do this for all modules to keep it simple. 304 */ 305 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 306 align = ALIGN_4K; 307 } 308 309 if (g_workload_selection == ACCEL_OPC_COMPRESS || 310 g_workload_selection == ACCEL_OPC_DECOMPRESS) { 311 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 312 } else if (g_workload_selection == ACCEL_OPC_CRC32C || 313 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 314 assert(g_chained_count > 0); 315 task->src_iovcnt = g_chained_count; 316 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 317 if (!task->src_iovs) { 318 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 319 return -ENOMEM; 320 } 321 322 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 323 dst_buff_len = g_xfer_size_bytes * g_chained_count; 324 } 325 326 for (i = 0; i < task->src_iovcnt; i++) { 327 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 328 if (task->src_iovs[i].iov_base == NULL) { 329 return -ENOMEM; 330 } 331 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 332 task->src_iovs[i].iov_len = g_xfer_size_bytes; 333 } 334 335 } else { 336 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 337 if (task->src == NULL) { 338 fprintf(stderr, "Unable to alloc src buffer\n"); 339 return -ENOMEM; 340 } 341 342 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 343 if (g_workload_selection == ACCEL_OPC_FILL) { 344 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 345 } else { 346 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 347 } 348 } 349 350 if (g_workload_selection != ACCEL_OPC_CRC32C) { 351 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 352 if (task->dst == NULL) { 353 fprintf(stderr, "Unable to alloc dst buffer\n"); 354 return -ENOMEM; 355 } 356 357 /* For compare we want the buffers to match, otherwise not. */ 358 if (g_workload_selection == ACCEL_OPC_COMPARE) { 359 memset(task->dst, DATA_PATTERN, dst_buff_len); 360 } else { 361 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 362 } 363 364 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 365 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 366 if (!task->dst_iovs) { 367 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 368 return -ENOMEM; 369 } 370 task->dst_iovcnt = g_chained_count; 371 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 372 } 373 } 374 375 /* For dualcast 2 buffers are needed for the operation. */ 376 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 377 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 378 if (task->dst2 == NULL) { 379 fprintf(stderr, "Unable to alloc dst buffer\n"); 380 return -ENOMEM; 381 } 382 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 383 } 384 385 return 0; 386 } 387 388 inline static struct ap_task * 389 _get_task(struct worker_thread *worker) 390 { 391 struct ap_task *task; 392 393 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 394 task = TAILQ_FIRST(&worker->tasks_pool); 395 TAILQ_REMOVE(&worker->tasks_pool, task, link); 396 } else { 397 fprintf(stderr, "Unable to get ap_task\n"); 398 return NULL; 399 } 400 401 return task; 402 } 403 404 /* Submit one operation using the same ap task that just completed. */ 405 static void 406 _submit_single(struct worker_thread *worker, struct ap_task *task) 407 { 408 int random_num; 409 int rc = 0; 410 int flags = 0; 411 412 assert(worker); 413 414 switch (worker->workload) { 415 case ACCEL_OPC_COPY: 416 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 417 g_xfer_size_bytes, flags, accel_done, task); 418 break; 419 case ACCEL_OPC_FILL: 420 /* For fill use the first byte of the task->dst buffer */ 421 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 422 g_xfer_size_bytes, flags, accel_done, task); 423 break; 424 case ACCEL_OPC_CRC32C: 425 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 426 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 427 accel_done, task); 428 break; 429 case ACCEL_OPC_COPY_CRC32C: 430 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 431 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 432 break; 433 case ACCEL_OPC_COMPARE: 434 random_num = rand() % 100; 435 if (random_num < g_fail_percent_goal) { 436 task->expected_status = -EILSEQ; 437 *(uint8_t *)task->dst = ~DATA_PATTERN; 438 } else { 439 task->expected_status = 0; 440 *(uint8_t *)task->dst = DATA_PATTERN; 441 } 442 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 443 g_xfer_size_bytes, accel_done, task); 444 break; 445 case ACCEL_OPC_DUALCAST: 446 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 447 task->src, g_xfer_size_bytes, flags, accel_done, task); 448 break; 449 case ACCEL_OPC_COMPRESS: 450 task->src_iovs = task->cur_seg->uncompressed_iovs; 451 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 452 rc = spdk_accel_submit_compress(worker->ch, task->dst, g_xfer_size_bytes, task->src_iovs, 453 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 454 break; 455 case ACCEL_OPC_DECOMPRESS: 456 task->src_iovs = task->cur_seg->compressed_iovs; 457 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 458 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 459 task->src_iovcnt, flags, accel_done, task); 460 break; 461 default: 462 assert(false); 463 break; 464 465 } 466 467 worker->current_queue_depth++; 468 if (rc) { 469 accel_done(task, rc); 470 } 471 } 472 473 static void 474 _free_task_buffers(struct ap_task *task) 475 { 476 uint32_t i; 477 478 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 479 free(task->dst_iovs); 480 } else if (g_workload_selection == ACCEL_OPC_CRC32C || 481 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 482 if (task->src_iovs) { 483 for (i = 0; i < task->src_iovcnt; i++) { 484 if (task->src_iovs[i].iov_base) { 485 spdk_dma_free(task->src_iovs[i].iov_base); 486 } 487 } 488 free(task->src_iovs); 489 } 490 } else { 491 spdk_dma_free(task->src); 492 } 493 494 spdk_dma_free(task->dst); 495 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 496 spdk_dma_free(task->dst2); 497 } 498 } 499 500 static int 501 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 502 { 503 uint32_t i; 504 uint32_t ttl_len = 0; 505 uint8_t *dst = (uint8_t *)_dst; 506 507 for (i = 0; i < iovcnt; i++) { 508 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 509 return -1; 510 } 511 dst += src_src_iovs[i].iov_len; 512 ttl_len += src_src_iovs[i].iov_len; 513 } 514 515 if (ttl_len != iovcnt * g_xfer_size_bytes) { 516 return -1; 517 } 518 519 return 0; 520 } 521 522 static int _worker_stop(void *arg); 523 524 static void 525 accel_done(void *arg1, int status) 526 { 527 struct ap_task *task = arg1; 528 struct worker_thread *worker = task->worker; 529 uint32_t sw_crc32c; 530 531 assert(worker); 532 assert(worker->current_queue_depth > 0); 533 534 if (g_verify && status == 0) { 535 switch (worker->workload) { 536 case ACCEL_OPC_COPY_CRC32C: 537 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 538 if (task->crc_dst != sw_crc32c) { 539 SPDK_NOTICELOG("CRC-32C miscompare\n"); 540 worker->xfer_failed++; 541 } 542 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 543 SPDK_NOTICELOG("Data miscompare\n"); 544 worker->xfer_failed++; 545 } 546 break; 547 case ACCEL_OPC_CRC32C: 548 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 549 if (task->crc_dst != sw_crc32c) { 550 SPDK_NOTICELOG("CRC-32C miscompare\n"); 551 worker->xfer_failed++; 552 } 553 break; 554 case ACCEL_OPC_COPY: 555 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 556 SPDK_NOTICELOG("Data miscompare\n"); 557 worker->xfer_failed++; 558 } 559 break; 560 case ACCEL_OPC_DUALCAST: 561 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 562 SPDK_NOTICELOG("Data miscompare, first destination\n"); 563 worker->xfer_failed++; 564 } 565 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 566 SPDK_NOTICELOG("Data miscompare, second destination\n"); 567 worker->xfer_failed++; 568 } 569 break; 570 case ACCEL_OPC_FILL: 571 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 572 SPDK_NOTICELOG("Data miscompare\n"); 573 worker->xfer_failed++; 574 } 575 break; 576 case ACCEL_OPC_COMPARE: 577 break; 578 case ACCEL_OPC_COMPRESS: 579 break; 580 case ACCEL_OPC_DECOMPRESS: 581 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 582 SPDK_NOTICELOG("Data miscompare on decompression\n"); 583 worker->xfer_failed++; 584 } 585 break; 586 default: 587 assert(false); 588 break; 589 } 590 } 591 592 if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 593 /* Advance the task to the next segment */ 594 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 595 if (task->cur_seg == NULL) { 596 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 597 } 598 } 599 600 if (task->expected_status == -EILSEQ) { 601 assert(status != 0); 602 worker->injected_miscompares++; 603 status = 0; 604 } else if (status) { 605 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 606 worker->xfer_failed++; 607 } 608 609 worker->xfer_completed++; 610 worker->current_queue_depth--; 611 612 if (!worker->is_draining && status == 0) { 613 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 614 task = _get_task(worker); 615 _submit_single(worker, task); 616 } else { 617 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 618 } 619 } 620 621 static int 622 dump_result(void) 623 { 624 uint64_t total_completed = 0; 625 uint64_t total_failed = 0; 626 uint64_t total_miscompared = 0; 627 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 628 struct worker_thread *worker = g_workers; 629 630 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 631 printf("------------------------------------------------------------------------\n"); 632 while (worker != NULL) { 633 634 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 635 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 636 (g_time_in_sec * 1024 * 1024); 637 638 total_completed += worker->xfer_completed; 639 total_failed += worker->xfer_failed; 640 total_miscompared += worker->injected_miscompares; 641 642 if (xfer_per_sec) { 643 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 644 worker->display.core, worker->display.thread, xfer_per_sec, 645 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 646 } 647 648 worker = worker->next; 649 } 650 651 total_xfer_per_sec = total_completed / g_time_in_sec; 652 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 653 (g_time_in_sec * 1024 * 1024); 654 655 printf("=========================================================================\n"); 656 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 657 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 658 659 return total_failed ? 1 : 0; 660 } 661 662 static inline void 663 _free_task_buffers_in_pool(struct worker_thread *worker) 664 { 665 struct ap_task *task; 666 667 assert(worker); 668 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 669 TAILQ_REMOVE(&worker->tasks_pool, task, link); 670 _free_task_buffers(task); 671 } 672 } 673 674 static int 675 _check_draining(void *arg) 676 { 677 struct worker_thread *worker = arg; 678 679 assert(worker); 680 681 if (worker->current_queue_depth == 0) { 682 _free_task_buffers_in_pool(worker); 683 spdk_poller_unregister(&worker->is_draining_poller); 684 unregister_worker(worker); 685 } 686 687 return SPDK_POLLER_BUSY; 688 } 689 690 static int 691 _worker_stop(void *arg) 692 { 693 struct worker_thread *worker = arg; 694 695 assert(worker); 696 697 spdk_poller_unregister(&worker->stop_poller); 698 699 /* now let the worker drain and check it's outstanding IO with a poller */ 700 worker->is_draining = true; 701 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 702 703 return SPDK_POLLER_BUSY; 704 } 705 706 static void 707 _init_thread(void *arg1) 708 { 709 struct worker_thread *worker; 710 struct ap_task *task; 711 int i, num_tasks = g_allocate_depth; 712 struct display_info *display = arg1; 713 714 worker = calloc(1, sizeof(*worker)); 715 if (worker == NULL) { 716 fprintf(stderr, "Unable to allocate worker\n"); 717 free(display); 718 return; 719 } 720 721 worker->workload = g_workload_selection; 722 worker->display.core = display->core; 723 worker->display.thread = display->thread; 724 free(display); 725 worker->core = spdk_env_get_current_core(); 726 worker->thread = spdk_get_thread(); 727 pthread_mutex_lock(&g_workers_lock); 728 g_num_workers++; 729 worker->next = g_workers; 730 g_workers = worker; 731 pthread_mutex_unlock(&g_workers_lock); 732 worker->ch = spdk_accel_get_io_channel(); 733 if (worker->ch == NULL) { 734 fprintf(stderr, "Unable to get an accel channel\n"); 735 goto error; 736 } 737 738 TAILQ_INIT(&worker->tasks_pool); 739 740 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 741 if (worker->task_base == NULL) { 742 fprintf(stderr, "Could not allocate task base.\n"); 743 goto error; 744 } 745 746 task = worker->task_base; 747 for (i = 0; i < num_tasks; i++) { 748 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 749 task->worker = worker; 750 if (_get_task_data_bufs(task)) { 751 fprintf(stderr, "Unable to get data bufs\n"); 752 goto error; 753 } 754 task++; 755 } 756 757 /* Register a poller that will stop the worker at time elapsed */ 758 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 759 g_time_in_sec * 1000000ULL); 760 761 /* Load up queue depth worth of operations. */ 762 for (i = 0; i < g_queue_depth; i++) { 763 task = _get_task(worker); 764 if (task == NULL) { 765 goto error; 766 } 767 768 _submit_single(worker, task); 769 } 770 return; 771 error: 772 773 _free_task_buffers_in_pool(worker); 774 free(worker->task_base); 775 spdk_app_stop(-1); 776 } 777 778 static void 779 accel_perf_start(void *arg1) 780 { 781 struct spdk_cpuset tmp_cpumask = {}; 782 char thread_name[32]; 783 uint32_t i; 784 int j; 785 struct spdk_thread *thread; 786 struct display_info *display; 787 788 g_tsc_rate = spdk_get_ticks_hz(); 789 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 790 791 dump_user_config(); 792 793 printf("Running for %d seconds...\n", g_time_in_sec); 794 fflush(stdout); 795 796 /* Create worker threads for each core that was specified. */ 797 SPDK_ENV_FOREACH_CORE(i) { 798 for (j = 0; j < g_threads_per_core; j++) { 799 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 800 spdk_cpuset_zero(&tmp_cpumask); 801 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 802 thread = spdk_thread_create(thread_name, &tmp_cpumask); 803 display = calloc(1, sizeof(*display)); 804 if (display == NULL) { 805 fprintf(stderr, "Unable to allocate memory\n"); 806 spdk_app_stop(-1); 807 return; 808 } 809 display->core = i; 810 display->thread = j; 811 spdk_thread_send_msg(thread, _init_thread, display); 812 } 813 } 814 } 815 816 static void 817 accel_perf_free_compress_segs(void) 818 { 819 struct ap_compress_seg *seg, *tmp; 820 821 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 822 free(seg->uncompressed_iovs); 823 free(seg->compressed_iovs); 824 spdk_dma_free(seg->compressed_data); 825 spdk_dma_free(seg->uncompressed_data); 826 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 827 free(seg); 828 } 829 } 830 831 struct accel_perf_prep_ctx { 832 FILE *file; 833 long remaining; 834 struct spdk_io_channel *ch; 835 struct ap_compress_seg *cur_seg; 836 }; 837 838 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 839 840 static void 841 accel_perf_prep_process_seg_cpl(void *ref, int status) 842 { 843 struct accel_perf_prep_ctx *ctx = ref; 844 struct ap_compress_seg *seg; 845 846 if (status != 0) { 847 fprintf(stderr, "error (%d) on initial compress completion\n", status); 848 spdk_dma_free(ctx->cur_seg->compressed_data); 849 spdk_dma_free(ctx->cur_seg->uncompressed_data); 850 free(ctx->cur_seg); 851 spdk_put_io_channel(ctx->ch); 852 fclose(ctx->file); 853 free(ctx); 854 spdk_app_stop(-status); 855 return; 856 } 857 858 seg = ctx->cur_seg; 859 860 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 861 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 862 if (seg->compressed_iovs == NULL) { 863 fprintf(stderr, "unable to allocate iovec\n"); 864 spdk_dma_free(seg->compressed_data); 865 spdk_dma_free(seg->uncompressed_data); 866 free(seg); 867 spdk_put_io_channel(ctx->ch); 868 fclose(ctx->file); 869 free(ctx); 870 spdk_app_stop(-ENOMEM); 871 return; 872 } 873 seg->compressed_iovcnt = g_chained_count; 874 875 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 876 seg->compressed_iovcnt); 877 } 878 879 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 880 ctx->remaining -= seg->uncompressed_len; 881 882 accel_perf_prep_process_seg(ctx); 883 } 884 885 static void 886 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 887 { 888 struct ap_compress_seg *seg; 889 int sz, sz_read; 890 void *ubuf, *cbuf; 891 struct iovec iov[1]; 892 int rc; 893 894 if (ctx->remaining == 0) { 895 spdk_put_io_channel(ctx->ch); 896 fclose(ctx->file); 897 free(ctx); 898 accel_perf_start(NULL); 899 return; 900 } 901 902 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 903 904 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 905 if (!ubuf) { 906 fprintf(stderr, "unable to allocate uncompress buffer\n"); 907 rc = -ENOMEM; 908 goto error; 909 } 910 911 cbuf = spdk_dma_malloc(sz, ALIGN_4K, NULL); 912 if (!cbuf) { 913 fprintf(stderr, "unable to allocate compress buffer\n"); 914 rc = -ENOMEM; 915 spdk_dma_free(ubuf); 916 goto error; 917 } 918 919 seg = calloc(1, sizeof(*seg)); 920 if (!seg) { 921 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 922 spdk_dma_free(ubuf); 923 spdk_dma_free(cbuf); 924 rc = -ENOMEM; 925 goto error; 926 } 927 928 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 929 if (sz_read != sz) { 930 fprintf(stderr, "unable to read input file\n"); 931 free(seg); 932 spdk_dma_free(ubuf); 933 spdk_dma_free(cbuf); 934 rc = -errno; 935 goto error; 936 } 937 938 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 939 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 940 if (seg->uncompressed_iovs == NULL) { 941 fprintf(stderr, "unable to allocate iovec\n"); 942 free(seg); 943 spdk_dma_free(ubuf); 944 spdk_dma_free(cbuf); 945 rc = -ENOMEM; 946 goto error; 947 } 948 seg->uncompressed_iovcnt = g_chained_count; 949 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 950 } 951 952 seg->uncompressed_data = ubuf; 953 seg->uncompressed_len = sz; 954 seg->compressed_data = cbuf; 955 seg->compressed_len = sz; 956 957 ctx->cur_seg = seg; 958 iov[0].iov_base = seg->uncompressed_data; 959 iov[0].iov_len = seg->uncompressed_len; 960 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 961 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 962 * to hold the compressed data. This example app simply uses the same size as the input 963 * buffer which will work for example purposes but when using the API in your application 964 * be sure to allocate enough room in the destination buffer for cases where the data is 965 * no compressible, the addition of header information will cause it to be larger than the 966 * original input. 967 */ 968 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len, iov, 1, 969 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 970 if (rc < 0) { 971 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 972 goto error; 973 } 974 975 return; 976 977 error: 978 spdk_put_io_channel(ctx->ch); 979 fclose(ctx->file); 980 free(ctx); 981 spdk_app_stop(rc); 982 } 983 984 static void 985 accel_perf_prep(void *arg1) 986 { 987 struct accel_perf_prep_ctx *ctx; 988 int rc = 0; 989 990 if (g_workload_selection != ACCEL_OPC_COMPRESS && 991 g_workload_selection != ACCEL_OPC_DECOMPRESS) { 992 accel_perf_start(arg1); 993 return; 994 } 995 996 if (g_cd_file_in_name == NULL) { 997 fprintf(stdout, "A filename is required.\n"); 998 rc = -EINVAL; 999 goto error_end; 1000 } 1001 1002 if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) { 1003 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1004 rc = -ENOTSUP; 1005 goto error_end; 1006 } 1007 1008 printf("Preparing input file...\n"); 1009 1010 ctx = calloc(1, sizeof(*ctx)); 1011 if (ctx == NULL) { 1012 rc = -ENOMEM; 1013 goto error_end; 1014 } 1015 1016 ctx->file = fopen(g_cd_file_in_name, "r"); 1017 if (ctx->file == NULL) { 1018 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1019 rc = -errno; 1020 goto error_ctx; 1021 } 1022 1023 fseek(ctx->file, 0L, SEEK_END); 1024 ctx->remaining = ftell(ctx->file); 1025 fseek(ctx->file, 0L, SEEK_SET); 1026 1027 ctx->ch = spdk_accel_get_io_channel(); 1028 if (ctx->ch == NULL) { 1029 rc = -EAGAIN; 1030 goto error_file; 1031 } 1032 1033 if (g_xfer_size_bytes == 0) { 1034 /* size of 0 means "file at a time" */ 1035 g_xfer_size_bytes = ctx->remaining; 1036 } 1037 1038 accel_perf_prep_process_seg(ctx); 1039 return; 1040 1041 error_file: 1042 fclose(ctx->file); 1043 error_ctx: 1044 free(ctx); 1045 error_end: 1046 spdk_app_stop(rc); 1047 } 1048 1049 int 1050 main(int argc, char **argv) 1051 { 1052 struct worker_thread *worker, *tmp; 1053 1054 pthread_mutex_init(&g_workers_lock, NULL); 1055 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1056 g_opts.name = "accel_perf"; 1057 g_opts.reactor_mask = "0x1"; 1058 if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:", NULL, parse_args, 1059 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 1060 g_rc = -1; 1061 goto cleanup; 1062 } 1063 1064 if ((g_workload_selection != ACCEL_OPC_COPY) && 1065 (g_workload_selection != ACCEL_OPC_FILL) && 1066 (g_workload_selection != ACCEL_OPC_CRC32C) && 1067 (g_workload_selection != ACCEL_OPC_COPY_CRC32C) && 1068 (g_workload_selection != ACCEL_OPC_COMPARE) && 1069 (g_workload_selection != ACCEL_OPC_COMPRESS) && 1070 (g_workload_selection != ACCEL_OPC_DECOMPRESS) && 1071 (g_workload_selection != ACCEL_OPC_DUALCAST)) { 1072 usage(); 1073 g_rc = -1; 1074 goto cleanup; 1075 } 1076 1077 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1078 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1079 usage(); 1080 g_rc = -1; 1081 goto cleanup; 1082 } 1083 1084 if (g_allocate_depth == 0) { 1085 g_allocate_depth = g_queue_depth; 1086 } 1087 1088 if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) && 1089 g_chained_count == 0) { 1090 usage(); 1091 g_rc = -1; 1092 goto cleanup; 1093 } 1094 1095 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1096 if (g_rc) { 1097 SPDK_ERRLOG("ERROR starting application\n"); 1098 } 1099 1100 pthread_mutex_destroy(&g_workers_lock); 1101 1102 worker = g_workers; 1103 while (worker) { 1104 tmp = worker->next; 1105 free(worker); 1106 worker = tmp; 1107 } 1108 cleanup: 1109 accel_perf_free_compress_segs(); 1110 spdk_app_fini(); 1111 return g_rc; 1112 } 1113