1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 #include "spdk/xor.h" 16 17 #define DATA_PATTERN 0x5a 18 #define ALIGN_4K 0x1000 19 #define COMP_BUF_PAD_PERCENTAGE 1.1L 20 21 static uint64_t g_tsc_rate; 22 static uint64_t g_tsc_end; 23 static int g_rc; 24 static int g_xfer_size_bytes = 4096; 25 static int g_queue_depth = 32; 26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 27 * be at least as much as the queue depth. 28 */ 29 static int g_allocate_depth = 0; 30 static int g_threads_per_core = 1; 31 static int g_time_in_sec = 5; 32 static uint32_t g_crc32c_seed = 0; 33 static uint32_t g_chained_count = 1; 34 static int g_fail_percent_goal = 0; 35 static uint8_t g_fill_pattern = 255; 36 static uint32_t g_xor_src_count = 2; 37 static bool g_verify = false; 38 static const char *g_workload_type = NULL; 39 static enum accel_opcode g_workload_selection; 40 static struct worker_thread *g_workers = NULL; 41 static int g_num_workers = 0; 42 static char *g_cd_file_in_name = NULL; 43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 44 static struct spdk_app_opts g_opts = {}; 45 46 struct ap_compress_seg { 47 void *uncompressed_data; 48 uint32_t uncompressed_len; 49 struct iovec *uncompressed_iovs; 50 uint32_t uncompressed_iovcnt; 51 52 void *compressed_data; 53 uint32_t compressed_len; 54 uint32_t compressed_len_padded; 55 struct iovec *compressed_iovs; 56 uint32_t compressed_iovcnt; 57 58 STAILQ_ENTRY(ap_compress_seg) link; 59 }; 60 61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 62 63 struct worker_thread; 64 static void accel_done(void *ref, int status); 65 66 struct display_info { 67 int core; 68 int thread; 69 }; 70 71 struct ap_task { 72 void *src; 73 struct iovec *src_iovs; 74 uint32_t src_iovcnt; 75 void **sources; 76 struct iovec *dst_iovs; 77 uint32_t dst_iovcnt; 78 void *dst; 79 void *dst2; 80 uint32_t crc_dst; 81 uint32_t compressed_sz; 82 struct ap_compress_seg *cur_seg; 83 struct worker_thread *worker; 84 int expected_status; /* used for the compare operation */ 85 TAILQ_ENTRY(ap_task) link; 86 }; 87 88 struct worker_thread { 89 struct spdk_io_channel *ch; 90 uint64_t xfer_completed; 91 uint64_t xfer_failed; 92 uint64_t injected_miscompares; 93 uint64_t current_queue_depth; 94 TAILQ_HEAD(, ap_task) tasks_pool; 95 struct worker_thread *next; 96 unsigned core; 97 struct spdk_thread *thread; 98 bool is_draining; 99 struct spdk_poller *is_draining_poller; 100 struct spdk_poller *stop_poller; 101 void *task_base; 102 struct display_info display; 103 enum accel_opcode workload; 104 }; 105 106 static void 107 dump_user_config(void) 108 { 109 const char *module_name = NULL; 110 int rc; 111 112 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 113 if (rc) { 114 printf("error getting module name (%d)\n", rc); 115 } 116 117 printf("\nSPDK Configuration:\n"); 118 printf("Core mask: %s\n\n", g_opts.reactor_mask); 119 printf("Accel Perf Configuration:\n"); 120 printf("Workload Type: %s\n", g_workload_type); 121 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 122 printf("CRC-32C seed: %u\n", g_crc32c_seed); 123 } else if (g_workload_selection == ACCEL_OPC_FILL) { 124 printf("Fill pattern: 0x%x\n", g_fill_pattern); 125 } else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 126 printf("Failure inject: %u percent\n", g_fail_percent_goal); 127 } else if (g_workload_selection == ACCEL_OPC_XOR) { 128 printf("Source buffers: %u\n", g_xor_src_count); 129 } 130 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 131 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 132 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 133 } else { 134 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 135 } 136 printf("vector count %u\n", g_chained_count); 137 printf("Module: %s\n", module_name); 138 if (g_workload_selection == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 139 printf("File Name: %s\n", g_cd_file_in_name); 140 } 141 printf("Queue depth: %u\n", g_queue_depth); 142 printf("Allocate depth: %u\n", g_allocate_depth); 143 printf("# threads/core: %u\n", g_threads_per_core); 144 printf("Run time: %u seconds\n", g_time_in_sec); 145 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 146 } 147 148 static void 149 usage(void) 150 { 151 printf("accel_perf options:\n"); 152 printf("\t[-h help message]\n"); 153 printf("\t[-q queue depth per core]\n"); 154 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 155 printf("\t[-T number of threads per core\n"); 156 printf("\t[-n number of channels]\n"); 157 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 158 printf("\t[-t time in seconds]\n"); 159 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n"); 160 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 161 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 162 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 163 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 164 printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n"); 165 printf("\t[-y verify result if this switch is on]\n"); 166 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 167 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 168 } 169 170 static int 171 parse_args(int argc, char *argv) 172 { 173 int argval = 0; 174 175 switch (argc) { 176 case 'a': 177 case 'C': 178 case 'f': 179 case 'T': 180 case 'o': 181 case 'P': 182 case 'q': 183 case 's': 184 case 't': 185 case 'x': 186 argval = spdk_strtol(optarg, 10); 187 if (argval < 0) { 188 fprintf(stderr, "-%c option must be non-negative.\n", argc); 189 usage(); 190 return 1; 191 } 192 break; 193 default: 194 break; 195 }; 196 197 switch (argc) { 198 case 'a': 199 g_allocate_depth = argval; 200 break; 201 case 'C': 202 g_chained_count = argval; 203 break; 204 case 'l': 205 g_cd_file_in_name = optarg; 206 break; 207 case 'f': 208 g_fill_pattern = (uint8_t)argval; 209 break; 210 case 'T': 211 g_threads_per_core = argval; 212 break; 213 case 'o': 214 g_xfer_size_bytes = argval; 215 break; 216 case 'P': 217 g_fail_percent_goal = argval; 218 break; 219 case 'q': 220 g_queue_depth = argval; 221 break; 222 case 's': 223 g_crc32c_seed = argval; 224 break; 225 case 't': 226 g_time_in_sec = argval; 227 break; 228 case 'x': 229 g_xor_src_count = argval; 230 break; 231 case 'y': 232 g_verify = true; 233 break; 234 case 'w': 235 g_workload_type = optarg; 236 if (!strcmp(g_workload_type, "copy")) { 237 g_workload_selection = ACCEL_OPC_COPY; 238 } else if (!strcmp(g_workload_type, "fill")) { 239 g_workload_selection = ACCEL_OPC_FILL; 240 } else if (!strcmp(g_workload_type, "crc32c")) { 241 g_workload_selection = ACCEL_OPC_CRC32C; 242 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 243 g_workload_selection = ACCEL_OPC_COPY_CRC32C; 244 } else if (!strcmp(g_workload_type, "compare")) { 245 g_workload_selection = ACCEL_OPC_COMPARE; 246 } else if (!strcmp(g_workload_type, "dualcast")) { 247 g_workload_selection = ACCEL_OPC_DUALCAST; 248 } else if (!strcmp(g_workload_type, "compress")) { 249 g_workload_selection = ACCEL_OPC_COMPRESS; 250 } else if (!strcmp(g_workload_type, "decompress")) { 251 g_workload_selection = ACCEL_OPC_DECOMPRESS; 252 } else if (!strcmp(g_workload_type, "xor")) { 253 g_workload_selection = ACCEL_OPC_XOR; 254 } else { 255 usage(); 256 return 1; 257 } 258 break; 259 default: 260 usage(); 261 return 1; 262 } 263 264 return 0; 265 } 266 267 static int dump_result(void); 268 static void 269 unregister_worker(void *arg1) 270 { 271 struct worker_thread *worker = arg1; 272 273 free(worker->task_base); 274 spdk_put_io_channel(worker->ch); 275 spdk_thread_exit(spdk_get_thread()); 276 pthread_mutex_lock(&g_workers_lock); 277 assert(g_num_workers >= 1); 278 if (--g_num_workers == 0) { 279 pthread_mutex_unlock(&g_workers_lock); 280 g_rc = dump_result(); 281 spdk_app_stop(0); 282 } else { 283 pthread_mutex_unlock(&g_workers_lock); 284 } 285 } 286 287 static void 288 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 289 { 290 uint64_t ele_size; 291 uint8_t *data; 292 uint32_t i; 293 294 ele_size = spdk_divide_round_up(sz, iovcnt); 295 296 data = buf; 297 for (i = 0; i < iovcnt; i++) { 298 ele_size = spdk_min(ele_size, sz); 299 assert(ele_size > 0); 300 301 iovs[i].iov_base = data; 302 iovs[i].iov_len = ele_size; 303 304 data += ele_size; 305 sz -= ele_size; 306 } 307 assert(sz == 0); 308 } 309 310 static int 311 _get_task_data_bufs(struct ap_task *task) 312 { 313 uint32_t align = 0; 314 uint32_t i = 0; 315 int dst_buff_len = g_xfer_size_bytes; 316 317 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 318 * we do this for all modules to keep it simple. 319 */ 320 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 321 align = ALIGN_4K; 322 } 323 324 if (g_workload_selection == ACCEL_OPC_COMPRESS || 325 g_workload_selection == ACCEL_OPC_DECOMPRESS) { 326 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 327 328 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 329 dst_buff_len = task->cur_seg->compressed_len_padded; 330 } 331 332 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 333 if (task->dst == NULL) { 334 fprintf(stderr, "Unable to alloc dst buffer\n"); 335 return -ENOMEM; 336 } 337 338 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 339 if (!task->dst_iovs) { 340 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 341 return -ENOMEM; 342 } 343 task->dst_iovcnt = g_chained_count; 344 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 345 346 return 0; 347 } 348 349 if (g_workload_selection == ACCEL_OPC_CRC32C || 350 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 351 assert(g_chained_count > 0); 352 task->src_iovcnt = g_chained_count; 353 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 354 if (!task->src_iovs) { 355 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 356 return -ENOMEM; 357 } 358 359 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 360 dst_buff_len = g_xfer_size_bytes * g_chained_count; 361 } 362 363 for (i = 0; i < task->src_iovcnt; i++) { 364 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 365 if (task->src_iovs[i].iov_base == NULL) { 366 return -ENOMEM; 367 } 368 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 369 task->src_iovs[i].iov_len = g_xfer_size_bytes; 370 } 371 } else if (g_workload_selection == ACCEL_OPC_XOR) { 372 assert(g_xor_src_count > 1); 373 task->sources = calloc(g_xor_src_count, sizeof(*task->sources)); 374 if (!task->sources) { 375 return -ENOMEM; 376 } 377 378 for (i = 0; i < g_xor_src_count; i++) { 379 task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 380 if (!task->sources[i]) { 381 return -ENOMEM; 382 } 383 memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes); 384 } 385 } else { 386 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 387 if (task->src == NULL) { 388 fprintf(stderr, "Unable to alloc src buffer\n"); 389 return -ENOMEM; 390 } 391 392 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 393 if (g_workload_selection == ACCEL_OPC_FILL) { 394 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 395 } else { 396 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 397 } 398 } 399 400 if (g_workload_selection != ACCEL_OPC_CRC32C) { 401 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 402 if (task->dst == NULL) { 403 fprintf(stderr, "Unable to alloc dst buffer\n"); 404 return -ENOMEM; 405 } 406 407 /* For compare we want the buffers to match, otherwise not. */ 408 if (g_workload_selection == ACCEL_OPC_COMPARE) { 409 memset(task->dst, DATA_PATTERN, dst_buff_len); 410 } else { 411 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 412 } 413 } 414 415 /* For dualcast 2 buffers are needed for the operation. */ 416 if (g_workload_selection == ACCEL_OPC_DUALCAST || 417 (g_workload_selection == ACCEL_OPC_XOR && g_verify)) { 418 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 419 if (task->dst2 == NULL) { 420 fprintf(stderr, "Unable to alloc dst buffer\n"); 421 return -ENOMEM; 422 } 423 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 424 } 425 426 return 0; 427 } 428 429 inline static struct ap_task * 430 _get_task(struct worker_thread *worker) 431 { 432 struct ap_task *task; 433 434 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 435 task = TAILQ_FIRST(&worker->tasks_pool); 436 TAILQ_REMOVE(&worker->tasks_pool, task, link); 437 } else { 438 fprintf(stderr, "Unable to get ap_task\n"); 439 return NULL; 440 } 441 442 return task; 443 } 444 445 /* Submit one operation using the same ap task that just completed. */ 446 static void 447 _submit_single(struct worker_thread *worker, struct ap_task *task) 448 { 449 int random_num; 450 int rc = 0; 451 int flags = 0; 452 453 assert(worker); 454 455 switch (worker->workload) { 456 case ACCEL_OPC_COPY: 457 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 458 g_xfer_size_bytes, flags, accel_done, task); 459 break; 460 case ACCEL_OPC_FILL: 461 /* For fill use the first byte of the task->dst buffer */ 462 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 463 g_xfer_size_bytes, flags, accel_done, task); 464 break; 465 case ACCEL_OPC_CRC32C: 466 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 467 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 468 accel_done, task); 469 break; 470 case ACCEL_OPC_COPY_CRC32C: 471 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 472 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 473 break; 474 case ACCEL_OPC_COMPARE: 475 random_num = rand() % 100; 476 if (random_num < g_fail_percent_goal) { 477 task->expected_status = -EILSEQ; 478 *(uint8_t *)task->dst = ~DATA_PATTERN; 479 } else { 480 task->expected_status = 0; 481 *(uint8_t *)task->dst = DATA_PATTERN; 482 } 483 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 484 g_xfer_size_bytes, accel_done, task); 485 break; 486 case ACCEL_OPC_DUALCAST: 487 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 488 task->src, g_xfer_size_bytes, flags, accel_done, task); 489 break; 490 case ACCEL_OPC_COMPRESS: 491 task->src_iovs = task->cur_seg->uncompressed_iovs; 492 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 493 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded, 494 task->src_iovs, 495 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 496 break; 497 case ACCEL_OPC_DECOMPRESS: 498 task->src_iovs = task->cur_seg->compressed_iovs; 499 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 500 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 501 task->src_iovcnt, NULL, flags, accel_done, task); 502 break; 503 case ACCEL_OPC_XOR: 504 rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count, 505 g_xfer_size_bytes, accel_done, task); 506 break; 507 default: 508 assert(false); 509 break; 510 511 } 512 513 worker->current_queue_depth++; 514 if (rc) { 515 accel_done(task, rc); 516 } 517 } 518 519 static void 520 _free_task_buffers(struct ap_task *task) 521 { 522 uint32_t i; 523 524 if (g_workload_selection == ACCEL_OPC_DECOMPRESS || g_workload_selection == ACCEL_OPC_COMPRESS) { 525 free(task->dst_iovs); 526 } else if (g_workload_selection == ACCEL_OPC_CRC32C || 527 g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 528 if (task->src_iovs) { 529 for (i = 0; i < task->src_iovcnt; i++) { 530 if (task->src_iovs[i].iov_base) { 531 spdk_dma_free(task->src_iovs[i].iov_base); 532 } 533 } 534 free(task->src_iovs); 535 } 536 } else if (g_workload_selection == ACCEL_OPC_XOR) { 537 if (task->sources) { 538 for (i = 0; i < g_xor_src_count; i++) { 539 spdk_dma_free(task->sources[i]); 540 } 541 free(task->sources); 542 } 543 } else { 544 spdk_dma_free(task->src); 545 } 546 547 spdk_dma_free(task->dst); 548 if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_XOR) { 549 spdk_dma_free(task->dst2); 550 } 551 } 552 553 static int 554 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 555 { 556 uint32_t i; 557 uint32_t ttl_len = 0; 558 uint8_t *dst = (uint8_t *)_dst; 559 560 for (i = 0; i < iovcnt; i++) { 561 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 562 return -1; 563 } 564 dst += src_src_iovs[i].iov_len; 565 ttl_len += src_src_iovs[i].iov_len; 566 } 567 568 if (ttl_len != iovcnt * g_xfer_size_bytes) { 569 return -1; 570 } 571 572 return 0; 573 } 574 575 static int _worker_stop(void *arg); 576 577 static void 578 accel_done(void *arg1, int status) 579 { 580 struct ap_task *task = arg1; 581 struct worker_thread *worker = task->worker; 582 uint32_t sw_crc32c; 583 584 assert(worker); 585 assert(worker->current_queue_depth > 0); 586 587 if (g_verify && status == 0) { 588 switch (worker->workload) { 589 case ACCEL_OPC_COPY_CRC32C: 590 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 591 if (task->crc_dst != sw_crc32c) { 592 SPDK_NOTICELOG("CRC-32C miscompare\n"); 593 worker->xfer_failed++; 594 } 595 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 596 SPDK_NOTICELOG("Data miscompare\n"); 597 worker->xfer_failed++; 598 } 599 break; 600 case ACCEL_OPC_CRC32C: 601 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 602 if (task->crc_dst != sw_crc32c) { 603 SPDK_NOTICELOG("CRC-32C miscompare\n"); 604 worker->xfer_failed++; 605 } 606 break; 607 case ACCEL_OPC_COPY: 608 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 609 SPDK_NOTICELOG("Data miscompare\n"); 610 worker->xfer_failed++; 611 } 612 break; 613 case ACCEL_OPC_DUALCAST: 614 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 615 SPDK_NOTICELOG("Data miscompare, first destination\n"); 616 worker->xfer_failed++; 617 } 618 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 619 SPDK_NOTICELOG("Data miscompare, second destination\n"); 620 worker->xfer_failed++; 621 } 622 break; 623 case ACCEL_OPC_FILL: 624 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 625 SPDK_NOTICELOG("Data miscompare\n"); 626 worker->xfer_failed++; 627 } 628 break; 629 case ACCEL_OPC_COMPARE: 630 break; 631 case ACCEL_OPC_COMPRESS: 632 break; 633 case ACCEL_OPC_DECOMPRESS: 634 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 635 SPDK_NOTICELOG("Data miscompare on decompression\n"); 636 worker->xfer_failed++; 637 } 638 break; 639 case ACCEL_OPC_XOR: 640 if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count, 641 g_xfer_size_bytes) != 0) { 642 SPDK_ERRLOG("Failed to generate xor for verification\n"); 643 } else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) { 644 SPDK_NOTICELOG("Data miscompare\n"); 645 worker->xfer_failed++; 646 } 647 break; 648 default: 649 assert(false); 650 break; 651 } 652 } 653 654 if (worker->workload == ACCEL_OPC_COMPRESS || g_workload_selection == ACCEL_OPC_DECOMPRESS) { 655 /* Advance the task to the next segment */ 656 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 657 if (task->cur_seg == NULL) { 658 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 659 } 660 } 661 662 if (task->expected_status == -EILSEQ) { 663 assert(status != 0); 664 worker->injected_miscompares++; 665 status = 0; 666 } else if (status) { 667 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 668 worker->xfer_failed++; 669 } 670 671 worker->xfer_completed++; 672 worker->current_queue_depth--; 673 674 if (!worker->is_draining && status == 0) { 675 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 676 task = _get_task(worker); 677 _submit_single(worker, task); 678 } else { 679 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 680 } 681 } 682 683 static int 684 dump_result(void) 685 { 686 uint64_t total_completed = 0; 687 uint64_t total_failed = 0; 688 uint64_t total_miscompared = 0; 689 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 690 struct worker_thread *worker = g_workers; 691 692 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 693 printf("------------------------------------------------------------------------\n"); 694 while (worker != NULL) { 695 696 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 697 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 698 (g_time_in_sec * 1024 * 1024); 699 700 total_completed += worker->xfer_completed; 701 total_failed += worker->xfer_failed; 702 total_miscompared += worker->injected_miscompares; 703 704 if (xfer_per_sec) { 705 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 706 worker->display.core, worker->display.thread, xfer_per_sec, 707 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 708 } 709 710 worker = worker->next; 711 } 712 713 total_xfer_per_sec = total_completed / g_time_in_sec; 714 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 715 (g_time_in_sec * 1024 * 1024); 716 717 printf("=========================================================================\n"); 718 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 719 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 720 721 return total_failed ? 1 : 0; 722 } 723 724 static inline void 725 _free_task_buffers_in_pool(struct worker_thread *worker) 726 { 727 struct ap_task *task; 728 729 assert(worker); 730 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 731 TAILQ_REMOVE(&worker->tasks_pool, task, link); 732 _free_task_buffers(task); 733 } 734 } 735 736 static int 737 _check_draining(void *arg) 738 { 739 struct worker_thread *worker = arg; 740 741 assert(worker); 742 743 if (worker->current_queue_depth == 0) { 744 _free_task_buffers_in_pool(worker); 745 spdk_poller_unregister(&worker->is_draining_poller); 746 unregister_worker(worker); 747 } 748 749 return SPDK_POLLER_BUSY; 750 } 751 752 static int 753 _worker_stop(void *arg) 754 { 755 struct worker_thread *worker = arg; 756 757 assert(worker); 758 759 spdk_poller_unregister(&worker->stop_poller); 760 761 /* now let the worker drain and check it's outstanding IO with a poller */ 762 worker->is_draining = true; 763 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 764 765 return SPDK_POLLER_BUSY; 766 } 767 768 static void 769 _init_thread(void *arg1) 770 { 771 struct worker_thread *worker; 772 struct ap_task *task; 773 int i, num_tasks = g_allocate_depth; 774 struct display_info *display = arg1; 775 776 worker = calloc(1, sizeof(*worker)); 777 if (worker == NULL) { 778 fprintf(stderr, "Unable to allocate worker\n"); 779 free(display); 780 return; 781 } 782 783 worker->workload = g_workload_selection; 784 worker->display.core = display->core; 785 worker->display.thread = display->thread; 786 free(display); 787 worker->core = spdk_env_get_current_core(); 788 worker->thread = spdk_get_thread(); 789 pthread_mutex_lock(&g_workers_lock); 790 g_num_workers++; 791 worker->next = g_workers; 792 g_workers = worker; 793 pthread_mutex_unlock(&g_workers_lock); 794 worker->ch = spdk_accel_get_io_channel(); 795 if (worker->ch == NULL) { 796 fprintf(stderr, "Unable to get an accel channel\n"); 797 goto error; 798 } 799 800 TAILQ_INIT(&worker->tasks_pool); 801 802 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 803 if (worker->task_base == NULL) { 804 fprintf(stderr, "Could not allocate task base.\n"); 805 goto error; 806 } 807 808 task = worker->task_base; 809 for (i = 0; i < num_tasks; i++) { 810 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 811 task->worker = worker; 812 if (_get_task_data_bufs(task)) { 813 fprintf(stderr, "Unable to get data bufs\n"); 814 goto error; 815 } 816 task++; 817 } 818 819 /* Register a poller that will stop the worker at time elapsed */ 820 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 821 g_time_in_sec * 1000000ULL); 822 823 /* Load up queue depth worth of operations. */ 824 for (i = 0; i < g_queue_depth; i++) { 825 task = _get_task(worker); 826 if (task == NULL) { 827 goto error; 828 } 829 830 _submit_single(worker, task); 831 } 832 return; 833 error: 834 835 _free_task_buffers_in_pool(worker); 836 free(worker->task_base); 837 spdk_app_stop(-1); 838 } 839 840 static void 841 accel_perf_start(void *arg1) 842 { 843 struct spdk_cpuset tmp_cpumask = {}; 844 char thread_name[32]; 845 uint32_t i; 846 int j; 847 struct spdk_thread *thread; 848 struct display_info *display; 849 850 g_tsc_rate = spdk_get_ticks_hz(); 851 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 852 853 dump_user_config(); 854 855 printf("Running for %d seconds...\n", g_time_in_sec); 856 fflush(stdout); 857 858 /* Create worker threads for each core that was specified. */ 859 SPDK_ENV_FOREACH_CORE(i) { 860 for (j = 0; j < g_threads_per_core; j++) { 861 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 862 spdk_cpuset_zero(&tmp_cpumask); 863 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 864 thread = spdk_thread_create(thread_name, &tmp_cpumask); 865 display = calloc(1, sizeof(*display)); 866 if (display == NULL) { 867 fprintf(stderr, "Unable to allocate memory\n"); 868 spdk_app_stop(-1); 869 return; 870 } 871 display->core = i; 872 display->thread = j; 873 spdk_thread_send_msg(thread, _init_thread, display); 874 } 875 } 876 } 877 878 static void 879 accel_perf_free_compress_segs(void) 880 { 881 struct ap_compress_seg *seg, *tmp; 882 883 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 884 free(seg->uncompressed_iovs); 885 free(seg->compressed_iovs); 886 spdk_dma_free(seg->compressed_data); 887 spdk_dma_free(seg->uncompressed_data); 888 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 889 free(seg); 890 } 891 } 892 893 struct accel_perf_prep_ctx { 894 FILE *file; 895 long remaining; 896 struct spdk_io_channel *ch; 897 struct ap_compress_seg *cur_seg; 898 }; 899 900 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 901 902 static void 903 accel_perf_prep_process_seg_cpl(void *ref, int status) 904 { 905 struct accel_perf_prep_ctx *ctx = ref; 906 struct ap_compress_seg *seg; 907 908 if (status != 0) { 909 fprintf(stderr, "error (%d) on initial compress completion\n", status); 910 spdk_dma_free(ctx->cur_seg->compressed_data); 911 spdk_dma_free(ctx->cur_seg->uncompressed_data); 912 free(ctx->cur_seg); 913 spdk_put_io_channel(ctx->ch); 914 fclose(ctx->file); 915 free(ctx); 916 spdk_app_stop(-status); 917 return; 918 } 919 920 seg = ctx->cur_seg; 921 922 if (g_workload_selection == ACCEL_OPC_DECOMPRESS) { 923 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 924 if (seg->compressed_iovs == NULL) { 925 fprintf(stderr, "unable to allocate iovec\n"); 926 spdk_dma_free(seg->compressed_data); 927 spdk_dma_free(seg->uncompressed_data); 928 free(seg); 929 spdk_put_io_channel(ctx->ch); 930 fclose(ctx->file); 931 free(ctx); 932 spdk_app_stop(-ENOMEM); 933 return; 934 } 935 seg->compressed_iovcnt = g_chained_count; 936 937 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 938 seg->compressed_iovcnt); 939 } 940 941 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 942 ctx->remaining -= seg->uncompressed_len; 943 944 accel_perf_prep_process_seg(ctx); 945 } 946 947 static void 948 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 949 { 950 struct ap_compress_seg *seg; 951 int sz, sz_read, sz_padded; 952 void *ubuf, *cbuf; 953 struct iovec iov[1]; 954 int rc; 955 956 if (ctx->remaining == 0) { 957 spdk_put_io_channel(ctx->ch); 958 fclose(ctx->file); 959 free(ctx); 960 accel_perf_start(NULL); 961 return; 962 } 963 964 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 965 /* Add 10% pad to the compress buffer for incompressible data. Note that a real app 966 * would likely either deal with the failure of not having a large enough buffer 967 * by submitting another operation with a larger one. Or, like the vbdev module 968 * does, just accept the error and use the data uncompressed marking it as such in 969 * its own metadata so that in the future it doesn't try to decompress uncompressed 970 * data, etc. 971 */ 972 sz_padded = sz * COMP_BUF_PAD_PERCENTAGE; 973 974 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 975 if (!ubuf) { 976 fprintf(stderr, "unable to allocate uncompress buffer\n"); 977 rc = -ENOMEM; 978 goto error; 979 } 980 981 cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL); 982 if (!cbuf) { 983 fprintf(stderr, "unable to allocate compress buffer\n"); 984 rc = -ENOMEM; 985 spdk_dma_free(ubuf); 986 goto error; 987 } 988 989 seg = calloc(1, sizeof(*seg)); 990 if (!seg) { 991 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 992 spdk_dma_free(ubuf); 993 spdk_dma_free(cbuf); 994 rc = -ENOMEM; 995 goto error; 996 } 997 998 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 999 if (sz_read != sz) { 1000 fprintf(stderr, "unable to read input file\n"); 1001 free(seg); 1002 spdk_dma_free(ubuf); 1003 spdk_dma_free(cbuf); 1004 rc = -errno; 1005 goto error; 1006 } 1007 1008 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 1009 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1010 if (seg->uncompressed_iovs == NULL) { 1011 fprintf(stderr, "unable to allocate iovec\n"); 1012 free(seg); 1013 spdk_dma_free(ubuf); 1014 spdk_dma_free(cbuf); 1015 rc = -ENOMEM; 1016 goto error; 1017 } 1018 seg->uncompressed_iovcnt = g_chained_count; 1019 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 1020 } 1021 1022 seg->uncompressed_data = ubuf; 1023 seg->uncompressed_len = sz; 1024 seg->compressed_data = cbuf; 1025 seg->compressed_len = sz; 1026 seg->compressed_len_padded = sz_padded; 1027 1028 ctx->cur_seg = seg; 1029 iov[0].iov_base = seg->uncompressed_data; 1030 iov[0].iov_len = seg->uncompressed_len; 1031 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 1032 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 1033 * to hold the compressed data. This example app simply adds 10% buffer for compressed data 1034 * but real applications may want to consider a more sophisticated method. 1035 */ 1036 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1, 1037 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 1038 if (rc < 0) { 1039 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 1040 goto error; 1041 } 1042 1043 return; 1044 1045 error: 1046 spdk_put_io_channel(ctx->ch); 1047 fclose(ctx->file); 1048 free(ctx); 1049 spdk_app_stop(rc); 1050 } 1051 1052 static void 1053 accel_perf_prep(void *arg1) 1054 { 1055 struct accel_perf_prep_ctx *ctx; 1056 int rc = 0; 1057 1058 if (g_workload_selection != ACCEL_OPC_COMPRESS && 1059 g_workload_selection != ACCEL_OPC_DECOMPRESS) { 1060 accel_perf_start(arg1); 1061 return; 1062 } 1063 1064 if (g_cd_file_in_name == NULL) { 1065 fprintf(stdout, "A filename is required.\n"); 1066 rc = -EINVAL; 1067 goto error_end; 1068 } 1069 1070 if (g_workload_selection == ACCEL_OPC_COMPRESS && g_verify) { 1071 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1072 rc = -ENOTSUP; 1073 goto error_end; 1074 } 1075 1076 printf("Preparing input file...\n"); 1077 1078 ctx = calloc(1, sizeof(*ctx)); 1079 if (ctx == NULL) { 1080 rc = -ENOMEM; 1081 goto error_end; 1082 } 1083 1084 ctx->file = fopen(g_cd_file_in_name, "r"); 1085 if (ctx->file == NULL) { 1086 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1087 rc = -errno; 1088 goto error_ctx; 1089 } 1090 1091 fseek(ctx->file, 0L, SEEK_END); 1092 ctx->remaining = ftell(ctx->file); 1093 fseek(ctx->file, 0L, SEEK_SET); 1094 1095 ctx->ch = spdk_accel_get_io_channel(); 1096 if (ctx->ch == NULL) { 1097 rc = -EAGAIN; 1098 goto error_file; 1099 } 1100 1101 if (g_xfer_size_bytes == 0) { 1102 /* size of 0 means "file at a time" */ 1103 g_xfer_size_bytes = ctx->remaining; 1104 } 1105 1106 accel_perf_prep_process_seg(ctx); 1107 return; 1108 1109 error_file: 1110 fclose(ctx->file); 1111 error_ctx: 1112 free(ctx); 1113 error_end: 1114 spdk_app_stop(rc); 1115 } 1116 1117 int 1118 main(int argc, char **argv) 1119 { 1120 struct worker_thread *worker, *tmp; 1121 1122 pthread_mutex_init(&g_workers_lock, NULL); 1123 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1124 g_opts.name = "accel_perf"; 1125 g_opts.reactor_mask = "0x1"; 1126 if (spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:x:", NULL, parse_args, 1127 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 1128 g_rc = -1; 1129 goto cleanup; 1130 } 1131 1132 if ((g_workload_selection != ACCEL_OPC_COPY) && 1133 (g_workload_selection != ACCEL_OPC_FILL) && 1134 (g_workload_selection != ACCEL_OPC_CRC32C) && 1135 (g_workload_selection != ACCEL_OPC_COPY_CRC32C) && 1136 (g_workload_selection != ACCEL_OPC_COMPARE) && 1137 (g_workload_selection != ACCEL_OPC_COMPRESS) && 1138 (g_workload_selection != ACCEL_OPC_DECOMPRESS) && 1139 (g_workload_selection != ACCEL_OPC_DUALCAST) && 1140 (g_workload_selection != ACCEL_OPC_XOR)) { 1141 usage(); 1142 g_rc = -1; 1143 goto cleanup; 1144 } 1145 1146 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1147 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1148 usage(); 1149 g_rc = -1; 1150 goto cleanup; 1151 } 1152 1153 if (g_allocate_depth == 0) { 1154 g_allocate_depth = g_queue_depth; 1155 } 1156 1157 if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) && 1158 g_chained_count == 0) { 1159 usage(); 1160 g_rc = -1; 1161 goto cleanup; 1162 } 1163 1164 if (g_workload_selection == ACCEL_OPC_XOR && g_xor_src_count < 2) { 1165 usage(); 1166 g_rc = -1; 1167 goto cleanup; 1168 } 1169 1170 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1171 if (g_rc) { 1172 SPDK_ERRLOG("ERROR starting application\n"); 1173 } 1174 1175 pthread_mutex_destroy(&g_workers_lock); 1176 1177 worker = g_workers; 1178 while (worker) { 1179 tmp = worker->next; 1180 free(worker); 1181 worker = tmp; 1182 } 1183 cleanup: 1184 accel_perf_free_compress_segs(); 1185 spdk_app_fini(); 1186 return g_rc; 1187 } 1188