1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 #include "spdk/util.h" 43 44 #define DATA_PATTERN 0x5a 45 #define ALIGN_4K 0x1000 46 47 static uint64_t g_tsc_rate; 48 static uint64_t g_tsc_end; 49 static int g_rc; 50 static int g_xfer_size_bytes = 4096; 51 static int g_queue_depth = 32; 52 static int g_ops_per_batch = 0; 53 static int g_threads_per_core = 1; 54 static int g_time_in_sec = 5; 55 static uint32_t g_crc32c_seed = 0; 56 static uint32_t g_crc32c_chained_count = 1; 57 static int g_fail_percent_goal = 0; 58 static uint8_t g_fill_pattern = 255; 59 static bool g_verify = false; 60 static const char *g_workload_type = NULL; 61 static enum accel_capability g_workload_selection; 62 static struct worker_thread *g_workers = NULL; 63 static int g_num_workers = 0; 64 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 65 uint64_t g_capabilites; 66 67 struct worker_thread; 68 static void accel_done(void *ref, int status); 69 70 struct display_info { 71 int core; 72 int thread; 73 }; 74 75 struct ap_task { 76 void *src; 77 struct iovec *iovs; 78 uint32_t iov_cnt; 79 void *dst; 80 void *dst2; 81 struct worker_thread *worker; 82 int status; 83 int expected_status; /* used for the compare operation */ 84 TAILQ_ENTRY(ap_task) link; 85 }; 86 87 struct accel_batch { 88 int status; 89 int cmd_count; 90 struct spdk_accel_batch *batch; 91 struct worker_thread *worker; 92 TAILQ_ENTRY(accel_batch) link; 93 }; 94 95 struct worker_thread { 96 struct spdk_io_channel *ch; 97 uint64_t xfer_completed; 98 uint64_t xfer_failed; 99 uint64_t injected_miscompares; 100 uint64_t current_queue_depth; 101 TAILQ_HEAD(, ap_task) tasks_pool; 102 struct worker_thread *next; 103 unsigned core; 104 struct spdk_thread *thread; 105 bool is_draining; 106 struct spdk_poller *is_draining_poller; 107 struct spdk_poller *stop_poller; 108 void *task_base; 109 struct accel_batch *batch_base; 110 struct display_info display; 111 TAILQ_HEAD(, accel_batch) in_prep_batches; 112 TAILQ_HEAD(, accel_batch) in_use_batches; 113 TAILQ_HEAD(, accel_batch) to_submit_batches; 114 }; 115 116 static void 117 dump_user_config(struct spdk_app_opts *opts) 118 { 119 printf("SPDK Configuration:\n"); 120 printf("Core mask: %s\n\n", opts->reactor_mask); 121 printf("Accel Perf Configuration:\n"); 122 printf("Workload Type: %s\n", g_workload_type); 123 if (g_workload_selection == ACCEL_CRC32C) { 124 printf("CRC-32C seed: %u\n", g_crc32c_seed); 125 printf("vector size: %u\n", g_crc32c_chained_count); 126 } else if (g_workload_selection == ACCEL_FILL) { 127 printf("Fill pattern: 0x%x\n", g_fill_pattern); 128 } else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) { 129 printf("Failure inject: %u percent\n", g_fail_percent_goal); 130 } 131 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 132 printf("Queue depth: %u\n", g_queue_depth); 133 printf("# threads/core: %u\n", g_threads_per_core); 134 printf("Run time: %u seconds\n", g_time_in_sec); 135 if (g_ops_per_batch > 0) { 136 printf("Batching: %u operations\n", g_ops_per_batch); 137 } else { 138 printf("Batching: Disabled\n"); 139 } 140 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 141 } 142 143 static void 144 usage(void) 145 { 146 printf("accel_perf options:\n"); 147 printf("\t[-h help message]\n"); 148 printf("\t[-q queue depth per core]\n"); 149 printf("\t[-C for crc32c workload, use this value to configre the io vector size to test (default 1)\n"); 150 printf("\t[-T number of threads per core\n"); 151 printf("\t[-n number of channels]\n"); 152 printf("\t[-o transfer size in bytes]\n"); 153 printf("\t[-t time in seconds]\n"); 154 printf("\t[-w workload type must be one of these: copy, fill, crc32c, compare, dualcast\n"); 155 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 156 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 157 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 158 printf("\t[-y verify result if this switch is on]\n"); 159 printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n"); 160 } 161 162 static int 163 parse_args(int argc, char *argv) 164 { 165 switch (argc) { 166 case 'b': 167 g_ops_per_batch = spdk_strtol(optarg, 10); 168 break; 169 case 'C': 170 g_crc32c_chained_count = spdk_strtol(optarg, 10); 171 break; 172 case 'f': 173 g_fill_pattern = (uint8_t)spdk_strtol(optarg, 10); 174 break; 175 case 'T': 176 g_threads_per_core = spdk_strtol(optarg, 10); 177 break; 178 case 'o': 179 g_xfer_size_bytes = spdk_strtol(optarg, 10); 180 break; 181 case 'P': 182 g_fail_percent_goal = spdk_strtol(optarg, 10); 183 break; 184 case 'q': 185 g_queue_depth = spdk_strtol(optarg, 10); 186 break; 187 case 's': 188 g_crc32c_seed = spdk_strtol(optarg, 10); 189 break; 190 case 't': 191 g_time_in_sec = spdk_strtol(optarg, 10); 192 break; 193 case 'y': 194 g_verify = true; 195 break; 196 case 'w': 197 g_workload_type = optarg; 198 if (!strcmp(g_workload_type, "copy")) { 199 g_workload_selection = ACCEL_COPY; 200 } else if (!strcmp(g_workload_type, "fill")) { 201 g_workload_selection = ACCEL_FILL; 202 } else if (!strcmp(g_workload_type, "crc32c")) { 203 g_workload_selection = ACCEL_CRC32C; 204 } else if (!strcmp(g_workload_type, "compare")) { 205 g_workload_selection = ACCEL_COMPARE; 206 } else if (!strcmp(g_workload_type, "dualcast")) { 207 g_workload_selection = ACCEL_DUALCAST; 208 } 209 break; 210 default: 211 usage(); 212 return 1; 213 } 214 215 return 0; 216 } 217 218 static int dump_result(void); 219 static void 220 unregister_worker(void *arg1) 221 { 222 struct worker_thread *worker = arg1; 223 224 free(worker->task_base); 225 free(worker->batch_base); 226 spdk_put_io_channel(worker->ch); 227 pthread_mutex_lock(&g_workers_lock); 228 assert(g_num_workers >= 1); 229 if (--g_num_workers == 0) { 230 pthread_mutex_unlock(&g_workers_lock); 231 g_rc = dump_result(); 232 spdk_app_stop(0); 233 } 234 pthread_mutex_unlock(&g_workers_lock); 235 } 236 237 static int 238 _get_task_data_bufs(struct ap_task *task) 239 { 240 uint32_t align = 0; 241 uint32_t i = 0; 242 243 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 244 * we do this for all engines to keep it simple. 245 */ 246 if (g_workload_selection == ACCEL_DUALCAST) { 247 align = ALIGN_4K; 248 } 249 250 if (g_workload_selection == ACCEL_CRC32C) { 251 assert(g_crc32c_chained_count > 0); 252 task->iov_cnt = g_crc32c_chained_count; 253 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 254 if (!task->iovs) { 255 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 256 return -ENOMEM; 257 } 258 259 for (i = 0; i < task->iov_cnt; i++) { 260 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 261 if (task->iovs[i].iov_base == NULL) { 262 return -ENOMEM; 263 } 264 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 265 task->iovs[i].iov_len = g_xfer_size_bytes; 266 } 267 268 } else { 269 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 270 if (task->src == NULL) { 271 fprintf(stderr, "Unable to alloc src buffer\n"); 272 return -ENOMEM; 273 } 274 275 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 276 if (g_workload_selection == ACCEL_FILL) { 277 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 278 } else { 279 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 280 } 281 } 282 283 task->dst = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 284 if (task->dst == NULL) { 285 fprintf(stderr, "Unable to alloc dst buffer\n"); 286 return -ENOMEM; 287 } 288 289 /* For compare we want the buffers to match, otherwise not. */ 290 if (g_workload_selection == ACCEL_COMPARE) { 291 memset(task->dst, DATA_PATTERN, g_xfer_size_bytes); 292 } else { 293 memset(task->dst, ~DATA_PATTERN, g_xfer_size_bytes); 294 } 295 296 if (g_workload_selection == ACCEL_DUALCAST) { 297 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 298 if (task->dst2 == NULL) { 299 fprintf(stderr, "Unable to alloc dst buffer\n"); 300 return -ENOMEM; 301 } 302 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 303 } 304 305 return 0; 306 } 307 308 inline static struct ap_task * 309 _get_task(struct worker_thread *worker) 310 { 311 struct ap_task *task; 312 313 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 314 task = TAILQ_FIRST(&worker->tasks_pool); 315 TAILQ_REMOVE(&worker->tasks_pool, task, link); 316 } else { 317 fprintf(stderr, "Unable to get ap_task\n"); 318 return NULL; 319 } 320 321 task->worker = worker; 322 task->worker->current_queue_depth++; 323 return task; 324 } 325 326 /* Submit one operation using the same ap task that just completed. */ 327 static void 328 _submit_single(struct worker_thread *worker, struct ap_task *task) 329 { 330 int random_num; 331 int rc = 0; 332 333 assert(worker); 334 335 switch (g_workload_selection) { 336 case ACCEL_COPY: 337 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 338 g_xfer_size_bytes, accel_done, task); 339 break; 340 case ACCEL_FILL: 341 /* For fill use the first byte of the task->dst buffer */ 342 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 343 g_xfer_size_bytes, accel_done, task); 344 break; 345 case ACCEL_CRC32C: 346 rc = spdk_accel_submit_crc32cv(worker->ch, (uint32_t *)task->dst, 347 task->iovs, task->iov_cnt, g_crc32c_seed, 348 accel_done, task); 349 break; 350 case ACCEL_COMPARE: 351 random_num = rand() % 100; 352 if (random_num < g_fail_percent_goal) { 353 task->expected_status = -EILSEQ; 354 *(uint8_t *)task->dst = ~DATA_PATTERN; 355 } else { 356 task->expected_status = 0; 357 *(uint8_t *)task->dst = DATA_PATTERN; 358 } 359 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 360 g_xfer_size_bytes, accel_done, task); 361 break; 362 case ACCEL_DUALCAST: 363 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 364 task->src, g_xfer_size_bytes, accel_done, task); 365 break; 366 default: 367 assert(false); 368 break; 369 370 } 371 372 if (rc) { 373 accel_done(task, rc); 374 } 375 } 376 377 static int 378 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, 379 struct accel_batch *worker_batch) 380 { 381 struct spdk_accel_batch *batch = worker_batch->batch; 382 int rc = 0; 383 384 worker_batch->cmd_count++; 385 assert(worker_batch->cmd_count <= g_ops_per_batch); 386 387 switch (g_workload_selection) { 388 case ACCEL_COPY: 389 rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst, 390 task->src, g_xfer_size_bytes, accel_done, task); 391 break; 392 case ACCEL_DUALCAST: 393 rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2, 394 task->src, g_xfer_size_bytes, accel_done, task); 395 break; 396 case ACCEL_COMPARE: 397 rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src, 398 g_xfer_size_bytes, accel_done, task); 399 break; 400 case ACCEL_FILL: 401 rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst, 402 *(uint8_t *)task->src, 403 g_xfer_size_bytes, accel_done, task); 404 break; 405 case ACCEL_CRC32C: 406 rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, (uint32_t *)task->dst, 407 task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task); 408 break; 409 default: 410 assert(false); 411 break; 412 } 413 414 return rc; 415 } 416 417 static void 418 _free_task_buffers(struct ap_task *task) 419 { 420 uint32_t i; 421 422 if (g_workload_selection == ACCEL_CRC32C) { 423 if (task->iovs) { 424 for (i = 0; i < task->iov_cnt; i++) { 425 if (task->iovs[i].iov_base) { 426 spdk_dma_free(task->iovs[i].iov_base); 427 } 428 } 429 free(task->iovs); 430 } 431 } else { 432 spdk_dma_free(task->src); 433 } 434 435 spdk_dma_free(task->dst); 436 if (g_workload_selection == ACCEL_DUALCAST) { 437 spdk_dma_free(task->dst2); 438 } 439 } 440 441 static void _batch_done(void *cb_arg); 442 static void 443 _build_batch(struct worker_thread *worker, struct ap_task *task) 444 { 445 struct accel_batch *worker_batch = NULL; 446 int rc; 447 448 assert(!TAILQ_EMPTY(&worker->in_prep_batches)); 449 450 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 451 452 /* If an accel batch hasn't been created yet do so now. */ 453 if (worker_batch->batch == NULL) { 454 worker_batch->batch = spdk_accel_batch_create(worker->ch); 455 if (worker_batch->batch == NULL) { 456 fprintf(stderr, "error unable to create new batch\n"); 457 return; 458 } 459 } 460 461 /* Prep the command re-using the last completed command's task */ 462 rc = _batch_prep_cmd(worker, task, worker_batch); 463 if (rc) { 464 fprintf(stderr, "error preping command for batch\n"); 465 goto error; 466 } 467 468 /* If this batch is full move it to the to_submit list so it gets 469 * submitted as batches complete. 470 */ 471 if (worker_batch->cmd_count == g_ops_per_batch) { 472 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 473 TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link); 474 } 475 476 return; 477 error: 478 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 479 480 } 481 482 static void batch_done(void *cb_arg, int status); 483 static void 484 _drain_batch(struct worker_thread *worker) 485 { 486 struct accel_batch *worker_batch, *tmp; 487 int rc; 488 489 /* submit any batches that were being built up. */ 490 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) { 491 if (worker_batch->cmd_count == 0) { 492 continue; 493 } 494 worker->current_queue_depth += worker_batch->cmd_count + 1; 495 496 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 497 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 498 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 499 if (rc == 0) { 500 worker_batch->cmd_count = 0; 501 } else { 502 fprintf(stderr, "error sending final batch\n"); 503 worker->current_queue_depth -= worker_batch->cmd_count + 1; 504 break; 505 } 506 } 507 } 508 509 static void 510 _batch_done(void *cb_arg) 511 { 512 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 513 struct worker_thread *worker = worker_batch->worker; 514 int rc; 515 516 assert(TAILQ_EMPTY(&worker->in_use_batches) == 0); 517 518 if (worker_batch->status) { 519 SPDK_ERRLOG("error %d\n", worker_batch->status); 520 } 521 522 worker->current_queue_depth--; 523 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 524 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 525 worker_batch->batch = NULL; 526 worker_batch->cmd_count = 0; 527 528 if (!worker->is_draining) { 529 worker_batch = TAILQ_FIRST(&worker->to_submit_batches); 530 if (worker_batch != NULL) { 531 532 assert(worker_batch->cmd_count == g_ops_per_batch); 533 534 /* Add one for the batch command itself. */ 535 worker->current_queue_depth += g_ops_per_batch + 1; 536 TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link); 537 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 538 539 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 540 if (rc) { 541 fprintf(stderr, "error ending batch\n"); 542 worker->current_queue_depth -= g_ops_per_batch + 1; 543 return; 544 } 545 } 546 } else { 547 _drain_batch(worker); 548 } 549 } 550 551 static void 552 batch_done(void *cb_arg, int status) 553 { 554 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 555 556 assert(worker_batch->worker); 557 558 worker_batch->status = status; 559 spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch); 560 } 561 562 static uint32_t 563 _update_crc32c_iov(struct iovec *iov, int iovcnt, uint32_t crc32c) 564 { 565 int i; 566 567 for (i = 0; i < iovcnt; i++) { 568 assert(iov[i].iov_base != NULL); 569 assert(iov[i].iov_len != 0); 570 crc32c = spdk_crc32c_update(iov[i].iov_base, iov[i].iov_len, crc32c); 571 572 } 573 return crc32c; 574 } 575 576 static void 577 _accel_done(void *arg1) 578 { 579 struct ap_task *task = arg1; 580 struct worker_thread *worker = task->worker; 581 uint32_t sw_crc32c; 582 583 assert(worker); 584 assert(worker->current_queue_depth > 0); 585 586 if (g_verify && task->status == 0) { 587 switch (g_workload_selection) { 588 case ACCEL_CRC32C: 589 sw_crc32c = _update_crc32c_iov(task->iovs, task->iov_cnt, ~g_crc32c_seed); 590 if (*(uint32_t *)task->dst != sw_crc32c) { 591 SPDK_NOTICELOG("CRC-32C miscompare\n"); 592 worker->xfer_failed++; 593 } 594 break; 595 case ACCEL_COPY: 596 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 597 SPDK_NOTICELOG("Data miscompare\n"); 598 worker->xfer_failed++; 599 } 600 break; 601 case ACCEL_DUALCAST: 602 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 603 SPDK_NOTICELOG("Data miscompare, first destination\n"); 604 worker->xfer_failed++; 605 } 606 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 607 SPDK_NOTICELOG("Data miscompare, second destination\n"); 608 worker->xfer_failed++; 609 } 610 break; 611 case ACCEL_FILL: 612 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 613 SPDK_NOTICELOG("Data miscompare\n"); 614 worker->xfer_failed++; 615 } 616 break; 617 case ACCEL_COMPARE: 618 break; 619 default: 620 assert(false); 621 break; 622 } 623 } 624 625 if (task->expected_status == -EILSEQ) { 626 assert(task->status != 0); 627 worker->injected_miscompares++; 628 } else if (task->status) { 629 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 630 worker->xfer_failed++; 631 } 632 633 worker->xfer_completed++; 634 worker->current_queue_depth--; 635 636 if (!worker->is_draining) { 637 if (g_ops_per_batch == 0) { 638 _submit_single(worker, task); 639 worker->current_queue_depth++; 640 } else { 641 _build_batch(worker, task); 642 } 643 } else if (g_ops_per_batch > 0) { 644 _drain_batch(worker); 645 } else { 646 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 647 } 648 } 649 650 static int 651 dump_result(void) 652 { 653 uint64_t total_completed = 0; 654 uint64_t total_failed = 0; 655 uint64_t total_miscompared = 0; 656 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 657 struct worker_thread *worker = g_workers; 658 659 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 660 printf("------------------------------------------------------------------------\n"); 661 while (worker != NULL) { 662 663 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 664 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 665 (g_time_in_sec * 1024 * 1024); 666 667 total_completed += worker->xfer_completed; 668 total_failed += worker->xfer_failed; 669 total_miscompared += worker->injected_miscompares; 670 671 if (xfer_per_sec) { 672 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 673 worker->display.core, worker->display.thread, xfer_per_sec, 674 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 675 } 676 677 worker = worker->next; 678 } 679 680 total_xfer_per_sec = total_completed / g_time_in_sec; 681 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 682 (g_time_in_sec * 1024 * 1024); 683 684 printf("=========================================================================\n"); 685 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 686 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 687 688 return total_failed ? 1 : 0; 689 } 690 691 static inline void 692 _free_task_buffers_in_pool(struct worker_thread *worker) 693 { 694 struct ap_task *task; 695 696 assert(worker); 697 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 698 TAILQ_REMOVE(&worker->tasks_pool, task, link); 699 _free_task_buffers(task); 700 } 701 } 702 703 static int 704 _check_draining(void *arg) 705 { 706 struct worker_thread *worker = arg; 707 708 assert(worker); 709 710 if (worker->current_queue_depth == 0) { 711 _free_task_buffers_in_pool(worker); 712 spdk_poller_unregister(&worker->is_draining_poller); 713 unregister_worker(worker); 714 } 715 716 return -1; 717 } 718 719 static int 720 _worker_stop(void *arg) 721 { 722 struct worker_thread *worker = arg; 723 724 assert(worker); 725 726 spdk_poller_unregister(&worker->stop_poller); 727 728 /* now let the worker drain and check it's outstanding IO with a poller */ 729 worker->is_draining = true; 730 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 731 732 return 0; 733 } 734 735 static void 736 _init_thread(void *arg1) 737 { 738 struct worker_thread *worker; 739 struct ap_task *task; 740 int i, rc, num_batches; 741 int max_per_batch; 742 int remaining = g_queue_depth; 743 int num_tasks = g_queue_depth; 744 struct accel_batch *tmp; 745 struct accel_batch *worker_batch = NULL; 746 struct display_info *display = arg1; 747 748 worker = calloc(1, sizeof(*worker)); 749 if (worker == NULL) { 750 fprintf(stderr, "Unable to allocate worker\n"); 751 free(display); 752 return; 753 } 754 755 worker->display.core = display->core; 756 worker->display.thread = display->thread; 757 free(display); 758 worker->core = spdk_env_get_current_core(); 759 worker->thread = spdk_get_thread(); 760 pthread_mutex_lock(&g_workers_lock); 761 g_num_workers++; 762 worker->next = g_workers; 763 g_workers = worker; 764 pthread_mutex_unlock(&g_workers_lock); 765 worker->ch = spdk_accel_engine_get_io_channel(); 766 767 TAILQ_INIT(&worker->tasks_pool); 768 769 if (g_ops_per_batch > 0) { 770 771 max_per_batch = spdk_accel_batch_get_max(worker->ch); 772 assert(max_per_batch > 0); 773 774 if (g_ops_per_batch > max_per_batch) { 775 fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch); 776 g_ops_per_batch = max_per_batch; 777 } 778 779 if (g_ops_per_batch > g_queue_depth) { 780 fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth); 781 g_ops_per_batch = g_queue_depth; 782 } 783 784 TAILQ_INIT(&worker->in_prep_batches); 785 TAILQ_INIT(&worker->to_submit_batches); 786 TAILQ_INIT(&worker->in_use_batches); 787 788 /* A worker_batch will live on one of 3 lists: 789 * IN_PREP: as individual IOs complete new ones are built on on a 790 * worker_batch on this list until it reaches g_ops_per_batch. 791 * TO_SUBMIT: as batches are built up on IO completion they are moved 792 * to this list once they are full. This list is used in 793 * batch completion to start new batches. 794 * IN_USE: the worker_batch is outstanding and will be moved to in prep 795 * list when the batch is completed. 796 * 797 * So we need enough to cover Q depth loading and then one to replace 798 * each one of those and for when everything is outstanding there needs 799 * to be one extra batch to build up while the last batch is completing 800 * IO but before it's completed the batch command. 801 */ 802 num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1; 803 worker->batch_base = calloc(num_batches, sizeof(struct accel_batch)); 804 worker_batch = worker->batch_base; 805 for (i = 0; i < num_batches; i++) { 806 worker_batch->worker = worker; 807 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 808 worker_batch++; 809 } 810 } 811 812 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 813 if (worker->task_base == NULL) { 814 fprintf(stderr, "Could not allocate task base.\n"); 815 goto error; 816 } 817 818 task = worker->task_base; 819 for (i = 0; i < num_tasks; i++) { 820 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 821 if (_get_task_data_bufs(task)) { 822 fprintf(stderr, "Unable to get data bufs\n"); 823 goto error; 824 } 825 task++; 826 } 827 828 /* Register a poller that will stop the worker at time elapsed */ 829 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 830 g_time_in_sec * 1000000ULL); 831 832 /* If batching is enabled load up to the full Q depth before 833 * processing any completions, then ping pong between two batches, 834 * one processing and one being built up for when the other completes. 835 */ 836 if (g_ops_per_batch > 0) { 837 do { 838 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 839 if (worker_batch == NULL) { 840 goto error; 841 } 842 843 worker_batch->batch = spdk_accel_batch_create(worker->ch); 844 if (worker_batch->batch == NULL) { 845 raise(SIGINT); 846 break; 847 } 848 849 for (i = 0; i < g_ops_per_batch; i++) { 850 task = _get_task(worker); 851 if (task == NULL) { 852 goto error; 853 } 854 855 rc = _batch_prep_cmd(worker, task, worker_batch); 856 if (rc) { 857 fprintf(stderr, "error preping command\n"); 858 goto error; 859 } 860 } 861 862 /* for the batch operation itself. */ 863 task->worker->current_queue_depth++; 864 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 865 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 866 867 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 868 if (rc) { 869 fprintf(stderr, "error ending batch\n"); 870 goto error; 871 } 872 assert(remaining >= g_ops_per_batch); 873 remaining -= g_ops_per_batch; 874 } while (remaining > 0); 875 } 876 877 /* Submit as singles when no batching is enabled or we ran out of batches. */ 878 for (i = 0; i < remaining; i++) { 879 task = _get_task(worker); 880 if (task == NULL) { 881 goto error; 882 } 883 884 _submit_single(worker, task); 885 } 886 return; 887 error: 888 if (worker_batch && worker_batch->batch) { 889 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) { 890 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 891 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 892 } 893 } 894 895 _free_task_buffers_in_pool(worker); 896 free(worker->batch_base); 897 free(worker->task_base); 898 free(worker); 899 spdk_app_stop(-1); 900 } 901 902 static void 903 accel_done(void *cb_arg, int status) 904 { 905 struct ap_task *task = (struct ap_task *)cb_arg; 906 struct worker_thread *worker = task->worker; 907 908 assert(worker); 909 910 task->status = status; 911 spdk_thread_send_msg(worker->thread, _accel_done, task); 912 } 913 914 static void 915 accel_perf_start(void *arg1) 916 { 917 struct spdk_io_channel *accel_ch; 918 struct spdk_cpuset tmp_cpumask = {}; 919 char thread_name[32]; 920 uint32_t i; 921 int j; 922 struct spdk_thread *thread; 923 struct display_info *display; 924 925 accel_ch = spdk_accel_engine_get_io_channel(); 926 g_capabilites = spdk_accel_get_capabilities(accel_ch); 927 spdk_put_io_channel(accel_ch); 928 929 if ((g_capabilites & g_workload_selection) != g_workload_selection) { 930 SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n"); 931 SPDK_WARNLOG("The software engine will be used instead.\n\n"); 932 } 933 934 g_tsc_rate = spdk_get_ticks_hz(); 935 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 936 937 printf("Running for %d seconds...\n", g_time_in_sec); 938 fflush(stdout); 939 940 /* Create worker threads for each core that was specified. */ 941 SPDK_ENV_FOREACH_CORE(i) { 942 for (j = 0; j < g_threads_per_core; j++) { 943 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 944 spdk_cpuset_zero(&tmp_cpumask); 945 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 946 thread = spdk_thread_create(thread_name, &tmp_cpumask); 947 display = calloc(1, sizeof(*display)); 948 if (display == NULL) { 949 fprintf(stderr, "Unable to allocate memory\n"); 950 spdk_app_stop(-1); 951 return; 952 } 953 display->core = i; 954 display->thread = j; 955 spdk_thread_send_msg(thread, _init_thread, display); 956 } 957 } 958 } 959 960 int 961 main(int argc, char **argv) 962 { 963 struct spdk_app_opts opts = {}; 964 struct worker_thread *worker, *tmp; 965 966 pthread_mutex_init(&g_workers_lock, NULL); 967 spdk_app_opts_init(&opts, sizeof(opts)); 968 opts.reactor_mask = "0x1"; 969 if (spdk_app_parse_args(argc, argv, &opts, "C:o:q:t:yw:P:f:b:T:", NULL, parse_args, 970 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 971 g_rc = -1; 972 goto cleanup; 973 } 974 975 if ((g_workload_selection != ACCEL_COPY) && 976 (g_workload_selection != ACCEL_FILL) && 977 (g_workload_selection != ACCEL_CRC32C) && 978 (g_workload_selection != ACCEL_COMPARE) && 979 (g_workload_selection != ACCEL_DUALCAST)) { 980 usage(); 981 g_rc = -1; 982 goto cleanup; 983 } 984 985 if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) { 986 fprintf(stdout, "batch size must be a multiple of queue depth\n"); 987 usage(); 988 g_rc = -1; 989 goto cleanup; 990 } 991 992 if (g_workload_selection == ACCEL_CRC32C && 993 g_crc32c_chained_count == 0) { 994 usage(); 995 g_rc = -1; 996 goto cleanup; 997 } 998 999 dump_user_config(&opts); 1000 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 1001 if (g_rc) { 1002 SPDK_ERRLOG("ERROR starting application\n"); 1003 } 1004 1005 pthread_mutex_destroy(&g_workers_lock); 1006 1007 worker = g_workers; 1008 while (worker) { 1009 tmp = worker->next; 1010 free(worker); 1011 worker = tmp; 1012 } 1013 cleanup: 1014 spdk_app_fini(); 1015 return g_rc; 1016 } 1017