1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/thread.h" 8 #include "spdk/env.h" 9 #include "spdk/event.h" 10 #include "spdk/log.h" 11 #include "spdk/string.h" 12 #include "spdk/accel_engine.h" 13 #include "spdk/crc32.h" 14 #include "spdk/util.h" 15 16 #define DATA_PATTERN 0x5a 17 #define ALIGN_4K 0x1000 18 19 static uint64_t g_tsc_rate; 20 static uint64_t g_tsc_end; 21 static int g_rc; 22 static int g_xfer_size_bytes = 4096; 23 static int g_queue_depth = 32; 24 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 25 * be at least as much as the queue depth. 26 */ 27 static int g_allocate_depth = 0; 28 static int g_threads_per_core = 1; 29 static int g_time_in_sec = 5; 30 static uint32_t g_crc32c_seed = 0; 31 static uint32_t g_crc32c_chained_count = 1; 32 static int g_fail_percent_goal = 0; 33 static uint8_t g_fill_pattern = 255; 34 static bool g_verify = false; 35 static const char *g_workload_type = NULL; 36 static enum accel_opcode g_workload_selection; 37 static struct worker_thread *g_workers = NULL; 38 static int g_num_workers = 0; 39 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 40 41 struct worker_thread; 42 static void accel_done(void *ref, int status); 43 44 struct display_info { 45 int core; 46 int thread; 47 }; 48 49 struct ap_task { 50 void *src; 51 struct iovec *iovs; 52 uint32_t iov_cnt; 53 void *dst; 54 void *dst2; 55 union { 56 uint32_t crc_dst; 57 uint32_t output_size; 58 }; 59 struct worker_thread *worker; 60 int expected_status; /* used for the compare operation */ 61 TAILQ_ENTRY(ap_task) link; 62 }; 63 64 struct worker_thread { 65 struct spdk_io_channel *ch; 66 uint64_t xfer_completed; 67 uint64_t xfer_failed; 68 uint64_t injected_miscompares; 69 uint64_t current_queue_depth; 70 TAILQ_HEAD(, ap_task) tasks_pool; 71 struct worker_thread *next; 72 unsigned core; 73 struct spdk_thread *thread; 74 bool is_draining; 75 struct spdk_poller *is_draining_poller; 76 struct spdk_poller *stop_poller; 77 void *task_base; 78 struct display_info display; 79 enum accel_opcode workload; 80 void *rnd_data; 81 }; 82 83 static void 84 dump_user_config(struct spdk_app_opts *opts) 85 { 86 printf("SPDK Configuration:\n"); 87 printf("Core mask: %s\n\n", opts->reactor_mask); 88 printf("Accel Perf Configuration:\n"); 89 printf("Workload Type: %s\n", g_workload_type); 90 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 91 printf("CRC-32C seed: %u\n", g_crc32c_seed); 92 printf("vector count %u\n", g_crc32c_chained_count); 93 } else if (g_workload_selection == ACCEL_OPC_FILL) { 94 printf("Fill pattern: 0x%x\n", g_fill_pattern); 95 } else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 96 printf("Failure inject: %u percent\n", g_fail_percent_goal); 97 } 98 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 99 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 100 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count); 101 } else { 102 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 103 } 104 printf("Queue depth: %u\n", g_queue_depth); 105 printf("Allocate depth: %u\n", g_allocate_depth); 106 printf("# threads/core: %u\n", g_threads_per_core); 107 printf("Run time: %u seconds\n", g_time_in_sec); 108 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 109 } 110 111 static void 112 usage(void) 113 { 114 printf("accel_perf options:\n"); 115 printf("\t[-h help message]\n"); 116 printf("\t[-q queue depth per core]\n"); 117 printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n"); 118 printf("\t[-T number of threads per core\n"); 119 printf("\t[-n number of channels]\n"); 120 printf("\t[-o transfer size in bytes]\n"); 121 printf("\t[-t time in seconds]\n"); 122 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, dualcast\n"); 123 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 124 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 125 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 126 printf("\t[-y verify result if this switch is on]\n"); 127 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 128 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 129 } 130 131 static int 132 parse_args(int argc, char *argv) 133 { 134 int argval = 0; 135 136 switch (argc) { 137 case 'a': 138 case 'C': 139 case 'f': 140 case 'T': 141 case 'o': 142 case 'P': 143 case 'q': 144 case 's': 145 case 't': 146 argval = spdk_strtol(optarg, 10); 147 if (argval < 0) { 148 fprintf(stderr, "-%c option must be non-negative.\n", argc); 149 usage(); 150 return 1; 151 } 152 break; 153 default: 154 break; 155 }; 156 157 switch (argc) { 158 case 'a': 159 g_allocate_depth = argval; 160 break; 161 case 'C': 162 g_crc32c_chained_count = argval; 163 break; 164 case 'f': 165 g_fill_pattern = (uint8_t)argval; 166 break; 167 case 'T': 168 g_threads_per_core = argval; 169 break; 170 case 'o': 171 g_xfer_size_bytes = argval; 172 break; 173 case 'P': 174 g_fail_percent_goal = argval; 175 break; 176 case 'q': 177 g_queue_depth = argval; 178 break; 179 case 's': 180 g_crc32c_seed = argval; 181 break; 182 case 't': 183 g_time_in_sec = argval; 184 break; 185 case 'y': 186 g_verify = true; 187 break; 188 case 'w': 189 g_workload_type = optarg; 190 if (!strcmp(g_workload_type, "copy")) { 191 g_workload_selection = ACCEL_OPC_COPY; 192 } else if (!strcmp(g_workload_type, "fill")) { 193 g_workload_selection = ACCEL_OPC_FILL; 194 } else if (!strcmp(g_workload_type, "crc32c")) { 195 g_workload_selection = ACCEL_OPC_CRC32C; 196 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 197 g_workload_selection = ACCEL_OPC_COPY_CRC32C; 198 } else if (!strcmp(g_workload_type, "compare")) { 199 g_workload_selection = ACCEL_OPC_COMPARE; 200 } else if (!strcmp(g_workload_type, "dualcast")) { 201 g_workload_selection = ACCEL_OPC_DUALCAST; 202 } else if (!strcmp(g_workload_type, "compress")) { 203 g_workload_selection = ACCEL_OPC_COMPRESS; 204 } 205 break; 206 default: 207 usage(); 208 return 1; 209 } 210 211 return 0; 212 } 213 214 static int dump_result(void); 215 static void 216 unregister_worker(void *arg1) 217 { 218 struct worker_thread *worker = arg1; 219 220 free(worker->task_base); 221 free(worker->rnd_data); 222 spdk_put_io_channel(worker->ch); 223 pthread_mutex_lock(&g_workers_lock); 224 assert(g_num_workers >= 1); 225 if (--g_num_workers == 0) { 226 pthread_mutex_unlock(&g_workers_lock); 227 g_rc = dump_result(); 228 spdk_app_stop(0); 229 } 230 pthread_mutex_unlock(&g_workers_lock); 231 } 232 233 static int 234 _get_task_data_bufs(struct ap_task *task) 235 { 236 uint32_t align = 0; 237 uint32_t i = 0; 238 int dst_buff_len = g_xfer_size_bytes; 239 240 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 241 * we do this for all engines to keep it simple. 242 */ 243 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 244 align = ALIGN_4K; 245 } 246 247 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 248 assert(g_crc32c_chained_count > 0); 249 task->iov_cnt = g_crc32c_chained_count; 250 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 251 if (!task->iovs) { 252 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 253 return -ENOMEM; 254 } 255 256 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 257 dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count; 258 } 259 260 for (i = 0; i < task->iov_cnt; i++) { 261 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 262 if (task->iovs[i].iov_base == NULL) { 263 return -ENOMEM; 264 } 265 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 266 task->iovs[i].iov_len = g_xfer_size_bytes; 267 } 268 269 } else { 270 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 271 if (task->src == NULL) { 272 fprintf(stderr, "Unable to alloc src buffer\n"); 273 return -ENOMEM; 274 } 275 276 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 277 if (g_workload_selection == ACCEL_OPC_FILL) { 278 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 279 } else if (g_workload_selection == ACCEL_OPC_COMPRESS) { 280 memcpy(task->src, task->worker->rnd_data, g_xfer_size_bytes); 281 } else { 282 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 283 } 284 } 285 286 if (g_workload_selection != ACCEL_OPC_CRC32C) { 287 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 288 if (task->dst == NULL) { 289 fprintf(stderr, "Unable to alloc dst buffer\n"); 290 return -ENOMEM; 291 } 292 293 /* For compare we want the buffers to match, otherwise not. */ 294 if (g_workload_selection == ACCEL_OPC_COMPARE) { 295 memset(task->dst, DATA_PATTERN, dst_buff_len); 296 } else { 297 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 298 } 299 } 300 301 /* For dualcast 2 buffers are needed for the operation. For compress we use the second buffer to 302 * store the original pre-compressed data so we have a copy of it when we go to decompress. 303 */ 304 if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) { 305 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 306 if (task->dst2 == NULL) { 307 fprintf(stderr, "Unable to alloc dst buffer\n"); 308 return -ENOMEM; 309 } 310 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 311 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 312 } else if (g_workload_selection == ACCEL_OPC_COMPRESS) { 313 /* copy the oriignal data to dst2 so we can compare it to 314 * the results of decompression if -y is used. 315 */ 316 assert(task->src); /* for scan-build */ 317 memcpy(task->dst2, task->src, g_xfer_size_bytes); 318 } 319 } 320 321 return 0; 322 } 323 324 inline static struct ap_task * 325 _get_task(struct worker_thread *worker) 326 { 327 struct ap_task *task; 328 329 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 330 task = TAILQ_FIRST(&worker->tasks_pool); 331 TAILQ_REMOVE(&worker->tasks_pool, task, link); 332 } else { 333 fprintf(stderr, "Unable to get ap_task\n"); 334 return NULL; 335 } 336 337 return task; 338 } 339 340 /* Submit one operation using the same ap task that just completed. */ 341 static void 342 _submit_single(struct worker_thread *worker, struct ap_task *task) 343 { 344 int random_num; 345 int rc = 0; 346 int flags = 0; 347 348 assert(worker); 349 350 switch (worker->workload) { 351 case ACCEL_OPC_COPY: 352 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 353 g_xfer_size_bytes, flags, accel_done, task); 354 break; 355 case ACCEL_OPC_FILL: 356 /* For fill use the first byte of the task->dst buffer */ 357 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 358 g_xfer_size_bytes, flags, accel_done, task); 359 break; 360 case ACCEL_OPC_CRC32C: 361 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 362 task->iovs, task->iov_cnt, g_crc32c_seed, 363 accel_done, task); 364 break; 365 case ACCEL_OPC_COPY_CRC32C: 366 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt, 367 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 368 break; 369 case ACCEL_OPC_COMPARE: 370 random_num = rand() % 100; 371 if (random_num < g_fail_percent_goal) { 372 task->expected_status = -EILSEQ; 373 *(uint8_t *)task->dst = ~DATA_PATTERN; 374 } else { 375 task->expected_status = 0; 376 *(uint8_t *)task->dst = DATA_PATTERN; 377 } 378 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 379 g_xfer_size_bytes, accel_done, task); 380 break; 381 case ACCEL_OPC_DUALCAST: 382 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 383 task->src, g_xfer_size_bytes, flags, accel_done, task); 384 break; 385 case ACCEL_OPC_COMPRESS: 386 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->src, 387 g_xfer_size_bytes, g_xfer_size_bytes, &task->output_size, 388 flags, accel_done, task); 389 break; 390 default: 391 assert(false); 392 break; 393 394 } 395 396 if (rc) { 397 accel_done(task, rc); 398 } 399 } 400 401 static void 402 _free_task_buffers(struct ap_task *task) 403 { 404 uint32_t i; 405 406 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 407 if (task->iovs) { 408 for (i = 0; i < task->iov_cnt; i++) { 409 if (task->iovs[i].iov_base) { 410 spdk_dma_free(task->iovs[i].iov_base); 411 } 412 } 413 free(task->iovs); 414 } 415 } else { 416 spdk_dma_free(task->src); 417 } 418 419 spdk_dma_free(task->dst); 420 if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) { 421 spdk_dma_free(task->dst2); 422 } 423 } 424 425 static int 426 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt) 427 { 428 uint32_t i; 429 uint32_t ttl_len = 0; 430 uint8_t *dst = (uint8_t *)_dst; 431 432 for (i = 0; i < iovcnt; i++) { 433 if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) { 434 return -1; 435 } 436 dst += src_iovs[i].iov_len; 437 ttl_len += src_iovs[i].iov_len; 438 } 439 440 if (ttl_len != iovcnt * g_xfer_size_bytes) { 441 return -1; 442 } 443 444 return 0; 445 } 446 447 static int _worker_stop(void *arg); 448 449 static void 450 accel_done(void *arg1, int status) 451 { 452 struct ap_task *task = arg1; 453 struct worker_thread *worker = task->worker; 454 uint32_t sw_crc32c; 455 int rc; 456 457 assert(worker); 458 assert(worker->current_queue_depth > 0); 459 460 if (!worker->is_draining && status == -EINVAL && worker->workload == ACCEL_OPC_COMPRESS) { 461 printf("Invalid configuration, compress workload needs ISA-L or IAA. Exiting\n"); 462 _worker_stop(worker); 463 } 464 465 if (g_verify && status == 0) { 466 switch (worker->workload) { 467 case ACCEL_OPC_COPY_CRC32C: 468 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 469 if (task->crc_dst != sw_crc32c) { 470 SPDK_NOTICELOG("CRC-32C miscompare\n"); 471 worker->xfer_failed++; 472 } 473 if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) { 474 SPDK_NOTICELOG("Data miscompare\n"); 475 worker->xfer_failed++; 476 } 477 break; 478 case ACCEL_OPC_CRC32C: 479 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 480 if (task->crc_dst != sw_crc32c) { 481 SPDK_NOTICELOG("CRC-32C miscompare\n"); 482 worker->xfer_failed++; 483 } 484 break; 485 case ACCEL_OPC_COPY: 486 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 487 SPDK_NOTICELOG("Data miscompare\n"); 488 worker->xfer_failed++; 489 } 490 break; 491 case ACCEL_OPC_DUALCAST: 492 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 493 SPDK_NOTICELOG("Data miscompare, first destination\n"); 494 worker->xfer_failed++; 495 } 496 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 497 SPDK_NOTICELOG("Data miscompare, second destination\n"); 498 worker->xfer_failed++; 499 } 500 break; 501 case ACCEL_OPC_FILL: 502 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 503 SPDK_NOTICELOG("Data miscompare\n"); 504 worker->xfer_failed++; 505 } 506 break; 507 case ACCEL_OPC_COMPARE: 508 break; 509 case ACCEL_OPC_COMPRESS: 510 /* We've completed the compression phase, now need to uncompress the compressed data 511 * and compare that to the original buffer to see if it matches. So we flip flor 512 * src and destination then compare task->src to task->dst which is where we saved 513 * the orgiinal data. 514 */ 515 if (!worker->is_draining) { 516 worker->workload = ACCEL_OPC_DECOMPRESS; 517 worker->xfer_completed++; 518 memset(task->src, 0, g_xfer_size_bytes); 519 rc = spdk_accel_submit_decompress(worker->ch, task->src, task->dst, 520 g_xfer_size_bytes, g_xfer_size_bytes, 0, accel_done, task); 521 if (rc) { 522 SPDK_NOTICELOG("Unable to submit decomrpess for verficiation, tc = %d\n", rc); 523 } 524 return; 525 } 526 break; 527 case ACCEL_OPC_DECOMPRESS: 528 worker->workload = ACCEL_OPC_COMPRESS; 529 if (memcmp(task->dst2, task->src, g_xfer_size_bytes)) { 530 SPDK_NOTICELOG("Data miscompare after decompression\n"); 531 worker->xfer_failed++; 532 } 533 break; 534 default: 535 assert(false); 536 break; 537 } 538 } 539 540 if (task->expected_status == -EILSEQ) { 541 assert(status != 0); 542 worker->injected_miscompares++; 543 status = 0; 544 } else if (status) { 545 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 546 worker->xfer_failed++; 547 } 548 549 worker->xfer_completed++; 550 worker->current_queue_depth--; 551 552 if (!worker->is_draining && status == 0) { 553 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 554 task = _get_task(worker); 555 _submit_single(worker, task); 556 worker->current_queue_depth++; 557 } else { 558 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 559 } 560 } 561 562 static int 563 dump_result(void) 564 { 565 uint64_t total_completed = 0; 566 uint64_t total_failed = 0; 567 uint64_t total_miscompared = 0; 568 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 569 struct worker_thread *worker = g_workers; 570 571 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 572 printf("------------------------------------------------------------------------\n"); 573 while (worker != NULL) { 574 575 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 576 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 577 (g_time_in_sec * 1024 * 1024); 578 579 total_completed += worker->xfer_completed; 580 total_failed += worker->xfer_failed; 581 total_miscompared += worker->injected_miscompares; 582 583 if (xfer_per_sec) { 584 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 585 worker->display.core, worker->display.thread, xfer_per_sec, 586 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 587 } 588 589 worker = worker->next; 590 } 591 592 total_xfer_per_sec = total_completed / g_time_in_sec; 593 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 594 (g_time_in_sec * 1024 * 1024); 595 596 printf("=========================================================================\n"); 597 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 598 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 599 600 return total_failed ? 1 : 0; 601 } 602 603 static inline void 604 _free_task_buffers_in_pool(struct worker_thread *worker) 605 { 606 struct ap_task *task; 607 608 assert(worker); 609 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 610 TAILQ_REMOVE(&worker->tasks_pool, task, link); 611 _free_task_buffers(task); 612 } 613 } 614 615 static int 616 _check_draining(void *arg) 617 { 618 struct worker_thread *worker = arg; 619 620 assert(worker); 621 622 if (worker->current_queue_depth == 0) { 623 _free_task_buffers_in_pool(worker); 624 spdk_poller_unregister(&worker->is_draining_poller); 625 unregister_worker(worker); 626 } 627 628 return SPDK_POLLER_BUSY; 629 } 630 631 static int 632 _worker_stop(void *arg) 633 { 634 struct worker_thread *worker = arg; 635 636 assert(worker); 637 638 spdk_poller_unregister(&worker->stop_poller); 639 640 /* now let the worker drain and check it's outstanding IO with a poller */ 641 worker->is_draining = true; 642 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 643 644 return SPDK_POLLER_BUSY; 645 } 646 647 static void 648 _init_thread(void *arg1) 649 { 650 struct worker_thread *worker; 651 struct ap_task *task; 652 int i, num_tasks = g_allocate_depth; 653 struct display_info *display = arg1; 654 uint8_t *offset; 655 uint64_t j; 656 657 worker = calloc(1, sizeof(*worker)); 658 if (worker == NULL) { 659 fprintf(stderr, "Unable to allocate worker\n"); 660 free(display); 661 return; 662 } 663 664 worker->workload = g_workload_selection; 665 worker->display.core = display->core; 666 worker->display.thread = display->thread; 667 free(display); 668 worker->core = spdk_env_get_current_core(); 669 worker->thread = spdk_get_thread(); 670 pthread_mutex_lock(&g_workers_lock); 671 g_num_workers++; 672 worker->next = g_workers; 673 g_workers = worker; 674 pthread_mutex_unlock(&g_workers_lock); 675 worker->ch = spdk_accel_engine_get_io_channel(); 676 if (worker->ch == NULL) { 677 fprintf(stderr, "Unable to get an accel channel\n"); 678 goto error; 679 } 680 681 TAILQ_INIT(&worker->tasks_pool); 682 683 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 684 if (worker->task_base == NULL) { 685 fprintf(stderr, "Could not allocate task base.\n"); 686 goto error; 687 } 688 689 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 690 worker->rnd_data = calloc(1, g_xfer_size_bytes); 691 if (worker->rnd_data == NULL) { 692 printf("unable to allcoate rnd_data buffer\n"); 693 goto error; 694 } 695 /* only fill half the data buffer with rnd data to make it more 696 * compressible. 697 */ 698 offset = worker->rnd_data; 699 for (j = 0; j < g_xfer_size_bytes / sizeof(uint8_t) / 2; j++) { 700 *offset = rand() % 256; 701 offset++; 702 } 703 } 704 705 task = worker->task_base; 706 for (i = 0; i < num_tasks; i++) { 707 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 708 task->worker = worker; 709 if (_get_task_data_bufs(task)) { 710 fprintf(stderr, "Unable to get data bufs\n"); 711 goto error; 712 } 713 task++; 714 } 715 716 /* Register a poller that will stop the worker at time elapsed */ 717 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 718 g_time_in_sec * 1000000ULL); 719 720 /* Load up queue depth worth of operations. */ 721 for (i = 0; i < g_queue_depth; i++) { 722 task = _get_task(worker); 723 worker->current_queue_depth++; 724 if (task == NULL) { 725 goto error; 726 } 727 728 _submit_single(worker, task); 729 } 730 return; 731 error: 732 733 free(worker->rnd_data); 734 _free_task_buffers_in_pool(worker); 735 free(worker->task_base); 736 spdk_app_stop(-1); 737 } 738 739 static void 740 accel_perf_start(void *arg1) 741 { 742 struct spdk_cpuset tmp_cpumask = {}; 743 char thread_name[32]; 744 uint32_t i; 745 int j; 746 struct spdk_thread *thread; 747 struct display_info *display; 748 749 g_tsc_rate = spdk_get_ticks_hz(); 750 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 751 752 printf("Running for %d seconds...\n", g_time_in_sec); 753 fflush(stdout); 754 755 /* Create worker threads for each core that was specified. */ 756 SPDK_ENV_FOREACH_CORE(i) { 757 for (j = 0; j < g_threads_per_core; j++) { 758 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 759 spdk_cpuset_zero(&tmp_cpumask); 760 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 761 thread = spdk_thread_create(thread_name, &tmp_cpumask); 762 display = calloc(1, sizeof(*display)); 763 if (display == NULL) { 764 fprintf(stderr, "Unable to allocate memory\n"); 765 spdk_app_stop(-1); 766 return; 767 } 768 display->core = i; 769 display->thread = j; 770 spdk_thread_send_msg(thread, _init_thread, display); 771 } 772 } 773 } 774 775 int 776 main(int argc, char **argv) 777 { 778 struct spdk_app_opts opts = {}; 779 struct worker_thread *worker, *tmp; 780 781 pthread_mutex_init(&g_workers_lock, NULL); 782 spdk_app_opts_init(&opts, sizeof(opts)); 783 opts.reactor_mask = "0x1"; 784 if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args, 785 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 786 g_rc = -1; 787 goto cleanup; 788 } 789 790 if ((g_workload_selection != ACCEL_OPC_COPY) && 791 (g_workload_selection != ACCEL_OPC_FILL) && 792 (g_workload_selection != ACCEL_OPC_CRC32C) && 793 (g_workload_selection != ACCEL_OPC_COPY_CRC32C) && 794 (g_workload_selection != ACCEL_OPC_COMPARE) && 795 (g_workload_selection != ACCEL_OPC_DUALCAST) && 796 (g_workload_selection != ACCEL_OPC_COMPRESS)) { 797 usage(); 798 g_rc = -1; 799 goto cleanup; 800 } 801 802 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 803 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 804 usage(); 805 g_rc = -1; 806 goto cleanup; 807 } 808 809 if (g_allocate_depth == 0) { 810 g_allocate_depth = g_queue_depth; 811 } 812 813 if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) && 814 g_crc32c_chained_count == 0) { 815 usage(); 816 g_rc = -1; 817 goto cleanup; 818 } 819 820 dump_user_config(&opts); 821 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 822 if (g_rc) { 823 SPDK_ERRLOG("ERROR starting application\n"); 824 } 825 826 pthread_mutex_destroy(&g_workers_lock); 827 828 worker = g_workers; 829 while (worker) { 830 tmp = worker->next; 831 free(worker); 832 worker = tmp; 833 } 834 cleanup: 835 spdk_app_fini(); 836 return g_rc; 837 } 838