1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 #include "spdk/util.h" 43 44 #define DATA_PATTERN 0x5a 45 #define ALIGN_4K 0x1000 46 47 static uint64_t g_tsc_rate; 48 static uint64_t g_tsc_end; 49 static int g_rc; 50 static int g_xfer_size_bytes = 4096; 51 static int g_queue_depth = 32; 52 static int g_ops_per_batch = 0; 53 static int g_threads_per_core = 1; 54 static int g_time_in_sec = 5; 55 static uint32_t g_crc32c_seed = 0; 56 static uint32_t g_crc32c_chained_count = 1; 57 static int g_fail_percent_goal = 0; 58 static uint8_t g_fill_pattern = 255; 59 static bool g_verify = false; 60 static const char *g_workload_type = NULL; 61 static enum accel_capability g_workload_selection; 62 static struct worker_thread *g_workers = NULL; 63 static int g_num_workers = 0; 64 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 65 uint64_t g_capabilites; 66 67 struct worker_thread; 68 static void accel_done(void *ref, int status); 69 70 struct display_info { 71 int core; 72 int thread; 73 }; 74 75 struct ap_task { 76 void *src; 77 struct iovec *iovs; 78 uint32_t iov_cnt; 79 void *dst; 80 void *dst2; 81 struct worker_thread *worker; 82 int status; 83 int expected_status; /* used for the compare operation */ 84 TAILQ_ENTRY(ap_task) link; 85 }; 86 87 struct accel_batch { 88 int status; 89 int cmd_count; 90 struct spdk_accel_batch *batch; 91 struct worker_thread *worker; 92 TAILQ_ENTRY(accel_batch) link; 93 }; 94 95 struct worker_thread { 96 struct spdk_io_channel *ch; 97 uint64_t xfer_completed; 98 uint64_t xfer_failed; 99 uint64_t injected_miscompares; 100 uint64_t current_queue_depth; 101 TAILQ_HEAD(, ap_task) tasks_pool; 102 struct worker_thread *next; 103 unsigned core; 104 struct spdk_thread *thread; 105 bool is_draining; 106 struct spdk_poller *is_draining_poller; 107 struct spdk_poller *stop_poller; 108 void *task_base; 109 struct accel_batch *batch_base; 110 struct display_info display; 111 TAILQ_HEAD(, accel_batch) in_prep_batches; 112 TAILQ_HEAD(, accel_batch) in_use_batches; 113 TAILQ_HEAD(, accel_batch) to_submit_batches; 114 }; 115 116 static void 117 dump_user_config(struct spdk_app_opts *opts) 118 { 119 printf("SPDK Configuration:\n"); 120 printf("Core mask: %s\n\n", opts->reactor_mask); 121 printf("Accel Perf Configuration:\n"); 122 printf("Workload Type: %s\n", g_workload_type); 123 if (g_workload_selection == ACCEL_CRC32C) { 124 printf("CRC-32C seed: %u\n", g_crc32c_seed); 125 printf("vector size: %u\n", g_crc32c_chained_count); 126 } else if (g_workload_selection == ACCEL_FILL) { 127 printf("Fill pattern: 0x%x\n", g_fill_pattern); 128 } else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) { 129 printf("Failure inject: %u percent\n", g_fail_percent_goal); 130 } 131 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 132 printf("Queue depth: %u\n", g_queue_depth); 133 printf("# threads/core: %u\n", g_threads_per_core); 134 printf("Run time: %u seconds\n", g_time_in_sec); 135 if (g_ops_per_batch > 0) { 136 printf("Batching: %u operations\n", g_ops_per_batch); 137 } else { 138 printf("Batching: Disabled\n"); 139 } 140 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 141 } 142 143 static void 144 usage(void) 145 { 146 printf("accel_perf options:\n"); 147 printf("\t[-h help message]\n"); 148 printf("\t[-q queue depth per core]\n"); 149 printf("\t[-C for crc32c workload, use this value to configre the io vector size to test (default 1)\n"); 150 printf("\t[-T number of threads per core\n"); 151 printf("\t[-n number of channels]\n"); 152 printf("\t[-o transfer size in bytes]\n"); 153 printf("\t[-t time in seconds]\n"); 154 printf("\t[-w workload type must be one of these: copy, fill, crc32c, compare, dualcast\n"); 155 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 156 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 157 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 158 printf("\t[-y verify result if this switch is on]\n"); 159 printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n"); 160 } 161 162 static int 163 parse_args(int argc, char *argv) 164 { 165 switch (argc) { 166 case 'b': 167 g_ops_per_batch = spdk_strtol(optarg, 10); 168 break; 169 case 'C': 170 g_crc32c_chained_count = spdk_strtol(optarg, 10); 171 break; 172 case 'f': 173 g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10); 174 break; 175 case 'T': 176 g_threads_per_core = spdk_strtol(optarg, 10); 177 break; 178 case 'o': 179 g_xfer_size_bytes = spdk_strtol(optarg, 10); 180 break; 181 case 'P': 182 g_fail_percent_goal = spdk_strtol(optarg, 10); 183 break; 184 case 'q': 185 g_queue_depth = spdk_strtol(optarg, 10); 186 break; 187 case 's': 188 g_crc32c_seed = spdk_strtol(optarg, 10); 189 break; 190 case 't': 191 g_time_in_sec = spdk_strtol(optarg, 10); 192 break; 193 case 'y': 194 g_verify = true; 195 break; 196 case 'w': 197 g_workload_type = optarg; 198 if (!strcmp(g_workload_type, "copy")) { 199 g_workload_selection = ACCEL_COPY; 200 } else if (!strcmp(g_workload_type, "fill")) { 201 g_workload_selection = ACCEL_FILL; 202 } else if (!strcmp(g_workload_type, "crc32c")) { 203 g_workload_selection = ACCEL_CRC32C; 204 } else if (!strcmp(g_workload_type, "compare")) { 205 g_workload_selection = ACCEL_COMPARE; 206 } else if (!strcmp(g_workload_type, "dualcast")) { 207 g_workload_selection = ACCEL_DUALCAST; 208 } 209 break; 210 default: 211 usage(); 212 return 1; 213 } 214 215 return 0; 216 } 217 218 static int dump_result(void); 219 static void 220 unregister_worker(void *arg1) 221 { 222 struct worker_thread *worker = arg1; 223 224 free(worker->task_base); 225 free(worker->batch_base); 226 spdk_put_io_channel(worker->ch); 227 pthread_mutex_lock(&g_workers_lock); 228 assert(g_num_workers >= 1); 229 if (--g_num_workers == 0) { 230 pthread_mutex_unlock(&g_workers_lock); 231 g_rc = dump_result(); 232 spdk_app_stop(0); 233 } 234 pthread_mutex_unlock(&g_workers_lock); 235 } 236 237 static int 238 _get_task_data_bufs(struct ap_task *task) 239 { 240 uint32_t align = 0; 241 uint32_t i = 0; 242 243 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 244 * we do this for all engines to keep it simple. 245 */ 246 if (g_workload_selection == ACCEL_DUALCAST) { 247 align = ALIGN_4K; 248 } 249 250 if (g_workload_selection == ACCEL_CRC32C) { 251 assert(g_crc32c_chained_count > 0); 252 task->iov_cnt = g_crc32c_chained_count; 253 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 254 if (!task->iovs) { 255 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 256 return -ENOMEM; 257 } 258 259 for (i = 0; i < task->iov_cnt; i++) { 260 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 261 if (task->iovs[i].iov_base == NULL) { 262 return -ENOMEM; 263 } 264 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 265 task->iovs[i].iov_len = g_xfer_size_bytes; 266 } 267 268 } else { 269 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 270 if (task->src == NULL) { 271 fprintf(stderr, "Unable to alloc src buffer\n"); 272 return -ENOMEM; 273 } 274 275 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 276 if (g_workload_selection == ACCEL_FILL) { 277 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 278 } else { 279 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 280 } 281 } 282 283 task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 284 if (task->dst == NULL) { 285 fprintf(stderr, "Unable to alloc dst buffer\n"); 286 return -ENOMEM; 287 } 288 289 /* For compare we want the buffers to match, otherwise not. */ 290 if (g_workload_selection == ACCEL_COMPARE) { 291 memset(task->dst, DATA_PATTERN, g_xfer_size_bytes); 292 } else { 293 memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes); 294 } 295 296 if (g_workload_selection == ACCEL_DUALCAST) { 297 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 298 if (task->dst2 == NULL) { 299 fprintf(stderr, "Unable to alloc dst buffer\n"); 300 return -ENOMEM; 301 } 302 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 303 } 304 305 return 0; 306 } 307 308 inline static struct ap_task * 309 _get_task(struct worker_thread *worker) 310 { 311 struct ap_task *task; 312 313 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 314 task = TAILQ_FIRST(&worker->tasks_pool); 315 TAILQ_REMOVE(&worker->tasks_pool, task, link); 316 } else { 317 fprintf(stderr, "Unable to get ap_task\n"); 318 return NULL; 319 } 320 321 task->worker = worker; 322 task->worker->current_queue_depth++; 323 return task; 324 } 325 326 /* Submit one operation using the same ap task that just completed. */ 327 static void 328 _submit_single(struct worker_thread *worker, struct ap_task *task) 329 { 330 int random_num; 331 int rc = 0; 332 333 assert(worker); 334 335 switch (g_workload_selection) { 336 case ACCEL_COPY: 337 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 338 g_xfer_size_bytes, accel_done, task); 339 break; 340 case ACCEL_FILL: 341 /* For fill use the first byte of the task->dst buffer */ 342 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 343 g_xfer_size_bytes, accel_done, task); 344 break; 345 case ACCEL_CRC32C: 346 rc = spdk_accel_submit_crc32cv(worker->ch, (uint32_t *)task->dst, 347 task->iovs, task->iov_cnt, g_crc32c_seed, 348 accel_done, task); 349 break; 350 case ACCEL_COMPARE: 351 random_num = rand() % 100; 352 if (random_num < g_fail_percent_goal) { 353 task->expected_status = -EILSEQ; 354 *(uint8_t *)task->dst = ~DATA_PATTERN; 355 } else { 356 task->expected_status = 0; 357 *(uint8_t *)task->dst = DATA_PATTERN; 358 } 359 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 360 g_xfer_size_bytes, accel_done, task); 361 break; 362 case ACCEL_DUALCAST: 363 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 364 task->src, g_xfer_size_bytes, accel_done, task); 365 break; 366 default: 367 assert(false); 368 break; 369 370 } 371 372 if (rc) { 373 accel_done(task, rc); 374 } 375 } 376 377 static int 378 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, 379 struct accel_batch *worker_batch) 380 { 381 struct spdk_accel_batch *batch = worker_batch->batch; 382 int rc = 0; 383 384 worker_batch->cmd_count++; 385 assert(worker_batch->cmd_count <= g_ops_per_batch); 386 387 switch (g_workload_selection) { 388 case ACCEL_COPY: 389 rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst, 390 task->src, g_xfer_size_bytes, accel_done, task); 391 break; 392 case ACCEL_DUALCAST: 393 rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2, 394 task->src, g_xfer_size_bytes, accel_done, task); 395 break; 396 case ACCEL_COMPARE: 397 rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src, 398 g_xfer_size_bytes, accel_done, task); 399 break; 400 case ACCEL_FILL: 401 rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst, 402 *(uint8_t *)task->src, 403 g_xfer_size_bytes, accel_done, task); 404 break; 405 case ACCEL_CRC32C: 406 rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, (uint32_t *)task->dst, 407 task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task); 408 break; 409 default: 410 assert(false); 411 break; 412 } 413 414 return rc; 415 } 416 417 static void 418 _free_task_buffers(struct ap_task *task) 419 { 420 uint32_t i; 421 422 if (g_workload_selection == ACCEL_CRC32C) { 423 if (task->iovs) { 424 for (i = 0; i < task->iov_cnt; i++) { 425 if (task->iovs[i].iov_base) { 426 spdk_dma_free(task->iovs[i].iov_base); 427 } 428 } 429 free(task->iovs); 430 } 431 } else { 432 spdk_dma_free(task->src); 433 } 434 435 spdk_dma_free(task->dst); 436 if (g_workload_selection == ACCEL_DUALCAST) { 437 spdk_dma_free(task->dst2); 438 } 439 } 440 441 static void _batch_done(void *cb_arg); 442 static void 443 _build_batch(struct worker_thread *worker, struct ap_task *task) 444 { 445 struct accel_batch *worker_batch = NULL; 446 int rc; 447 448 assert(!TAILQ_EMPTY(&worker->in_prep_batches)); 449 450 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 451 452 /* If an accel batch hasn't been created yet do so now. */ 453 if (worker_batch->batch == NULL) { 454 worker_batch->batch = spdk_accel_batch_create(worker->ch); 455 if (worker_batch->batch == NULL) { 456 fprintf(stderr, "error unable to create new batch\n"); 457 return; 458 } 459 } 460 461 /* Prep the command re-using the last completed command's task */ 462 rc = _batch_prep_cmd(worker, task, worker_batch); 463 if (rc) { 464 fprintf(stderr, "error preping command for batch\n"); 465 goto error; 466 } 467 468 /* If this batch is full move it to the to_submit list so it gets 469 * submitted as batches complete. 470 */ 471 if (worker_batch->cmd_count == g_ops_per_batch) { 472 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 473 TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link); 474 } 475 476 return; 477 error: 478 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 479 480 } 481 482 static void batch_done(void *cb_arg, int status); 483 static void 484 _drain_batch(struct worker_thread *worker) 485 { 486 struct accel_batch *worker_batch, *tmp; 487 int rc; 488 489 /* submit any batches that were being built up. */ 490 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) { 491 if (worker_batch->cmd_count == 0) { 492 continue; 493 } 494 worker->current_queue_depth += worker_batch->cmd_count + 1; 495 496 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 497 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 498 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 499 if (rc == 0) { 500 worker_batch->cmd_count = 0; 501 } else { 502 fprintf(stderr, "error sending final batch\n"); 503 worker->current_queue_depth -= worker_batch->cmd_count + 1; 504 break; 505 } 506 } 507 } 508 509 static void 510 _batch_done(void *cb_arg) 511 { 512 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 513 struct worker_thread *worker = worker_batch->worker; 514 int rc; 515 516 assert(TAILQ_EMPTY(&worker->in_use_batches) == 0); 517 518 if (worker_batch->status) { 519 SPDK_ERRLOG("error %d\n", worker_batch->status); 520 } 521 522 worker->current_queue_depth--; 523 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 524 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 525 worker_batch->batch = NULL; 526 worker_batch->cmd_count = 0; 527 528 if (!worker->is_draining) { 529 worker_batch = TAILQ_FIRST(&worker->to_submit_batches); 530 if (worker_batch != NULL) { 531 532 assert(worker_batch->cmd_count == g_ops_per_batch); 533 534 /* Add one for the batch command itself. */ 535 worker->current_queue_depth += g_ops_per_batch + 1; 536 TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link); 537 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 538 539 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 540 if (rc) { 541 fprintf(stderr, "error ending batch\n"); 542 worker->current_queue_depth -= g_ops_per_batch + 1; 543 return; 544 } 545 } 546 } else { 547 _drain_batch(worker); 548 } 549 } 550 551 static void 552 batch_done(void *cb_arg, int status) 553 { 554 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 555 556 assert(worker_batch->worker); 557 558 worker_batch->status = status; 559 spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch); 560 } 561 562 static void 563 _accel_done(void *arg1) 564 { 565 struct ap_task *task = arg1; 566 struct worker_thread *worker = task->worker; 567 uint32_t sw_crc32c; 568 569 assert(worker); 570 assert(worker->current_queue_depth > 0); 571 572 if (g_verify && task->status == 0) { 573 switch (g_workload_selection) { 574 case ACCEL_CRC32C: 575 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 576 if (*(uint32_t *)task->dst != sw_crc32c) { 577 SPDK_NOTICELOG("CRC-32C miscompare\n"); 578 worker->xfer_failed++; 579 } 580 break; 581 case ACCEL_COPY: 582 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 583 SPDK_NOTICELOG("Data miscompare\n"); 584 worker->xfer_failed++; 585 } 586 break; 587 case ACCEL_DUALCAST: 588 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 589 SPDK_NOTICELOG("Data miscompare, first destination\n"); 590 worker->xfer_failed++; 591 } 592 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 593 SPDK_NOTICELOG("Data miscompare, second destination\n"); 594 worker->xfer_failed++; 595 } 596 break; 597 case ACCEL_FILL: 598 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 599 SPDK_NOTICELOG("Data miscompare\n"); 600 worker->xfer_failed++; 601 } 602 break; 603 case ACCEL_COMPARE: 604 break; 605 default: 606 assert(false); 607 break; 608 } 609 } 610 611 if (task->expected_status == -EILSEQ) { 612 assert(task->status != 0); 613 worker->injected_miscompares++; 614 } else if (task->status) { 615 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 616 worker->xfer_failed++; 617 } 618 619 worker->xfer_completed++; 620 worker->current_queue_depth--; 621 622 if (!worker->is_draining) { 623 if (g_ops_per_batch == 0) { 624 _submit_single(worker, task); 625 worker->current_queue_depth++; 626 } else { 627 _build_batch(worker, task); 628 } 629 } else if (g_ops_per_batch > 0) { 630 _drain_batch(worker); 631 } else { 632 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 633 } 634 } 635 636 static int 637 dump_result(void) 638 { 639 uint64_t total_completed = 0; 640 uint64_t total_failed = 0; 641 uint64_t total_miscompared = 0; 642 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 643 struct worker_thread *worker = g_workers; 644 645 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 646 printf("------------------------------------------------------------------------\n"); 647 while (worker != NULL) { 648 649 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 650 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 651 (g_time_in_sec * 1024 * 1024); 652 653 total_completed += worker->xfer_completed; 654 total_failed += worker->xfer_failed; 655 total_miscompared += worker->injected_miscompares; 656 657 if (xfer_per_sec) { 658 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 659 worker->display.core, worker->display.thread, xfer_per_sec, 660 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 661 } 662 663 worker = worker->next; 664 } 665 666 total_xfer_per_sec = total_completed / g_time_in_sec; 667 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 668 (g_time_in_sec * 1024 * 1024); 669 670 printf("=========================================================================\n"); 671 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 672 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 673 674 return total_failed ? 1 : 0; 675 } 676 677 static inline void 678 _free_task_buffers_in_pool(struct worker_thread *worker) 679 { 680 struct ap_task *task; 681 682 assert(worker); 683 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 684 TAILQ_REMOVE(&worker->tasks_pool, task, link); 685 _free_task_buffers(task); 686 } 687 } 688 689 static int 690 _check_draining(void *arg) 691 { 692 struct worker_thread *worker = arg; 693 694 assert(worker); 695 696 if (worker->current_queue_depth == 0) { 697 _free_task_buffers_in_pool(worker); 698 spdk_poller_unregister(&worker->is_draining_poller); 699 unregister_worker(worker); 700 } 701 702 return -1; 703 } 704 705 static int 706 _worker_stop(void *arg) 707 { 708 struct worker_thread *worker = arg; 709 710 assert(worker); 711 712 spdk_poller_unregister(&worker->stop_poller); 713 714 /* now let the worker drain and check it's outstanding IO with a poller */ 715 worker->is_draining = true; 716 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 717 718 return 0; 719 } 720 721 static void 722 _init_thread(void *arg1) 723 { 724 struct worker_thread *worker; 725 struct ap_task *task; 726 int i, rc, num_batches; 727 int max_per_batch; 728 int remaining = g_queue_depth; 729 int num_tasks = g_queue_depth; 730 struct accel_batch *tmp; 731 struct accel_batch *worker_batch = NULL; 732 struct display_info *display = arg1; 733 734 worker = calloc(1, sizeof(*worker)); 735 if (worker == NULL) { 736 fprintf(stderr, "Unable to allocate worker\n"); 737 free(display); 738 return; 739 } 740 741 worker->display.core = display->core; 742 worker->display.thread = display->thread; 743 free(display); 744 worker->core = spdk_env_get_current_core(); 745 worker->thread = spdk_get_thread(); 746 pthread_mutex_lock(&g_workers_lock); 747 g_num_workers++; 748 worker->next = g_workers; 749 g_workers = worker; 750 pthread_mutex_unlock(&g_workers_lock); 751 worker->ch = spdk_accel_engine_get_io_channel(); 752 753 TAILQ_INIT(&worker->tasks_pool); 754 755 if (g_ops_per_batch > 0) { 756 757 max_per_batch = spdk_accel_batch_get_max(worker->ch); 758 assert(max_per_batch > 0); 759 760 if (g_ops_per_batch > max_per_batch) { 761 fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch); 762 g_ops_per_batch = max_per_batch; 763 } 764 765 if (g_ops_per_batch > g_queue_depth) { 766 fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth); 767 g_ops_per_batch = g_queue_depth; 768 } 769 770 TAILQ_INIT(&worker->in_prep_batches); 771 TAILQ_INIT(&worker->to_submit_batches); 772 TAILQ_INIT(&worker->in_use_batches); 773 774 /* A worker_batch will live on one of 3 lists: 775 * IN_PREP: as individual IOs complete new ones are built on on a 776 * worker_batch on this list until it reaches g_ops_per_batch. 777 * TO_SUBMIT: as batches are built up on IO completion they are moved 778 * to this list once they are full. This list is used in 779 * batch completion to start new batches. 780 * IN_USE: the worker_batch is outstanding and will be moved to in prep 781 * list when the batch is completed. 782 * 783 * So we need enough to cover Q depth loading and then one to replace 784 * each one of those and for when everything is outstanding there needs 785 * to be one extra batch to build up while the last batch is completing 786 * IO but before it's completed the batch command. 787 */ 788 num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1; 789 worker->batch_base = calloc(num_batches, sizeof(struct accel_batch)); 790 worker_batch = worker->batch_base; 791 for (i = 0; i < num_batches; i++) { 792 worker_batch->worker = worker; 793 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 794 worker_batch++; 795 } 796 } 797 798 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 799 if (worker->task_base == NULL) { 800 fprintf(stderr, "Could not allocate task base.\n"); 801 goto error; 802 } 803 804 task = worker->task_base; 805 for (i = 0; i < num_tasks; i++) { 806 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 807 if (_get_task_data_bufs(task)) { 808 fprintf(stderr, "Unable to get data bufs\n"); 809 goto error; 810 } 811 task++; 812 } 813 814 /* Register a poller that will stop the worker at time elapsed */ 815 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 816 g_time_in_sec * 1000000ULL); 817 818 /* If batching is enabled load up to the full Q depth before 819 * processing any completions, then ping pong between two batches, 820 * one processing and one being built up for when the other completes. 821 */ 822 if (g_ops_per_batch > 0) { 823 do { 824 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 825 if (worker_batch == NULL) { 826 goto error; 827 } 828 829 worker_batch->batch = spdk_accel_batch_create(worker->ch); 830 if (worker_batch->batch == NULL) { 831 raise(SIGINT); 832 break; 833 } 834 835 for (i = 0; i < g_ops_per_batch; i++) { 836 task = _get_task(worker); 837 if (task == NULL) { 838 goto error; 839 } 840 841 rc = _batch_prep_cmd(worker, task, worker_batch); 842 if (rc) { 843 fprintf(stderr, "error preping command\n"); 844 goto error; 845 } 846 } 847 848 /* for the batch operation itself. */ 849 task->worker->current_queue_depth++; 850 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 851 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 852 853 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 854 if (rc) { 855 fprintf(stderr, "error ending batch\n"); 856 goto error; 857 } 858 assert(remaining >= g_ops_per_batch); 859 remaining -= g_ops_per_batch; 860 } while (remaining > 0); 861 } 862 863 /* Submit as singles when no batching is enabled or we ran out of batches. */ 864 for (i = 0; i < remaining; i++) { 865 task = _get_task(worker); 866 if (task == NULL) { 867 goto error; 868 } 869 870 _submit_single(worker, task); 871 } 872 return; 873 error: 874 if (worker_batch && worker_batch->batch) { 875 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) { 876 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 877 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 878 } 879 } 880 881 _free_task_buffers_in_pool(worker); 882 free(worker->batch_base); 883 free(worker->task_base); 884 free(worker); 885 spdk_app_stop(-1); 886 } 887 888 static void 889 accel_done(void *cb_arg, int status) 890 { 891 struct ap_task *task = (struct ap_task *)cb_arg; 892 struct worker_thread *worker = task->worker; 893 894 assert(worker); 895 896 task->status = status; 897 spdk_thread_send_msg(worker->thread, _accel_done, task); 898 } 899 900 static void 901 accel_perf_start(void *arg1) 902 { 903 struct spdk_io_channel *accel_ch; 904 struct spdk_cpuset tmp_cpumask = {}; 905 char thread_name[32]; 906 uint32_t i; 907 int j; 908 struct spdk_thread *thread; 909 struct display_info *display; 910 911 accel_ch = spdk_accel_engine_get_io_channel(); 912 g_capabilites = spdk_accel_get_capabilities(accel_ch); 913 spdk_put_io_channel(accel_ch); 914 915 if ((g_capabilites & g_workload_selection) != g_workload_selection) { 916 SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n"); 917 SPDK_WARNLOG("The software engine will be used instead.\n\n"); 918 } 919 920 g_tsc_rate = spdk_get_ticks_hz(); 921 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 922 923 printf("Running for %d seconds...\n", g_time_in_sec); 924 fflush(stdout); 925 926 /* Create worker threads for each core that was specified. */ 927 SPDK_ENV_FOREACH_CORE(i) { 928 for (j = 0; j < g_threads_per_core; j++) { 929 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 930 spdk_cpuset_zero(&tmp_cpumask); 931 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 932 thread = spdk_thread_create(thread_name, &tmp_cpumask); 933 display = calloc(1, sizeof(*display)); 934 if (display == NULL) { 935 fprintf(stderr, "Unable to allocate memory\n"); 936 spdk_app_stop(-1); 937 return; 938 } 939 display->core = i; 940 display->thread = j; 941 spdk_thread_send_msg(thread, _init_thread, display); 942 } 943 } 944 } 945 946 int 947 main(int argc, char **argv) 948 { 949 struct spdk_app_opts opts = {}; 950 struct worker_thread *worker, *tmp; 951 952 pthread_mutex_init(&g_workers_lock, NULL); 953 spdk_app_opts_init(&opts, sizeof(opts)); 954 opts.reactor_mask = "0x1"; 955 if (spdk_app_parse_args(argc, argv, &opts, "C:o:q:t:yw:P:f:b:T:", NULL, parse_args, 956 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 957 g_rc = -1; 958 goto cleanup; 959 } 960 961 if ((g_workload_selection != ACCEL_COPY) && 962 (g_workload_selection != ACCEL_FILL) && 963 (g_workload_selection != ACCEL_CRC32C) && 964 (g_workload_selection != ACCEL_COMPARE) && 965 (g_workload_selection != ACCEL_DUALCAST)) { 966 usage(); 967 g_rc = -1; 968 goto cleanup; 969 } 970 971 if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) { 972 fprintf(stdout, "batch size must be a multiple of queue depth\n"); 973 usage(); 974 g_rc = -1; 975 goto cleanup; 976 } 977 978 if (g_workload_selection == ACCEL_CRC32C && 979 g_crc32c_chained_count == 0) { 980 usage(); 981 g_rc = -1; 982 goto cleanup; 983 } 984 985 dump_user_config(&opts); 986 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 987 if (g_rc) { 988 SPDK_ERRLOG("ERROR starting application\n"); 989 } 990 991 pthread_mutex_destroy(&g_workers_lock); 992 993 worker = g_workers; 994 while (worker) { 995 tmp = worker->next; 996 free(worker); 997 worker = tmp; 998 } 999 cleanup: 1000 spdk_app_fini(); 1001 return g_rc; 1002 } 1003