1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 #include "spdk/xor.h" 16 17 #define DATA_PATTERN 0x5a 18 #define ALIGN_4K 0x1000 19 #define COMP_BUF_PAD_PERCENTAGE 1.1L 20 21 static uint64_t g_tsc_rate; 22 static uint64_t g_tsc_end; 23 static int g_rc; 24 static int g_xfer_size_bytes = 4096; 25 static int g_queue_depth = 32; 26 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 27 * be at least as much as the queue depth. 28 */ 29 static int g_allocate_depth = 0; 30 static int g_threads_per_core = 1; 31 static int g_time_in_sec = 5; 32 static uint32_t g_crc32c_seed = 0; 33 static uint32_t g_chained_count = 1; 34 static int g_fail_percent_goal = 0; 35 static uint8_t g_fill_pattern = 255; 36 static uint32_t g_xor_src_count = 2; 37 static bool g_verify = false; 38 static const char *g_workload_type = NULL; 39 static enum spdk_accel_opcode g_workload_selection; 40 static struct worker_thread *g_workers = NULL; 41 static int g_num_workers = 0; 42 static char *g_cd_file_in_name = NULL; 43 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 44 static struct spdk_app_opts g_opts = {}; 45 46 struct ap_compress_seg { 47 void *uncompressed_data; 48 uint32_t uncompressed_len; 49 struct iovec *uncompressed_iovs; 50 uint32_t uncompressed_iovcnt; 51 52 void *compressed_data; 53 uint32_t compressed_len; 54 uint32_t compressed_len_padded; 55 struct iovec *compressed_iovs; 56 uint32_t compressed_iovcnt; 57 58 STAILQ_ENTRY(ap_compress_seg) link; 59 }; 60 61 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 62 63 struct worker_thread; 64 static void accel_done(void *ref, int status); 65 66 struct display_info { 67 int core; 68 int thread; 69 }; 70 71 struct ap_task { 72 void *src; 73 struct iovec *src_iovs; 74 uint32_t src_iovcnt; 75 void **sources; 76 struct iovec *dst_iovs; 77 uint32_t dst_iovcnt; 78 void *dst; 79 void *dst2; 80 uint32_t crc_dst; 81 uint32_t compressed_sz; 82 struct ap_compress_seg *cur_seg; 83 struct worker_thread *worker; 84 int expected_status; /* used for the compare operation */ 85 TAILQ_ENTRY(ap_task) link; 86 }; 87 88 struct worker_thread { 89 struct spdk_io_channel *ch; 90 struct spdk_accel_opcode_stats stats; 91 uint64_t xfer_failed; 92 uint64_t injected_miscompares; 93 uint64_t current_queue_depth; 94 TAILQ_HEAD(, ap_task) tasks_pool; 95 struct worker_thread *next; 96 unsigned core; 97 struct spdk_thread *thread; 98 bool is_draining; 99 struct spdk_poller *is_draining_poller; 100 struct spdk_poller *stop_poller; 101 void *task_base; 102 struct display_info display; 103 enum spdk_accel_opcode workload; 104 }; 105 106 static void 107 dump_user_config(void) 108 { 109 const char *module_name = NULL; 110 int rc; 111 112 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 113 if (rc) { 114 printf("error getting module name (%d)\n", rc); 115 } 116 117 printf("\nSPDK Configuration:\n"); 118 printf("Core mask: %s\n\n", g_opts.reactor_mask); 119 printf("Accel Perf Configuration:\n"); 120 printf("Workload Type: %s\n", g_workload_type); 121 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 122 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 123 printf("CRC-32C seed: %u\n", g_crc32c_seed); 124 } else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 125 printf("Fill pattern: 0x%x\n", g_fill_pattern); 126 } else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 127 printf("Failure inject: %u percent\n", g_fail_percent_goal); 128 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 129 printf("Source buffers: %u\n", g_xor_src_count); 130 } 131 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 132 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 133 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 134 } else { 135 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 136 } 137 printf("vector count %u\n", g_chained_count); 138 printf("Module: %s\n", module_name); 139 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 140 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 141 printf("File Name: %s\n", g_cd_file_in_name); 142 } 143 printf("Queue depth: %u\n", g_queue_depth); 144 printf("Allocate depth: %u\n", g_allocate_depth); 145 printf("# threads/core: %u\n", g_threads_per_core); 146 printf("Run time: %u seconds\n", g_time_in_sec); 147 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 148 } 149 150 static void 151 usage(void) 152 { 153 printf("accel_perf options:\n"); 154 printf("\t[-h help message]\n"); 155 printf("\t[-q queue depth per core]\n"); 156 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 157 printf("\t[-T number of threads per core\n"); 158 printf("\t[-n number of channels]\n"); 159 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 160 printf("\t[-t time in seconds]\n"); 161 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor\n"); 162 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 163 printf("\t[-S for crc32c workload, use this seed value (default 0)\n"); 164 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 165 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 166 printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n"); 167 printf("\t[-y verify result if this switch is on]\n"); 168 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 169 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 170 } 171 172 static int 173 parse_args(int ch, char *arg) 174 { 175 int argval = 0; 176 177 switch (ch) { 178 case 'a': 179 case 'C': 180 case 'f': 181 case 'T': 182 case 'o': 183 case 'P': 184 case 'q': 185 case 'S': 186 case 't': 187 case 'x': 188 argval = spdk_strtol(optarg, 10); 189 if (argval < 0) { 190 fprintf(stderr, "-%c option must be non-negative.\n", ch); 191 usage(); 192 return 1; 193 } 194 break; 195 default: 196 break; 197 }; 198 199 switch (ch) { 200 case 'a': 201 g_allocate_depth = argval; 202 break; 203 case 'C': 204 g_chained_count = argval; 205 break; 206 case 'l': 207 g_cd_file_in_name = optarg; 208 break; 209 case 'f': 210 g_fill_pattern = (uint8_t)argval; 211 break; 212 case 'T': 213 g_threads_per_core = argval; 214 break; 215 case 'o': 216 g_xfer_size_bytes = argval; 217 break; 218 case 'P': 219 g_fail_percent_goal = argval; 220 break; 221 case 'q': 222 g_queue_depth = argval; 223 break; 224 case 'S': 225 g_crc32c_seed = argval; 226 break; 227 case 't': 228 g_time_in_sec = argval; 229 break; 230 case 'x': 231 g_xor_src_count = argval; 232 break; 233 case 'y': 234 g_verify = true; 235 break; 236 case 'w': 237 g_workload_type = optarg; 238 if (!strcmp(g_workload_type, "copy")) { 239 g_workload_selection = SPDK_ACCEL_OPC_COPY; 240 } else if (!strcmp(g_workload_type, "fill")) { 241 g_workload_selection = SPDK_ACCEL_OPC_FILL; 242 } else if (!strcmp(g_workload_type, "crc32c")) { 243 g_workload_selection = SPDK_ACCEL_OPC_CRC32C; 244 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 245 g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C; 246 } else if (!strcmp(g_workload_type, "compare")) { 247 g_workload_selection = SPDK_ACCEL_OPC_COMPARE; 248 } else if (!strcmp(g_workload_type, "dualcast")) { 249 g_workload_selection = SPDK_ACCEL_OPC_DUALCAST; 250 } else if (!strcmp(g_workload_type, "compress")) { 251 g_workload_selection = SPDK_ACCEL_OPC_COMPRESS; 252 } else if (!strcmp(g_workload_type, "decompress")) { 253 g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS; 254 } else if (!strcmp(g_workload_type, "xor")) { 255 g_workload_selection = SPDK_ACCEL_OPC_XOR; 256 } else { 257 usage(); 258 return 1; 259 } 260 break; 261 default: 262 usage(); 263 return 1; 264 } 265 266 return 0; 267 } 268 269 static int dump_result(void); 270 static void 271 unregister_worker(void *arg1) 272 { 273 struct worker_thread *worker = arg1; 274 275 spdk_accel_get_opcode_stats(worker->ch, worker->workload, 276 &worker->stats, sizeof(worker->stats)); 277 free(worker->task_base); 278 spdk_put_io_channel(worker->ch); 279 spdk_thread_exit(spdk_get_thread()); 280 pthread_mutex_lock(&g_workers_lock); 281 assert(g_num_workers >= 1); 282 if (--g_num_workers == 0) { 283 pthread_mutex_unlock(&g_workers_lock); 284 g_rc = dump_result(); 285 spdk_app_stop(0); 286 } else { 287 pthread_mutex_unlock(&g_workers_lock); 288 } 289 } 290 291 static void 292 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 293 { 294 uint64_t ele_size; 295 uint8_t *data; 296 uint32_t i; 297 298 ele_size = spdk_divide_round_up(sz, iovcnt); 299 300 data = buf; 301 for (i = 0; i < iovcnt; i++) { 302 ele_size = spdk_min(ele_size, sz); 303 assert(ele_size > 0); 304 305 iovs[i].iov_base = data; 306 iovs[i].iov_len = ele_size; 307 308 data += ele_size; 309 sz -= ele_size; 310 } 311 assert(sz == 0); 312 } 313 314 static int 315 _get_task_data_bufs(struct ap_task *task) 316 { 317 uint32_t align = 0; 318 uint32_t i = 0; 319 int dst_buff_len = g_xfer_size_bytes; 320 321 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 322 * we do this for all modules to keep it simple. 323 */ 324 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) { 325 align = ALIGN_4K; 326 } 327 328 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 329 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 330 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 331 332 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 333 dst_buff_len = task->cur_seg->compressed_len_padded; 334 } 335 336 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 337 if (task->dst == NULL) { 338 fprintf(stderr, "Unable to alloc dst buffer\n"); 339 return -ENOMEM; 340 } 341 342 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 343 if (!task->dst_iovs) { 344 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 345 return -ENOMEM; 346 } 347 task->dst_iovcnt = g_chained_count; 348 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 349 350 return 0; 351 } 352 353 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 354 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 355 assert(g_chained_count > 0); 356 task->src_iovcnt = g_chained_count; 357 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 358 if (!task->src_iovs) { 359 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 360 return -ENOMEM; 361 } 362 363 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 364 dst_buff_len = g_xfer_size_bytes * g_chained_count; 365 } 366 367 for (i = 0; i < task->src_iovcnt; i++) { 368 task->src_iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 369 if (task->src_iovs[i].iov_base == NULL) { 370 return -ENOMEM; 371 } 372 memset(task->src_iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 373 task->src_iovs[i].iov_len = g_xfer_size_bytes; 374 } 375 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 376 assert(g_xor_src_count > 1); 377 task->sources = calloc(g_xor_src_count, sizeof(*task->sources)); 378 if (!task->sources) { 379 return -ENOMEM; 380 } 381 382 for (i = 0; i < g_xor_src_count; i++) { 383 task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 384 if (!task->sources[i]) { 385 return -ENOMEM; 386 } 387 memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes); 388 } 389 } else { 390 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 391 if (task->src == NULL) { 392 fprintf(stderr, "Unable to alloc src buffer\n"); 393 return -ENOMEM; 394 } 395 396 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 397 if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 398 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 399 } else { 400 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 401 } 402 } 403 404 if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) { 405 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 406 if (task->dst == NULL) { 407 fprintf(stderr, "Unable to alloc dst buffer\n"); 408 return -ENOMEM; 409 } 410 411 /* For compare we want the buffers to match, otherwise not. */ 412 if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) { 413 memset(task->dst, DATA_PATTERN, dst_buff_len); 414 } else { 415 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 416 } 417 } 418 419 /* For dualcast 2 buffers are needed for the operation. */ 420 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || 421 (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) { 422 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 423 if (task->dst2 == NULL) { 424 fprintf(stderr, "Unable to alloc dst buffer\n"); 425 return -ENOMEM; 426 } 427 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 428 } 429 430 return 0; 431 } 432 433 inline static struct ap_task * 434 _get_task(struct worker_thread *worker) 435 { 436 struct ap_task *task; 437 438 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 439 task = TAILQ_FIRST(&worker->tasks_pool); 440 TAILQ_REMOVE(&worker->tasks_pool, task, link); 441 } else { 442 fprintf(stderr, "Unable to get ap_task\n"); 443 return NULL; 444 } 445 446 return task; 447 } 448 449 /* Submit one operation using the same ap task that just completed. */ 450 static void 451 _submit_single(struct worker_thread *worker, struct ap_task *task) 452 { 453 int random_num; 454 int rc = 0; 455 int flags = 0; 456 457 assert(worker); 458 459 switch (worker->workload) { 460 case SPDK_ACCEL_OPC_COPY: 461 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 462 g_xfer_size_bytes, flags, accel_done, task); 463 break; 464 case SPDK_ACCEL_OPC_FILL: 465 /* For fill use the first byte of the task->dst buffer */ 466 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 467 g_xfer_size_bytes, flags, accel_done, task); 468 break; 469 case SPDK_ACCEL_OPC_CRC32C: 470 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 471 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 472 accel_done, task); 473 break; 474 case SPDK_ACCEL_OPC_COPY_CRC32C: 475 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 476 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 477 break; 478 case SPDK_ACCEL_OPC_COMPARE: 479 random_num = rand() % 100; 480 if (random_num < g_fail_percent_goal) { 481 task->expected_status = -EILSEQ; 482 *(uint8_t *)task->dst = ~DATA_PATTERN; 483 } else { 484 task->expected_status = 0; 485 *(uint8_t *)task->dst = DATA_PATTERN; 486 } 487 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 488 g_xfer_size_bytes, accel_done, task); 489 break; 490 case SPDK_ACCEL_OPC_DUALCAST: 491 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 492 task->src, g_xfer_size_bytes, flags, accel_done, task); 493 break; 494 case SPDK_ACCEL_OPC_COMPRESS: 495 task->src_iovs = task->cur_seg->uncompressed_iovs; 496 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 497 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded, 498 task->src_iovs, 499 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 500 break; 501 case SPDK_ACCEL_OPC_DECOMPRESS: 502 task->src_iovs = task->cur_seg->compressed_iovs; 503 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 504 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 505 task->src_iovcnt, NULL, flags, accel_done, task); 506 break; 507 case SPDK_ACCEL_OPC_XOR: 508 rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count, 509 g_xfer_size_bytes, accel_done, task); 510 break; 511 default: 512 assert(false); 513 break; 514 515 } 516 517 worker->current_queue_depth++; 518 if (rc) { 519 accel_done(task, rc); 520 } 521 } 522 523 static void 524 _free_task_buffers(struct ap_task *task) 525 { 526 uint32_t i; 527 528 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS || 529 g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 530 free(task->dst_iovs); 531 } else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 532 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 533 if (task->src_iovs) { 534 for (i = 0; i < task->src_iovcnt; i++) { 535 if (task->src_iovs[i].iov_base) { 536 spdk_dma_free(task->src_iovs[i].iov_base); 537 } 538 } 539 free(task->src_iovs); 540 } 541 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 542 if (task->sources) { 543 for (i = 0; i < g_xor_src_count; i++) { 544 spdk_dma_free(task->sources[i]); 545 } 546 free(task->sources); 547 } 548 } else { 549 spdk_dma_free(task->src); 550 } 551 552 spdk_dma_free(task->dst); 553 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) { 554 spdk_dma_free(task->dst2); 555 } 556 } 557 558 static int 559 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 560 { 561 uint32_t i; 562 uint32_t ttl_len = 0; 563 uint8_t *dst = (uint8_t *)_dst; 564 565 for (i = 0; i < iovcnt; i++) { 566 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 567 return -1; 568 } 569 dst += src_src_iovs[i].iov_len; 570 ttl_len += src_src_iovs[i].iov_len; 571 } 572 573 if (ttl_len != iovcnt * g_xfer_size_bytes) { 574 return -1; 575 } 576 577 return 0; 578 } 579 580 static int _worker_stop(void *arg); 581 582 static void 583 accel_done(void *arg1, int status) 584 { 585 struct ap_task *task = arg1; 586 struct worker_thread *worker = task->worker; 587 uint32_t sw_crc32c; 588 589 assert(worker); 590 assert(worker->current_queue_depth > 0); 591 592 if (g_verify && status == 0) { 593 switch (worker->workload) { 594 case SPDK_ACCEL_OPC_COPY_CRC32C: 595 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 596 if (task->crc_dst != sw_crc32c) { 597 SPDK_NOTICELOG("CRC-32C miscompare\n"); 598 worker->xfer_failed++; 599 } 600 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 601 SPDK_NOTICELOG("Data miscompare\n"); 602 worker->xfer_failed++; 603 } 604 break; 605 case SPDK_ACCEL_OPC_CRC32C: 606 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 607 if (task->crc_dst != sw_crc32c) { 608 SPDK_NOTICELOG("CRC-32C miscompare\n"); 609 worker->xfer_failed++; 610 } 611 break; 612 case SPDK_ACCEL_OPC_COPY: 613 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 614 SPDK_NOTICELOG("Data miscompare\n"); 615 worker->xfer_failed++; 616 } 617 break; 618 case SPDK_ACCEL_OPC_DUALCAST: 619 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 620 SPDK_NOTICELOG("Data miscompare, first destination\n"); 621 worker->xfer_failed++; 622 } 623 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 624 SPDK_NOTICELOG("Data miscompare, second destination\n"); 625 worker->xfer_failed++; 626 } 627 break; 628 case SPDK_ACCEL_OPC_FILL: 629 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 630 SPDK_NOTICELOG("Data miscompare\n"); 631 worker->xfer_failed++; 632 } 633 break; 634 case SPDK_ACCEL_OPC_COMPARE: 635 break; 636 case SPDK_ACCEL_OPC_COMPRESS: 637 break; 638 case SPDK_ACCEL_OPC_DECOMPRESS: 639 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 640 SPDK_NOTICELOG("Data miscompare on decompression\n"); 641 worker->xfer_failed++; 642 } 643 break; 644 case SPDK_ACCEL_OPC_XOR: 645 if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count, 646 g_xfer_size_bytes) != 0) { 647 SPDK_ERRLOG("Failed to generate xor for verification\n"); 648 } else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) { 649 SPDK_NOTICELOG("Data miscompare\n"); 650 worker->xfer_failed++; 651 } 652 break; 653 default: 654 assert(false); 655 break; 656 } 657 } 658 659 if (worker->workload == SPDK_ACCEL_OPC_COMPRESS || 660 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 661 /* Advance the task to the next segment */ 662 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 663 if (task->cur_seg == NULL) { 664 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 665 } 666 } 667 668 if (task->expected_status == -EILSEQ) { 669 assert(status != 0); 670 worker->injected_miscompares++; 671 status = 0; 672 } else if (status) { 673 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 674 worker->xfer_failed++; 675 } 676 677 worker->current_queue_depth--; 678 679 if (!worker->is_draining && status == 0) { 680 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 681 task = _get_task(worker); 682 _submit_single(worker, task); 683 } else { 684 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 685 } 686 } 687 688 static int 689 dump_result(void) 690 { 691 uint64_t total_completed = 0; 692 uint64_t total_failed = 0; 693 uint64_t total_miscompared = 0; 694 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 695 struct worker_thread *worker = g_workers; 696 697 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 698 printf("------------------------------------------------------------------------\n"); 699 while (worker != NULL) { 700 701 uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec; 702 uint64_t bw_in_MiBps = worker->stats.num_bytes / 703 (g_time_in_sec * 1024 * 1024); 704 705 total_completed += worker->stats.executed; 706 total_failed += worker->xfer_failed; 707 total_miscompared += worker->injected_miscompares; 708 709 if (xfer_per_sec) { 710 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 711 worker->display.core, worker->display.thread, xfer_per_sec, 712 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 713 } 714 715 worker = worker->next; 716 } 717 718 total_xfer_per_sec = total_completed / g_time_in_sec; 719 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 720 (g_time_in_sec * 1024 * 1024); 721 722 printf("=========================================================================\n"); 723 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 724 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 725 726 return total_failed ? 1 : 0; 727 } 728 729 static inline void 730 _free_task_buffers_in_pool(struct worker_thread *worker) 731 { 732 struct ap_task *task; 733 734 assert(worker); 735 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 736 TAILQ_REMOVE(&worker->tasks_pool, task, link); 737 _free_task_buffers(task); 738 } 739 } 740 741 static int 742 _check_draining(void *arg) 743 { 744 struct worker_thread *worker = arg; 745 746 assert(worker); 747 748 if (worker->current_queue_depth == 0) { 749 _free_task_buffers_in_pool(worker); 750 spdk_poller_unregister(&worker->is_draining_poller); 751 unregister_worker(worker); 752 } 753 754 return SPDK_POLLER_BUSY; 755 } 756 757 static int 758 _worker_stop(void *arg) 759 { 760 struct worker_thread *worker = arg; 761 762 assert(worker); 763 764 spdk_poller_unregister(&worker->stop_poller); 765 766 /* now let the worker drain and check it's outstanding IO with a poller */ 767 worker->is_draining = true; 768 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 769 770 return SPDK_POLLER_BUSY; 771 } 772 773 static void 774 _init_thread(void *arg1) 775 { 776 struct worker_thread *worker; 777 struct ap_task *task; 778 int i, num_tasks = g_allocate_depth; 779 struct display_info *display = arg1; 780 781 worker = calloc(1, sizeof(*worker)); 782 if (worker == NULL) { 783 fprintf(stderr, "Unable to allocate worker\n"); 784 free(display); 785 return; 786 } 787 788 worker->workload = g_workload_selection; 789 worker->display.core = display->core; 790 worker->display.thread = display->thread; 791 free(display); 792 worker->core = spdk_env_get_current_core(); 793 worker->thread = spdk_get_thread(); 794 pthread_mutex_lock(&g_workers_lock); 795 g_num_workers++; 796 worker->next = g_workers; 797 g_workers = worker; 798 pthread_mutex_unlock(&g_workers_lock); 799 worker->ch = spdk_accel_get_io_channel(); 800 if (worker->ch == NULL) { 801 fprintf(stderr, "Unable to get an accel channel\n"); 802 goto error; 803 } 804 805 TAILQ_INIT(&worker->tasks_pool); 806 807 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 808 if (worker->task_base == NULL) { 809 fprintf(stderr, "Could not allocate task base.\n"); 810 goto error; 811 } 812 813 task = worker->task_base; 814 for (i = 0; i < num_tasks; i++) { 815 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 816 task->worker = worker; 817 if (_get_task_data_bufs(task)) { 818 fprintf(stderr, "Unable to get data bufs\n"); 819 goto error; 820 } 821 task++; 822 } 823 824 /* Register a poller that will stop the worker at time elapsed */ 825 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 826 g_time_in_sec * 1000000ULL); 827 828 /* Load up queue depth worth of operations. */ 829 for (i = 0; i < g_queue_depth; i++) { 830 task = _get_task(worker); 831 if (task == NULL) { 832 goto error; 833 } 834 835 _submit_single(worker, task); 836 } 837 return; 838 error: 839 840 _free_task_buffers_in_pool(worker); 841 free(worker->task_base); 842 spdk_app_stop(-1); 843 } 844 845 static void 846 accel_perf_start(void *arg1) 847 { 848 struct spdk_cpuset tmp_cpumask = {}; 849 char thread_name[32]; 850 uint32_t i; 851 int j; 852 struct spdk_thread *thread; 853 struct display_info *display; 854 855 g_tsc_rate = spdk_get_ticks_hz(); 856 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 857 858 dump_user_config(); 859 860 printf("Running for %d seconds...\n", g_time_in_sec); 861 fflush(stdout); 862 863 /* Create worker threads for each core that was specified. */ 864 SPDK_ENV_FOREACH_CORE(i) { 865 for (j = 0; j < g_threads_per_core; j++) { 866 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 867 spdk_cpuset_zero(&tmp_cpumask); 868 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 869 thread = spdk_thread_create(thread_name, &tmp_cpumask); 870 display = calloc(1, sizeof(*display)); 871 if (display == NULL) { 872 fprintf(stderr, "Unable to allocate memory\n"); 873 spdk_app_stop(-1); 874 return; 875 } 876 display->core = i; 877 display->thread = j; 878 spdk_thread_send_msg(thread, _init_thread, display); 879 } 880 } 881 } 882 883 static void 884 accel_perf_free_compress_segs(void) 885 { 886 struct ap_compress_seg *seg, *tmp; 887 888 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 889 free(seg->uncompressed_iovs); 890 free(seg->compressed_iovs); 891 spdk_dma_free(seg->compressed_data); 892 spdk_dma_free(seg->uncompressed_data); 893 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 894 free(seg); 895 } 896 } 897 898 struct accel_perf_prep_ctx { 899 FILE *file; 900 long remaining; 901 struct spdk_io_channel *ch; 902 struct ap_compress_seg *cur_seg; 903 }; 904 905 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 906 907 static void 908 accel_perf_prep_process_seg_cpl(void *ref, int status) 909 { 910 struct accel_perf_prep_ctx *ctx = ref; 911 struct ap_compress_seg *seg; 912 913 if (status != 0) { 914 fprintf(stderr, "error (%d) on initial compress completion\n", status); 915 spdk_dma_free(ctx->cur_seg->compressed_data); 916 spdk_dma_free(ctx->cur_seg->uncompressed_data); 917 free(ctx->cur_seg); 918 spdk_put_io_channel(ctx->ch); 919 fclose(ctx->file); 920 free(ctx); 921 spdk_app_stop(-status); 922 return; 923 } 924 925 seg = ctx->cur_seg; 926 927 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 928 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 929 if (seg->compressed_iovs == NULL) { 930 fprintf(stderr, "unable to allocate iovec\n"); 931 spdk_dma_free(seg->compressed_data); 932 spdk_dma_free(seg->uncompressed_data); 933 free(seg); 934 spdk_put_io_channel(ctx->ch); 935 fclose(ctx->file); 936 free(ctx); 937 spdk_app_stop(-ENOMEM); 938 return; 939 } 940 seg->compressed_iovcnt = g_chained_count; 941 942 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 943 seg->compressed_iovcnt); 944 } 945 946 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 947 ctx->remaining -= seg->uncompressed_len; 948 949 accel_perf_prep_process_seg(ctx); 950 } 951 952 static void 953 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 954 { 955 struct ap_compress_seg *seg; 956 int sz, sz_read, sz_padded; 957 void *ubuf, *cbuf; 958 struct iovec iov[1]; 959 int rc; 960 961 if (ctx->remaining == 0) { 962 spdk_put_io_channel(ctx->ch); 963 fclose(ctx->file); 964 free(ctx); 965 accel_perf_start(NULL); 966 return; 967 } 968 969 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 970 /* Add 10% pad to the compress buffer for incompressible data. Note that a real app 971 * would likely either deal with the failure of not having a large enough buffer 972 * by submitting another operation with a larger one. Or, like the vbdev module 973 * does, just accept the error and use the data uncompressed marking it as such in 974 * its own metadata so that in the future it doesn't try to decompress uncompressed 975 * data, etc. 976 */ 977 sz_padded = sz * COMP_BUF_PAD_PERCENTAGE; 978 979 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 980 if (!ubuf) { 981 fprintf(stderr, "unable to allocate uncompress buffer\n"); 982 rc = -ENOMEM; 983 goto error; 984 } 985 986 cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL); 987 if (!cbuf) { 988 fprintf(stderr, "unable to allocate compress buffer\n"); 989 rc = -ENOMEM; 990 spdk_dma_free(ubuf); 991 goto error; 992 } 993 994 seg = calloc(1, sizeof(*seg)); 995 if (!seg) { 996 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 997 spdk_dma_free(ubuf); 998 spdk_dma_free(cbuf); 999 rc = -ENOMEM; 1000 goto error; 1001 } 1002 1003 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 1004 if (sz_read != sz) { 1005 fprintf(stderr, "unable to read input file\n"); 1006 free(seg); 1007 spdk_dma_free(ubuf); 1008 spdk_dma_free(cbuf); 1009 rc = -errno; 1010 goto error; 1011 } 1012 1013 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 1014 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1015 if (seg->uncompressed_iovs == NULL) { 1016 fprintf(stderr, "unable to allocate iovec\n"); 1017 free(seg); 1018 spdk_dma_free(ubuf); 1019 spdk_dma_free(cbuf); 1020 rc = -ENOMEM; 1021 goto error; 1022 } 1023 seg->uncompressed_iovcnt = g_chained_count; 1024 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 1025 } 1026 1027 seg->uncompressed_data = ubuf; 1028 seg->uncompressed_len = sz; 1029 seg->compressed_data = cbuf; 1030 seg->compressed_len = sz; 1031 seg->compressed_len_padded = sz_padded; 1032 1033 ctx->cur_seg = seg; 1034 iov[0].iov_base = seg->uncompressed_data; 1035 iov[0].iov_len = seg->uncompressed_len; 1036 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 1037 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 1038 * to hold the compressed data. This example app simply adds 10% buffer for compressed data 1039 * but real applications may want to consider a more sophisticated method. 1040 */ 1041 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1, 1042 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 1043 if (rc < 0) { 1044 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 1045 goto error; 1046 } 1047 1048 return; 1049 1050 error: 1051 spdk_put_io_channel(ctx->ch); 1052 fclose(ctx->file); 1053 free(ctx); 1054 spdk_app_stop(rc); 1055 } 1056 1057 static void 1058 accel_perf_prep(void *arg1) 1059 { 1060 struct accel_perf_prep_ctx *ctx; 1061 int rc = 0; 1062 1063 if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS && 1064 g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) { 1065 accel_perf_start(arg1); 1066 return; 1067 } 1068 1069 if (g_cd_file_in_name == NULL) { 1070 fprintf(stdout, "A filename is required.\n"); 1071 rc = -EINVAL; 1072 goto error_end; 1073 } 1074 1075 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) { 1076 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1077 rc = -ENOTSUP; 1078 goto error_end; 1079 } 1080 1081 printf("Preparing input file...\n"); 1082 1083 ctx = calloc(1, sizeof(*ctx)); 1084 if (ctx == NULL) { 1085 rc = -ENOMEM; 1086 goto error_end; 1087 } 1088 1089 ctx->file = fopen(g_cd_file_in_name, "r"); 1090 if (ctx->file == NULL) { 1091 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1092 rc = -errno; 1093 goto error_ctx; 1094 } 1095 1096 fseek(ctx->file, 0L, SEEK_END); 1097 ctx->remaining = ftell(ctx->file); 1098 fseek(ctx->file, 0L, SEEK_SET); 1099 1100 ctx->ch = spdk_accel_get_io_channel(); 1101 if (ctx->ch == NULL) { 1102 rc = -EAGAIN; 1103 goto error_file; 1104 } 1105 1106 if (g_xfer_size_bytes == 0) { 1107 /* size of 0 means "file at a time" */ 1108 g_xfer_size_bytes = ctx->remaining; 1109 } 1110 1111 accel_perf_prep_process_seg(ctx); 1112 return; 1113 1114 error_file: 1115 fclose(ctx->file); 1116 error_ctx: 1117 free(ctx); 1118 error_end: 1119 spdk_app_stop(rc); 1120 } 1121 1122 static void 1123 worker_shutdown(void *ctx) 1124 { 1125 _worker_stop(ctx); 1126 } 1127 1128 static void 1129 shutdown_cb(void) 1130 { 1131 struct worker_thread *worker; 1132 1133 pthread_mutex_lock(&g_workers_lock); 1134 if (!g_workers) { 1135 spdk_app_stop(1); 1136 goto unlock; 1137 } 1138 1139 worker = g_workers; 1140 while (worker) { 1141 spdk_thread_send_msg(worker->thread, worker_shutdown, worker); 1142 worker = worker->next; 1143 } 1144 unlock: 1145 pthread_mutex_unlock(&g_workers_lock); 1146 } 1147 1148 int 1149 main(int argc, char **argv) 1150 { 1151 struct worker_thread *worker, *tmp; 1152 int rc; 1153 1154 pthread_mutex_init(&g_workers_lock, NULL); 1155 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1156 g_opts.name = "accel_perf"; 1157 g_opts.reactor_mask = "0x1"; 1158 g_opts.shutdown_cb = shutdown_cb; 1159 1160 rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:P:f:T:l:S:x:", NULL, 1161 parse_args, usage); 1162 if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) { 1163 return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1; 1164 } 1165 1166 if ((g_workload_selection != SPDK_ACCEL_OPC_COPY) && 1167 (g_workload_selection != SPDK_ACCEL_OPC_FILL) && 1168 (g_workload_selection != SPDK_ACCEL_OPC_CRC32C) && 1169 (g_workload_selection != SPDK_ACCEL_OPC_COPY_CRC32C) && 1170 (g_workload_selection != SPDK_ACCEL_OPC_COMPARE) && 1171 (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS) && 1172 (g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) && 1173 (g_workload_selection != SPDK_ACCEL_OPC_DUALCAST) && 1174 (g_workload_selection != SPDK_ACCEL_OPC_XOR)) { 1175 usage(); 1176 g_rc = -1; 1177 goto cleanup; 1178 } 1179 1180 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1181 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1182 usage(); 1183 g_rc = -1; 1184 goto cleanup; 1185 } 1186 1187 if (g_allocate_depth == 0) { 1188 g_allocate_depth = g_queue_depth; 1189 } 1190 1191 if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 1192 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) && 1193 g_chained_count == 0) { 1194 usage(); 1195 g_rc = -1; 1196 goto cleanup; 1197 } 1198 1199 if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) { 1200 usage(); 1201 g_rc = -1; 1202 goto cleanup; 1203 } 1204 1205 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1206 if (g_rc) { 1207 SPDK_ERRLOG("ERROR starting application\n"); 1208 } 1209 1210 pthread_mutex_destroy(&g_workers_lock); 1211 1212 worker = g_workers; 1213 while (worker) { 1214 tmp = worker->next; 1215 free(worker); 1216 worker = tmp; 1217 } 1218 cleanup: 1219 accel_perf_free_compress_segs(); 1220 spdk_app_fini(); 1221 return g_rc; 1222 } 1223