1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 #include "spdk/xor.h" 16 17 #define DATA_PATTERN 0x5a 18 #define ALIGN_4K 0x1000 19 #define COMP_BUF_PAD_PERCENTAGE 1.1L 20 21 static uint64_t g_tsc_rate; 22 static uint64_t g_tsc_end; 23 static int g_rc; 24 static int g_xfer_size_bytes = 4096; 25 static int g_queue_depth = 32; 26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 27 * be at least as much as the queue depth. 28 */ 29 static int g_allocate_depth = 0; 30 static int g_threads_per_core = 1; 31 static int g_time_in_sec = 5; 32 static uint32_t g_crc32c_seed = 0; 33 static uint32_t g_chained_count = 1; 34 static int g_fail_percent_goal = 0; 35 static uint8_t g_fill_pattern = 255; 36 static uint32_t g_xor_src_count = 2; 37 static bool g_verify = false; 38 static const char *g_workload_type = NULL; 39 static enum spdk_accel_opcode g_workload_selection = SPDK_ACCEL_OPC_LAST; 40 static const char *g_module_name = NULL; 41 static struct worker_thread *g_workers = NULL; 42 static int g_num_workers = 0; 43 static char *g_cd_file_in_name = NULL; 44 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 45 static struct spdk_app_opts g_opts = {}; 46 47 struct ap_compress_seg { 48 void *uncompressed_data; 49 uint32_t uncompressed_len; 50 struct iovec *uncompressed_iovs; 51 uint32_t uncompressed_iovcnt; 52 53 void *compressed_data; 54 uint32_t compressed_len; 55 uint32_t compressed_len_padded; 56 struct iovec *compressed_iovs; 57 uint32_t compressed_iovcnt; 58 59 STAILQ_ENTRY(ap_compress_seg) link; 60 }; 61 62 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 63 64 struct worker_thread; 65 static void accel_done(void *ref, int status); 66 67 struct display_info { 68 int core; 69 int thread; 70 }; 71 72 struct ap_task { 73 void *src; 74 struct iovec *src_iovs; 75 uint32_t src_iovcnt; 76 void **sources; 77 struct iovec *dst_iovs; 78 uint32_t dst_iovcnt; 79 void *dst; 80 void *dst2; 81 uint32_t crc_dst; 82 uint32_t compressed_sz; 83 struct ap_compress_seg *cur_seg; 84 struct worker_thread *worker; 85 int expected_status; /* used for the compare operation */ 86 TAILQ_ENTRY(ap_task) link; 87 }; 88 89 struct worker_thread { 90 struct spdk_io_channel *ch; 91 struct spdk_accel_opcode_stats stats; 92 uint64_t xfer_failed; 93 uint64_t injected_miscompares; 94 uint64_t current_queue_depth; 95 TAILQ_HEAD(, ap_task) tasks_pool; 96 struct worker_thread *next; 97 unsigned core; 98 struct spdk_thread *thread; 99 bool is_draining; 100 struct spdk_poller *is_draining_poller; 101 struct spdk_poller *stop_poller; 102 void *task_base; 103 struct display_info display; 104 enum spdk_accel_opcode workload; 105 }; 106 107 static void 108 dump_user_config(void) 109 { 110 const char *module_name = NULL; 111 int rc; 112 113 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 114 if (rc) { 115 printf("error getting module name (%d)\n", rc); 116 } 117 118 printf("\nSPDK Configuration:\n"); 119 printf("Core mask: %s\n\n", g_opts.reactor_mask); 120 printf("Accel Perf Configuration:\n"); 121 printf("Workload Type: %s\n", g_workload_type); 122 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 123 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 124 printf("CRC-32C seed: %u\n", g_crc32c_seed); 125 } else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 126 printf("Fill pattern: 0x%x\n", g_fill_pattern); 127 } else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 128 printf("Failure inject: %u percent\n", g_fail_percent_goal); 129 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 130 printf("Source buffers: %u\n", g_xor_src_count); 131 } 132 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 133 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 134 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 135 } else { 136 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 137 } 138 printf("vector count %u\n", g_chained_count); 139 printf("Module: %s\n", module_name); 140 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 141 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 142 printf("File Name: %s\n", g_cd_file_in_name); 143 } 144 printf("Queue depth: %u\n", g_queue_depth); 145 printf("Allocate depth: %u\n", g_allocate_depth); 146 printf("# threads/core: %u\n", g_threads_per_core); 147 printf("Run time: %u seconds\n", g_time_in_sec); 148 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 149 } 150 151 static void 152 usage(void) 153 { 154 printf("accel_perf options:\n"); 155 printf("\t[-h help message]\n"); 156 printf("\t[-q queue depth per core]\n"); 157 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 158 printf("\t[-T number of threads per core\n"); 159 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 160 printf("\t[-t time in seconds]\n"); 161 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n"); 162 printf("\t[-M assign module to the operation, not compatible with accel_assign_opc RPC\n"); 163 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 164 printf("\t[-S for crc32c workload, use this seed value (default 0)\n"); 165 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 166 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 167 printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n"); 168 printf("\t[-y verify result if this switch is on]\n"); 169 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 170 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 171 } 172 173 static int 174 parse_args(int ch, char *arg) 175 { 176 int argval = 0; 177 178 switch (ch) { 179 case 'a': 180 case 'C': 181 case 'f': 182 case 'T': 183 case 'o': 184 case 'P': 185 case 'q': 186 case 'S': 187 case 't': 188 case 'x': 189 argval = spdk_strtol(optarg, 10); 190 if (argval < 0) { 191 fprintf(stderr, "-%c option must be non-negative.\n", ch); 192 usage(); 193 return 1; 194 } 195 break; 196 default: 197 break; 198 }; 199 200 switch (ch) { 201 case 'a': 202 g_allocate_depth = argval; 203 break; 204 case 'C': 205 g_chained_count = argval; 206 break; 207 case 'l': 208 g_cd_file_in_name = optarg; 209 break; 210 case 'f': 211 g_fill_pattern = (uint8_t)argval; 212 break; 213 case 'T': 214 g_threads_per_core = argval; 215 break; 216 case 'o': 217 g_xfer_size_bytes = argval; 218 break; 219 case 'P': 220 g_fail_percent_goal = argval; 221 break; 222 case 'q': 223 g_queue_depth = argval; 224 break; 225 case 'S': 226 g_crc32c_seed = argval; 227 break; 228 case 't': 229 g_time_in_sec = argval; 230 break; 231 case 'x': 232 g_xor_src_count = argval; 233 break; 234 case 'y': 235 g_verify = true; 236 break; 237 case 'w': 238 g_workload_type = optarg; 239 if (!strcmp(g_workload_type, "copy")) { 240 g_workload_selection = SPDK_ACCEL_OPC_COPY; 241 } else if (!strcmp(g_workload_type, "fill")) { 242 g_workload_selection = SPDK_ACCEL_OPC_FILL; 243 } else if (!strcmp(g_workload_type, "crc32c")) { 244 g_workload_selection = SPDK_ACCEL_OPC_CRC32C; 245 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 246 g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C; 247 } else if (!strcmp(g_workload_type, "compare")) { 248 g_workload_selection = SPDK_ACCEL_OPC_COMPARE; 249 } else if (!strcmp(g_workload_type, "dualcast")) { 250 g_workload_selection = SPDK_ACCEL_OPC_DUALCAST; 251 } else if (!strcmp(g_workload_type, "compress")) { 252 g_workload_selection = SPDK_ACCEL_OPC_COMPRESS; 253 } else if (!strcmp(g_workload_type, "decompress")) { 254 g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS; 255 } else if (!strcmp(g_workload_type, "xor")) { 256 g_workload_selection = SPDK_ACCEL_OPC_XOR; 257 } else { 258 fprintf(stderr, "Unsupported workload type: %s\n", optarg); 259 usage(); 260 return 1; 261 } 262 break; 263 case 'M': 264 g_module_name = optarg; 265 break; 266 267 default: 268 usage(); 269 return 1; 270 } 271 272 return 0; 273 } 274 275 static int dump_result(void); 276 static void 277 unregister_worker(void *arg1) 278 { 279 struct worker_thread *worker = arg1; 280 281 if (worker->ch) { 282 spdk_accel_get_opcode_stats(worker->ch, worker->workload, 283 &worker->stats, sizeof(worker->stats)); 284 spdk_put_io_channel(worker->ch); 285 worker->ch = NULL; 286 } 287 free(worker->task_base); 288 spdk_thread_exit(spdk_get_thread()); 289 pthread_mutex_lock(&g_workers_lock); 290 assert(g_num_workers >= 1); 291 if (--g_num_workers == 0) { 292 pthread_mutex_unlock(&g_workers_lock); 293 /* Only dump results on successful runs */ 294 if (g_rc == 0) { 295 g_rc = dump_result(); 296 } 297 spdk_app_stop(g_rc); 298 } else { 299 pthread_mutex_unlock(&g_workers_lock); 300 } 301 } 302 303 static void 304 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 305 { 306 uint64_t ele_size; 307 uint8_t *data; 308 uint32_t i; 309 310 ele_size = spdk_divide_round_up(sz, iovcnt); 311 312 data = buf; 313 for (i = 0; i < iovcnt; i++) { 314 ele_size = spdk_min(ele_size, sz); 315 assert(ele_size > 0); 316 317 iovs[i].iov_base = data; 318 iovs[i].iov_len = ele_size; 319 320 data += ele_size; 321 sz -= ele_size; 322 } 323 assert(sz == 0); 324 } 325 326 static int 327 _get_task_data_bufs(struct ap_task *task) 328 { 329 uint32_t align = 0; 330 uint32_t i = 0; 331 int dst_buff_len = g_xfer_size_bytes; 332 333 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 334 * we do this for all modules to keep it simple. 335 */ 336 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) { 337 align = ALIGN_4K; 338 } 339 340 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 341 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 342 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 343 344 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 345 dst_buff_len = task->cur_seg->compressed_len_padded; 346 } 347 348 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 349 if (task->dst == NULL) { 350 fprintf(stderr, "Unable to alloc dst buffer\n"); 351 return -ENOMEM; 352 } 353 354 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 355 if (!task->dst_iovs) { 356 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 357 return -ENOMEM; 358 } 359 task->dst_iovcnt = g_chained_count; 360 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 361 362 return 0; 363 } 364 365 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 366 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 367 assert(g_chained_count > 0); 368 task->src_iovcnt = g_chained_count; 369 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 370 if (!task->src_iovs) { 371 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 372 return -ENOMEM; 373 } 374 375 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 376 dst_buff_len = g_xfer_size_bytes * g_chained_count; 377 } 378 379 for (i = 0; i < task->src_iovcnt; i++) { 380 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 381 if (task->src_iovs[i].iov_base == NULL) { 382 return -ENOMEM; 383 } 384 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 385 task->src_iovs[i].iov_len = g_xfer_size_bytes; 386 } 387 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 388 assert(g_xor_src_count > 1); 389 task->sources = calloc(g_xor_src_count, sizeof(*task->sources)); 390 if (!task->sources) { 391 return -ENOMEM; 392 } 393 394 for (i = 0; i < g_xor_src_count; i++) { 395 task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 396 if (!task->sources[i]) { 397 return -ENOMEM; 398 } 399 memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes); 400 } 401 } else { 402 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 403 if (task->src == NULL) { 404 fprintf(stderr, "Unable to alloc src buffer\n"); 405 return -ENOMEM; 406 } 407 408 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 409 if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 410 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 411 } else { 412 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 413 } 414 } 415 416 if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) { 417 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 418 if (task->dst == NULL) { 419 fprintf(stderr, "Unable to alloc dst buffer\n"); 420 return -ENOMEM; 421 } 422 423 /* For compare we want the buffers to match, otherwise not. */ 424 if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) { 425 memset(task->dst, DATA_PATTERN, dst_buff_len); 426 } else { 427 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 428 } 429 } 430 431 /* For dualcast 2 buffers are needed for the operation. */ 432 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || 433 (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) { 434 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 435 if (task->dst2 == NULL) { 436 fprintf(stderr, "Unable to alloc dst buffer\n"); 437 return -ENOMEM; 438 } 439 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 440 } 441 442 return 0; 443 } 444 445 inline static struct ap_task * 446 _get_task(struct worker_thread *worker) 447 { 448 struct ap_task *task; 449 450 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 451 task = TAILQ_FIRST(&worker->tasks_pool); 452 TAILQ_REMOVE(&worker->tasks_pool, task, link); 453 } else { 454 fprintf(stderr, "Unable to get ap_task\n"); 455 return NULL; 456 } 457 458 return task; 459 } 460 461 /* Submit one operation using the same ap task that just completed. */ 462 static void 463 _submit_single(struct worker_thread *worker, struct ap_task *task) 464 { 465 int random_num; 466 int rc = 0; 467 int flags = 0; 468 469 assert(worker); 470 471 switch (worker->workload) { 472 case SPDK_ACCEL_OPC_COPY: 473 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 474 g_xfer_size_bytes, flags, accel_done, task); 475 break; 476 case SPDK_ACCEL_OPC_FILL: 477 /* For fill use the first byte of the task->dst buffer */ 478 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 479 g_xfer_size_bytes, flags, accel_done, task); 480 break; 481 case SPDK_ACCEL_OPC_CRC32C: 482 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 483 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 484 accel_done, task); 485 break; 486 case SPDK_ACCEL_OPC_COPY_CRC32C: 487 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 488 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 489 break; 490 case SPDK_ACCEL_OPC_COMPARE: 491 random_num = rand() % 100; 492 if (random_num < g_fail_percent_goal) { 493 task->expected_status = -EILSEQ; 494 *(uint8_t *)task->dst = ~DATA_PATTERN; 495 } else { 496 task->expected_status = 0; 497 *(uint8_t *)task->dst = DATA_PATTERN; 498 } 499 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 500 g_xfer_size_bytes, accel_done, task); 501 break; 502 case SPDK_ACCEL_OPC_DUALCAST: 503 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 504 task->src, g_xfer_size_bytes, flags, accel_done, task); 505 break; 506 case SPDK_ACCEL_OPC_COMPRESS: 507 task->src_iovs = task->cur_seg->uncompressed_iovs; 508 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 509 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded, 510 task->src_iovs, 511 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 512 break; 513 case SPDK_ACCEL_OPC_DECOMPRESS: 514 task->src_iovs = task->cur_seg->compressed_iovs; 515 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 516 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 517 task->src_iovcnt, NULL, flags, accel_done, task); 518 break; 519 case SPDK_ACCEL_OPC_XOR: 520 rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count, 521 g_xfer_size_bytes, accel_done, task); 522 break; 523 default: 524 assert(false); 525 break; 526 527 } 528 529 worker->current_queue_depth++; 530 if (rc) { 531 accel_done(task, rc); 532 } 533 } 534 535 static void 536 _free_task_buffers(struct ap_task *task) 537 { 538 uint32_t i; 539 540 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS || 541 g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 542 free(task->dst_iovs); 543 } else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 544 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 545 if (task->src_iovs) { 546 for (i = 0; i < task->src_iovcnt; i++) { 547 if (task->src_iovs[i].iov_base) { 548 spdk_dma_free(task->src_iovs[i].iov_base); 549 } 550 } 551 free(task->src_iovs); 552 } 553 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 554 if (task->sources) { 555 for (i = 0; i < g_xor_src_count; i++) { 556 spdk_dma_free(task->sources[i]); 557 } 558 free(task->sources); 559 } 560 } else { 561 spdk_dma_free(task->src); 562 } 563 564 spdk_dma_free(task->dst); 565 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) { 566 spdk_dma_free(task->dst2); 567 } 568 } 569 570 static int 571 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 572 { 573 uint32_t i; 574 uint32_t ttl_len = 0; 575 uint8_t *dst = (uint8_t *)_dst; 576 577 for (i = 0; i < iovcnt; i++) { 578 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 579 return -1; 580 } 581 dst += src_src_iovs[i].iov_len; 582 ttl_len += src_src_iovs[i].iov_len; 583 } 584 585 if (ttl_len != iovcnt * g_xfer_size_bytes) { 586 return -1; 587 } 588 589 return 0; 590 } 591 592 static int _worker_stop(void *arg); 593 594 static void 595 accel_done(void *arg1, int status) 596 { 597 struct ap_task *task = arg1; 598 struct worker_thread *worker = task->worker; 599 uint32_t sw_crc32c; 600 601 assert(worker); 602 assert(worker->current_queue_depth > 0); 603 604 if (g_verify && status == 0) { 605 switch (worker->workload) { 606 case SPDK_ACCEL_OPC_COPY_CRC32C: 607 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 608 if (task->crc_dst != sw_crc32c) { 609 SPDK_NOTICELOG("CRC-32C miscompare\n"); 610 worker->xfer_failed++; 611 } 612 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 613 SPDK_NOTICELOG("Data miscompare\n"); 614 worker->xfer_failed++; 615 } 616 break; 617 case SPDK_ACCEL_OPC_CRC32C: 618 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 619 if (task->crc_dst != sw_crc32c) { 620 SPDK_NOTICELOG("CRC-32C miscompare\n"); 621 worker->xfer_failed++; 622 } 623 break; 624 case SPDK_ACCEL_OPC_COPY: 625 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 626 SPDK_NOTICELOG("Data miscompare\n"); 627 worker->xfer_failed++; 628 } 629 break; 630 case SPDK_ACCEL_OPC_DUALCAST: 631 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 632 SPDK_NOTICELOG("Data miscompare, first destination\n"); 633 worker->xfer_failed++; 634 } 635 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 636 SPDK_NOTICELOG("Data miscompare, second destination\n"); 637 worker->xfer_failed++; 638 } 639 break; 640 case SPDK_ACCEL_OPC_FILL: 641 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 642 SPDK_NOTICELOG("Data miscompare\n"); 643 worker->xfer_failed++; 644 } 645 break; 646 case SPDK_ACCEL_OPC_COMPARE: 647 break; 648 case SPDK_ACCEL_OPC_COMPRESS: 649 break; 650 case SPDK_ACCEL_OPC_DECOMPRESS: 651 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 652 SPDK_NOTICELOG("Data miscompare on decompression\n"); 653 worker->xfer_failed++; 654 } 655 break; 656 case SPDK_ACCEL_OPC_XOR: 657 if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count, 658 g_xfer_size_bytes) != 0) { 659 SPDK_ERRLOG("Failed to generate xor for verification\n"); 660 } else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) { 661 SPDK_NOTICELOG("Data miscompare\n"); 662 worker->xfer_failed++; 663 } 664 break; 665 default: 666 assert(false); 667 break; 668 } 669 } 670 671 if (worker->workload == SPDK_ACCEL_OPC_COMPRESS || 672 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 673 /* Advance the task to the next segment */ 674 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 675 if (task->cur_seg == NULL) { 676 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 677 } 678 } 679 680 if (task->expected_status == -EILSEQ) { 681 assert(status != 0); 682 worker->injected_miscompares++; 683 status = 0; 684 } else if (status) { 685 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 686 worker->xfer_failed++; 687 } 688 689 worker->current_queue_depth--; 690 691 if (!worker->is_draining && status == 0) { 692 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 693 task = _get_task(worker); 694 _submit_single(worker, task); 695 } else { 696 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 697 } 698 } 699 700 static int 701 dump_result(void) 702 { 703 uint64_t total_completed = 0; 704 uint64_t total_failed = 0; 705 uint64_t total_miscompared = 0; 706 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 707 struct worker_thread *worker = g_workers; 708 char tmp[64]; 709 710 printf("\n%-12s %20s %16s %16s %16s\n", 711 "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares"); 712 printf("------------------------------------------------------------------------------------\n"); 713 while (worker != NULL) { 714 715 uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec; 716 uint64_t bw_in_MiBps = worker->stats.num_bytes / 717 (g_time_in_sec * 1024 * 1024); 718 719 total_completed += worker->stats.executed; 720 total_failed += worker->xfer_failed; 721 total_miscompared += worker->injected_miscompares; 722 723 snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread); 724 if (xfer_per_sec) { 725 printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n", 726 tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed, 727 worker->injected_miscompares); 728 } 729 730 worker = worker->next; 731 } 732 733 total_xfer_per_sec = total_completed / g_time_in_sec; 734 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 735 (g_time_in_sec * 1024 * 1024); 736 737 printf("====================================================================================\n"); 738 printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n", 739 "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 740 741 return total_failed ? 1 : 0; 742 } 743 744 static inline void 745 _free_task_buffers_in_pool(struct worker_thread *worker) 746 { 747 struct ap_task *task; 748 749 assert(worker); 750 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 751 TAILQ_REMOVE(&worker->tasks_pool, task, link); 752 _free_task_buffers(task); 753 } 754 } 755 756 static int 757 _check_draining(void *arg) 758 { 759 struct worker_thread *worker = arg; 760 761 assert(worker); 762 763 if (worker->current_queue_depth == 0) { 764 _free_task_buffers_in_pool(worker); 765 spdk_poller_unregister(&worker->is_draining_poller); 766 unregister_worker(worker); 767 } 768 769 return SPDK_POLLER_BUSY; 770 } 771 772 static int 773 _worker_stop(void *arg) 774 { 775 struct worker_thread *worker = arg; 776 777 assert(worker); 778 779 spdk_poller_unregister(&worker->stop_poller); 780 781 /* now let the worker drain and check it's outstanding IO with a poller */ 782 worker->is_draining = true; 783 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 784 785 return SPDK_POLLER_BUSY; 786 } 787 788 static void shutdown_cb(void); 789 790 static void 791 _init_thread(void *arg1) 792 { 793 struct worker_thread *worker; 794 struct ap_task *task; 795 int i, num_tasks = g_allocate_depth; 796 struct display_info *display = arg1; 797 798 worker = calloc(1, sizeof(*worker)); 799 if (worker == NULL) { 800 fprintf(stderr, "Unable to allocate worker\n"); 801 free(display); 802 spdk_thread_exit(spdk_get_thread()); 803 goto no_worker; 804 } 805 806 worker->workload = g_workload_selection; 807 worker->display.core = display->core; 808 worker->display.thread = display->thread; 809 free(display); 810 worker->core = spdk_env_get_current_core(); 811 worker->thread = spdk_get_thread(); 812 pthread_mutex_lock(&g_workers_lock); 813 g_num_workers++; 814 worker->next = g_workers; 815 g_workers = worker; 816 pthread_mutex_unlock(&g_workers_lock); 817 worker->ch = spdk_accel_get_io_channel(); 818 if (worker->ch == NULL) { 819 fprintf(stderr, "Unable to get an accel channel\n"); 820 goto error; 821 } 822 823 TAILQ_INIT(&worker->tasks_pool); 824 825 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 826 if (worker->task_base == NULL) { 827 fprintf(stderr, "Could not allocate task base.\n"); 828 goto error; 829 } 830 831 task = worker->task_base; 832 for (i = 0; i < num_tasks; i++) { 833 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 834 task->worker = worker; 835 if (_get_task_data_bufs(task)) { 836 fprintf(stderr, "Unable to get data bufs\n"); 837 goto error; 838 } 839 task++; 840 } 841 842 /* Register a poller that will stop the worker at time elapsed */ 843 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 844 g_time_in_sec * 1000000ULL); 845 846 /* Load up queue depth worth of operations. */ 847 for (i = 0; i < g_queue_depth; i++) { 848 task = _get_task(worker); 849 if (task == NULL) { 850 goto error; 851 } 852 853 _submit_single(worker, task); 854 } 855 return; 856 error: 857 858 _free_task_buffers_in_pool(worker); 859 free(worker->task_base); 860 no_worker: 861 shutdown_cb(); 862 g_rc = -1; 863 } 864 865 static void 866 accel_perf_start(void *arg1) 867 { 868 struct spdk_cpuset tmp_cpumask = {}; 869 char thread_name[32]; 870 uint32_t i; 871 int j; 872 struct spdk_thread *thread; 873 struct display_info *display; 874 875 g_tsc_rate = spdk_get_ticks_hz(); 876 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 877 878 dump_user_config(); 879 880 printf("Running for %d seconds...\n", g_time_in_sec); 881 fflush(stdout); 882 883 /* Create worker threads for each core that was specified. */ 884 SPDK_ENV_FOREACH_CORE(i) { 885 for (j = 0; j < g_threads_per_core; j++) { 886 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 887 spdk_cpuset_zero(&tmp_cpumask); 888 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 889 thread = spdk_thread_create(thread_name, &tmp_cpumask); 890 display = calloc(1, sizeof(*display)); 891 if (display == NULL) { 892 fprintf(stderr, "Unable to allocate memory\n"); 893 spdk_app_stop(-1); 894 return; 895 } 896 display->core = i; 897 display->thread = j; 898 spdk_thread_send_msg(thread, _init_thread, display); 899 } 900 } 901 } 902 903 static void 904 accel_perf_free_compress_segs(void) 905 { 906 struct ap_compress_seg *seg, *tmp; 907 908 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 909 free(seg->uncompressed_iovs); 910 free(seg->compressed_iovs); 911 spdk_dma_free(seg->compressed_data); 912 spdk_dma_free(seg->uncompressed_data); 913 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 914 free(seg); 915 } 916 } 917 918 struct accel_perf_prep_ctx { 919 FILE *file; 920 long remaining; 921 struct spdk_io_channel *ch; 922 struct ap_compress_seg *cur_seg; 923 }; 924 925 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 926 927 static void 928 accel_perf_prep_process_seg_cpl(void *ref, int status) 929 { 930 struct accel_perf_prep_ctx *ctx = ref; 931 struct ap_compress_seg *seg; 932 933 if (status != 0) { 934 fprintf(stderr, "error (%d) on initial compress completion\n", status); 935 spdk_dma_free(ctx->cur_seg->compressed_data); 936 spdk_dma_free(ctx->cur_seg->uncompressed_data); 937 free(ctx->cur_seg); 938 spdk_put_io_channel(ctx->ch); 939 fclose(ctx->file); 940 free(ctx); 941 spdk_app_stop(-status); 942 return; 943 } 944 945 seg = ctx->cur_seg; 946 947 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 948 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 949 if (seg->compressed_iovs == NULL) { 950 fprintf(stderr, "unable to allocate iovec\n"); 951 spdk_dma_free(seg->compressed_data); 952 spdk_dma_free(seg->uncompressed_data); 953 free(seg); 954 spdk_put_io_channel(ctx->ch); 955 fclose(ctx->file); 956 free(ctx); 957 spdk_app_stop(-ENOMEM); 958 return; 959 } 960 seg->compressed_iovcnt = g_chained_count; 961 962 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 963 seg->compressed_iovcnt); 964 } 965 966 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 967 ctx->remaining -= seg->uncompressed_len; 968 969 accel_perf_prep_process_seg(ctx); 970 } 971 972 static void 973 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 974 { 975 struct ap_compress_seg *seg; 976 int sz, sz_read, sz_padded; 977 void *ubuf, *cbuf; 978 struct iovec iov[1]; 979 int rc; 980 981 if (ctx->remaining == 0) { 982 spdk_put_io_channel(ctx->ch); 983 fclose(ctx->file); 984 free(ctx); 985 accel_perf_start(NULL); 986 return; 987 } 988 989 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 990 /* Add 10% pad to the compress buffer for incompressible data. Note that a real app 991 * would likely either deal with the failure of not having a large enough buffer 992 * by submitting another operation with a larger one. Or, like the vbdev module 993 * does, just accept the error and use the data uncompressed marking it as such in 994 * its own metadata so that in the future it doesn't try to decompress uncompressed 995 * data, etc. 996 */ 997 sz_padded = sz * COMP_BUF_PAD_PERCENTAGE; 998 999 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 1000 if (!ubuf) { 1001 fprintf(stderr, "unable to allocate uncompress buffer\n"); 1002 rc = -ENOMEM; 1003 goto error; 1004 } 1005 1006 cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL); 1007 if (!cbuf) { 1008 fprintf(stderr, "unable to allocate compress buffer\n"); 1009 rc = -ENOMEM; 1010 spdk_dma_free(ubuf); 1011 goto error; 1012 } 1013 1014 seg = calloc(1, sizeof(*seg)); 1015 if (!seg) { 1016 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 1017 spdk_dma_free(ubuf); 1018 spdk_dma_free(cbuf); 1019 rc = -ENOMEM; 1020 goto error; 1021 } 1022 1023 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 1024 if (sz_read != sz) { 1025 fprintf(stderr, "unable to read input file\n"); 1026 free(seg); 1027 spdk_dma_free(ubuf); 1028 spdk_dma_free(cbuf); 1029 rc = -errno; 1030 goto error; 1031 } 1032 1033 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 1034 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1035 if (seg->uncompressed_iovs == NULL) { 1036 fprintf(stderr, "unable to allocate iovec\n"); 1037 free(seg); 1038 spdk_dma_free(ubuf); 1039 spdk_dma_free(cbuf); 1040 rc = -ENOMEM; 1041 goto error; 1042 } 1043 seg->uncompressed_iovcnt = g_chained_count; 1044 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 1045 } 1046 1047 seg->uncompressed_data = ubuf; 1048 seg->uncompressed_len = sz; 1049 seg->compressed_data = cbuf; 1050 seg->compressed_len = sz; 1051 seg->compressed_len_padded = sz_padded; 1052 1053 ctx->cur_seg = seg; 1054 iov[0].iov_base = seg->uncompressed_data; 1055 iov[0].iov_len = seg->uncompressed_len; 1056 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 1057 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 1058 * to hold the compressed data. This example app simply adds 10% buffer for compressed data 1059 * but real applications may want to consider a more sophisticated method. 1060 */ 1061 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1, 1062 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 1063 if (rc < 0) { 1064 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 1065 goto error; 1066 } 1067 1068 return; 1069 1070 error: 1071 spdk_put_io_channel(ctx->ch); 1072 fclose(ctx->file); 1073 free(ctx); 1074 spdk_app_stop(rc); 1075 } 1076 1077 static void 1078 accel_perf_prep(void *arg1) 1079 { 1080 struct accel_perf_prep_ctx *ctx; 1081 const char *module_name = NULL; 1082 int rc = 0; 1083 1084 if (g_module_name) { 1085 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 1086 if (rc != 0 || strcmp(g_module_name, module_name) != 0) { 1087 fprintf(stderr, "Module '%s' was assigned via JSON config or RPC, instead of '%s'\n", 1088 module_name, g_module_name); 1089 fprintf(stderr, "-M option is not compatible with accel_assign_opc RPC\n"); 1090 rc = -EINVAL; 1091 goto error_end; 1092 } 1093 } 1094 1095 if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS && 1096 g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) { 1097 accel_perf_start(arg1); 1098 return; 1099 } 1100 1101 if (g_cd_file_in_name == NULL) { 1102 fprintf(stdout, "A filename is required.\n"); 1103 rc = -EINVAL; 1104 goto error_end; 1105 } 1106 1107 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) { 1108 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1109 rc = -ENOTSUP; 1110 goto error_end; 1111 } 1112 1113 printf("Preparing input file...\n"); 1114 1115 ctx = calloc(1, sizeof(*ctx)); 1116 if (ctx == NULL) { 1117 rc = -ENOMEM; 1118 goto error_end; 1119 } 1120 1121 ctx->file = fopen(g_cd_file_in_name, "r"); 1122 if (ctx->file == NULL) { 1123 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1124 rc = -errno; 1125 goto error_ctx; 1126 } 1127 1128 fseek(ctx->file, 0L, SEEK_END); 1129 ctx->remaining = ftell(ctx->file); 1130 fseek(ctx->file, 0L, SEEK_SET); 1131 1132 ctx->ch = spdk_accel_get_io_channel(); 1133 if (ctx->ch == NULL) { 1134 rc = -EAGAIN; 1135 goto error_file; 1136 } 1137 1138 if (g_xfer_size_bytes == 0) { 1139 /* size of 0 means "file at a time" */ 1140 g_xfer_size_bytes = ctx->remaining; 1141 } 1142 1143 accel_perf_prep_process_seg(ctx); 1144 return; 1145 1146 error_file: 1147 fclose(ctx->file); 1148 error_ctx: 1149 free(ctx); 1150 error_end: 1151 spdk_app_stop(rc); 1152 } 1153 1154 static void 1155 worker_shutdown(void *ctx) 1156 { 1157 _worker_stop(ctx); 1158 } 1159 1160 static void 1161 shutdown_cb(void) 1162 { 1163 struct worker_thread *worker; 1164 1165 pthread_mutex_lock(&g_workers_lock); 1166 if (!g_workers) { 1167 spdk_app_stop(1); 1168 goto unlock; 1169 } 1170 1171 worker = g_workers; 1172 while (worker) { 1173 spdk_thread_send_msg(worker->thread, worker_shutdown, worker); 1174 worker = worker->next; 1175 } 1176 unlock: 1177 pthread_mutex_unlock(&g_workers_lock); 1178 } 1179 1180 int 1181 main(int argc, char **argv) 1182 { 1183 struct worker_thread *worker, *tmp; 1184 int rc; 1185 1186 pthread_mutex_init(&g_workers_lock, NULL); 1187 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1188 g_opts.name = "accel_perf"; 1189 g_opts.reactor_mask = "0x1"; 1190 g_opts.shutdown_cb = shutdown_cb; 1191 1192 rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:M:P:f:T:l:S:x:", NULL, 1193 parse_args, usage); 1194 if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) { 1195 return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1; 1196 } 1197 1198 if (g_workload_selection == SPDK_ACCEL_OPC_LAST) { 1199 fprintf(stderr, "Must provide a workload type\n"); 1200 usage(); 1201 return -1; 1202 } 1203 1204 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1205 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1206 usage(); 1207 return -1; 1208 } 1209 1210 if (g_allocate_depth == 0) { 1211 g_allocate_depth = g_queue_depth; 1212 } 1213 1214 if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 1215 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) && 1216 g_chained_count == 0) { 1217 usage(); 1218 return -1; 1219 } 1220 1221 if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) { 1222 usage(); 1223 return -1; 1224 } 1225 1226 if (g_module_name && spdk_accel_assign_opc(g_workload_selection, g_module_name)) { 1227 fprintf(stderr, "Was not able to assign '%s' module to the workload\n", g_module_name); 1228 usage(); 1229 return -1; 1230 } 1231 1232 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1233 if (g_rc) { 1234 SPDK_ERRLOG("ERROR starting application\n"); 1235 } 1236 1237 pthread_mutex_destroy(&g_workers_lock); 1238 1239 worker = g_workers; 1240 while (worker) { 1241 tmp = worker->next; 1242 free(worker); 1243 worker = tmp; 1244 } 1245 accel_perf_free_compress_segs(); 1246 spdk_app_fini(); 1247 return g_rc; 1248 } 1249