1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 #include "spdk/util.h" 43 44 #define DATA_PATTERN 0x5a 45 #define ALIGN_4K 0x1000 46 47 static bool g_using_sw_engine = false; 48 static uint64_t g_tsc_rate; 49 static uint64_t g_tsc_end; 50 static int g_rc; 51 static int g_xfer_size_bytes = 4096; 52 static int g_queue_depth = 32; 53 static int g_ops_per_batch = 0; 54 static int g_threads_per_core = 1; 55 static int g_time_in_sec = 5; 56 static uint32_t g_crc32c_seed = 0; 57 static uint32_t g_crc32c_chained_count = 1; 58 static int g_fail_percent_goal = 0; 59 static uint8_t g_fill_pattern = 255; 60 static bool g_verify = false; 61 static const char *g_workload_type = NULL; 62 static enum accel_capability g_workload_selection; 63 static struct worker_thread *g_workers = NULL; 64 static int g_num_workers = 0; 65 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 66 67 struct worker_thread; 68 static void accel_done(void *ref, int status); 69 70 struct display_info { 71 int core; 72 int thread; 73 }; 74 75 struct ap_task { 76 void *src; 77 struct iovec *iovs; 78 uint32_t iov_cnt; 79 void *dst; 80 void *dst2; 81 struct worker_thread *worker; 82 int status; 83 int expected_status; /* used for the compare operation */ 84 TAILQ_ENTRY(ap_task) link; 85 }; 86 87 struct accel_batch { 88 int status; 89 int cmd_count; 90 struct spdk_accel_batch *batch; 91 struct worker_thread *worker; 92 TAILQ_ENTRY(accel_batch) link; 93 }; 94 95 struct worker_thread { 96 struct spdk_io_channel *ch; 97 uint64_t xfer_completed; 98 uint64_t xfer_failed; 99 uint64_t injected_miscompares; 100 uint64_t current_queue_depth; 101 TAILQ_HEAD(, ap_task) tasks_pool; 102 struct worker_thread *next; 103 unsigned core; 104 struct spdk_thread *thread; 105 bool is_draining; 106 struct spdk_poller *is_draining_poller; 107 struct spdk_poller *stop_poller; 108 void *task_base; 109 struct accel_batch *batch_base; 110 struct display_info display; 111 TAILQ_HEAD(, accel_batch) in_prep_batches; 112 TAILQ_HEAD(, accel_batch) in_use_batches; 113 TAILQ_HEAD(, accel_batch) to_submit_batches; 114 }; 115 116 static void 117 dump_user_config(struct spdk_app_opts *opts) 118 { 119 printf("SPDK Configuration:\n"); 120 printf("Core mask: %s\n\n", opts->reactor_mask); 121 printf("Accel Perf Configuration:\n"); 122 printf("Workload Type: %s\n", g_workload_type); 123 if (g_workload_selection == ACCEL_CRC32C) { 124 printf("CRC-32C seed: %u\n", g_crc32c_seed); 125 printf("vector size: %u\n", g_crc32c_chained_count); 126 } else if (g_workload_selection == ACCEL_FILL) { 127 printf("Fill pattern: 0x%x\n", g_fill_pattern); 128 } else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) { 129 printf("Failure inject: %u percent\n", g_fail_percent_goal); 130 } 131 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 132 printf("Queue depth: %u\n", g_queue_depth); 133 printf("# threads/core: %u\n", g_threads_per_core); 134 printf("Run time: %u seconds\n", g_time_in_sec); 135 if (g_ops_per_batch > 0) { 136 printf("Batching: %u operations\n", g_ops_per_batch); 137 } else { 138 printf("Batching: Disabled\n"); 139 } 140 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 141 } 142 143 static void 144 usage(void) 145 { 146 printf("accel_perf options:\n"); 147 printf("\t[-h help message]\n"); 148 printf("\t[-q queue depth per core]\n"); 149 printf("\t[-C for crc32c workload, use this value to configre the io vector size to test (default 1)\n"); 150 printf("\t[-T number of threads per core\n"); 151 printf("\t[-n number of channels]\n"); 152 printf("\t[-o transfer size in bytes]\n"); 153 printf("\t[-t time in seconds]\n"); 154 printf("\t[-w workload type must be one of these: copy, fill, crc32c, compare, dualcast\n"); 155 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 156 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 157 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 158 printf("\t[-y verify result if this switch is on]\n"); 159 printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n"); 160 } 161 162 static int 163 parse_args(int argc, char *argv) 164 { 165 switch (argc) { 166 case 'b': 167 g_ops_per_batch = spdk_strtol(optarg, 10); 168 break; 169 case 'C': 170 g_crc32c_chained_count = spdk_strtol(optarg, 10); 171 break; 172 case 'f': 173 g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10); 174 break; 175 case 'T': 176 g_threads_per_core = spdk_strtol(optarg, 10); 177 break; 178 case 'o': 179 g_xfer_size_bytes = spdk_strtol(optarg, 10); 180 break; 181 case 'P': 182 g_fail_percent_goal = spdk_strtol(optarg, 10); 183 break; 184 case 'q': 185 g_queue_depth = spdk_strtol(optarg, 10); 186 break; 187 case 's': 188 g_crc32c_seed = spdk_strtol(optarg, 10); 189 break; 190 case 't': 191 g_time_in_sec = spdk_strtol(optarg, 10); 192 break; 193 case 'y': 194 g_verify = true; 195 break; 196 case 'w': 197 g_workload_type = optarg; 198 if (!strcmp(g_workload_type, "copy")) { 199 g_workload_selection = ACCEL_COPY; 200 } else if (!strcmp(g_workload_type, "fill")) { 201 g_workload_selection = ACCEL_FILL; 202 } else if (!strcmp(g_workload_type, "crc32c")) { 203 g_workload_selection = ACCEL_CRC32C; 204 } else if (!strcmp(g_workload_type, "compare")) { 205 g_workload_selection = ACCEL_COMPARE; 206 } else if (!strcmp(g_workload_type, "dualcast")) { 207 g_workload_selection = ACCEL_DUALCAST; 208 } 209 break; 210 default: 211 usage(); 212 return 1; 213 } 214 215 return 0; 216 } 217 218 static int dump_result(void); 219 static void 220 unregister_worker(void *arg1) 221 { 222 struct worker_thread *worker = arg1; 223 224 free(worker->task_base); 225 free(worker->batch_base); 226 spdk_put_io_channel(worker->ch); 227 pthread_mutex_lock(&g_workers_lock); 228 assert(g_num_workers >= 1); 229 if (--g_num_workers == 0) { 230 pthread_mutex_unlock(&g_workers_lock); 231 g_rc = dump_result(); 232 spdk_app_stop(0); 233 } 234 pthread_mutex_unlock(&g_workers_lock); 235 } 236 237 static int 238 _get_task_data_bufs(struct ap_task *task) 239 { 240 uint32_t align = 0; 241 uint32_t i = 0; 242 243 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 244 * we do this for all engines to keep it simple. 245 */ 246 if (g_workload_selection == ACCEL_DUALCAST) { 247 align = ALIGN_4K; 248 } 249 250 if (g_workload_selection == ACCEL_CRC32C) { 251 assert(g_crc32c_chained_count > 0); 252 task->iov_cnt = g_crc32c_chained_count; 253 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 254 if (!task->iovs) { 255 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 256 return -ENOMEM; 257 } 258 259 for (i = 0; i < task->iov_cnt; i++) { 260 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 261 if (task->iovs[i].iov_base == NULL) { 262 return -ENOMEM; 263 } 264 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 265 task->iovs[i].iov_len = g_xfer_size_bytes; 266 } 267 268 } else { 269 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 270 if (task->src == NULL) { 271 fprintf(stderr, "Unable to alloc src buffer\n"); 272 return -ENOMEM; 273 } 274 275 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 276 if (g_workload_selection == ACCEL_FILL) { 277 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 278 } else { 279 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 280 } 281 } 282 283 task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 284 if (task->dst == NULL) { 285 fprintf(stderr, "Unable to alloc dst buffer\n"); 286 return -ENOMEM; 287 } 288 289 /* For compare we want the buffers to match, otherwise not. */ 290 if (g_workload_selection == ACCEL_COMPARE) { 291 memset(task->dst, DATA_PATTERN, g_xfer_size_bytes); 292 } else { 293 memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes); 294 } 295 296 if (g_workload_selection == ACCEL_DUALCAST) { 297 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 298 if (task->dst2 == NULL) { 299 fprintf(stderr, "Unable to alloc dst buffer\n"); 300 return -ENOMEM; 301 } 302 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 303 } 304 305 return 0; 306 } 307 308 inline static struct ap_task * 309 _get_task(struct worker_thread *worker) 310 { 311 struct ap_task *task; 312 313 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 314 task = TAILQ_FIRST(&worker->tasks_pool); 315 TAILQ_REMOVE(&worker->tasks_pool, task, link); 316 } else { 317 fprintf(stderr, "Unable to get ap_task\n"); 318 return NULL; 319 } 320 321 task->worker = worker; 322 task->worker->current_queue_depth++; 323 return task; 324 } 325 326 /* Submit one operation using the same ap task that just completed. */ 327 static void 328 _submit_single(struct worker_thread *worker, struct ap_task *task) 329 { 330 int random_num; 331 int rc = 0; 332 333 assert(worker); 334 335 switch (g_workload_selection) { 336 case ACCEL_COPY: 337 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 338 g_xfer_size_bytes, accel_done, task); 339 break; 340 case ACCEL_FILL: 341 /* For fill use the first byte of the task->dst buffer */ 342 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 343 g_xfer_size_bytes, accel_done, task); 344 break; 345 case ACCEL_CRC32C: 346 rc = spdk_accel_submit_crc32cv(worker->ch, (uint32_t *)task->dst, 347 task->iovs, task->iov_cnt, g_crc32c_seed, 348 accel_done, task); 349 break; 350 case ACCEL_COMPARE: 351 random_num = rand() % 100; 352 if (random_num < g_fail_percent_goal) { 353 task->expected_status = -EILSEQ; 354 *(uint8_t *)task->dst = ~DATA_PATTERN; 355 } else { 356 task->expected_status = 0; 357 *(uint8_t *)task->dst = DATA_PATTERN; 358 } 359 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 360 g_xfer_size_bytes, accel_done, task); 361 break; 362 case ACCEL_DUALCAST: 363 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 364 task->src, g_xfer_size_bytes, accel_done, task); 365 break; 366 default: 367 assert(false); 368 break; 369 370 } 371 372 if (rc) { 373 accel_done(task, rc); 374 } 375 } 376 377 static int 378 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, 379 struct accel_batch *worker_batch) 380 { 381 struct spdk_accel_batch *batch = worker_batch->batch; 382 int rc = 0; 383 384 worker_batch->cmd_count++; 385 assert(worker_batch->cmd_count <= g_ops_per_batch); 386 387 switch (g_workload_selection) { 388 case ACCEL_COPY: 389 rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst, 390 task->src, g_xfer_size_bytes, accel_done, task); 391 break; 392 case ACCEL_DUALCAST: 393 rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2, 394 task->src, g_xfer_size_bytes, accel_done, task); 395 break; 396 case ACCEL_COMPARE: 397 rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src, 398 g_xfer_size_bytes, accel_done, task); 399 break; 400 case ACCEL_FILL: 401 rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst, 402 *(uint8_t *)task->src, 403 g_xfer_size_bytes, accel_done, task); 404 break; 405 case ACCEL_CRC32C: 406 rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, (uint32_t *)task->dst, 407 task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task); 408 break; 409 default: 410 assert(false); 411 break; 412 } 413 414 return rc; 415 } 416 417 static void 418 _free_task_buffers(struct ap_task *task) 419 { 420 uint32_t i; 421 422 if (g_workload_selection == ACCEL_CRC32C) { 423 if (task->iovs) { 424 for (i = 0; i < task->iov_cnt; i++) { 425 if (task->iovs[i].iov_base) { 426 spdk_dma_free(task->iovs[i].iov_base); 427 } 428 } 429 free(task->iovs); 430 } 431 } else { 432 spdk_dma_free(task->src); 433 } 434 435 spdk_dma_free(task->dst); 436 if (g_workload_selection == ACCEL_DUALCAST) { 437 spdk_dma_free(task->dst2); 438 } 439 } 440 441 static void _batch_done(void *cb_arg); 442 static void 443 _build_batch(struct worker_thread *worker, struct ap_task *task) 444 { 445 struct accel_batch *worker_batch = NULL; 446 int rc; 447 448 assert(!TAILQ_EMPTY(&worker->in_prep_batches)); 449 450 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 451 452 /* If an accel batch hasn't been created yet do so now. */ 453 if (worker_batch->batch == NULL) { 454 worker_batch->batch = spdk_accel_batch_create(worker->ch); 455 if (worker_batch->batch == NULL) { 456 fprintf(stderr, "error unable to create new batch\n"); 457 return; 458 } 459 } 460 461 /* Prep the command re-using the last completed command's task */ 462 rc = _batch_prep_cmd(worker, task, worker_batch); 463 if (rc) { 464 fprintf(stderr, "error preping command for batch\n"); 465 goto error; 466 } 467 468 /* If this batch is full move it to the to_submit list so it gets 469 * submitted as batches complete. 470 */ 471 if (worker_batch->cmd_count == g_ops_per_batch) { 472 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 473 TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link); 474 } 475 476 return; 477 error: 478 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 479 480 } 481 482 static void batch_done(void *cb_arg, int status); 483 static void 484 _drain_batch(struct worker_thread *worker) 485 { 486 struct accel_batch *worker_batch, *tmp; 487 int rc; 488 489 /* submit any batches that were being built up. */ 490 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) { 491 if (worker_batch->cmd_count == 0) { 492 continue; 493 } 494 worker->current_queue_depth += worker_batch->cmd_count + 1; 495 496 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 497 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 498 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 499 if (rc == 0) { 500 worker_batch->cmd_count = 0; 501 } else { 502 fprintf(stderr, "error sending final batch\n"); 503 worker->current_queue_depth -= worker_batch->cmd_count + 1; 504 break; 505 } 506 } 507 } 508 509 static void 510 _batch_done(void *cb_arg) 511 { 512 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 513 struct worker_thread *worker = worker_batch->worker; 514 int rc; 515 516 assert(TAILQ_EMPTY(&worker->in_use_batches) == 0); 517 518 if (worker_batch->status) { 519 SPDK_ERRLOG("error %d\n", worker_batch->status); 520 } 521 522 worker->current_queue_depth--; 523 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 524 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 525 worker_batch->batch = NULL; 526 worker_batch->cmd_count = 0; 527 528 if (!worker->is_draining) { 529 worker_batch = TAILQ_FIRST(&worker->to_submit_batches); 530 if (worker_batch != NULL) { 531 532 assert(worker_batch->cmd_count == g_ops_per_batch); 533 534 /* Add one for the batch command itself. */ 535 worker->current_queue_depth += g_ops_per_batch + 1; 536 TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link); 537 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 538 539 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 540 if (rc) { 541 fprintf(stderr, "error ending batch\n"); 542 worker->current_queue_depth -= g_ops_per_batch + 1; 543 return; 544 } 545 } 546 } else { 547 _drain_batch(worker); 548 } 549 } 550 551 static void 552 batch_done(void *cb_arg, int status) 553 { 554 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 555 556 assert(worker_batch->worker); 557 558 worker_batch->status = status; 559 spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch); 560 } 561 562 static void 563 _accel_done(void *arg1) 564 { 565 struct ap_task *task = arg1; 566 struct worker_thread *worker = task->worker; 567 uint32_t sw_crc32c; 568 569 assert(worker); 570 assert(worker->current_queue_depth > 0); 571 572 if (g_verify && task->status == 0) { 573 switch (g_workload_selection) { 574 case ACCEL_CRC32C: 575 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 576 if (*(uint32_t *)task->dst != sw_crc32c) { 577 SPDK_NOTICELOG("CRC-32C miscompare\n"); 578 worker->xfer_failed++; 579 } 580 break; 581 case ACCEL_COPY: 582 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 583 SPDK_NOTICELOG("Data miscompare\n"); 584 worker->xfer_failed++; 585 } 586 break; 587 case ACCEL_DUALCAST: 588 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 589 SPDK_NOTICELOG("Data miscompare, first destination\n"); 590 worker->xfer_failed++; 591 } 592 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 593 SPDK_NOTICELOG("Data miscompare, second destination\n"); 594 worker->xfer_failed++; 595 } 596 break; 597 case ACCEL_FILL: 598 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 599 SPDK_NOTICELOG("Data miscompare\n"); 600 worker->xfer_failed++; 601 } 602 break; 603 case ACCEL_COMPARE: 604 break; 605 default: 606 assert(false); 607 break; 608 } 609 } 610 611 if (task->expected_status == -EILSEQ) { 612 assert(task->status != 0); 613 worker->injected_miscompares++; 614 } else if (task->status) { 615 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 616 worker->xfer_failed++; 617 } 618 619 worker->xfer_completed++; 620 worker->current_queue_depth--; 621 622 if (!worker->is_draining) { 623 if (g_ops_per_batch == 0) { 624 _submit_single(worker, task); 625 worker->current_queue_depth++; 626 } else { 627 _build_batch(worker, task); 628 } 629 } else if (g_ops_per_batch > 0) { 630 _drain_batch(worker); 631 } else { 632 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 633 } 634 } 635 636 static int 637 dump_result(void) 638 { 639 uint64_t total_completed = 0; 640 uint64_t total_failed = 0; 641 uint64_t total_miscompared = 0; 642 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 643 struct worker_thread *worker = g_workers; 644 645 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 646 printf("------------------------------------------------------------------------\n"); 647 while (worker != NULL) { 648 649 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 650 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 651 (g_time_in_sec * 1024 * 1024); 652 653 total_completed += worker->xfer_completed; 654 total_failed += worker->xfer_failed; 655 total_miscompared += worker->injected_miscompares; 656 657 if (xfer_per_sec) { 658 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 659 worker->display.core, worker->display.thread, xfer_per_sec, 660 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 661 } 662 663 worker = worker->next; 664 } 665 666 total_xfer_per_sec = total_completed / g_time_in_sec; 667 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 668 (g_time_in_sec * 1024 * 1024); 669 670 printf("=========================================================================\n"); 671 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 672 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 673 674 return total_failed ? 1 : 0; 675 } 676 677 static inline void 678 _free_task_buffers_in_pool(struct worker_thread *worker) 679 { 680 struct ap_task *task; 681 682 assert(worker); 683 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 684 TAILQ_REMOVE(&worker->tasks_pool, task, link); 685 _free_task_buffers(task); 686 } 687 } 688 689 static int 690 _check_draining(void *arg) 691 { 692 struct worker_thread *worker = arg; 693 694 assert(worker); 695 696 if (worker->current_queue_depth == 0) { 697 _free_task_buffers_in_pool(worker); 698 spdk_poller_unregister(&worker->is_draining_poller); 699 unregister_worker(worker); 700 } 701 702 return -1; 703 } 704 705 static int 706 _worker_stop(void *arg) 707 { 708 struct worker_thread *worker = arg; 709 710 assert(worker); 711 712 spdk_poller_unregister(&worker->stop_poller); 713 714 /* now let the worker drain and check it's outstanding IO with a poller */ 715 worker->is_draining = true; 716 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 717 718 return 0; 719 } 720 721 static void 722 _init_thread(void *arg1) 723 { 724 struct worker_thread *worker; 725 struct ap_task *task; 726 int i, rc, num_batches; 727 int max_per_batch; 728 int remaining = g_queue_depth; 729 int num_tasks = g_queue_depth; 730 struct accel_batch *tmp; 731 struct accel_batch *worker_batch = NULL; 732 struct display_info *display = arg1; 733 uint64_t capabilities; 734 735 worker = calloc(1, sizeof(*worker)); 736 if (worker == NULL) { 737 fprintf(stderr, "Unable to allocate worker\n"); 738 free(display); 739 return; 740 } 741 742 worker->display.core = display->core; 743 worker->display.thread = display->thread; 744 free(display); 745 worker->core = spdk_env_get_current_core(); 746 worker->thread = spdk_get_thread(); 747 pthread_mutex_lock(&g_workers_lock); 748 g_num_workers++; 749 worker->next = g_workers; 750 g_workers = worker; 751 pthread_mutex_unlock(&g_workers_lock); 752 worker->ch = spdk_accel_engine_get_io_channel(); 753 754 if (g_num_workers == 1) { 755 capabilities = spdk_accel_get_capabilities(worker->ch); 756 if ((capabilities & g_workload_selection) != g_workload_selection) { 757 g_using_sw_engine = true; 758 SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n"); 759 SPDK_WARNLOG("The software engine will be used instead.\n\n"); 760 } 761 } 762 763 TAILQ_INIT(&worker->tasks_pool); 764 765 if (g_ops_per_batch > 0) { 766 767 max_per_batch = spdk_accel_batch_get_max(worker->ch); 768 assert(max_per_batch > 0); 769 770 if (g_ops_per_batch > max_per_batch) { 771 fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch); 772 g_ops_per_batch = max_per_batch; 773 } 774 775 if (g_ops_per_batch > g_queue_depth) { 776 fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth); 777 g_ops_per_batch = g_queue_depth; 778 } 779 780 TAILQ_INIT(&worker->in_prep_batches); 781 TAILQ_INIT(&worker->to_submit_batches); 782 TAILQ_INIT(&worker->in_use_batches); 783 784 /* A worker_batch will live on one of 3 lists: 785 * IN_PREP: as individual IOs complete new ones are built on on a 786 * worker_batch on this list until it reaches g_ops_per_batch. 787 * TO_SUBMIT: as batches are built up on IO completion they are moved 788 * to this list once they are full. This list is used in 789 * batch completion to start new batches. 790 * IN_USE: the worker_batch is outstanding and will be moved to in prep 791 * list when the batch is completed. 792 * 793 * So we need enough to cover Q depth loading and then one to replace 794 * each one of those and for when everything is outstanding there needs 795 * to be one extra batch to build up while the last batch is completing 796 * IO but before it's completed the batch command. 797 */ 798 num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1; 799 worker->batch_base = calloc(num_batches, sizeof(struct accel_batch)); 800 worker_batch = worker->batch_base; 801 for (i = 0; i < num_batches; i++) { 802 worker_batch->worker = worker; 803 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 804 worker_batch++; 805 } 806 } 807 808 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 809 if (worker->task_base == NULL) { 810 fprintf(stderr, "Could not allocate task base.\n"); 811 goto error; 812 } 813 814 task = worker->task_base; 815 for (i = 0; i < num_tasks; i++) { 816 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 817 if (_get_task_data_bufs(task)) { 818 fprintf(stderr, "Unable to get data bufs\n"); 819 goto error; 820 } 821 task++; 822 } 823 824 /* Register a poller that will stop the worker at time elapsed */ 825 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 826 g_time_in_sec * 1000000ULL); 827 828 /* If batching is enabled load up to the full Q depth before 829 * processing any completions, then ping pong between two batches, 830 * one processing and one being built up for when the other completes. 831 */ 832 if (g_ops_per_batch > 0) { 833 do { 834 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 835 if (worker_batch == NULL) { 836 goto error; 837 } 838 839 worker_batch->batch = spdk_accel_batch_create(worker->ch); 840 if (worker_batch->batch == NULL) { 841 raise(SIGINT); 842 break; 843 } 844 845 for (i = 0; i < g_ops_per_batch; i++) { 846 task = _get_task(worker); 847 if (task == NULL) { 848 goto error; 849 } 850 851 rc = _batch_prep_cmd(worker, task, worker_batch); 852 if (rc) { 853 fprintf(stderr, "error preping command\n"); 854 goto error; 855 } 856 } 857 858 /* for the batch operation itself. */ 859 task->worker->current_queue_depth++; 860 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 861 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 862 863 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 864 if (rc) { 865 fprintf(stderr, "error ending batch\n"); 866 goto error; 867 } 868 assert(remaining >= g_ops_per_batch); 869 remaining -= g_ops_per_batch; 870 } while (remaining > 0); 871 } 872 873 /* Submit as singles when no batching is enabled or we ran out of batches. */ 874 for (i = 0; i < remaining; i++) { 875 task = _get_task(worker); 876 if (task == NULL) { 877 goto error; 878 } 879 880 _submit_single(worker, task); 881 } 882 return; 883 error: 884 if (worker_batch && worker_batch->batch) { 885 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) { 886 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 887 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 888 } 889 } 890 891 _free_task_buffers_in_pool(worker); 892 free(worker->batch_base); 893 free(worker->task_base); 894 free(worker); 895 spdk_app_stop(-1); 896 } 897 898 static void 899 accel_done(void *cb_arg, int status) 900 { 901 struct ap_task *task = (struct ap_task *)cb_arg; 902 struct worker_thread *worker = task->worker; 903 904 assert(worker); 905 906 task->status = status; 907 if (g_using_sw_engine == false) { 908 _accel_done(task); 909 } else { 910 spdk_thread_send_msg(worker->thread, _accel_done, task); 911 } 912 } 913 914 static void 915 accel_perf_start(void *arg1) 916 { 917 struct spdk_cpuset tmp_cpumask = {}; 918 char thread_name[32]; 919 uint32_t i; 920 int j; 921 struct spdk_thread *thread; 922 struct display_info *display; 923 924 g_tsc_rate = spdk_get_ticks_hz(); 925 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 926 927 printf("Running for %d seconds...\n", g_time_in_sec); 928 fflush(stdout); 929 930 /* Create worker threads for each core that was specified. */ 931 SPDK_ENV_FOREACH_CORE(i) { 932 for (j = 0; j < g_threads_per_core; j++) { 933 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 934 spdk_cpuset_zero(&tmp_cpumask); 935 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 936 thread = spdk_thread_create(thread_name, &tmp_cpumask); 937 display = calloc(1, sizeof(*display)); 938 if (display == NULL) { 939 fprintf(stderr, "Unable to allocate memory\n"); 940 spdk_app_stop(-1); 941 return; 942 } 943 display->core = i; 944 display->thread = j; 945 spdk_thread_send_msg(thread, _init_thread, display); 946 } 947 } 948 } 949 950 int 951 main(int argc, char **argv) 952 { 953 struct spdk_app_opts opts = {}; 954 struct worker_thread *worker, *tmp; 955 956 pthread_mutex_init(&g_workers_lock, NULL); 957 spdk_app_opts_init(&opts, sizeof(opts)); 958 opts.reactor_mask = "0x1"; 959 if (spdk_app_parse_args(argc, argv, &opts, "C:o:q:t:yw:P:f:b:T:", NULL, parse_args, 960 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 961 g_rc = -1; 962 goto cleanup; 963 } 964 965 if ((g_workload_selection != ACCEL_COPY) && 966 (g_workload_selection != ACCEL_FILL) && 967 (g_workload_selection != ACCEL_CRC32C) && 968 (g_workload_selection != ACCEL_COMPARE) && 969 (g_workload_selection != ACCEL_DUALCAST)) { 970 usage(); 971 g_rc = -1; 972 goto cleanup; 973 } 974 975 if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) { 976 fprintf(stdout, "batch size must be a multiple of queue depth\n"); 977 usage(); 978 g_rc = -1; 979 goto cleanup; 980 } 981 982 if (g_workload_selection == ACCEL_CRC32C && 983 g_crc32c_chained_count == 0) { 984 usage(); 985 g_rc = -1; 986 goto cleanup; 987 } 988 989 dump_user_config(&opts); 990 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 991 if (g_rc) { 992 SPDK_ERRLOG("ERROR starting application\n"); 993 } 994 995 pthread_mutex_destroy(&g_workers_lock); 996 997 worker = g_workers; 998 while (worker) { 999 tmp = worker->next; 1000 free(worker); 1001 worker = tmp; 1002 } 1003 cleanup: 1004 spdk_app_fini(); 1005 return g_rc; 1006 } 1007