1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 #include "spdk/xor.h" 16 17 #define DATA_PATTERN 0x5a 18 #define ALIGN_4K 0x1000 19 #define COMP_BUF_PAD_PERCENTAGE 1.1L 20 21 static uint64_t g_tsc_rate; 22 static uint64_t g_tsc_end; 23 static int g_rc; 24 static int g_xfer_size_bytes = 4096; 25 static int g_queue_depth = 32; 26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 27 * be at least as much as the queue depth. 28 */ 29 static int g_allocate_depth = 0; 30 static int g_threads_per_core = 1; 31 static int g_time_in_sec = 5; 32 static uint32_t g_crc32c_seed = 0; 33 static uint32_t g_chained_count = 1; 34 static int g_fail_percent_goal = 0; 35 static uint8_t g_fill_pattern = 255; 36 static uint32_t g_xor_src_count = 2; 37 static bool g_verify = false; 38 static const char *g_workload_type = NULL; 39 static enum spdk_accel_opcode g_workload_selection; 40 static struct worker_thread *g_workers = NULL; 41 static int g_num_workers = 0; 42 static char *g_cd_file_in_name = NULL; 43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 44 static struct spdk_app_opts g_opts = {}; 45 46 struct ap_compress_seg { 47 void *uncompressed_data; 48 uint32_t uncompressed_len; 49 struct iovec *uncompressed_iovs; 50 uint32_t uncompressed_iovcnt; 51 52 void *compressed_data; 53 uint32_t compressed_len; 54 uint32_t compressed_len_padded; 55 struct iovec *compressed_iovs; 56 uint32_t compressed_iovcnt; 57 58 STAILQ_ENTRY(ap_compress_seg) link; 59 }; 60 61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 62 63 struct worker_thread; 64 static void accel_done(void *ref, int status); 65 66 struct display_info { 67 int core; 68 int thread; 69 }; 70 71 struct ap_task { 72 void *src; 73 struct iovec *src_iovs; 74 uint32_t src_iovcnt; 75 void **sources; 76 struct iovec *dst_iovs; 77 uint32_t dst_iovcnt; 78 void *dst; 79 void *dst2; 80 uint32_t crc_dst; 81 uint32_t compressed_sz; 82 struct ap_compress_seg *cur_seg; 83 struct worker_thread *worker; 84 int expected_status; /* used for the compare operation */ 85 TAILQ_ENTRY(ap_task) link; 86 }; 87 88 struct worker_thread { 89 struct spdk_io_channel *ch; 90 struct spdk_accel_opcode_stats stats; 91 uint64_t xfer_failed; 92 uint64_t injected_miscompares; 93 uint64_t current_queue_depth; 94 TAILQ_HEAD(, ap_task) tasks_pool; 95 struct worker_thread *next; 96 unsigned core; 97 struct spdk_thread *thread; 98 bool is_draining; 99 struct spdk_poller *is_draining_poller; 100 struct spdk_poller *stop_poller; 101 void *task_base; 102 struct display_info display; 103 enum spdk_accel_opcode workload; 104 }; 105 106 static void 107 dump_user_config(void) 108 { 109 const char *module_name = NULL; 110 int rc; 111 112 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 113 if (rc) { 114 printf("error getting module name (%d)\n", rc); 115 } 116 117 printf("\nSPDK Configuration:\n"); 118 printf("Core mask: %s\n\n", g_opts.reactor_mask); 119 printf("Accel Perf Configuration:\n"); 120 printf("Workload Type: %s\n", g_workload_type); 121 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 122 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 123 printf("CRC-32C seed: %u\n", g_crc32c_seed); 124 } else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 125 printf("Fill pattern: 0x%x\n", g_fill_pattern); 126 } else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 127 printf("Failure inject: %u percent\n", g_fail_percent_goal); 128 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 129 printf("Source buffers: %u\n", g_xor_src_count); 130 } 131 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 132 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 133 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 134 } else { 135 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 136 } 137 printf("vector count %u\n", g_chained_count); 138 printf("Module: %s\n", module_name); 139 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 140 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 141 printf("File Name: %s\n", g_cd_file_in_name); 142 } 143 printf("Queue depth: %u\n", g_queue_depth); 144 printf("Allocate depth: %u\n", g_allocate_depth); 145 printf("# threads/core: %u\n", g_threads_per_core); 146 printf("Run time: %u seconds\n", g_time_in_sec); 147 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 148 } 149 150 static void 151 usage(void) 152 { 153 printf("accel_perf options:\n"); 154 printf("\t[-h help message]\n"); 155 printf("\t[-q queue depth per core]\n"); 156 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 157 printf("\t[-T number of threads per core\n"); 158 printf("\t[-n number of channels]\n"); 159 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 160 printf("\t[-t time in seconds]\n"); 161 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n"); 162 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 163 printf("\t[-S for crc32c workload, use this seed value (default 0)\n"); 164 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 165 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 166 printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n"); 167 printf("\t[-y verify result if this switch is on]\n"); 168 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 169 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 170 } 171 172 static int 173 parse_args(int ch, char *arg) 174 { 175 int argval = 0; 176 177 switch (ch) { 178 case 'a': 179 case 'C': 180 case 'f': 181 case 'T': 182 case 'o': 183 case 'P': 184 case 'q': 185 case 'S': 186 case 't': 187 case 'x': 188 argval = spdk_strtol(optarg, 10); 189 if (argval < 0) { 190 fprintf(stderr, "-%c option must be non-negative.\n", ch); 191 usage(); 192 return 1; 193 } 194 break; 195 default: 196 break; 197 }; 198 199 switch (ch) { 200 case 'a': 201 g_allocate_depth = argval; 202 break; 203 case 'C': 204 g_chained_count = argval; 205 break; 206 case 'l': 207 g_cd_file_in_name = optarg; 208 break; 209 case 'f': 210 g_fill_pattern = (uint8_t)argval; 211 break; 212 case 'T': 213 g_threads_per_core = argval; 214 break; 215 case 'o': 216 g_xfer_size_bytes = argval; 217 break; 218 case 'P': 219 g_fail_percent_goal = argval; 220 break; 221 case 'q': 222 g_queue_depth = argval; 223 break; 224 case 'S': 225 g_crc32c_seed = argval; 226 break; 227 case 't': 228 g_time_in_sec = argval; 229 break; 230 case 'x': 231 g_xor_src_count = argval; 232 break; 233 case 'y': 234 g_verify = true; 235 break; 236 case 'w': 237 g_workload_type = optarg; 238 if (!strcmp(g_workload_type, "copy")) { 239 g_workload_selection = SPDK_ACCEL_OPC_COPY; 240 } else if (!strcmp(g_workload_type, "fill")) { 241 g_workload_selection = SPDK_ACCEL_OPC_FILL; 242 } else if (!strcmp(g_workload_type, "crc32c")) { 243 g_workload_selection = SPDK_ACCEL_OPC_CRC32C; 244 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 245 g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C; 246 } else if (!strcmp(g_workload_type, "compare")) { 247 g_workload_selection = SPDK_ACCEL_OPC_COMPARE; 248 } else if (!strcmp(g_workload_type, "dualcast")) { 249 g_workload_selection = SPDK_ACCEL_OPC_DUALCAST; 250 } else if (!strcmp(g_workload_type, "compress")) { 251 g_workload_selection = SPDK_ACCEL_OPC_COMPRESS; 252 } else if (!strcmp(g_workload_type, "decompress")) { 253 g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS; 254 } else if (!strcmp(g_workload_type, "xor")) { 255 g_workload_selection = SPDK_ACCEL_OPC_XOR; 256 } else { 257 usage(); 258 return 1; 259 } 260 break; 261 default: 262 usage(); 263 return 1; 264 } 265 266 return 0; 267 } 268 269 static int dump_result(void); 270 static void 271 unregister_worker(void *arg1) 272 { 273 struct worker_thread *worker = arg1; 274 275 spdk_accel_get_opcode_stats(worker->ch, worker->workload, 276 &worker->stats, sizeof(worker->stats)); 277 free(worker->task_base); 278 spdk_put_io_channel(worker->ch); 279 spdk_thread_exit(spdk_get_thread()); 280 pthread_mutex_lock(&g_workers_lock); 281 assert(g_num_workers >= 1); 282 if (--g_num_workers == 0) { 283 pthread_mutex_unlock(&g_workers_lock); 284 g_rc = dump_result(); 285 spdk_app_stop(0); 286 } else { 287 pthread_mutex_unlock(&g_workers_lock); 288 } 289 } 290 291 static void 292 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 293 { 294 uint64_t ele_size; 295 uint8_t *data; 296 uint32_t i; 297 298 ele_size = spdk_divide_round_up(sz, iovcnt); 299 300 data = buf; 301 for (i = 0; i < iovcnt; i++) { 302 ele_size = spdk_min(ele_size, sz); 303 assert(ele_size > 0); 304 305 iovs[i].iov_base = data; 306 iovs[i].iov_len = ele_size; 307 308 data += ele_size; 309 sz -= ele_size; 310 } 311 assert(sz == 0); 312 } 313 314 static int 315 _get_task_data_bufs(struct ap_task *task) 316 { 317 uint32_t align = 0; 318 uint32_t i = 0; 319 int dst_buff_len = g_xfer_size_bytes; 320 321 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 322 * we do this for all modules to keep it simple. 323 */ 324 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) { 325 align = ALIGN_4K; 326 } 327 328 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 329 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 330 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 331 332 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 333 dst_buff_len = task->cur_seg->compressed_len_padded; 334 } 335 336 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 337 if (task->dst == NULL) { 338 fprintf(stderr, "Unable to alloc dst buffer\n"); 339 return -ENOMEM; 340 } 341 342 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 343 if (!task->dst_iovs) { 344 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 345 return -ENOMEM; 346 } 347 task->dst_iovcnt = g_chained_count; 348 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 349 350 return 0; 351 } 352 353 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 354 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 355 assert(g_chained_count > 0); 356 task->src_iovcnt = g_chained_count; 357 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 358 if (!task->src_iovs) { 359 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 360 return -ENOMEM; 361 } 362 363 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 364 dst_buff_len = g_xfer_size_bytes * g_chained_count; 365 } 366 367 for (i = 0; i < task->src_iovcnt; i++) { 368 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 369 if (task->src_iovs[i].iov_base == NULL) { 370 return -ENOMEM; 371 } 372 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 373 task->src_iovs[i].iov_len = g_xfer_size_bytes; 374 } 375 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 376 assert(g_xor_src_count > 1); 377 task->sources = calloc(g_xor_src_count, sizeof(*task->sources)); 378 if (!task->sources) { 379 return -ENOMEM; 380 } 381 382 for (i = 0; i < g_xor_src_count; i++) { 383 task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 384 if (!task->sources[i]) { 385 return -ENOMEM; 386 } 387 memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes); 388 } 389 } else { 390 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 391 if (task->src == NULL) { 392 fprintf(stderr, "Unable to alloc src buffer\n"); 393 return -ENOMEM; 394 } 395 396 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 397 if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 398 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 399 } else { 400 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 401 } 402 } 403 404 if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) { 405 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 406 if (task->dst == NULL) { 407 fprintf(stderr, "Unable to alloc dst buffer\n"); 408 return -ENOMEM; 409 } 410 411 /* For compare we want the buffers to match, otherwise not. */ 412 if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) { 413 memset(task->dst, DATA_PATTERN, dst_buff_len); 414 } else { 415 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 416 } 417 } 418 419 /* For dualcast 2 buffers are needed for the operation. */ 420 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || 421 (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) { 422 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 423 if (task->dst2 == NULL) { 424 fprintf(stderr, "Unable to alloc dst buffer\n"); 425 return -ENOMEM; 426 } 427 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 428 } 429 430 return 0; 431 } 432 433 inline static struct ap_task * 434 _get_task(struct worker_thread *worker) 435 { 436 struct ap_task *task; 437 438 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 439 task = TAILQ_FIRST(&worker->tasks_pool); 440 TAILQ_REMOVE(&worker->tasks_pool, task, link); 441 } else { 442 fprintf(stderr, "Unable to get ap_task\n"); 443 return NULL; 444 } 445 446 return task; 447 } 448 449 /* Submit one operation using the same ap task that just completed. */ 450 static void 451 _submit_single(struct worker_thread *worker, struct ap_task *task) 452 { 453 int random_num; 454 int rc = 0; 455 int flags = 0; 456 457 assert(worker); 458 459 switch (worker->workload) { 460 case SPDK_ACCEL_OPC_COPY: 461 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 462 g_xfer_size_bytes, flags, accel_done, task); 463 break; 464 case SPDK_ACCEL_OPC_FILL: 465 /* For fill use the first byte of the task->dst buffer */ 466 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 467 g_xfer_size_bytes, flags, accel_done, task); 468 break; 469 case SPDK_ACCEL_OPC_CRC32C: 470 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 471 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 472 accel_done, task); 473 break; 474 case SPDK_ACCEL_OPC_COPY_CRC32C: 475 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 476 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 477 break; 478 case SPDK_ACCEL_OPC_COMPARE: 479 random_num = rand() % 100; 480 if (random_num < g_fail_percent_goal) { 481 task->expected_status = -EILSEQ; 482 *(uint8_t *)task->dst = ~DATA_PATTERN; 483 } else { 484 task->expected_status = 0; 485 *(uint8_t *)task->dst = DATA_PATTERN; 486 } 487 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 488 g_xfer_size_bytes, accel_done, task); 489 break; 490 case SPDK_ACCEL_OPC_DUALCAST: 491 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 492 task->src, g_xfer_size_bytes, flags, accel_done, task); 493 break; 494 case SPDK_ACCEL_OPC_COMPRESS: 495 task->src_iovs = task->cur_seg->uncompressed_iovs; 496 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 497 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded, 498 task->src_iovs, 499 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 500 break; 501 case SPDK_ACCEL_OPC_DECOMPRESS: 502 task->src_iovs = task->cur_seg->compressed_iovs; 503 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 504 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 505 task->src_iovcnt, NULL, flags, accel_done, task); 506 break; 507 case SPDK_ACCEL_OPC_XOR: 508 rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count, 509 g_xfer_size_bytes, accel_done, task); 510 break; 511 default: 512 assert(false); 513 break; 514 515 } 516 517 worker->current_queue_depth++; 518 if (rc) { 519 accel_done(task, rc); 520 } 521 } 522 523 static void 524 _free_task_buffers(struct ap_task *task) 525 { 526 uint32_t i; 527 528 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS || 529 g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 530 free(task->dst_iovs); 531 } else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 532 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 533 if (task->src_iovs) { 534 for (i = 0; i < task->src_iovcnt; i++) { 535 if (task->src_iovs[i].iov_base) { 536 spdk_dma_free(task->src_iovs[i].iov_base); 537 } 538 } 539 free(task->src_iovs); 540 } 541 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 542 if (task->sources) { 543 for (i = 0; i < g_xor_src_count; i++) { 544 spdk_dma_free(task->sources[i]); 545 } 546 free(task->sources); 547 } 548 } else { 549 spdk_dma_free(task->src); 550 } 551 552 spdk_dma_free(task->dst); 553 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) { 554 spdk_dma_free(task->dst2); 555 } 556 } 557 558 static int 559 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 560 { 561 uint32_t i; 562 uint32_t ttl_len = 0; 563 uint8_t *dst = (uint8_t *)_dst; 564 565 for (i = 0; i < iovcnt; i++) { 566 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 567 return -1; 568 } 569 dst += src_src_iovs[i].iov_len; 570 ttl_len += src_src_iovs[i].iov_len; 571 } 572 573 if (ttl_len != iovcnt * g_xfer_size_bytes) { 574 return -1; 575 } 576 577 return 0; 578 } 579 580 static int _worker_stop(void *arg); 581 582 static void 583 accel_done(void *arg1, int status) 584 { 585 struct ap_task *task = arg1; 586 struct worker_thread *worker = task->worker; 587 uint32_t sw_crc32c; 588 589 assert(worker); 590 assert(worker->current_queue_depth > 0); 591 592 if (g_verify && status == 0) { 593 switch (worker->workload) { 594 case SPDK_ACCEL_OPC_COPY_CRC32C: 595 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 596 if (task->crc_dst != sw_crc32c) { 597 SPDK_NOTICELOG("CRC-32C miscompare\n"); 598 worker->xfer_failed++; 599 } 600 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 601 SPDK_NOTICELOG("Data miscompare\n"); 602 worker->xfer_failed++; 603 } 604 break; 605 case SPDK_ACCEL_OPC_CRC32C: 606 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 607 if (task->crc_dst != sw_crc32c) { 608 SPDK_NOTICELOG("CRC-32C miscompare\n"); 609 worker->xfer_failed++; 610 } 611 break; 612 case SPDK_ACCEL_OPC_COPY: 613 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 614 SPDK_NOTICELOG("Data miscompare\n"); 615 worker->xfer_failed++; 616 } 617 break; 618 case SPDK_ACCEL_OPC_DUALCAST: 619 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 620 SPDK_NOTICELOG("Data miscompare, first destination\n"); 621 worker->xfer_failed++; 622 } 623 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 624 SPDK_NOTICELOG("Data miscompare, second destination\n"); 625 worker->xfer_failed++; 626 } 627 break; 628 case SPDK_ACCEL_OPC_FILL: 629 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 630 SPDK_NOTICELOG("Data miscompare\n"); 631 worker->xfer_failed++; 632 } 633 break; 634 case SPDK_ACCEL_OPC_COMPARE: 635 break; 636 case SPDK_ACCEL_OPC_COMPRESS: 637 break; 638 case SPDK_ACCEL_OPC_DECOMPRESS: 639 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 640 SPDK_NOTICELOG("Data miscompare on decompression\n"); 641 worker->xfer_failed++; 642 } 643 break; 644 case SPDK_ACCEL_OPC_XOR: 645 if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count, 646 g_xfer_size_bytes) != 0) { 647 SPDK_ERRLOG("Failed to generate xor for verification\n"); 648 } else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) { 649 SPDK_NOTICELOG("Data miscompare\n"); 650 worker->xfer_failed++; 651 } 652 break; 653 default: 654 assert(false); 655 break; 656 } 657 } 658 659 if (worker->workload == SPDK_ACCEL_OPC_COMPRESS || 660 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 661 /* Advance the task to the next segment */ 662 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 663 if (task->cur_seg == NULL) { 664 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 665 } 666 } 667 668 if (task->expected_status == -EILSEQ) { 669 assert(status != 0); 670 worker->injected_miscompares++; 671 status = 0; 672 } else if (status) { 673 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 674 worker->xfer_failed++; 675 } 676 677 worker->current_queue_depth--; 678 679 if (!worker->is_draining && status == 0) { 680 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 681 task = _get_task(worker); 682 _submit_single(worker, task); 683 } else { 684 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 685 } 686 } 687 688 static int 689 dump_result(void) 690 { 691 uint64_t total_completed = 0; 692 uint64_t total_failed = 0; 693 uint64_t total_miscompared = 0; 694 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 695 struct worker_thread *worker = g_workers; 696 char tmp[64]; 697 698 printf("\n%-12s %20s %16s %16s %16s\n", 699 "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares"); 700 printf("------------------------------------------------------------------------------------\n"); 701 while (worker != NULL) { 702 703 uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec; 704 uint64_t bw_in_MiBps = worker->stats.num_bytes / 705 (g_time_in_sec * 1024 * 1024); 706 707 total_completed += worker->stats.executed; 708 total_failed += worker->xfer_failed; 709 total_miscompared += worker->injected_miscompares; 710 711 snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread); 712 if (xfer_per_sec) { 713 printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n", 714 tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed, 715 worker->injected_miscompares); 716 } 717 718 worker = worker->next; 719 } 720 721 total_xfer_per_sec = total_completed / g_time_in_sec; 722 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 723 (g_time_in_sec * 1024 * 1024); 724 725 printf("====================================================================================\n"); 726 printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n", 727 "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 728 729 return total_failed ? 1 : 0; 730 } 731 732 static inline void 733 _free_task_buffers_in_pool(struct worker_thread *worker) 734 { 735 struct ap_task *task; 736 737 assert(worker); 738 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 739 TAILQ_REMOVE(&worker->tasks_pool, task, link); 740 _free_task_buffers(task); 741 } 742 } 743 744 static int 745 _check_draining(void *arg) 746 { 747 struct worker_thread *worker = arg; 748 749 assert(worker); 750 751 if (worker->current_queue_depth == 0) { 752 _free_task_buffers_in_pool(worker); 753 spdk_poller_unregister(&worker->is_draining_poller); 754 unregister_worker(worker); 755 } 756 757 return SPDK_POLLER_BUSY; 758 } 759 760 static int 761 _worker_stop(void *arg) 762 { 763 struct worker_thread *worker = arg; 764 765 assert(worker); 766 767 spdk_poller_unregister(&worker->stop_poller); 768 769 /* now let the worker drain and check it's outstanding IO with a poller */ 770 worker->is_draining = true; 771 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 772 773 return SPDK_POLLER_BUSY; 774 } 775 776 static void 777 _init_thread(void *arg1) 778 { 779 struct worker_thread *worker; 780 struct ap_task *task; 781 int i, num_tasks = g_allocate_depth; 782 struct display_info *display = arg1; 783 784 worker = calloc(1, sizeof(*worker)); 785 if (worker == NULL) { 786 fprintf(stderr, "Unable to allocate worker\n"); 787 free(display); 788 return; 789 } 790 791 worker->workload = g_workload_selection; 792 worker->display.core = display->core; 793 worker->display.thread = display->thread; 794 free(display); 795 worker->core = spdk_env_get_current_core(); 796 worker->thread = spdk_get_thread(); 797 pthread_mutex_lock(&g_workers_lock); 798 g_num_workers++; 799 worker->next = g_workers; 800 g_workers = worker; 801 pthread_mutex_unlock(&g_workers_lock); 802 worker->ch = spdk_accel_get_io_channel(); 803 if (worker->ch == NULL) { 804 fprintf(stderr, "Unable to get an accel channel\n"); 805 goto error; 806 } 807 808 TAILQ_INIT(&worker->tasks_pool); 809 810 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 811 if (worker->task_base == NULL) { 812 fprintf(stderr, "Could not allocate task base.\n"); 813 goto error; 814 } 815 816 task = worker->task_base; 817 for (i = 0; i < num_tasks; i++) { 818 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 819 task->worker = worker; 820 if (_get_task_data_bufs(task)) { 821 fprintf(stderr, "Unable to get data bufs\n"); 822 goto error; 823 } 824 task++; 825 } 826 827 /* Register a poller that will stop the worker at time elapsed */ 828 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 829 g_time_in_sec * 1000000ULL); 830 831 /* Load up queue depth worth of operations. */ 832 for (i = 0; i < g_queue_depth; i++) { 833 task = _get_task(worker); 834 if (task == NULL) { 835 goto error; 836 } 837 838 _submit_single(worker, task); 839 } 840 return; 841 error: 842 843 _free_task_buffers_in_pool(worker); 844 free(worker->task_base); 845 spdk_app_stop(-1); 846 } 847 848 static void 849 accel_perf_start(void *arg1) 850 { 851 struct spdk_cpuset tmp_cpumask = {}; 852 char thread_name[32]; 853 uint32_t i; 854 int j; 855 struct spdk_thread *thread; 856 struct display_info *display; 857 858 g_tsc_rate = spdk_get_ticks_hz(); 859 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 860 861 dump_user_config(); 862 863 printf("Running for %d seconds...\n", g_time_in_sec); 864 fflush(stdout); 865 866 /* Create worker threads for each core that was specified. */ 867 SPDK_ENV_FOREACH_CORE(i) { 868 for (j = 0; j < g_threads_per_core; j++) { 869 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 870 spdk_cpuset_zero(&tmp_cpumask); 871 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 872 thread = spdk_thread_create(thread_name, &tmp_cpumask); 873 display = calloc(1, sizeof(*display)); 874 if (display == NULL) { 875 fprintf(stderr, "Unable to allocate memory\n"); 876 spdk_app_stop(-1); 877 return; 878 } 879 display->core = i; 880 display->thread = j; 881 spdk_thread_send_msg(thread, _init_thread, display); 882 } 883 } 884 } 885 886 static void 887 accel_perf_free_compress_segs(void) 888 { 889 struct ap_compress_seg *seg, *tmp; 890 891 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 892 free(seg->uncompressed_iovs); 893 free(seg->compressed_iovs); 894 spdk_dma_free(seg->compressed_data); 895 spdk_dma_free(seg->uncompressed_data); 896 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 897 free(seg); 898 } 899 } 900 901 struct accel_perf_prep_ctx { 902 FILE *file; 903 long remaining; 904 struct spdk_io_channel *ch; 905 struct ap_compress_seg *cur_seg; 906 }; 907 908 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 909 910 static void 911 accel_perf_prep_process_seg_cpl(void *ref, int status) 912 { 913 struct accel_perf_prep_ctx *ctx = ref; 914 struct ap_compress_seg *seg; 915 916 if (status != 0) { 917 fprintf(stderr, "error (%d) on initial compress completion\n", status); 918 spdk_dma_free(ctx->cur_seg->compressed_data); 919 spdk_dma_free(ctx->cur_seg->uncompressed_data); 920 free(ctx->cur_seg); 921 spdk_put_io_channel(ctx->ch); 922 fclose(ctx->file); 923 free(ctx); 924 spdk_app_stop(-status); 925 return; 926 } 927 928 seg = ctx->cur_seg; 929 930 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 931 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 932 if (seg->compressed_iovs == NULL) { 933 fprintf(stderr, "unable to allocate iovec\n"); 934 spdk_dma_free(seg->compressed_data); 935 spdk_dma_free(seg->uncompressed_data); 936 free(seg); 937 spdk_put_io_channel(ctx->ch); 938 fclose(ctx->file); 939 free(ctx); 940 spdk_app_stop(-ENOMEM); 941 return; 942 } 943 seg->compressed_iovcnt = g_chained_count; 944 945 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 946 seg->compressed_iovcnt); 947 } 948 949 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 950 ctx->remaining -= seg->uncompressed_len; 951 952 accel_perf_prep_process_seg(ctx); 953 } 954 955 static void 956 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 957 { 958 struct ap_compress_seg *seg; 959 int sz, sz_read, sz_padded; 960 void *ubuf, *cbuf; 961 struct iovec iov[1]; 962 int rc; 963 964 if (ctx->remaining == 0) { 965 spdk_put_io_channel(ctx->ch); 966 fclose(ctx->file); 967 free(ctx); 968 accel_perf_start(NULL); 969 return; 970 } 971 972 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 973 /* Add 10% pad to the compress buffer for incompressible data. Note that a real app 974 * would likely either deal with the failure of not having a large enough buffer 975 * by submitting another operation with a larger one. Or, like the vbdev module 976 * does, just accept the error and use the data uncompressed marking it as such in 977 * its own metadata so that in the future it doesn't try to decompress uncompressed 978 * data, etc. 979 */ 980 sz_padded = sz * COMP_BUF_PAD_PERCENTAGE; 981 982 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 983 if (!ubuf) { 984 fprintf(stderr, "unable to allocate uncompress buffer\n"); 985 rc = -ENOMEM; 986 goto error; 987 } 988 989 cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL); 990 if (!cbuf) { 991 fprintf(stderr, "unable to allocate compress buffer\n"); 992 rc = -ENOMEM; 993 spdk_dma_free(ubuf); 994 goto error; 995 } 996 997 seg = calloc(1, sizeof(*seg)); 998 if (!seg) { 999 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 1000 spdk_dma_free(ubuf); 1001 spdk_dma_free(cbuf); 1002 rc = -ENOMEM; 1003 goto error; 1004 } 1005 1006 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 1007 if (sz_read != sz) { 1008 fprintf(stderr, "unable to read input file\n"); 1009 free(seg); 1010 spdk_dma_free(ubuf); 1011 spdk_dma_free(cbuf); 1012 rc = -errno; 1013 goto error; 1014 } 1015 1016 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 1017 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1018 if (seg->uncompressed_iovs == NULL) { 1019 fprintf(stderr, "unable to allocate iovec\n"); 1020 free(seg); 1021 spdk_dma_free(ubuf); 1022 spdk_dma_free(cbuf); 1023 rc = -ENOMEM; 1024 goto error; 1025 } 1026 seg->uncompressed_iovcnt = g_chained_count; 1027 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 1028 } 1029 1030 seg->uncompressed_data = ubuf; 1031 seg->uncompressed_len = sz; 1032 seg->compressed_data = cbuf; 1033 seg->compressed_len = sz; 1034 seg->compressed_len_padded = sz_padded; 1035 1036 ctx->cur_seg = seg; 1037 iov[0].iov_base = seg->uncompressed_data; 1038 iov[0].iov_len = seg->uncompressed_len; 1039 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 1040 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 1041 * to hold the compressed data. This example app simply adds 10% buffer for compressed data 1042 * but real applications may want to consider a more sophisticated method. 1043 */ 1044 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1, 1045 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 1046 if (rc < 0) { 1047 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 1048 goto error; 1049 } 1050 1051 return; 1052 1053 error: 1054 spdk_put_io_channel(ctx->ch); 1055 fclose(ctx->file); 1056 free(ctx); 1057 spdk_app_stop(rc); 1058 } 1059 1060 static void 1061 accel_perf_prep(void *arg1) 1062 { 1063 struct accel_perf_prep_ctx *ctx; 1064 int rc = 0; 1065 1066 if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS && 1067 g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) { 1068 accel_perf_start(arg1); 1069 return; 1070 } 1071 1072 if (g_cd_file_in_name == NULL) { 1073 fprintf(stdout, "A filename is required.\n"); 1074 rc = -EINVAL; 1075 goto error_end; 1076 } 1077 1078 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) { 1079 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1080 rc = -ENOTSUP; 1081 goto error_end; 1082 } 1083 1084 printf("Preparing input file...\n"); 1085 1086 ctx = calloc(1, sizeof(*ctx)); 1087 if (ctx == NULL) { 1088 rc = -ENOMEM; 1089 goto error_end; 1090 } 1091 1092 ctx->file = fopen(g_cd_file_in_name, "r"); 1093 if (ctx->file == NULL) { 1094 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1095 rc = -errno; 1096 goto error_ctx; 1097 } 1098 1099 fseek(ctx->file, 0L, SEEK_END); 1100 ctx->remaining = ftell(ctx->file); 1101 fseek(ctx->file, 0L, SEEK_SET); 1102 1103 ctx->ch = spdk_accel_get_io_channel(); 1104 if (ctx->ch == NULL) { 1105 rc = -EAGAIN; 1106 goto error_file; 1107 } 1108 1109 if (g_xfer_size_bytes == 0) { 1110 /* size of 0 means "file at a time" */ 1111 g_xfer_size_bytes = ctx->remaining; 1112 } 1113 1114 accel_perf_prep_process_seg(ctx); 1115 return; 1116 1117 error_file: 1118 fclose(ctx->file); 1119 error_ctx: 1120 free(ctx); 1121 error_end: 1122 spdk_app_stop(rc); 1123 } 1124 1125 static void 1126 worker_shutdown(void *ctx) 1127 { 1128 _worker_stop(ctx); 1129 } 1130 1131 static void 1132 shutdown_cb(void) 1133 { 1134 struct worker_thread *worker; 1135 1136 pthread_mutex_lock(&g_workers_lock); 1137 if (!g_workers) { 1138 spdk_app_stop(1); 1139 goto unlock; 1140 } 1141 1142 worker = g_workers; 1143 while (worker) { 1144 spdk_thread_send_msg(worker->thread, worker_shutdown, worker); 1145 worker = worker->next; 1146 } 1147 unlock: 1148 pthread_mutex_unlock(&g_workers_lock); 1149 } 1150 1151 int 1152 main(int argc, char **argv) 1153 { 1154 struct worker_thread *worker, *tmp; 1155 int rc; 1156 1157 pthread_mutex_init(&g_workers_lock, NULL); 1158 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1159 g_opts.name = "accel_perf"; 1160 g_opts.reactor_mask = "0x1"; 1161 g_opts.shutdown_cb = shutdown_cb; 1162 1163 rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:S:x:", NULL, 1164 parse_args, usage); 1165 if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) { 1166 return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1; 1167 } 1168 1169 if ((g_workload_selection != SPDK_ACCEL_OPC_COPY) && 1170 (g_workload_selection != SPDK_ACCEL_OPC_FILL) && 1171 (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) && 1172 (g_workload_selection != SPDK_ACCEL_OPC_COPY_CRC32C) && 1173 (g_workload_selection != SPDK_ACCEL_OPC_COMPARE) && 1174 (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS) && 1175 (g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) && 1176 (g_workload_selection != SPDK_ACCEL_OPC_DUALCAST) && 1177 (g_workload_selection != SPDK_ACCEL_OPC_XOR)) { 1178 usage(); 1179 g_rc = -1; 1180 goto cleanup; 1181 } 1182 1183 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1184 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1185 usage(); 1186 g_rc = -1; 1187 goto cleanup; 1188 } 1189 1190 if (g_allocate_depth == 0) { 1191 g_allocate_depth = g_queue_depth; 1192 } 1193 1194 if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 1195 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) && 1196 g_chained_count == 0) { 1197 usage(); 1198 g_rc = -1; 1199 goto cleanup; 1200 } 1201 1202 if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) { 1203 usage(); 1204 g_rc = -1; 1205 goto cleanup; 1206 } 1207 1208 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1209 if (g_rc) { 1210 SPDK_ERRLOG("ERROR starting application\n"); 1211 } 1212 1213 pthread_mutex_destroy(&g_workers_lock); 1214 1215 worker = g_workers; 1216 while (worker) { 1217 tmp = worker->next; 1218 free(worker); 1219 worker = tmp; 1220 } 1221 cleanup: 1222 accel_perf_free_compress_segs(); 1223 spdk_app_fini(); 1224 return g_rc; 1225 } 1226