1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 #include "spdk/xor.h" 16 17 #define DATA_PATTERN 0x5a 18 #define ALIGN_4K 0x1000 19 #define COMP_BUF_PAD_PERCENTAGE 1.1L 20 21 static uint64_t g_tsc_rate; 22 static uint64_t g_tsc_end; 23 static int g_rc; 24 static int g_xfer_size_bytes = 4096; 25 static int g_queue_depth = 32; 26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 27 * be at least as much as the queue depth. 28 */ 29 static int g_allocate_depth = 0; 30 static int g_threads_per_core = 1; 31 static int g_time_in_sec = 5; 32 static uint32_t g_crc32c_seed = 0; 33 static uint32_t g_chained_count = 1; 34 static int g_fail_percent_goal = 0; 35 static uint8_t g_fill_pattern = 255; 36 static uint32_t g_xor_src_count = 2; 37 static bool g_verify = false; 38 static const char *g_workload_type = NULL; 39 static enum accel_opcode g_workload_selection; 40 static struct worker_thread *g_workers = NULL; 41 static int g_num_workers = 0; 42 static char *g_cd_file_in_name = NULL; 43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 44 static struct spdk_app_opts g_opts = {}; 45 46 struct ap_compress_seg { 47 void *uncompressed_data; 48 uint32_t uncompressed_len; 49 struct iovec *uncompressed_iovs; 50 uint32_t uncompressed_iovcnt; 51 52 void *compressed_data; 53 uint32_t compressed_len; 54 uint32_t compressed_len_padded; 55 struct iovec *compressed_iovs; 56 uint32_t compressed_iovcnt; 57 58 STAILQ_ENTRY(ap_compress_seg) link; 59 }; 60 61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 62 63 struct worker_thread; 64 static void accel_done(void *ref, int status); 65 66 struct display_info { 67 int core; 68 int thread; 69 }; 70 71 struct ap_task { 72 void *src; 73 struct iovec *src_iovs; 74 uint32_t src_iovcnt; 75 void **sources; 76 struct iovec *dst_iovs; 77 uint32_t dst_iovcnt; 78 void *dst; 79 void *dst2; 80 uint32_t crc_dst; 81 uint32_t compressed_sz; 82 struct ap_compress_seg *cur_seg; 83 struct worker_thread *worker; 84 int expected_status; /* used for the compare operation */ 85 TAILQ_ENTRY(ap_task) link; 86 }; 87 88 struct worker_thread { 89 struct spdk_io_channel *ch; 90 struct spdk_accel_opcode_stats stats; 91 uint64_t xfer_failed; 92 uint64_t injected_miscompares; 93 uint64_t current_queue_depth; 94 TAILQ_HEAD(, ap_task) tasks_pool; 95 struct worker_thread *next; 96 unsigned core; 97 struct spdk_thread *thread; 98 bool is_draining; 99 struct spdk_poller *is_draining_poller; 100 struct spdk_poller *stop_poller; 101 void *task_base; 102 struct display_info display; 103 enum accel_opcode workload; 104 }; 105 106 static void 107 dump_user_config(void) 108 { 109 const char *module_name = NULL; 110 int rc; 111 112 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 113 if (rc) { 114 printf("error getting module name (%d)\n", rc); 115 } 116 117 printf("\nSPDK Configuration:\n"); 118 printf("Core mask: %s\n\n", g_opts.reactor_mask); 119 printf("Accel Perf Configuration:\n"); 120 printf("Workload Type: %s\n", g_workload_type); 121 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 122 printf("CRC-32C seed: %u\n", g_crc32c_seed); 123 } else if (g_workload_selection == ACCEL_OPC_FILL) { 124 printf("Fill pattern: 0x%x\n", g_fill_pattern); 125 } else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 126 printf("Failure inject: %u percent\n", g_fail_percent_goal); 127 } else if (g_workload_selection == ACCEL_OPC_XOR) { 128 printf("Source buffers: %u\n", g_xor_src_count); 129 } 130 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 131 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 132 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 133 } else { 134 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 135 } 136 printf("vector count %u\n", g_chained_count); 137 printf("Module: %s\n", module_name); 138 if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 139 printf("File Name: %s\n", g_cd_file_in_name); 140 } 141 printf("Queue depth: %u\n", g_queue_depth); 142 printf("Allocate depth: %u\n", g_allocate_depth); 143 printf("# threads/core: %u\n", g_threads_per_core); 144 printf("Run time: %u seconds\n", g_time_in_sec); 145 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 146 } 147 148 static void 149 usage(void) 150 { 151 printf("accel_perf options:\n"); 152 printf("\t[-h help message]\n"); 153 printf("\t[-q queue depth per core]\n"); 154 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 155 printf("\t[-T number of threads per core\n"); 156 printf("\t[-n number of channels]\n"); 157 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 158 printf("\t[-t time in seconds]\n"); 159 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n"); 160 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 161 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 162 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 163 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 164 printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n"); 165 printf("\t[-y verify result if this switch is on]\n"); 166 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 167 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 168 } 169 170 static int 171 parse_args(int argc, char *argv) 172 { 173 int argval = 0; 174 175 switch (argc) { 176 case 'a': 177 case 'C': 178 case 'f': 179 case 'T': 180 case 'o': 181 case 'P': 182 case 'q': 183 case 's': 184 case 't': 185 case 'x': 186 argval = spdk_strtol(optarg, 10); 187 if (argval < 0) { 188 fprintf(stderr, "-%c option must be non-negative.\n", argc); 189 usage(); 190 return 1; 191 } 192 break; 193 default: 194 break; 195 }; 196 197 switch (argc) { 198 case 'a': 199 g_allocate_depth = argval; 200 break; 201 case 'C': 202 g_chained_count = argval; 203 break; 204 case 'l': 205 g_cd_file_in_name = optarg; 206 break; 207 case 'f': 208 g_fill_pattern = (uint8_t)argval; 209 break; 210 case 'T': 211 g_threads_per_core = argval; 212 break; 213 case 'o': 214 g_xfer_size_bytes = argval; 215 break; 216 case 'P': 217 g_fail_percent_goal = argval; 218 break; 219 case 'q': 220 g_queue_depth = argval; 221 break; 222 case 's': 223 g_crc32c_seed = argval; 224 break; 225 case 't': 226 g_time_in_sec = argval; 227 break; 228 case 'x': 229 g_xor_src_count = argval; 230 break; 231 case 'y': 232 g_verify = true; 233 break; 234 case 'w': 235 g_workload_type = optarg; 236 if (!strcmp(g_workload_type, "copy")) { 237 g_workload_selection = ACCEL_OPC_COPY; 238 } else if (!strcmp(g_workload_type, "fill")) { 239 g_workload_selection = ACCEL_OPC_FILL; 240 } else if (!strcmp(g_workload_type, "crc32c")) { 241 g_workload_selection = ACCEL_OPC_CRC32C; 242 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 243 g_workload_selection = ACCEL_OPC_COPY_CRC32C; 244 } else if (!strcmp(g_workload_type, "compare")) { 245 g_workload_selection = ACCEL_OPC_COMPARE; 246 } else if (!strcmp(g_workload_type, "dualcast")) { 247 g_workload_selection = ACCEL_OPC_DUALCAST; 248 } else if (!strcmp(g_workload_type, "compress")) { 249 g_workload_selection = ACCEL_OPC_COMPRESS; 250 } else if (!strcmp(g_workload_type, "decompress")) { 251 g_workload_selection = ACCEL_OPC_DECOMPRESS; 252 } else if (!strcmp(g_workload_type, "xor")) { 253 g_workload_selection = ACCEL_OPC_XOR; 254 } else { 255 usage(); 256 return 1; 257 } 258 break; 259 default: 260 usage(); 261 return 1; 262 } 263 264 return 0; 265 } 266 267 static int dump_result(void); 268 static void 269 unregister_worker(void *arg1) 270 { 271 struct worker_thread *worker = arg1; 272 273 spdk_accel_get_opcode_stats(worker->ch, worker->workload, 274 &worker->stats, sizeof(worker->stats)); 275 free(worker->task_base); 276 spdk_put_io_channel(worker->ch); 277 spdk_thread_exit(spdk_get_thread()); 278 pthread_mutex_lock(&g_workers_lock); 279 assert(g_num_workers >= 1); 280 if (--g_num_workers == 0) { 281 pthread_mutex_unlock(&g_workers_lock); 282 g_rc = dump_result(); 283 spdk_app_stop(0); 284 } else { 285 pthread_mutex_unlock(&g_workers_lock); 286 } 287 } 288 289 static void 290 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 291 { 292 uint64_t ele_size; 293 uint8_t *data; 294 uint32_t i; 295 296 ele_size = spdk_divide_round_up(sz, iovcnt); 297 298 data = buf; 299 for (i = 0; i < iovcnt; i++) { 300 ele_size = spdk_min(ele_size, sz); 301 assert(ele_size > 0); 302 303 iovs[i].iov_base = data; 304 iovs[i].iov_len = ele_size; 305 306 data += ele_size; 307 sz -= ele_size; 308 } 309 assert(sz == 0); 310 } 311 312 static int 313 _get_task_data_bufs(struct ap_task *task) 314 { 315 uint32_t align = 0; 316 uint32_t i = 0; 317 int dst_buff_len = g_xfer_size_bytes; 318 319 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 320 * we do this for all modules to keep it simple. 321 */ 322 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 323 align = ALIGN_4K; 324 } 325 326 if (g_workload_selection == ACCEL_OPC_COMPRESS || 327 g_workload_selection == ACCEL_OPC_DECOMPRESS) { 328 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 329 330 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 331 dst_buff_len = task->cur_seg->compressed_len_padded; 332 } 333 334 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 335 if (task->dst == NULL) { 336 fprintf(stderr, "Unable to alloc dst buffer\n"); 337 return -ENOMEM; 338 } 339 340 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 341 if (!task->dst_iovs) { 342 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 343 return -ENOMEM; 344 } 345 task->dst_iovcnt = g_chained_count; 346 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 347 348 return 0; 349 } 350 351 if (g_workload_selection == ACCEL_OPC_CRC32C || 352 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 353 assert(g_chained_count > 0); 354 task->src_iovcnt = g_chained_count; 355 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 356 if (!task->src_iovs) { 357 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 358 return -ENOMEM; 359 } 360 361 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 362 dst_buff_len = g_xfer_size_bytes * g_chained_count; 363 } 364 365 for (i = 0; i < task->src_iovcnt; i++) { 366 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 367 if (task->src_iovs[i].iov_base == NULL) { 368 return -ENOMEM; 369 } 370 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 371 task->src_iovs[i].iov_len = g_xfer_size_bytes; 372 } 373 } else if (g_workload_selection == ACCEL_OPC_XOR) { 374 assert(g_xor_src_count > 1); 375 task->sources = calloc(g_xor_src_count, sizeof(*task->sources)); 376 if (!task->sources) { 377 return -ENOMEM; 378 } 379 380 for (i = 0; i < g_xor_src_count; i++) { 381 task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 382 if (!task->sources[i]) { 383 return -ENOMEM; 384 } 385 memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes); 386 } 387 } else { 388 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 389 if (task->src == NULL) { 390 fprintf(stderr, "Unable to alloc src buffer\n"); 391 return -ENOMEM; 392 } 393 394 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 395 if (g_workload_selection == ACCEL_OPC_FILL) { 396 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 397 } else { 398 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 399 } 400 } 401 402 if (g_workload_selection != ACCEL_OPC_CRC32C) { 403 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 404 if (task->dst == NULL) { 405 fprintf(stderr, "Unable to alloc dst buffer\n"); 406 return -ENOMEM; 407 } 408 409 /* For compare we want the buffers to match, otherwise not. */ 410 if (g_workload_selection == ACCEL_OPC_COMPARE) { 411 memset(task->dst, DATA_PATTERN, dst_buff_len); 412 } else { 413 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 414 } 415 } 416 417 /* For dualcast 2 buffers are needed for the operation. */ 418 if (g_workload_selection == ACCEL_OPC_DUALCAST || 419 (g_workload_selection == ACCEL_OPC_XOR && g_verify)) { 420 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 421 if (task->dst2 == NULL) { 422 fprintf(stderr, "Unable to alloc dst buffer\n"); 423 return -ENOMEM; 424 } 425 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 426 } 427 428 return 0; 429 } 430 431 inline static struct ap_task * 432 _get_task(struct worker_thread *worker) 433 { 434 struct ap_task *task; 435 436 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 437 task = TAILQ_FIRST(&worker->tasks_pool); 438 TAILQ_REMOVE(&worker->tasks_pool, task, link); 439 } else { 440 fprintf(stderr, "Unable to get ap_task\n"); 441 return NULL; 442 } 443 444 return task; 445 } 446 447 /* Submit one operation using the same ap task that just completed. */ 448 static void 449 _submit_single(struct worker_thread *worker, struct ap_task *task) 450 { 451 int random_num; 452 int rc = 0; 453 int flags = 0; 454 455 assert(worker); 456 457 switch (worker->workload) { 458 case ACCEL_OPC_COPY: 459 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 460 g_xfer_size_bytes, flags, accel_done, task); 461 break; 462 case ACCEL_OPC_FILL: 463 /* For fill use the first byte of the task->dst buffer */ 464 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 465 g_xfer_size_bytes, flags, accel_done, task); 466 break; 467 case ACCEL_OPC_CRC32C: 468 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 469 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 470 accel_done, task); 471 break; 472 case ACCEL_OPC_COPY_CRC32C: 473 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 474 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 475 break; 476 case ACCEL_OPC_COMPARE: 477 random_num = rand() % 100; 478 if (random_num < g_fail_percent_goal) { 479 task->expected_status = -EILSEQ; 480 *(uint8_t *)task->dst = ~DATA_PATTERN; 481 } else { 482 task->expected_status = 0; 483 *(uint8_t *)task->dst = DATA_PATTERN; 484 } 485 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 486 g_xfer_size_bytes, accel_done, task); 487 break; 488 case ACCEL_OPC_DUALCAST: 489 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 490 task->src, g_xfer_size_bytes, flags, accel_done, task); 491 break; 492 case ACCEL_OPC_COMPRESS: 493 task->src_iovs = task->cur_seg->uncompressed_iovs; 494 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 495 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded, 496 task->src_iovs, 497 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 498 break; 499 case ACCEL_OPC_DECOMPRESS: 500 task->src_iovs = task->cur_seg->compressed_iovs; 501 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 502 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 503 task->src_iovcnt, NULL, flags, accel_done, task); 504 break; 505 case ACCEL_OPC_XOR: 506 rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count, 507 g_xfer_size_bytes, accel_done, task); 508 break; 509 default: 510 assert(false); 511 break; 512 513 } 514 515 worker->current_queue_depth++; 516 if (rc) { 517 accel_done(task, rc); 518 } 519 } 520 521 static void 522 _free_task_buffers(struct ap_task *task) 523 { 524 uint32_t i; 525 526 if (g_workload_selection == ACCEL_OPC_DECOMPRESS || g_workload_selection == ACCEL_OPC_COMPRESS) { 527 free(task->dst_iovs); 528 } else if (g_workload_selection == ACCEL_OPC_CRC32C || 529 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 530 if (task->src_iovs) { 531 for (i = 0; i < task->src_iovcnt; i++) { 532 if (task->src_iovs[i].iov_base) { 533 spdk_dma_free(task->src_iovs[i].iov_base); 534 } 535 } 536 free(task->src_iovs); 537 } 538 } else if (g_workload_selection == ACCEL_OPC_XOR) { 539 if (task->sources) { 540 for (i = 0; i < g_xor_src_count; i++) { 541 spdk_dma_free(task->sources[i]); 542 } 543 free(task->sources); 544 } 545 } else { 546 spdk_dma_free(task->src); 547 } 548 549 spdk_dma_free(task->dst); 550 if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_XOR) { 551 spdk_dma_free(task->dst2); 552 } 553 } 554 555 static int 556 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 557 { 558 uint32_t i; 559 uint32_t ttl_len = 0; 560 uint8_t *dst = (uint8_t *)_dst; 561 562 for (i = 0; i < iovcnt; i++) { 563 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 564 return -1; 565 } 566 dst += src_src_iovs[i].iov_len; 567 ttl_len += src_src_iovs[i].iov_len; 568 } 569 570 if (ttl_len != iovcnt * g_xfer_size_bytes) { 571 return -1; 572 } 573 574 return 0; 575 } 576 577 static int _worker_stop(void *arg); 578 579 static void 580 accel_done(void *arg1, int status) 581 { 582 struct ap_task *task = arg1; 583 struct worker_thread *worker = task->worker; 584 uint32_t sw_crc32c; 585 586 assert(worker); 587 assert(worker->current_queue_depth > 0); 588 589 if (g_verify && status == 0) { 590 switch (worker->workload) { 591 case ACCEL_OPC_COPY_CRC32C: 592 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 593 if (task->crc_dst != sw_crc32c) { 594 SPDK_NOTICELOG("CRC-32C miscompare\n"); 595 worker->xfer_failed++; 596 } 597 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 598 SPDK_NOTICELOG("Data miscompare\n"); 599 worker->xfer_failed++; 600 } 601 break; 602 case ACCEL_OPC_CRC32C: 603 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 604 if (task->crc_dst != sw_crc32c) { 605 SPDK_NOTICELOG("CRC-32C miscompare\n"); 606 worker->xfer_failed++; 607 } 608 break; 609 case ACCEL_OPC_COPY: 610 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 611 SPDK_NOTICELOG("Data miscompare\n"); 612 worker->xfer_failed++; 613 } 614 break; 615 case ACCEL_OPC_DUALCAST: 616 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 617 SPDK_NOTICELOG("Data miscompare, first destination\n"); 618 worker->xfer_failed++; 619 } 620 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 621 SPDK_NOTICELOG("Data miscompare, second destination\n"); 622 worker->xfer_failed++; 623 } 624 break; 625 case ACCEL_OPC_FILL: 626 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 627 SPDK_NOTICELOG("Data miscompare\n"); 628 worker->xfer_failed++; 629 } 630 break; 631 case ACCEL_OPC_COMPARE: 632 break; 633 case ACCEL_OPC_COMPRESS: 634 break; 635 case ACCEL_OPC_DECOMPRESS: 636 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 637 SPDK_NOTICELOG("Data miscompare on decompression\n"); 638 worker->xfer_failed++; 639 } 640 break; 641 case ACCEL_OPC_XOR: 642 if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count, 643 g_xfer_size_bytes) != 0) { 644 SPDK_ERRLOG("Failed to generate xor for verification\n"); 645 } else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) { 646 SPDK_NOTICELOG("Data miscompare\n"); 647 worker->xfer_failed++; 648 } 649 break; 650 default: 651 assert(false); 652 break; 653 } 654 } 655 656 if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 657 /* Advance the task to the next segment */ 658 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 659 if (task->cur_seg == NULL) { 660 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 661 } 662 } 663 664 if (task->expected_status == -EILSEQ) { 665 assert(status != 0); 666 worker->injected_miscompares++; 667 status = 0; 668 } else if (status) { 669 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 670 worker->xfer_failed++; 671 } 672 673 worker->current_queue_depth--; 674 675 if (!worker->is_draining && status == 0) { 676 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 677 task = _get_task(worker); 678 _submit_single(worker, task); 679 } else { 680 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 681 } 682 } 683 684 static int 685 dump_result(void) 686 { 687 uint64_t total_completed = 0; 688 uint64_t total_failed = 0; 689 uint64_t total_miscompared = 0; 690 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 691 struct worker_thread *worker = g_workers; 692 693 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 694 printf("------------------------------------------------------------------------\n"); 695 while (worker != NULL) { 696 697 uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec; 698 uint64_t bw_in_MiBps = worker->stats.num_bytes / 699 (g_time_in_sec * 1024 * 1024); 700 701 total_completed += worker->stats.executed; 702 total_failed += worker->xfer_failed; 703 total_miscompared += worker->injected_miscompares; 704 705 if (xfer_per_sec) { 706 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 707 worker->display.core, worker->display.thread, xfer_per_sec, 708 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 709 } 710 711 worker = worker->next; 712 } 713 714 total_xfer_per_sec = total_completed / g_time_in_sec; 715 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 716 (g_time_in_sec * 1024 * 1024); 717 718 printf("=========================================================================\n"); 719 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 720 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 721 722 return total_failed ? 1 : 0; 723 } 724 725 static inline void 726 _free_task_buffers_in_pool(struct worker_thread *worker) 727 { 728 struct ap_task *task; 729 730 assert(worker); 731 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 732 TAILQ_REMOVE(&worker->tasks_pool, task, link); 733 _free_task_buffers(task); 734 } 735 } 736 737 static int 738 _check_draining(void *arg) 739 { 740 struct worker_thread *worker = arg; 741 742 assert(worker); 743 744 if (worker->current_queue_depth == 0) { 745 _free_task_buffers_in_pool(worker); 746 spdk_poller_unregister(&worker->is_draining_poller); 747 unregister_worker(worker); 748 } 749 750 return SPDK_POLLER_BUSY; 751 } 752 753 static int 754 _worker_stop(void *arg) 755 { 756 struct worker_thread *worker = arg; 757 758 assert(worker); 759 760 spdk_poller_unregister(&worker->stop_poller); 761 762 /* now let the worker drain and check it's outstanding IO with a poller */ 763 worker->is_draining = true; 764 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 765 766 return SPDK_POLLER_BUSY; 767 } 768 769 static void 770 _init_thread(void *arg1) 771 { 772 struct worker_thread *worker; 773 struct ap_task *task; 774 int i, num_tasks = g_allocate_depth; 775 struct display_info *display = arg1; 776 777 worker = calloc(1, sizeof(*worker)); 778 if (worker == NULL) { 779 fprintf(stderr, "Unable to allocate worker\n"); 780 free(display); 781 return; 782 } 783 784 worker->workload = g_workload_selection; 785 worker->display.core = display->core; 786 worker->display.thread = display->thread; 787 free(display); 788 worker->core = spdk_env_get_current_core(); 789 worker->thread = spdk_get_thread(); 790 pthread_mutex_lock(&g_workers_lock); 791 g_num_workers++; 792 worker->next = g_workers; 793 g_workers = worker; 794 pthread_mutex_unlock(&g_workers_lock); 795 worker->ch = spdk_accel_get_io_channel(); 796 if (worker->ch == NULL) { 797 fprintf(stderr, "Unable to get an accel channel\n"); 798 goto error; 799 } 800 801 TAILQ_INIT(&worker->tasks_pool); 802 803 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 804 if (worker->task_base == NULL) { 805 fprintf(stderr, "Could not allocate task base.\n"); 806 goto error; 807 } 808 809 task = worker->task_base; 810 for (i = 0; i < num_tasks; i++) { 811 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 812 task->worker = worker; 813 if (_get_task_data_bufs(task)) { 814 fprintf(stderr, "Unable to get data bufs\n"); 815 goto error; 816 } 817 task++; 818 } 819 820 /* Register a poller that will stop the worker at time elapsed */ 821 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 822 g_time_in_sec * 1000000ULL); 823 824 /* Load up queue depth worth of operations. */ 825 for (i = 0; i < g_queue_depth; i++) { 826 task = _get_task(worker); 827 if (task == NULL) { 828 goto error; 829 } 830 831 _submit_single(worker, task); 832 } 833 return; 834 error: 835 836 _free_task_buffers_in_pool(worker); 837 free(worker->task_base); 838 spdk_app_stop(-1); 839 } 840 841 static void 842 accel_perf_start(void *arg1) 843 { 844 struct spdk_cpuset tmp_cpumask = {}; 845 char thread_name[32]; 846 uint32_t i; 847 int j; 848 struct spdk_thread *thread; 849 struct display_info *display; 850 851 g_tsc_rate = spdk_get_ticks_hz(); 852 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 853 854 dump_user_config(); 855 856 printf("Running for %d seconds...\n", g_time_in_sec); 857 fflush(stdout); 858 859 /* Create worker threads for each core that was specified. */ 860 SPDK_ENV_FOREACH_CORE(i) { 861 for (j = 0; j < g_threads_per_core; j++) { 862 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 863 spdk_cpuset_zero(&tmp_cpumask); 864 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 865 thread = spdk_thread_create(thread_name, &tmp_cpumask); 866 display = calloc(1, sizeof(*display)); 867 if (display == NULL) { 868 fprintf(stderr, "Unable to allocate memory\n"); 869 spdk_app_stop(-1); 870 return; 871 } 872 display->core = i; 873 display->thread = j; 874 spdk_thread_send_msg(thread, _init_thread, display); 875 } 876 } 877 } 878 879 static void 880 accel_perf_free_compress_segs(void) 881 { 882 struct ap_compress_seg *seg, *tmp; 883 884 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 885 free(seg->uncompressed_iovs); 886 free(seg->compressed_iovs); 887 spdk_dma_free(seg->compressed_data); 888 spdk_dma_free(seg->uncompressed_data); 889 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 890 free(seg); 891 } 892 } 893 894 struct accel_perf_prep_ctx { 895 FILE *file; 896 long remaining; 897 struct spdk_io_channel *ch; 898 struct ap_compress_seg *cur_seg; 899 }; 900 901 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 902 903 static void 904 accel_perf_prep_process_seg_cpl(void *ref, int status) 905 { 906 struct accel_perf_prep_ctx *ctx = ref; 907 struct ap_compress_seg *seg; 908 909 if (status != 0) { 910 fprintf(stderr, "error (%d) on initial compress completion\n", status); 911 spdk_dma_free(ctx->cur_seg->compressed_data); 912 spdk_dma_free(ctx->cur_seg->uncompressed_data); 913 free(ctx->cur_seg); 914 spdk_put_io_channel(ctx->ch); 915 fclose(ctx->file); 916 free(ctx); 917 spdk_app_stop(-status); 918 return; 919 } 920 921 seg = ctx->cur_seg; 922 923 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 924 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 925 if (seg->compressed_iovs == NULL) { 926 fprintf(stderr, "unable to allocate iovec\n"); 927 spdk_dma_free(seg->compressed_data); 928 spdk_dma_free(seg->uncompressed_data); 929 free(seg); 930 spdk_put_io_channel(ctx->ch); 931 fclose(ctx->file); 932 free(ctx); 933 spdk_app_stop(-ENOMEM); 934 return; 935 } 936 seg->compressed_iovcnt = g_chained_count; 937 938 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 939 seg->compressed_iovcnt); 940 } 941 942 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 943 ctx->remaining -= seg->uncompressed_len; 944 945 accel_perf_prep_process_seg(ctx); 946 } 947 948 static void 949 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 950 { 951 struct ap_compress_seg *seg; 952 int sz, sz_read, sz_padded; 953 void *ubuf, *cbuf; 954 struct iovec iov[1]; 955 int rc; 956 957 if (ctx->remaining == 0) { 958 spdk_put_io_channel(ctx->ch); 959 fclose(ctx->file); 960 free(ctx); 961 accel_perf_start(NULL); 962 return; 963 } 964 965 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 966 /* Add 10% pad to the compress buffer for incompressible data. Note that a real app 967 * would likely either deal with the failure of not having a large enough buffer 968 * by submitting another operation with a larger one. Or, like the vbdev module 969 * does, just accept the error and use the data uncompressed marking it as such in 970 * its own metadata so that in the future it doesn't try to decompress uncompressed 971 * data, etc. 972 */ 973 sz_padded = sz * COMP_BUF_PAD_PERCENTAGE; 974 975 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 976 if (!ubuf) { 977 fprintf(stderr, "unable to allocate uncompress buffer\n"); 978 rc = -ENOMEM; 979 goto error; 980 } 981 982 cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL); 983 if (!cbuf) { 984 fprintf(stderr, "unable to allocate compress buffer\n"); 985 rc = -ENOMEM; 986 spdk_dma_free(ubuf); 987 goto error; 988 } 989 990 seg = calloc(1, sizeof(*seg)); 991 if (!seg) { 992 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 993 spdk_dma_free(ubuf); 994 spdk_dma_free(cbuf); 995 rc = -ENOMEM; 996 goto error; 997 } 998 999 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 1000 if (sz_read != sz) { 1001 fprintf(stderr, "unable to read input file\n"); 1002 free(seg); 1003 spdk_dma_free(ubuf); 1004 spdk_dma_free(cbuf); 1005 rc = -errno; 1006 goto error; 1007 } 1008 1009 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 1010 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1011 if (seg->uncompressed_iovs == NULL) { 1012 fprintf(stderr, "unable to allocate iovec\n"); 1013 free(seg); 1014 spdk_dma_free(ubuf); 1015 spdk_dma_free(cbuf); 1016 rc = -ENOMEM; 1017 goto error; 1018 } 1019 seg->uncompressed_iovcnt = g_chained_count; 1020 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 1021 } 1022 1023 seg->uncompressed_data = ubuf; 1024 seg->uncompressed_len = sz; 1025 seg->compressed_data = cbuf; 1026 seg->compressed_len = sz; 1027 seg->compressed_len_padded = sz_padded; 1028 1029 ctx->cur_seg = seg; 1030 iov[0].iov_base = seg->uncompressed_data; 1031 iov[0].iov_len = seg->uncompressed_len; 1032 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 1033 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 1034 * to hold the compressed data. This example app simply adds 10% buffer for compressed data 1035 * but real applications may want to consider a more sophisticated method. 1036 */ 1037 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1, 1038 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 1039 if (rc < 0) { 1040 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 1041 goto error; 1042 } 1043 1044 return; 1045 1046 error: 1047 spdk_put_io_channel(ctx->ch); 1048 fclose(ctx->file); 1049 free(ctx); 1050 spdk_app_stop(rc); 1051 } 1052 1053 static void 1054 accel_perf_prep(void *arg1) 1055 { 1056 struct accel_perf_prep_ctx *ctx; 1057 int rc = 0; 1058 1059 if (g_workload_selection != ACCEL_OPC_COMPRESS && 1060 g_workload_selection != ACCEL_OPC_DECOMPRESS) { 1061 accel_perf_start(arg1); 1062 return; 1063 } 1064 1065 if (g_cd_file_in_name == NULL) { 1066 fprintf(stdout, "A filename is required.\n"); 1067 rc = -EINVAL; 1068 goto error_end; 1069 } 1070 1071 if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) { 1072 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1073 rc = -ENOTSUP; 1074 goto error_end; 1075 } 1076 1077 printf("Preparing input file...\n"); 1078 1079 ctx = calloc(1, sizeof(*ctx)); 1080 if (ctx == NULL) { 1081 rc = -ENOMEM; 1082 goto error_end; 1083 } 1084 1085 ctx->file = fopen(g_cd_file_in_name, "r"); 1086 if (ctx->file == NULL) { 1087 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1088 rc = -errno; 1089 goto error_ctx; 1090 } 1091 1092 fseek(ctx->file, 0L, SEEK_END); 1093 ctx->remaining = ftell(ctx->file); 1094 fseek(ctx->file, 0L, SEEK_SET); 1095 1096 ctx->ch = spdk_accel_get_io_channel(); 1097 if (ctx->ch == NULL) { 1098 rc = -EAGAIN; 1099 goto error_file; 1100 } 1101 1102 if (g_xfer_size_bytes == 0) { 1103 /* size of 0 means "file at a time" */ 1104 g_xfer_size_bytes = ctx->remaining; 1105 } 1106 1107 accel_perf_prep_process_seg(ctx); 1108 return; 1109 1110 error_file: 1111 fclose(ctx->file); 1112 error_ctx: 1113 free(ctx); 1114 error_end: 1115 spdk_app_stop(rc); 1116 } 1117 1118 static void 1119 worker_shutdown(void *ctx) 1120 { 1121 _worker_stop(ctx); 1122 } 1123 1124 static void 1125 shutdown_cb(void) 1126 { 1127 struct worker_thread *worker; 1128 1129 pthread_mutex_lock(&g_workers_lock); 1130 worker = g_workers; 1131 while (worker) { 1132 spdk_thread_send_msg(worker->thread, worker_shutdown, worker); 1133 worker = worker->next; 1134 } 1135 pthread_mutex_unlock(&g_workers_lock); 1136 } 1137 1138 int 1139 main(int argc, char **argv) 1140 { 1141 struct worker_thread *worker, *tmp; 1142 1143 pthread_mutex_init(&g_workers_lock, NULL); 1144 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1145 g_opts.name = "accel_perf"; 1146 g_opts.reactor_mask = "0x1"; 1147 g_opts.shutdown_cb = shutdown_cb; 1148 if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:x:", NULL, parse_args, 1149 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 1150 g_rc = -1; 1151 goto cleanup; 1152 } 1153 1154 if ((g_workload_selection != ACCEL_OPC_COPY) && 1155 (g_workload_selection != ACCEL_OPC_FILL) && 1156 (g_workload_selection != ACCEL_OPC_CRC32C) && 1157 (g_workload_selection != ACCEL_OPC_COPY_CRC32C) && 1158 (g_workload_selection != ACCEL_OPC_COMPARE) && 1159 (g_workload_selection != ACCEL_OPC_COMPRESS) && 1160 (g_workload_selection != ACCEL_OPC_DECOMPRESS) && 1161 (g_workload_selection != ACCEL_OPC_DUALCAST) && 1162 (g_workload_selection != ACCEL_OPC_XOR)) { 1163 usage(); 1164 g_rc = -1; 1165 goto cleanup; 1166 } 1167 1168 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1169 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1170 usage(); 1171 g_rc = -1; 1172 goto cleanup; 1173 } 1174 1175 if (g_allocate_depth == 0) { 1176 g_allocate_depth = g_queue_depth; 1177 } 1178 1179 if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) && 1180 g_chained_count == 0) { 1181 usage(); 1182 g_rc = -1; 1183 goto cleanup; 1184 } 1185 1186 if (g_workload_selection == ACCEL_OPC_XOR && g_xor_src_count < 2) { 1187 usage(); 1188 g_rc = -1; 1189 goto cleanup; 1190 } 1191 1192 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1193 if (g_rc) { 1194 SPDK_ERRLOG("ERROR starting application\n"); 1195 } 1196 1197 pthread_mutex_destroy(&g_workers_lock); 1198 1199 worker = g_workers; 1200 while (worker) { 1201 tmp = worker->next; 1202 free(worker); 1203 worker = tmp; 1204 } 1205 cleanup: 1206 accel_perf_free_compress_segs(); 1207 spdk_app_fini(); 1208 return g_rc; 1209 } 1210