1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 #include "spdk/util.h" 43 44 #define DATA_PATTERN 0x5a 45 #define ALIGN_4K 0x1000 46 47 static uint64_t g_tsc_rate; 48 static uint64_t g_tsc_end; 49 static int g_rc; 50 static int g_xfer_size_bytes = 4096; 51 static int g_queue_depth = 32; 52 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 53 * be at least as much as the queue depth. 54 */ 55 static int g_allocate_depth = 0; 56 static int g_threads_per_core = 1; 57 static int g_time_in_sec = 5; 58 static uint32_t g_crc32c_seed = 0; 59 static uint32_t g_crc32c_chained_count = 1; 60 static int g_fail_percent_goal = 0; 61 static uint8_t g_fill_pattern = 255; 62 static bool g_verify = false; 63 static const char *g_workload_type = NULL; 64 static enum accel_opcode g_workload_selection; 65 static struct worker_thread *g_workers = NULL; 66 static int g_num_workers = 0; 67 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 68 69 struct worker_thread; 70 static void accel_done(void *ref, int status); 71 72 struct display_info { 73 int core; 74 int thread; 75 }; 76 77 struct ap_task { 78 void *src; 79 struct iovec *iovs; 80 uint32_t iov_cnt; 81 void *dst; 82 void *dst2; 83 union { 84 uint32_t crc_dst; 85 uint32_t output_size; 86 }; 87 struct worker_thread *worker; 88 int expected_status; /* used for the compare operation */ 89 TAILQ_ENTRY(ap_task) link; 90 }; 91 92 struct worker_thread { 93 struct spdk_io_channel *ch; 94 uint64_t xfer_completed; 95 uint64_t xfer_failed; 96 uint64_t injected_miscompares; 97 uint64_t current_queue_depth; 98 TAILQ_HEAD(, ap_task) tasks_pool; 99 struct worker_thread *next; 100 unsigned core; 101 struct spdk_thread *thread; 102 bool is_draining; 103 struct spdk_poller *is_draining_poller; 104 struct spdk_poller *stop_poller; 105 void *task_base; 106 struct display_info display; 107 enum accel_opcode workload; 108 void *rnd_data; 109 }; 110 111 static void 112 dump_user_config(struct spdk_app_opts *opts) 113 { 114 printf("SPDK Configuration:\n"); 115 printf("Core mask: %s\n\n", opts->reactor_mask); 116 printf("Accel Perf Configuration:\n"); 117 printf("Workload Type: %s\n", g_workload_type); 118 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 119 printf("CRC-32C seed: %u\n", g_crc32c_seed); 120 printf("vector count %u\n", g_crc32c_chained_count); 121 } else if (g_workload_selection == ACCEL_OPC_FILL) { 122 printf("Fill pattern: 0x%x\n", g_fill_pattern); 123 } else if ((g_workload_selection == ACCEL_OPC_COMPARE) && g_fail_percent_goal > 0) { 124 printf("Failure inject: %u percent\n", g_fail_percent_goal); 125 } 126 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 127 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 128 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count); 129 } else { 130 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 131 } 132 printf("Queue depth: %u\n", g_queue_depth); 133 printf("Allocate depth: %u\n", g_allocate_depth); 134 printf("# threads/core: %u\n", g_threads_per_core); 135 printf("Run time: %u seconds\n", g_time_in_sec); 136 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 137 } 138 139 static void 140 usage(void) 141 { 142 printf("accel_perf options:\n"); 143 printf("\t[-h help message]\n"); 144 printf("\t[-q queue depth per core]\n"); 145 printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n"); 146 printf("\t[-T number of threads per core\n"); 147 printf("\t[-n number of channels]\n"); 148 printf("\t[-o transfer size in bytes]\n"); 149 printf("\t[-t time in seconds]\n"); 150 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, compress, dualcast\n"); 151 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 152 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 153 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 154 printf("\t[-y verify result if this switch is on]\n"); 155 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 156 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 157 } 158 159 static int 160 parse_args(int argc, char *argv) 161 { 162 int argval = 0; 163 164 switch (argc) { 165 case 'a': 166 case 'C': 167 case 'f': 168 case 'T': 169 case 'o': 170 case 'P': 171 case 'q': 172 case 's': 173 case 't': 174 argval = spdk_strtol(optarg, 10); 175 if (argval < 0) { 176 fprintf(stderr, "-%c option must be non-negative.\n", argc); 177 usage(); 178 return 1; 179 } 180 break; 181 default: 182 break; 183 }; 184 185 switch (argc) { 186 case 'a': 187 g_allocate_depth = argval; 188 break; 189 case 'C': 190 g_crc32c_chained_count = argval; 191 break; 192 case 'f': 193 g_fill_pattern = (uint8_t)argval; 194 break; 195 case 'T': 196 g_threads_per_core = argval; 197 break; 198 case 'o': 199 g_xfer_size_bytes = argval; 200 break; 201 case 'P': 202 g_fail_percent_goal = argval; 203 break; 204 case 'q': 205 g_queue_depth = argval; 206 break; 207 case 's': 208 g_crc32c_seed = argval; 209 break; 210 case 't': 211 g_time_in_sec = argval; 212 break; 213 case 'y': 214 g_verify = true; 215 break; 216 case 'w': 217 g_workload_type = optarg; 218 if (!strcmp(g_workload_type, "copy")) { 219 g_workload_selection = ACCEL_OPC_COPY; 220 } else if (!strcmp(g_workload_type, "fill")) { 221 g_workload_selection = ACCEL_OPC_FILL; 222 } else if (!strcmp(g_workload_type, "crc32c")) { 223 g_workload_selection = ACCEL_OPC_CRC32C; 224 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 225 g_workload_selection = ACCEL_OPC_COPY_CRC32C; 226 } else if (!strcmp(g_workload_type, "compare")) { 227 g_workload_selection = ACCEL_OPC_COMPARE; 228 } else if (!strcmp(g_workload_type, "dualcast")) { 229 g_workload_selection = ACCEL_OPC_DUALCAST; 230 } else if (!strcmp(g_workload_type, "compress")) { 231 g_workload_selection = ACCEL_OPC_COMPRESS; 232 } 233 break; 234 default: 235 usage(); 236 return 1; 237 } 238 239 return 0; 240 } 241 242 static int dump_result(void); 243 static void 244 unregister_worker(void *arg1) 245 { 246 struct worker_thread *worker = arg1; 247 248 free(worker->task_base); 249 free(worker->rnd_data); 250 spdk_put_io_channel(worker->ch); 251 pthread_mutex_lock(&g_workers_lock); 252 assert(g_num_workers >= 1); 253 if (--g_num_workers == 0) { 254 pthread_mutex_unlock(&g_workers_lock); 255 g_rc = dump_result(); 256 spdk_app_stop(0); 257 } 258 pthread_mutex_unlock(&g_workers_lock); 259 } 260 261 static int 262 _get_task_data_bufs(struct ap_task *task) 263 { 264 uint32_t align = 0; 265 uint32_t i = 0; 266 int dst_buff_len = g_xfer_size_bytes; 267 268 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 269 * we do this for all engines to keep it simple. 270 */ 271 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 272 align = ALIGN_4K; 273 } 274 275 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 276 assert(g_crc32c_chained_count > 0); 277 task->iov_cnt = g_crc32c_chained_count; 278 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 279 if (!task->iovs) { 280 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 281 return -ENOMEM; 282 } 283 284 if (g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 285 dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count; 286 } 287 288 for (i = 0; i < task->iov_cnt; i++) { 289 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 290 if (task->iovs[i].iov_base == NULL) { 291 return -ENOMEM; 292 } 293 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 294 task->iovs[i].iov_len = g_xfer_size_bytes; 295 } 296 297 } else { 298 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 299 if (task->src == NULL) { 300 fprintf(stderr, "Unable to alloc src buffer\n"); 301 return -ENOMEM; 302 } 303 304 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 305 if (g_workload_selection == ACCEL_OPC_FILL) { 306 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 307 } else if (g_workload_selection == ACCEL_OPC_COMPRESS) { 308 memcpy(task->src, task->worker->rnd_data, g_xfer_size_bytes); 309 } else { 310 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 311 } 312 } 313 314 if (g_workload_selection != ACCEL_OPC_CRC32C) { 315 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 316 if (task->dst == NULL) { 317 fprintf(stderr, "Unable to alloc dst buffer\n"); 318 return -ENOMEM; 319 } 320 321 /* For compare we want the buffers to match, otherwise not. */ 322 if (g_workload_selection == ACCEL_OPC_COMPARE) { 323 memset(task->dst, DATA_PATTERN, dst_buff_len); 324 } else { 325 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 326 } 327 } 328 329 /* For dualcast 2 buffers are needed for the operation. For compress we use the second buffer to 330 * store the original pre-compressed data so we have a copy of it when we go to decompress. 331 */ 332 if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) { 333 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 334 if (task->dst2 == NULL) { 335 fprintf(stderr, "Unable to alloc dst buffer\n"); 336 return -ENOMEM; 337 } 338 if (g_workload_selection == ACCEL_OPC_DUALCAST) { 339 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 340 } else if (g_workload_selection == ACCEL_OPC_COMPRESS) { 341 /* copy the oriignal data to dst2 so we can compare it to 342 * the results of decompression if -y is used. 343 */ 344 assert(task->src); /* for scan-build */ 345 memcpy(task->dst2, task->src, g_xfer_size_bytes); 346 } 347 } 348 349 return 0; 350 } 351 352 inline static struct ap_task * 353 _get_task(struct worker_thread *worker) 354 { 355 struct ap_task *task; 356 357 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 358 task = TAILQ_FIRST(&worker->tasks_pool); 359 TAILQ_REMOVE(&worker->tasks_pool, task, link); 360 } else { 361 fprintf(stderr, "Unable to get ap_task\n"); 362 return NULL; 363 } 364 365 return task; 366 } 367 368 /* Submit one operation using the same ap task that just completed. */ 369 static void 370 _submit_single(struct worker_thread *worker, struct ap_task *task) 371 { 372 int random_num; 373 int rc = 0; 374 int flags = 0; 375 376 assert(worker); 377 378 switch (worker->workload) { 379 case ACCEL_OPC_COPY: 380 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 381 g_xfer_size_bytes, flags, accel_done, task); 382 break; 383 case ACCEL_OPC_FILL: 384 /* For fill use the first byte of the task->dst buffer */ 385 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 386 g_xfer_size_bytes, flags, accel_done, task); 387 break; 388 case ACCEL_OPC_CRC32C: 389 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 390 task->iovs, task->iov_cnt, g_crc32c_seed, 391 accel_done, task); 392 break; 393 case ACCEL_OPC_COPY_CRC32C: 394 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt, 395 &task->crc_dst, g_crc32c_seed, flags, accel_done, task); 396 break; 397 case ACCEL_OPC_COMPARE: 398 random_num = rand() % 100; 399 if (random_num < g_fail_percent_goal) { 400 task->expected_status = -EILSEQ; 401 *(uint8_t *)task->dst = ~DATA_PATTERN; 402 } else { 403 task->expected_status = 0; 404 *(uint8_t *)task->dst = DATA_PATTERN; 405 } 406 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 407 g_xfer_size_bytes, accel_done, task); 408 break; 409 case ACCEL_OPC_DUALCAST: 410 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 411 task->src, g_xfer_size_bytes, flags, accel_done, task); 412 break; 413 case ACCEL_OPC_COMPRESS: 414 rc = spdk_accel_submit_compress(worker->ch, task->dst, task->src, 415 g_xfer_size_bytes, g_xfer_size_bytes, &task->output_size, 416 flags, accel_done, task); 417 break; 418 default: 419 assert(false); 420 break; 421 422 } 423 424 if (rc) { 425 accel_done(task, rc); 426 } 427 } 428 429 static void 430 _free_task_buffers(struct ap_task *task) 431 { 432 uint32_t i; 433 434 if (g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) { 435 if (task->iovs) { 436 for (i = 0; i < task->iov_cnt; i++) { 437 if (task->iovs[i].iov_base) { 438 spdk_dma_free(task->iovs[i].iov_base); 439 } 440 } 441 free(task->iovs); 442 } 443 } else { 444 spdk_dma_free(task->src); 445 } 446 447 spdk_dma_free(task->dst); 448 if (g_workload_selection == ACCEL_OPC_DUALCAST || g_workload_selection == ACCEL_OPC_COMPRESS) { 449 spdk_dma_free(task->dst2); 450 } 451 } 452 453 static int 454 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt) 455 { 456 uint32_t i; 457 uint32_t ttl_len = 0; 458 uint8_t *dst = (uint8_t *)_dst; 459 460 for (i = 0; i < iovcnt; i++) { 461 if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) { 462 return -1; 463 } 464 dst += src_iovs[i].iov_len; 465 ttl_len += src_iovs[i].iov_len; 466 } 467 468 if (ttl_len != iovcnt * g_xfer_size_bytes) { 469 return -1; 470 } 471 472 return 0; 473 } 474 475 static int _worker_stop(void *arg); 476 477 static void 478 accel_done(void *arg1, int status) 479 { 480 struct ap_task *task = arg1; 481 struct worker_thread *worker = task->worker; 482 uint32_t sw_crc32c; 483 int rc; 484 485 assert(worker); 486 assert(worker->current_queue_depth > 0); 487 488 if (!worker->is_draining && status == -EINVAL && worker->workload == ACCEL_OPC_COMPRESS) { 489 printf("Invalid configuration, compress workload needs ISA-L or IAA. Exiting\n"); 490 _worker_stop(worker); 491 } 492 493 if (g_verify && status == 0) { 494 switch (worker->workload) { 495 case ACCEL_OPC_COPY_CRC32C: 496 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 497 if (task->crc_dst != sw_crc32c) { 498 SPDK_NOTICELOG("CRC-32C miscompare\n"); 499 worker->xfer_failed++; 500 } 501 if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) { 502 SPDK_NOTICELOG("Data miscompare\n"); 503 worker->xfer_failed++; 504 } 505 break; 506 case ACCEL_OPC_CRC32C: 507 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 508 if (task->crc_dst != sw_crc32c) { 509 SPDK_NOTICELOG("CRC-32C miscompare\n"); 510 worker->xfer_failed++; 511 } 512 break; 513 case ACCEL_OPC_COPY: 514 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 515 SPDK_NOTICELOG("Data miscompare\n"); 516 worker->xfer_failed++; 517 } 518 break; 519 case ACCEL_OPC_DUALCAST: 520 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 521 SPDK_NOTICELOG("Data miscompare, first destination\n"); 522 worker->xfer_failed++; 523 } 524 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 525 SPDK_NOTICELOG("Data miscompare, second destination\n"); 526 worker->xfer_failed++; 527 } 528 break; 529 case ACCEL_OPC_FILL: 530 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 531 SPDK_NOTICELOG("Data miscompare\n"); 532 worker->xfer_failed++; 533 } 534 break; 535 case ACCEL_OPC_COMPARE: 536 break; 537 case ACCEL_OPC_COMPRESS: 538 /* We've completed the compression phase, now need to uncompress the compressed data 539 * and compare that to the original buffer to see if it matches. So we flip flor 540 * src and destination then compare task->src to task->dst which is where we saved 541 * the orgiinal data. 542 */ 543 if (!worker->is_draining) { 544 worker->workload = ACCEL_OPC_DECOMPRESS; 545 worker->xfer_completed++; 546 memset(task->src, 0, g_xfer_size_bytes); 547 rc = spdk_accel_submit_decompress(worker->ch, task->src, task->dst, 548 g_xfer_size_bytes, g_xfer_size_bytes, 0, accel_done, task); 549 if (rc) { 550 SPDK_NOTICELOG("Unable to submit decomrpess for verficiation, tc = %d\n", rc); 551 } 552 return; 553 } 554 break; 555 case ACCEL_OPC_DECOMPRESS: 556 worker->workload = ACCEL_OPC_COMPRESS; 557 if (memcmp(task->dst2, task->src, g_xfer_size_bytes)) { 558 SPDK_NOTICELOG("Data miscompare after decompression\n"); 559 worker->xfer_failed++; 560 } 561 break; 562 default: 563 assert(false); 564 break; 565 } 566 } 567 568 if (task->expected_status == -EILSEQ) { 569 assert(status != 0); 570 worker->injected_miscompares++; 571 status = 0; 572 } else if (status) { 573 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 574 worker->xfer_failed++; 575 } 576 577 worker->xfer_completed++; 578 worker->current_queue_depth--; 579 580 if (!worker->is_draining && status == 0) { 581 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 582 task = _get_task(worker); 583 _submit_single(worker, task); 584 worker->current_queue_depth++; 585 } else { 586 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 587 } 588 } 589 590 static int 591 dump_result(void) 592 { 593 uint64_t total_completed = 0; 594 uint64_t total_failed = 0; 595 uint64_t total_miscompared = 0; 596 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 597 struct worker_thread *worker = g_workers; 598 599 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 600 printf("------------------------------------------------------------------------\n"); 601 while (worker != NULL) { 602 603 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 604 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 605 (g_time_in_sec * 1024 * 1024); 606 607 total_completed += worker->xfer_completed; 608 total_failed += worker->xfer_failed; 609 total_miscompared += worker->injected_miscompares; 610 611 if (xfer_per_sec) { 612 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 613 worker->display.core, worker->display.thread, xfer_per_sec, 614 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 615 } 616 617 worker = worker->next; 618 } 619 620 total_xfer_per_sec = total_completed / g_time_in_sec; 621 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 622 (g_time_in_sec * 1024 * 1024); 623 624 printf("=========================================================================\n"); 625 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 626 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 627 628 return total_failed ? 1 : 0; 629 } 630 631 static inline void 632 _free_task_buffers_in_pool(struct worker_thread *worker) 633 { 634 struct ap_task *task; 635 636 assert(worker); 637 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 638 TAILQ_REMOVE(&worker->tasks_pool, task, link); 639 _free_task_buffers(task); 640 } 641 } 642 643 static int 644 _check_draining(void *arg) 645 { 646 struct worker_thread *worker = arg; 647 648 assert(worker); 649 650 if (worker->current_queue_depth == 0) { 651 _free_task_buffers_in_pool(worker); 652 spdk_poller_unregister(&worker->is_draining_poller); 653 unregister_worker(worker); 654 } 655 656 return SPDK_POLLER_BUSY; 657 } 658 659 static int 660 _worker_stop(void *arg) 661 { 662 struct worker_thread *worker = arg; 663 664 assert(worker); 665 666 spdk_poller_unregister(&worker->stop_poller); 667 668 /* now let the worker drain and check it's outstanding IO with a poller */ 669 worker->is_draining = true; 670 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 671 672 return SPDK_POLLER_BUSY; 673 } 674 675 static void 676 _init_thread(void *arg1) 677 { 678 struct worker_thread *worker; 679 struct ap_task *task; 680 int i, num_tasks = g_allocate_depth; 681 struct display_info *display = arg1; 682 uint8_t *offset; 683 uint64_t j; 684 685 worker = calloc(1, sizeof(*worker)); 686 if (worker == NULL) { 687 fprintf(stderr, "Unable to allocate worker\n"); 688 free(display); 689 return; 690 } 691 692 worker->workload = g_workload_selection; 693 worker->display.core = display->core; 694 worker->display.thread = display->thread; 695 free(display); 696 worker->core = spdk_env_get_current_core(); 697 worker->thread = spdk_get_thread(); 698 pthread_mutex_lock(&g_workers_lock); 699 g_num_workers++; 700 worker->next = g_workers; 701 g_workers = worker; 702 pthread_mutex_unlock(&g_workers_lock); 703 worker->ch = spdk_accel_engine_get_io_channel(); 704 if (worker->ch == NULL) { 705 fprintf(stderr, "Unable to get an accel channel\n"); 706 goto error; 707 } 708 709 TAILQ_INIT(&worker->tasks_pool); 710 711 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 712 if (worker->task_base == NULL) { 713 fprintf(stderr, "Could not allocate task base.\n"); 714 goto error; 715 } 716 717 if (g_workload_selection == ACCEL_OPC_COMPRESS) { 718 worker->rnd_data = calloc(1, g_xfer_size_bytes); 719 if (worker->rnd_data == NULL) { 720 printf("unable to allcoate rnd_data buffer\n"); 721 goto error; 722 } 723 /* only fill half the data buffer with rnd data to make it more 724 * compressible. 725 */ 726 offset = worker->rnd_data; 727 for (j = 0; j < g_xfer_size_bytes / sizeof(uint8_t) / 2; j++) { 728 *offset = rand() % 256; 729 offset++; 730 } 731 } 732 733 task = worker->task_base; 734 for (i = 0; i < num_tasks; i++) { 735 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 736 task->worker = worker; 737 if (_get_task_data_bufs(task)) { 738 fprintf(stderr, "Unable to get data bufs\n"); 739 goto error; 740 } 741 task++; 742 } 743 744 /* Register a poller that will stop the worker at time elapsed */ 745 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 746 g_time_in_sec * 1000000ULL); 747 748 /* Load up queue depth worth of operations. */ 749 for (i = 0; i < g_queue_depth; i++) { 750 task = _get_task(worker); 751 worker->current_queue_depth++; 752 if (task == NULL) { 753 goto error; 754 } 755 756 _submit_single(worker, task); 757 } 758 return; 759 error: 760 761 free(worker->rnd_data); 762 _free_task_buffers_in_pool(worker); 763 free(worker->task_base); 764 spdk_app_stop(-1); 765 } 766 767 static void 768 accel_perf_start(void *arg1) 769 { 770 struct spdk_cpuset tmp_cpumask = {}; 771 char thread_name[32]; 772 uint32_t i; 773 int j; 774 struct spdk_thread *thread; 775 struct display_info *display; 776 777 g_tsc_rate = spdk_get_ticks_hz(); 778 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 779 780 printf("Running for %d seconds...\n", g_time_in_sec); 781 fflush(stdout); 782 783 /* Create worker threads for each core that was specified. */ 784 SPDK_ENV_FOREACH_CORE(i) { 785 for (j = 0; j < g_threads_per_core; j++) { 786 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 787 spdk_cpuset_zero(&tmp_cpumask); 788 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 789 thread = spdk_thread_create(thread_name, &tmp_cpumask); 790 display = calloc(1, sizeof(*display)); 791 if (display == NULL) { 792 fprintf(stderr, "Unable to allocate memory\n"); 793 spdk_app_stop(-1); 794 return; 795 } 796 display->core = i; 797 display->thread = j; 798 spdk_thread_send_msg(thread, _init_thread, display); 799 } 800 } 801 } 802 803 int 804 main(int argc, char **argv) 805 { 806 struct spdk_app_opts opts = {}; 807 struct worker_thread *worker, *tmp; 808 809 pthread_mutex_init(&g_workers_lock, NULL); 810 spdk_app_opts_init(&opts, sizeof(opts)); 811 opts.reactor_mask = "0x1"; 812 if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:T:", NULL, parse_args, 813 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 814 g_rc = -1; 815 goto cleanup; 816 } 817 818 if ((g_workload_selection != ACCEL_OPC_COPY) && 819 (g_workload_selection != ACCEL_OPC_FILL) && 820 (g_workload_selection != ACCEL_OPC_CRC32C) && 821 (g_workload_selection != ACCEL_OPC_COPY_CRC32C) && 822 (g_workload_selection != ACCEL_OPC_COMPARE) && 823 (g_workload_selection != ACCEL_OPC_DUALCAST) && 824 (g_workload_selection != ACCEL_OPC_COMPRESS)) { 825 usage(); 826 g_rc = -1; 827 goto cleanup; 828 } 829 830 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 831 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 832 usage(); 833 g_rc = -1; 834 goto cleanup; 835 } 836 837 if (g_allocate_depth == 0) { 838 g_allocate_depth = g_queue_depth; 839 } 840 841 if ((g_workload_selection == ACCEL_OPC_CRC32C || g_workload_selection == ACCEL_OPC_COPY_CRC32C) && 842 g_crc32c_chained_count == 0) { 843 usage(); 844 g_rc = -1; 845 goto cleanup; 846 } 847 848 dump_user_config(&opts); 849 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 850 if (g_rc) { 851 SPDK_ERRLOG("ERROR starting application\n"); 852 } 853 854 pthread_mutex_destroy(&g_workers_lock); 855 856 worker = g_workers; 857 while (worker) { 858 tmp = worker->next; 859 free(worker); 860 worker = tmp; 861 } 862 cleanup: 863 spdk_app_fini(); 864 return g_rc; 865 } 866