1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2020 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 #include "spdk/xor.h" 16 #include "spdk/dif.h" 17 18 #define DATA_PATTERN 0x5a 19 #define ALIGN_4K 0x1000 20 #define COMP_BUF_PAD_PERCENTAGE 1.1L 21 22 static uint64_t g_tsc_rate; 23 static uint64_t g_tsc_end; 24 static int g_rc; 25 static int g_xfer_size_bytes = 4096; 26 static int g_block_size_bytes = 512; 27 static int g_md_size_bytes = 8; 28 static int g_queue_depth = 32; 29 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 30 * be at least as much as the queue depth. 31 */ 32 static int g_allocate_depth = 0; 33 static int g_threads_per_core = 1; 34 static int g_time_in_sec = 5; 35 static uint32_t g_crc32c_seed = 0; 36 static uint32_t g_chained_count = 1; 37 static int g_fail_percent_goal = 0; 38 static uint8_t g_fill_pattern = 255; 39 static uint32_t g_xor_src_count = 2; 40 static bool g_verify = false; 41 static const char *g_workload_type = NULL; 42 static enum spdk_accel_opcode g_workload_selection = SPDK_ACCEL_OPC_LAST; 43 static const char *g_module_name = NULL; 44 static struct worker_thread *g_workers = NULL; 45 static int g_num_workers = 0; 46 static char *g_cd_file_in_name = NULL; 47 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 48 static struct spdk_app_opts g_opts = {}; 49 50 struct ap_compress_seg { 51 void *uncompressed_data; 52 uint32_t uncompressed_len; 53 struct iovec *uncompressed_iovs; 54 uint32_t uncompressed_iovcnt; 55 56 void *compressed_data; 57 uint32_t compressed_len; 58 uint32_t compressed_len_padded; 59 struct iovec *compressed_iovs; 60 uint32_t compressed_iovcnt; 61 62 STAILQ_ENTRY(ap_compress_seg) link; 63 }; 64 65 static STAILQ_HEAD(, ap_compress_seg) g_compress_segs = STAILQ_HEAD_INITIALIZER(g_compress_segs); 66 67 struct worker_thread; 68 static void accel_done(void *ref, int status); 69 70 struct display_info { 71 int core; 72 int thread; 73 }; 74 75 struct ap_task { 76 void *src; 77 struct iovec *src_iovs; 78 uint32_t src_iovcnt; 79 void **sources; 80 struct iovec *dst_iovs; 81 uint32_t dst_iovcnt; 82 void *dst; 83 void *dst2; 84 uint32_t *crc_dst; 85 uint32_t compressed_sz; 86 struct ap_compress_seg *cur_seg; 87 struct worker_thread *worker; 88 int expected_status; /* used for the compare operation */ 89 uint32_t num_blocks; /* used for the DIF related operations */ 90 struct spdk_dif_ctx dif_ctx; 91 struct spdk_dif_error dif_err; 92 TAILQ_ENTRY(ap_task) link; 93 }; 94 95 struct worker_thread { 96 struct spdk_io_channel *ch; 97 struct spdk_accel_opcode_stats stats; 98 uint64_t xfer_failed; 99 uint64_t injected_miscompares; 100 uint64_t current_queue_depth; 101 TAILQ_HEAD(, ap_task) tasks_pool; 102 struct worker_thread *next; 103 unsigned core; 104 struct spdk_thread *thread; 105 bool is_draining; 106 struct spdk_poller *is_draining_poller; 107 struct spdk_poller *stop_poller; 108 void *task_base; 109 struct display_info display; 110 enum spdk_accel_opcode workload; 111 }; 112 113 static void 114 dump_user_config(void) 115 { 116 const char *module_name = NULL; 117 int rc; 118 119 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 120 if (rc) { 121 printf("error getting module name (%d)\n", rc); 122 } 123 124 printf("\nSPDK Configuration:\n"); 125 printf("Core mask: %s\n\n", g_opts.reactor_mask); 126 printf("Accel Perf Configuration:\n"); 127 printf("Workload Type: %s\n", g_workload_type); 128 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 129 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 130 printf("CRC-32C seed: %u\n", g_crc32c_seed); 131 } else if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 132 printf("Fill pattern: 0x%x\n", g_fill_pattern); 133 } else if ((g_workload_selection == SPDK_ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 134 printf("Failure inject: %u percent\n", g_fail_percent_goal); 135 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 136 printf("Source buffers: %u\n", g_xor_src_count); 137 } 138 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C || 139 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY || 140 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE || 141 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) { 142 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 143 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_chained_count); 144 } else { 145 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 146 } 147 if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE || 148 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) { 149 printf("Block size: %u bytes\n", g_block_size_bytes); 150 printf("Metadata size: %u bytes\n", g_md_size_bytes); 151 } 152 printf("Vector count %u\n", g_chained_count); 153 printf("Module: %s\n", module_name); 154 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 155 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 156 printf("File Name: %s\n", g_cd_file_in_name); 157 } 158 printf("Queue depth: %u\n", g_queue_depth); 159 printf("Allocate depth: %u\n", g_allocate_depth); 160 printf("# threads/core: %u\n", g_threads_per_core); 161 printf("Run time: %u seconds\n", g_time_in_sec); 162 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 163 } 164 165 static void 166 usage(void) 167 { 168 printf("accel_perf options:\n"); 169 printf("\t[-h help message]\n"); 170 printf("\t[-q queue depth per core]\n"); 171 printf("\t[-C for supported workloads, use this value to configure the io vector size to test (default 1)\n"); 172 printf("\t[-T number of threads per core\n"); 173 printf("\t[-o transfer size in bytes (default: 4KiB. For compress/decompress, 0 means the input file size)]\n"); 174 printf("\t[-t time in seconds]\n"); 175 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, decompress, dualcast, xor,\n"); 176 printf("\t[ dif_verify, , dif_generate, dif_generate_copy\n"); 177 printf("\t[-M assign module to the operation, not compatible with accel_assign_opc RPC\n"); 178 printf("\t[-l for compress/decompress workloads, name of uncompressed input file\n"); 179 printf("\t[-S for crc32c workload, use this seed value (default 0)\n"); 180 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 181 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 182 printf("\t[-x for xor workload, use this number of source buffers (default, minimum: 2)]\n"); 183 printf("\t[-y verify result if this switch is on]\n"); 184 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 185 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 186 } 187 188 static int 189 parse_args(int ch, char *arg) 190 { 191 int argval = 0; 192 193 switch (ch) { 194 case 'a': 195 case 'C': 196 case 'f': 197 case 'T': 198 case 'o': 199 case 'P': 200 case 'q': 201 case 'S': 202 case 't': 203 case 'x': 204 argval = spdk_strtol(optarg, 10); 205 if (argval < 0) { 206 fprintf(stderr, "-%c option must be non-negative.\n", ch); 207 usage(); 208 return 1; 209 } 210 break; 211 default: 212 break; 213 }; 214 215 switch (ch) { 216 case 'a': 217 g_allocate_depth = argval; 218 break; 219 case 'C': 220 g_chained_count = argval; 221 break; 222 case 'l': 223 g_cd_file_in_name = optarg; 224 break; 225 case 'f': 226 g_fill_pattern = (uint8_t)argval; 227 break; 228 case 'T': 229 g_threads_per_core = argval; 230 break; 231 case 'o': 232 g_xfer_size_bytes = argval; 233 break; 234 case 'P': 235 g_fail_percent_goal = argval; 236 break; 237 case 'q': 238 g_queue_depth = argval; 239 break; 240 case 'S': 241 g_crc32c_seed = argval; 242 break; 243 case 't': 244 g_time_in_sec = argval; 245 break; 246 case 'x': 247 g_xor_src_count = argval; 248 break; 249 case 'y': 250 g_verify = true; 251 break; 252 case 'w': 253 g_workload_type = optarg; 254 if (!strcmp(g_workload_type, "copy")) { 255 g_workload_selection = SPDK_ACCEL_OPC_COPY; 256 } else if (!strcmp(g_workload_type, "fill")) { 257 g_workload_selection = SPDK_ACCEL_OPC_FILL; 258 } else if (!strcmp(g_workload_type, "crc32c")) { 259 g_workload_selection = SPDK_ACCEL_OPC_CRC32C; 260 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 261 g_workload_selection = SPDK_ACCEL_OPC_COPY_CRC32C; 262 } else if (!strcmp(g_workload_type, "compare")) { 263 g_workload_selection = SPDK_ACCEL_OPC_COMPARE; 264 } else if (!strcmp(g_workload_type, "dualcast")) { 265 g_workload_selection = SPDK_ACCEL_OPC_DUALCAST; 266 } else if (!strcmp(g_workload_type, "compress")) { 267 g_workload_selection = SPDK_ACCEL_OPC_COMPRESS; 268 } else if (!strcmp(g_workload_type, "decompress")) { 269 g_workload_selection = SPDK_ACCEL_OPC_DECOMPRESS; 270 } else if (!strcmp(g_workload_type, "xor")) { 271 g_workload_selection = SPDK_ACCEL_OPC_XOR; 272 } else if (!strcmp(g_workload_type, "dif_verify")) { 273 g_workload_selection = SPDK_ACCEL_OPC_DIF_VERIFY; 274 } else if (!strcmp(g_workload_type, "dif_generate")) { 275 g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE; 276 } else if (!strcmp(g_workload_type, "dif_generate_copy")) { 277 g_workload_selection = SPDK_ACCEL_OPC_DIF_GENERATE_COPY; 278 } else { 279 fprintf(stderr, "Unsupported workload type: %s\n", optarg); 280 usage(); 281 return 1; 282 } 283 break; 284 case 'M': 285 g_module_name = optarg; 286 break; 287 288 default: 289 usage(); 290 return 1; 291 } 292 293 return 0; 294 } 295 296 static int dump_result(void); 297 static void 298 unregister_worker(void *arg1) 299 { 300 struct worker_thread *worker = arg1; 301 302 if (worker->ch) { 303 spdk_accel_get_opcode_stats(worker->ch, worker->workload, 304 &worker->stats, sizeof(worker->stats)); 305 spdk_put_io_channel(worker->ch); 306 worker->ch = NULL; 307 } 308 free(worker->task_base); 309 spdk_thread_exit(spdk_get_thread()); 310 pthread_mutex_lock(&g_workers_lock); 311 assert(g_num_workers >= 1); 312 if (--g_num_workers == 0) { 313 pthread_mutex_unlock(&g_workers_lock); 314 /* Only dump results on successful runs */ 315 if (g_rc == 0) { 316 g_rc = dump_result(); 317 } 318 spdk_app_stop(g_rc); 319 } else { 320 pthread_mutex_unlock(&g_workers_lock); 321 } 322 } 323 324 static void 325 accel_perf_construct_iovs(void *buf, uint64_t sz, struct iovec *iovs, uint32_t iovcnt) 326 { 327 uint64_t ele_size; 328 uint8_t *data; 329 uint32_t i; 330 331 ele_size = spdk_divide_round_up(sz, iovcnt); 332 333 data = buf; 334 for (i = 0; i < iovcnt; i++) { 335 ele_size = spdk_min(ele_size, sz); 336 assert(ele_size > 0); 337 338 iovs[i].iov_base = data; 339 iovs[i].iov_len = ele_size; 340 341 data += ele_size; 342 sz -= ele_size; 343 } 344 assert(sz == 0); 345 } 346 347 static int 348 _get_task_data_bufs(struct ap_task *task) 349 { 350 uint32_t align = 0; 351 uint32_t i = 0; 352 int src_buff_len = g_xfer_size_bytes; 353 int dst_buff_len = g_xfer_size_bytes; 354 struct spdk_dif_ctx_init_ext_opts dif_opts; 355 uint32_t num_blocks, transfer_size_with_md; 356 int rc; 357 358 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 359 * we do this for all modules to keep it simple. 360 */ 361 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST) { 362 align = ALIGN_4K; 363 } 364 365 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS || 366 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 367 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 368 369 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 370 dst_buff_len = task->cur_seg->compressed_len_padded; 371 } 372 373 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 374 if (task->dst == NULL) { 375 fprintf(stderr, "Unable to alloc dst buffer\n"); 376 return -ENOMEM; 377 } 378 379 task->dst_iovs = calloc(g_chained_count, sizeof(struct iovec)); 380 if (!task->dst_iovs) { 381 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 382 return -ENOMEM; 383 } 384 task->dst_iovcnt = g_chained_count; 385 accel_perf_construct_iovs(task->dst, dst_buff_len, task->dst_iovs, task->dst_iovcnt); 386 387 return 0; 388 } 389 390 if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) { 391 task->dst_iovcnt = g_chained_count; 392 task->dst_iovs = calloc(task->dst_iovcnt, sizeof(struct iovec)); 393 if (!task->dst_iovs) { 394 fprintf(stderr, "cannot allocate task->dst_iovs for task=%p\n", task); 395 return -ENOMEM; 396 } 397 398 num_blocks = g_xfer_size_bytes / g_block_size_bytes; 399 /* Add bytes for each block for metadata */ 400 transfer_size_with_md = g_xfer_size_bytes + (num_blocks * g_md_size_bytes); 401 task->num_blocks = num_blocks; 402 403 for (i = 0; i < task->dst_iovcnt; i++) { 404 task->dst_iovs[i].iov_base = spdk_dma_zmalloc(transfer_size_with_md, 0, NULL); 405 if (task->dst_iovs[i].iov_base == NULL) { 406 return -ENOMEM; 407 } 408 task->dst_iovs[i].iov_len = transfer_size_with_md; 409 } 410 411 dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format); 412 dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16; 413 414 rc = spdk_dif_ctx_init(&task->dif_ctx, 415 g_block_size_bytes + g_md_size_bytes, 416 g_md_size_bytes, true, true, 417 SPDK_DIF_TYPE1, 418 SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK, 419 0x123, 0xFFFF, 0x234, 0, 0, &dif_opts); 420 if (rc != 0) { 421 fprintf(stderr, "Initialization of DIF context failed\n"); 422 return rc; 423 } 424 } 425 426 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 427 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 428 task->crc_dst = spdk_dma_zmalloc(sizeof(*task->crc_dst), 0, NULL); 429 } 430 431 if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 432 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C || 433 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY || 434 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE || 435 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) { 436 assert(g_chained_count > 0); 437 task->src_iovcnt = g_chained_count; 438 task->src_iovs = calloc(task->src_iovcnt, sizeof(struct iovec)); 439 if (!task->src_iovs) { 440 fprintf(stderr, "cannot allocated task->src_iovs fot task=%p\n", task); 441 return -ENOMEM; 442 } 443 444 if (g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C) { 445 dst_buff_len = g_xfer_size_bytes * g_chained_count; 446 } 447 448 if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE || 449 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) { 450 src_buff_len += (g_xfer_size_bytes / g_block_size_bytes) * g_md_size_bytes; 451 } 452 453 for (i = 0; i < task->src_iovcnt; i++) { 454 task->src_iovs[i].iov_base = spdk_dma_zmalloc(src_buff_len, 0, NULL); 455 if (task->src_iovs[i].iov_base == NULL) { 456 return -ENOMEM; 457 } 458 memset(task->src_iovs[i].iov_base, DATA_PATTERN, src_buff_len); 459 task->src_iovs[i].iov_len = src_buff_len; 460 } 461 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 462 assert(g_xor_src_count > 1); 463 task->sources = calloc(g_xor_src_count, sizeof(*task->sources)); 464 if (!task->sources) { 465 return -ENOMEM; 466 } 467 468 for (i = 0; i < g_xor_src_count; i++) { 469 task->sources[i] = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 470 if (!task->sources[i]) { 471 return -ENOMEM; 472 } 473 memset(task->sources[i], DATA_PATTERN, g_xfer_size_bytes); 474 } 475 } else { 476 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 477 if (task->src == NULL) { 478 fprintf(stderr, "Unable to alloc src buffer\n"); 479 return -ENOMEM; 480 } 481 482 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 483 if (g_workload_selection == SPDK_ACCEL_OPC_FILL) { 484 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 485 } else { 486 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 487 } 488 } 489 490 if (g_workload_selection != SPDK_ACCEL_OPC_CRC32C && 491 g_workload_selection != SPDK_ACCEL_OPC_DIF_VERIFY && 492 g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE && 493 g_workload_selection != SPDK_ACCEL_OPC_DIF_GENERATE_COPY) { 494 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 495 if (task->dst == NULL) { 496 fprintf(stderr, "Unable to alloc dst buffer\n"); 497 return -ENOMEM; 498 } 499 500 /* For compare we want the buffers to match, otherwise not. */ 501 if (g_workload_selection == SPDK_ACCEL_OPC_COMPARE) { 502 memset(task->dst, DATA_PATTERN, dst_buff_len); 503 } else { 504 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 505 } 506 } 507 508 /* For dualcast 2 buffers are needed for the operation. */ 509 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || 510 (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_verify)) { 511 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 512 if (task->dst2 == NULL) { 513 fprintf(stderr, "Unable to alloc dst buffer\n"); 514 return -ENOMEM; 515 } 516 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 517 } 518 519 if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE || 520 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) { 521 dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format); 522 dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16; 523 524 task->num_blocks = (g_xfer_size_bytes * g_chained_count) / g_block_size_bytes; 525 526 rc = spdk_dif_ctx_init(&task->dif_ctx, 527 g_block_size_bytes + g_md_size_bytes, 528 g_md_size_bytes, true, true, 529 SPDK_DIF_TYPE1, 530 SPDK_DIF_FLAGS_GUARD_CHECK | SPDK_DIF_FLAGS_APPTAG_CHECK | SPDK_DIF_FLAGS_REFTAG_CHECK, 531 16, 0xFFFF, 10, 0, 0, &dif_opts); 532 if (rc != 0) { 533 fprintf(stderr, "Initialization of DIF context failed, error (%d)\n", rc); 534 return rc; 535 } 536 537 if (g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY) { 538 rc = spdk_dif_generate(task->src_iovs, task->src_iovcnt, task->num_blocks, &task->dif_ctx); 539 if (rc != 0) { 540 fprintf(stderr, "Generation of DIF failed, error (%d)\n", rc); 541 return rc; 542 } 543 } 544 } 545 546 return 0; 547 } 548 549 inline static struct ap_task * 550 _get_task(struct worker_thread *worker) 551 { 552 struct ap_task *task; 553 554 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 555 task = TAILQ_FIRST(&worker->tasks_pool); 556 TAILQ_REMOVE(&worker->tasks_pool, task, link); 557 } else { 558 fprintf(stderr, "Unable to get ap_task\n"); 559 return NULL; 560 } 561 562 return task; 563 } 564 565 /* Submit one operation using the same ap task that just completed. */ 566 static void 567 _submit_single(struct worker_thread *worker, struct ap_task *task) 568 { 569 int random_num; 570 int rc = 0; 571 int flags = 0; 572 573 assert(worker); 574 575 switch (worker->workload) { 576 case SPDK_ACCEL_OPC_COPY: 577 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 578 g_xfer_size_bytes, flags, accel_done, task); 579 break; 580 case SPDK_ACCEL_OPC_FILL: 581 /* For fill use the first byte of the task->dst buffer */ 582 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 583 g_xfer_size_bytes, flags, accel_done, task); 584 break; 585 case SPDK_ACCEL_OPC_CRC32C: 586 rc = spdk_accel_submit_crc32cv(worker->ch, task->crc_dst, 587 task->src_iovs, task->src_iovcnt, g_crc32c_seed, 588 accel_done, task); 589 break; 590 case SPDK_ACCEL_OPC_COPY_CRC32C: 591 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->src_iovs, task->src_iovcnt, 592 task->crc_dst, g_crc32c_seed, flags, accel_done, task); 593 break; 594 case SPDK_ACCEL_OPC_COMPARE: 595 random_num = rand() % 100; 596 if (random_num < g_fail_percent_goal) { 597 task->expected_status = -EILSEQ; 598 *(uint8_t *)task->dst = ~DATA_PATTERN; 599 } else { 600 task->expected_status = 0; 601 *(uint8_t *)task->dst = DATA_PATTERN; 602 } 603 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 604 g_xfer_size_bytes, accel_done, task); 605 break; 606 case SPDK_ACCEL_OPC_DUALCAST: 607 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 608 task->src, g_xfer_size_bytes, flags, accel_done, task); 609 break; 610 case SPDK_ACCEL_OPC_COMPRESS: 611 task->src_iovs = task->cur_seg->uncompressed_iovs; 612 task->src_iovcnt = task->cur_seg->uncompressed_iovcnt; 613 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->cur_seg->compressed_len_padded, 614 task->src_iovs, 615 task->src_iovcnt, &task->compressed_sz, flags, accel_done, task); 616 break; 617 case SPDK_ACCEL_OPC_DECOMPRESS: 618 task->src_iovs = task->cur_seg->compressed_iovs; 619 task->src_iovcnt = task->cur_seg->compressed_iovcnt; 620 rc = spdk_accel_submit_decompress(worker->ch, task->dst_iovs, task->dst_iovcnt, task->src_iovs, 621 task->src_iovcnt, NULL, flags, accel_done, task); 622 break; 623 case SPDK_ACCEL_OPC_XOR: 624 rc = spdk_accel_submit_xor(worker->ch, task->dst, task->sources, g_xor_src_count, 625 g_xfer_size_bytes, accel_done, task); 626 break; 627 case SPDK_ACCEL_OPC_DIF_VERIFY: 628 rc = spdk_accel_submit_dif_verify(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks, 629 &task->dif_ctx, &task->dif_err, accel_done, task); 630 break; 631 case SPDK_ACCEL_OPC_DIF_GENERATE: 632 rc = spdk_accel_submit_dif_generate(worker->ch, task->src_iovs, task->src_iovcnt, task->num_blocks, 633 &task->dif_ctx, accel_done, task); 634 break; 635 case SPDK_ACCEL_OPC_DIF_GENERATE_COPY: 636 rc = spdk_accel_submit_dif_generate_copy(worker->ch, task->dst_iovs, task->dst_iovcnt, 637 task->src_iovs, task->src_iovcnt, 638 task->num_blocks, &task->dif_ctx, accel_done, task); 639 break; 640 default: 641 assert(false); 642 break; 643 644 } 645 646 worker->current_queue_depth++; 647 if (rc) { 648 accel_done(task, rc); 649 } 650 } 651 652 static void 653 _free_task_buffers(struct ap_task *task) 654 { 655 uint32_t i; 656 657 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS || 658 g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 659 free(task->dst_iovs); 660 } else if (g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 661 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C || 662 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY || 663 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE || 664 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) { 665 if (task->crc_dst) { 666 spdk_dma_free(task->crc_dst); 667 } 668 if (task->src_iovs) { 669 for (i = 0; i < task->src_iovcnt; i++) { 670 if (task->src_iovs[i].iov_base) { 671 spdk_dma_free(task->src_iovs[i].iov_base); 672 } 673 } 674 free(task->src_iovs); 675 } 676 } else if (g_workload_selection == SPDK_ACCEL_OPC_XOR) { 677 if (task->sources) { 678 for (i = 0; i < g_xor_src_count; i++) { 679 spdk_dma_free(task->sources[i]); 680 } 681 free(task->sources); 682 } 683 } else { 684 spdk_dma_free(task->src); 685 } 686 687 spdk_dma_free(task->dst); 688 if (g_workload_selection == SPDK_ACCEL_OPC_DUALCAST || g_workload_selection == SPDK_ACCEL_OPC_XOR) { 689 spdk_dma_free(task->dst2); 690 } 691 692 if (g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE_COPY) { 693 if (task->dst_iovs) { 694 for (i = 0; i < task->dst_iovcnt; i++) { 695 if (task->dst_iovs[i].iov_base) { 696 spdk_dma_free(task->dst_iovs[i].iov_base); 697 } 698 } 699 free(task->dst_iovs); 700 } 701 } 702 } 703 704 static int 705 _vector_memcmp(void *_dst, struct iovec *src_src_iovs, uint32_t iovcnt) 706 { 707 uint32_t i; 708 uint32_t ttl_len = 0; 709 uint8_t *dst = (uint8_t *)_dst; 710 711 for (i = 0; i < iovcnt; i++) { 712 if (memcmp(dst, src_src_iovs[i].iov_base, src_src_iovs[i].iov_len)) { 713 return -1; 714 } 715 dst += src_src_iovs[i].iov_len; 716 ttl_len += src_src_iovs[i].iov_len; 717 } 718 719 if (ttl_len != iovcnt * g_xfer_size_bytes) { 720 return -1; 721 } 722 723 return 0; 724 } 725 726 static int _worker_stop(void *arg); 727 728 static void 729 accel_done(void *arg1, int status) 730 { 731 struct ap_task *task = arg1; 732 struct worker_thread *worker = task->worker; 733 uint32_t sw_crc32c; 734 struct spdk_dif_error err_blk; 735 736 assert(worker); 737 assert(worker->current_queue_depth > 0); 738 739 if (g_verify && status == 0) { 740 switch (worker->workload) { 741 case SPDK_ACCEL_OPC_COPY_CRC32C: 742 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 743 if (*task->crc_dst != sw_crc32c) { 744 SPDK_NOTICELOG("CRC-32C miscompare\n"); 745 worker->xfer_failed++; 746 } 747 if (_vector_memcmp(task->dst, task->src_iovs, task->src_iovcnt)) { 748 SPDK_NOTICELOG("Data miscompare\n"); 749 worker->xfer_failed++; 750 } 751 break; 752 case SPDK_ACCEL_OPC_CRC32C: 753 sw_crc32c = spdk_crc32c_iov_update(task->src_iovs, task->src_iovcnt, ~g_crc32c_seed); 754 if (*task->crc_dst != sw_crc32c) { 755 SPDK_NOTICELOG("CRC-32C miscompare\n"); 756 worker->xfer_failed++; 757 } 758 break; 759 case SPDK_ACCEL_OPC_COPY: 760 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 761 SPDK_NOTICELOG("Data miscompare\n"); 762 worker->xfer_failed++; 763 } 764 break; 765 case SPDK_ACCEL_OPC_DUALCAST: 766 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 767 SPDK_NOTICELOG("Data miscompare, first destination\n"); 768 worker->xfer_failed++; 769 } 770 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 771 SPDK_NOTICELOG("Data miscompare, second destination\n"); 772 worker->xfer_failed++; 773 } 774 break; 775 case SPDK_ACCEL_OPC_FILL: 776 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 777 SPDK_NOTICELOG("Data miscompare\n"); 778 worker->xfer_failed++; 779 } 780 break; 781 case SPDK_ACCEL_OPC_COMPARE: 782 break; 783 case SPDK_ACCEL_OPC_COMPRESS: 784 break; 785 case SPDK_ACCEL_OPC_DECOMPRESS: 786 if (memcmp(task->dst, task->cur_seg->uncompressed_data, task->cur_seg->uncompressed_len)) { 787 SPDK_NOTICELOG("Data miscompare on decompression\n"); 788 worker->xfer_failed++; 789 } 790 break; 791 case SPDK_ACCEL_OPC_XOR: 792 if (spdk_xor_gen(task->dst2, task->sources, g_xor_src_count, 793 g_xfer_size_bytes) != 0) { 794 SPDK_ERRLOG("Failed to generate xor for verification\n"); 795 } else if (memcmp(task->dst, task->dst2, g_xfer_size_bytes)) { 796 SPDK_NOTICELOG("Data miscompare\n"); 797 worker->xfer_failed++; 798 } 799 break; 800 case SPDK_ACCEL_OPC_DIF_VERIFY: 801 break; 802 case SPDK_ACCEL_OPC_DIF_GENERATE: 803 if (spdk_dif_verify(task->src_iovs, task->src_iovcnt, task->num_blocks, 804 &task->dif_ctx, &err_blk) != 0) { 805 SPDK_NOTICELOG("Data miscompare, " 806 "err_type %u, expected %lu, actual %lu, err_offset %u\n", 807 err_blk.err_type, err_blk.expected, 808 err_blk.actual, err_blk.err_offset); 809 worker->xfer_failed++; 810 } 811 break; 812 case SPDK_ACCEL_OPC_DIF_GENERATE_COPY: 813 if (spdk_dif_verify(task->dst_iovs, task->dst_iovcnt, task->num_blocks, 814 &task->dif_ctx, &err_blk) != 0) { 815 SPDK_NOTICELOG("Data miscompare, " 816 "err_type %u, expected %lu, actual %lu, err_offset %u\n", 817 err_blk.err_type, err_blk.expected, 818 err_blk.actual, err_blk.err_offset); 819 worker->xfer_failed++; 820 } 821 break; 822 default: 823 assert(false); 824 break; 825 } 826 } 827 828 if (worker->workload == SPDK_ACCEL_OPC_COMPRESS || 829 g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 830 /* Advance the task to the next segment */ 831 task->cur_seg = STAILQ_NEXT(task->cur_seg, link); 832 if (task->cur_seg == NULL) { 833 task->cur_seg = STAILQ_FIRST(&g_compress_segs); 834 } 835 } 836 837 if (task->expected_status == -EILSEQ) { 838 assert(status != 0); 839 worker->injected_miscompares++; 840 status = 0; 841 } else if (status) { 842 /* Expected to pass but the accel module reported an error (ex: COMPARE operation). */ 843 worker->xfer_failed++; 844 } 845 846 worker->current_queue_depth--; 847 848 if (!worker->is_draining && status == 0) { 849 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 850 task = _get_task(worker); 851 _submit_single(worker, task); 852 } else { 853 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 854 } 855 } 856 857 static int 858 dump_result(void) 859 { 860 uint64_t total_completed = 0; 861 uint64_t total_failed = 0; 862 uint64_t total_miscompared = 0; 863 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 864 struct worker_thread *worker = g_workers; 865 char tmp[64]; 866 867 printf("\n%-12s %20s %16s %16s %16s\n", 868 "Core,Thread", "Transfers", "Bandwidth", "Failed", "Miscompares"); 869 printf("------------------------------------------------------------------------------------\n"); 870 while (worker != NULL) { 871 872 uint64_t xfer_per_sec = worker->stats.executed / g_time_in_sec; 873 uint64_t bw_in_MiBps = worker->stats.num_bytes / 874 (g_time_in_sec * 1024 * 1024); 875 876 total_completed += worker->stats.executed; 877 total_failed += worker->xfer_failed; 878 total_miscompared += worker->injected_miscompares; 879 880 snprintf(tmp, sizeof(tmp), "%u,%u", worker->display.core, worker->display.thread); 881 if (xfer_per_sec) { 882 printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n", 883 tmp, xfer_per_sec, bw_in_MiBps, worker->xfer_failed, 884 worker->injected_miscompares); 885 } 886 887 worker = worker->next; 888 } 889 890 total_xfer_per_sec = total_completed / g_time_in_sec; 891 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 892 (g_time_in_sec * 1024 * 1024); 893 894 printf("====================================================================================\n"); 895 printf("%-12s %18" PRIu64 "/s %10" PRIu64 " MiB/s %16"PRIu64 " %16" PRIu64 "\n", 896 "Total", total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 897 898 return total_failed ? 1 : 0; 899 } 900 901 static inline void 902 _free_task_buffers_in_pool(struct worker_thread *worker) 903 { 904 struct ap_task *task; 905 906 assert(worker); 907 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 908 TAILQ_REMOVE(&worker->tasks_pool, task, link); 909 _free_task_buffers(task); 910 } 911 } 912 913 static int 914 _check_draining(void *arg) 915 { 916 struct worker_thread *worker = arg; 917 918 assert(worker); 919 920 if (worker->current_queue_depth == 0) { 921 _free_task_buffers_in_pool(worker); 922 spdk_poller_unregister(&worker->is_draining_poller); 923 unregister_worker(worker); 924 } 925 926 return SPDK_POLLER_BUSY; 927 } 928 929 static int 930 _worker_stop(void *arg) 931 { 932 struct worker_thread *worker = arg; 933 934 assert(worker); 935 936 spdk_poller_unregister(&worker->stop_poller); 937 938 /* now let the worker drain and check it's outstanding IO with a poller */ 939 worker->is_draining = true; 940 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 941 942 return SPDK_POLLER_BUSY; 943 } 944 945 static void shutdown_cb(void); 946 947 static void 948 _init_thread(void *arg1) 949 { 950 struct worker_thread *worker; 951 struct ap_task *task; 952 int i, num_tasks = g_allocate_depth; 953 struct display_info *display = arg1; 954 955 worker = calloc(1, sizeof(*worker)); 956 if (worker == NULL) { 957 fprintf(stderr, "Unable to allocate worker\n"); 958 free(display); 959 spdk_thread_exit(spdk_get_thread()); 960 goto no_worker; 961 } 962 963 worker->workload = g_workload_selection; 964 worker->display.core = display->core; 965 worker->display.thread = display->thread; 966 free(display); 967 worker->core = spdk_env_get_current_core(); 968 worker->thread = spdk_get_thread(); 969 pthread_mutex_lock(&g_workers_lock); 970 g_num_workers++; 971 worker->next = g_workers; 972 g_workers = worker; 973 pthread_mutex_unlock(&g_workers_lock); 974 worker->ch = spdk_accel_get_io_channel(); 975 if (worker->ch == NULL) { 976 fprintf(stderr, "Unable to get an accel channel\n"); 977 goto error; 978 } 979 980 TAILQ_INIT(&worker->tasks_pool); 981 982 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 983 if (worker->task_base == NULL) { 984 fprintf(stderr, "Could not allocate task base.\n"); 985 goto error; 986 } 987 988 task = worker->task_base; 989 for (i = 0; i < num_tasks; i++) { 990 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 991 task->worker = worker; 992 if (_get_task_data_bufs(task)) { 993 fprintf(stderr, "Unable to get data bufs\n"); 994 goto error; 995 } 996 task++; 997 } 998 999 /* Register a poller that will stop the worker at time elapsed */ 1000 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 1001 g_time_in_sec * 1000000ULL); 1002 1003 /* Load up queue depth worth of operations. */ 1004 for (i = 0; i < g_queue_depth; i++) { 1005 task = _get_task(worker); 1006 if (task == NULL) { 1007 goto error; 1008 } 1009 1010 _submit_single(worker, task); 1011 } 1012 return; 1013 error: 1014 1015 _free_task_buffers_in_pool(worker); 1016 free(worker->task_base); 1017 no_worker: 1018 shutdown_cb(); 1019 g_rc = -1; 1020 } 1021 1022 static void 1023 accel_perf_start(void *arg1) 1024 { 1025 struct spdk_cpuset tmp_cpumask = {}; 1026 char thread_name[32]; 1027 uint32_t i; 1028 int j; 1029 struct spdk_thread *thread; 1030 struct display_info *display; 1031 1032 g_tsc_rate = spdk_get_ticks_hz(); 1033 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 1034 1035 dump_user_config(); 1036 1037 printf("Running for %d seconds...\n", g_time_in_sec); 1038 fflush(stdout); 1039 1040 /* Create worker threads for each core that was specified. */ 1041 SPDK_ENV_FOREACH_CORE(i) { 1042 for (j = 0; j < g_threads_per_core; j++) { 1043 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 1044 spdk_cpuset_zero(&tmp_cpumask); 1045 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 1046 thread = spdk_thread_create(thread_name, &tmp_cpumask); 1047 display = calloc(1, sizeof(*display)); 1048 if (display == NULL) { 1049 fprintf(stderr, "Unable to allocate memory\n"); 1050 spdk_app_stop(-1); 1051 return; 1052 } 1053 display->core = i; 1054 display->thread = j; 1055 spdk_thread_send_msg(thread, _init_thread, display); 1056 } 1057 } 1058 } 1059 1060 static void 1061 accel_perf_free_compress_segs(void) 1062 { 1063 struct ap_compress_seg *seg, *tmp; 1064 1065 STAILQ_FOREACH_SAFE(seg, &g_compress_segs, link, tmp) { 1066 free(seg->uncompressed_iovs); 1067 free(seg->compressed_iovs); 1068 spdk_dma_free(seg->compressed_data); 1069 spdk_dma_free(seg->uncompressed_data); 1070 STAILQ_REMOVE_HEAD(&g_compress_segs, link); 1071 free(seg); 1072 } 1073 } 1074 1075 struct accel_perf_prep_ctx { 1076 FILE *file; 1077 long remaining; 1078 struct spdk_io_channel *ch; 1079 struct ap_compress_seg *cur_seg; 1080 }; 1081 1082 static void accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx); 1083 1084 static void 1085 accel_perf_prep_process_seg_cpl(void *ref, int status) 1086 { 1087 struct accel_perf_prep_ctx *ctx = ref; 1088 struct ap_compress_seg *seg; 1089 1090 if (status != 0) { 1091 fprintf(stderr, "error (%d) on initial compress completion\n", status); 1092 spdk_dma_free(ctx->cur_seg->compressed_data); 1093 spdk_dma_free(ctx->cur_seg->uncompressed_data); 1094 free(ctx->cur_seg); 1095 spdk_put_io_channel(ctx->ch); 1096 fclose(ctx->file); 1097 free(ctx); 1098 spdk_app_stop(-status); 1099 return; 1100 } 1101 1102 seg = ctx->cur_seg; 1103 1104 if (g_workload_selection == SPDK_ACCEL_OPC_DECOMPRESS) { 1105 seg->compressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1106 if (seg->compressed_iovs == NULL) { 1107 fprintf(stderr, "unable to allocate iovec\n"); 1108 spdk_dma_free(seg->compressed_data); 1109 spdk_dma_free(seg->uncompressed_data); 1110 free(seg); 1111 spdk_put_io_channel(ctx->ch); 1112 fclose(ctx->file); 1113 free(ctx); 1114 spdk_app_stop(-ENOMEM); 1115 return; 1116 } 1117 seg->compressed_iovcnt = g_chained_count; 1118 1119 accel_perf_construct_iovs(seg->compressed_data, seg->compressed_len, seg->compressed_iovs, 1120 seg->compressed_iovcnt); 1121 } 1122 1123 STAILQ_INSERT_TAIL(&g_compress_segs, seg, link); 1124 ctx->remaining -= seg->uncompressed_len; 1125 1126 accel_perf_prep_process_seg(ctx); 1127 } 1128 1129 static void 1130 accel_perf_prep_process_seg(struct accel_perf_prep_ctx *ctx) 1131 { 1132 struct ap_compress_seg *seg; 1133 int sz, sz_read, sz_padded; 1134 void *ubuf, *cbuf; 1135 struct iovec iov[1]; 1136 int rc; 1137 1138 if (ctx->remaining == 0) { 1139 spdk_put_io_channel(ctx->ch); 1140 fclose(ctx->file); 1141 free(ctx); 1142 accel_perf_start(NULL); 1143 return; 1144 } 1145 1146 sz = spdk_min(ctx->remaining, g_xfer_size_bytes); 1147 /* Add 10% pad to the compress buffer for incompressible data. Note that a real app 1148 * would likely either deal with the failure of not having a large enough buffer 1149 * by submitting another operation with a larger one. Or, like the vbdev module 1150 * does, just accept the error and use the data uncompressed marking it as such in 1151 * its own metadata so that in the future it doesn't try to decompress uncompressed 1152 * data, etc. 1153 */ 1154 sz_padded = sz * COMP_BUF_PAD_PERCENTAGE; 1155 1156 ubuf = spdk_dma_zmalloc(sz, ALIGN_4K, NULL); 1157 if (!ubuf) { 1158 fprintf(stderr, "unable to allocate uncompress buffer\n"); 1159 rc = -ENOMEM; 1160 goto error; 1161 } 1162 1163 cbuf = spdk_dma_malloc(sz_padded, ALIGN_4K, NULL); 1164 if (!cbuf) { 1165 fprintf(stderr, "unable to allocate compress buffer\n"); 1166 rc = -ENOMEM; 1167 spdk_dma_free(ubuf); 1168 goto error; 1169 } 1170 1171 seg = calloc(1, sizeof(*seg)); 1172 if (!seg) { 1173 fprintf(stderr, "unable to allocate comp/decomp segment\n"); 1174 spdk_dma_free(ubuf); 1175 spdk_dma_free(cbuf); 1176 rc = -ENOMEM; 1177 goto error; 1178 } 1179 1180 sz_read = fread(ubuf, sizeof(uint8_t), sz, ctx->file); 1181 if (sz_read != sz) { 1182 fprintf(stderr, "unable to read input file\n"); 1183 free(seg); 1184 spdk_dma_free(ubuf); 1185 spdk_dma_free(cbuf); 1186 rc = -errno; 1187 goto error; 1188 } 1189 1190 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS) { 1191 seg->uncompressed_iovs = calloc(g_chained_count, sizeof(struct iovec)); 1192 if (seg->uncompressed_iovs == NULL) { 1193 fprintf(stderr, "unable to allocate iovec\n"); 1194 free(seg); 1195 spdk_dma_free(ubuf); 1196 spdk_dma_free(cbuf); 1197 rc = -ENOMEM; 1198 goto error; 1199 } 1200 seg->uncompressed_iovcnt = g_chained_count; 1201 accel_perf_construct_iovs(ubuf, sz, seg->uncompressed_iovs, seg->uncompressed_iovcnt); 1202 } 1203 1204 seg->uncompressed_data = ubuf; 1205 seg->uncompressed_len = sz; 1206 seg->compressed_data = cbuf; 1207 seg->compressed_len = sz; 1208 seg->compressed_len_padded = sz_padded; 1209 1210 ctx->cur_seg = seg; 1211 iov[0].iov_base = seg->uncompressed_data; 1212 iov[0].iov_len = seg->uncompressed_len; 1213 /* Note that anytime a call is made to spdk_accel_submit_compress() there's a chance 1214 * it will fail with -ENOMEM in the event that the destination buffer is not large enough 1215 * to hold the compressed data. This example app simply adds 10% buffer for compressed data 1216 * but real applications may want to consider a more sophisticated method. 1217 */ 1218 rc = spdk_accel_submit_compress(ctx->ch, seg->compressed_data, seg->compressed_len_padded, iov, 1, 1219 &seg->compressed_len, 0, accel_perf_prep_process_seg_cpl, ctx); 1220 if (rc < 0) { 1221 fprintf(stderr, "error (%d) on initial compress submission\n", rc); 1222 goto error; 1223 } 1224 1225 return; 1226 1227 error: 1228 spdk_put_io_channel(ctx->ch); 1229 fclose(ctx->file); 1230 free(ctx); 1231 spdk_app_stop(rc); 1232 } 1233 1234 static void 1235 accel_perf_prep(void *arg1) 1236 { 1237 struct accel_perf_prep_ctx *ctx; 1238 const char *module_name = NULL; 1239 int rc = 0; 1240 1241 if (g_module_name) { 1242 rc = spdk_accel_get_opc_module_name(g_workload_selection, &module_name); 1243 if (rc != 0 || strcmp(g_module_name, module_name) != 0) { 1244 fprintf(stderr, "Module '%s' was assigned via JSON config or RPC, instead of '%s'\n", 1245 module_name, g_module_name); 1246 fprintf(stderr, "-M option is not compatible with accel_assign_opc RPC\n"); 1247 rc = -EINVAL; 1248 goto error_end; 1249 } 1250 } 1251 1252 if (g_workload_selection != SPDK_ACCEL_OPC_COMPRESS && 1253 g_workload_selection != SPDK_ACCEL_OPC_DECOMPRESS) { 1254 accel_perf_start(arg1); 1255 return; 1256 } 1257 1258 if (g_cd_file_in_name == NULL) { 1259 fprintf(stdout, "A filename is required.\n"); 1260 rc = -EINVAL; 1261 goto error_end; 1262 } 1263 1264 if (g_workload_selection == SPDK_ACCEL_OPC_COMPRESS && g_verify) { 1265 fprintf(stdout, "\nCompression does not support the verify option, aborting.\n"); 1266 rc = -ENOTSUP; 1267 goto error_end; 1268 } 1269 1270 printf("Preparing input file...\n"); 1271 1272 ctx = calloc(1, sizeof(*ctx)); 1273 if (ctx == NULL) { 1274 rc = -ENOMEM; 1275 goto error_end; 1276 } 1277 1278 ctx->file = fopen(g_cd_file_in_name, "r"); 1279 if (ctx->file == NULL) { 1280 fprintf(stderr, "Could not open file %s.\n", g_cd_file_in_name); 1281 rc = -errno; 1282 goto error_ctx; 1283 } 1284 1285 fseek(ctx->file, 0L, SEEK_END); 1286 ctx->remaining = ftell(ctx->file); 1287 fseek(ctx->file, 0L, SEEK_SET); 1288 1289 ctx->ch = spdk_accel_get_io_channel(); 1290 if (ctx->ch == NULL) { 1291 rc = -EAGAIN; 1292 goto error_file; 1293 } 1294 1295 if (g_xfer_size_bytes == 0) { 1296 /* size of 0 means "file at a time" */ 1297 g_xfer_size_bytes = ctx->remaining; 1298 } 1299 1300 accel_perf_prep_process_seg(ctx); 1301 return; 1302 1303 error_file: 1304 fclose(ctx->file); 1305 error_ctx: 1306 free(ctx); 1307 error_end: 1308 spdk_app_stop(rc); 1309 } 1310 1311 static void 1312 worker_shutdown(void *ctx) 1313 { 1314 _worker_stop(ctx); 1315 } 1316 1317 static void 1318 shutdown_cb(void) 1319 { 1320 struct worker_thread *worker; 1321 1322 pthread_mutex_lock(&g_workers_lock); 1323 if (!g_workers) { 1324 spdk_app_stop(1); 1325 goto unlock; 1326 } 1327 1328 worker = g_workers; 1329 while (worker) { 1330 spdk_thread_send_msg(worker->thread, worker_shutdown, worker); 1331 worker = worker->next; 1332 } 1333 unlock: 1334 pthread_mutex_unlock(&g_workers_lock); 1335 } 1336 1337 int 1338 main(int argc, char **argv) 1339 { 1340 struct worker_thread *worker, *tmp; 1341 int rc; 1342 1343 pthread_mutex_init(&g_workers_lock, NULL); 1344 spdk_app_opts_init(&g_opts, sizeof(g_opts)); 1345 g_opts.name = "accel_perf"; 1346 g_opts.reactor_mask = "0x1"; 1347 g_opts.shutdown_cb = shutdown_cb; 1348 1349 rc = spdk_app_parse_args(argc, argv, &g_opts, "a:C:o:q:t:yw:M:P:f:T:l:S:x:", NULL, 1350 parse_args, usage); 1351 if (rc != SPDK_APP_PARSE_ARGS_SUCCESS) { 1352 return rc == SPDK_APP_PARSE_ARGS_HELP ? 0 : 1; 1353 } 1354 1355 if (g_workload_selection == SPDK_ACCEL_OPC_LAST) { 1356 fprintf(stderr, "Must provide a workload type\n"); 1357 usage(); 1358 return -1; 1359 } 1360 1361 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1362 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1363 usage(); 1364 return -1; 1365 } 1366 1367 if (g_allocate_depth == 0) { 1368 g_allocate_depth = g_queue_depth; 1369 } 1370 1371 if ((g_workload_selection == SPDK_ACCEL_OPC_CRC32C || 1372 g_workload_selection == SPDK_ACCEL_OPC_COPY_CRC32C || 1373 g_workload_selection == SPDK_ACCEL_OPC_DIF_VERIFY || 1374 g_workload_selection == SPDK_ACCEL_OPC_DIF_GENERATE) && 1375 g_chained_count == 0) { 1376 usage(); 1377 return -1; 1378 } 1379 1380 if (g_workload_selection == SPDK_ACCEL_OPC_XOR && g_xor_src_count < 2) { 1381 usage(); 1382 return -1; 1383 } 1384 1385 if (g_module_name && spdk_accel_assign_opc(g_workload_selection, g_module_name)) { 1386 fprintf(stderr, "Was not able to assign '%s' module to the workload\n", g_module_name); 1387 usage(); 1388 return -1; 1389 } 1390 1391 g_rc = spdk_app_start(&g_opts, accel_perf_prep, NULL); 1392 if (g_rc) { 1393 SPDK_ERRLOG("ERROR starting application\n"); 1394 } 1395 1396 pthread_mutex_destroy(&g_workers_lock); 1397 1398 worker = g_workers; 1399 while (worker) { 1400 tmp = worker->next; 1401 free(worker); 1402 worker = tmp; 1403 } 1404 accel_perf_free_compress_segs(); 1405 spdk_app_fini(); 1406 return g_rc; 1407 } 1408