1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/thread.h" 36 #include "spdk/env.h" 37 #include "spdk/event.h" 38 #include "spdk/log.h" 39 #include "spdk/string.h" 40 #include "spdk/accel_engine.h" 41 #include "spdk/crc32.h" 42 #include "spdk/util.h" 43 44 #define DATA_PATTERN 0x5a 45 #define ALIGN_4K 0x1000 46 47 static bool g_using_sw_engine = false; 48 static uint64_t g_tsc_rate; 49 static uint64_t g_tsc_end; 50 static int g_rc; 51 static int g_xfer_size_bytes = 4096; 52 static int g_queue_depth = 32; 53 /* g_allocate_depth indicates how many tasks we allocate per worker. It will 54 * be at least as much as the queue depth. 55 */ 56 static int g_allocate_depth = 0; 57 static int g_ops_per_batch = 0; 58 static int g_threads_per_core = 1; 59 static int g_time_in_sec = 5; 60 static uint32_t g_crc32c_seed = 0; 61 static uint32_t g_crc32c_chained_count = 1; 62 static int g_fail_percent_goal = 0; 63 static uint8_t g_fill_pattern = 255; 64 static bool g_verify = false; 65 static const char *g_workload_type = NULL; 66 static enum accel_capability g_workload_selection; 67 static struct worker_thread *g_workers = NULL; 68 static int g_num_workers = 0; 69 static pthread_mutex_t g_workers_lock = PTHREAD_MUTEX_INITIALIZER; 70 71 struct worker_thread; 72 static void accel_done(void *ref, int status); 73 74 struct display_info { 75 int core; 76 int thread; 77 }; 78 79 struct ap_task { 80 void *src; 81 struct iovec *iovs; 82 uint32_t iov_cnt; 83 void *dst; 84 void *dst2; 85 uint32_t crc_dst; 86 struct worker_thread *worker; 87 int status; 88 int expected_status; /* used for the compare operation */ 89 TAILQ_ENTRY(ap_task) link; 90 }; 91 92 struct accel_batch { 93 int status; 94 int cmd_count; 95 struct spdk_accel_batch *batch; 96 struct worker_thread *worker; 97 TAILQ_ENTRY(accel_batch) link; 98 }; 99 100 struct worker_thread { 101 struct spdk_io_channel *ch; 102 uint64_t xfer_completed; 103 uint64_t xfer_failed; 104 uint64_t injected_miscompares; 105 uint64_t current_queue_depth; 106 TAILQ_HEAD(, ap_task) tasks_pool; 107 struct worker_thread *next; 108 unsigned core; 109 struct spdk_thread *thread; 110 bool is_draining; 111 struct spdk_poller *is_draining_poller; 112 struct spdk_poller *stop_poller; 113 void *task_base; 114 struct accel_batch *batch_base; 115 struct display_info display; 116 TAILQ_HEAD(, accel_batch) in_prep_batches; 117 TAILQ_HEAD(, accel_batch) in_use_batches; 118 TAILQ_HEAD(, accel_batch) to_submit_batches; 119 }; 120 121 static void 122 dump_user_config(struct spdk_app_opts *opts) 123 { 124 printf("SPDK Configuration:\n"); 125 printf("Core mask: %s\n\n", opts->reactor_mask); 126 printf("Accel Perf Configuration:\n"); 127 printf("Workload Type: %s\n", g_workload_type); 128 if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) { 129 printf("CRC-32C seed: %u\n", g_crc32c_seed); 130 printf("vector count %u\n", g_crc32c_chained_count); 131 } else if (g_workload_selection == ACCEL_FILL) { 132 printf("Fill pattern: 0x%x\n", g_fill_pattern); 133 } else if ((g_workload_selection == ACCEL_COMPARE) && g_fail_percent_goal > 0) { 134 printf("Failure inject: %u percent\n", g_fail_percent_goal); 135 } 136 if (g_workload_selection == ACCEL_COPY_CRC32C) { 137 printf("Vector size: %u bytes\n", g_xfer_size_bytes); 138 printf("Transfer size: %u bytes\n", g_xfer_size_bytes * g_crc32c_chained_count); 139 } else { 140 printf("Transfer size: %u bytes\n", g_xfer_size_bytes); 141 } 142 printf("Queue depth: %u\n", g_queue_depth); 143 printf("Allocate depth: %u\n", g_allocate_depth); 144 printf("# threads/core: %u\n", g_threads_per_core); 145 printf("Run time: %u seconds\n", g_time_in_sec); 146 if (g_ops_per_batch > 0) { 147 printf("Batching: %u operations\n", g_ops_per_batch); 148 } else { 149 printf("Batching: Disabled\n"); 150 } 151 printf("Verify: %s\n\n", g_verify ? "Yes" : "No"); 152 } 153 154 static void 155 usage(void) 156 { 157 printf("accel_perf options:\n"); 158 printf("\t[-h help message]\n"); 159 printf("\t[-q queue depth per core]\n"); 160 printf("\t[-C for crc32c workload, use this value to configure the io vector size to test (default 1)\n"); 161 printf("\t[-T number of threads per core\n"); 162 printf("\t[-n number of channels]\n"); 163 printf("\t[-o transfer size in bytes]\n"); 164 printf("\t[-t time in seconds]\n"); 165 printf("\t[-w workload type must be one of these: copy, fill, crc32c, copy_crc32c, compare, dualcast\n"); 166 printf("\t[-s for crc32c workload, use this seed value (default 0)\n"); 167 printf("\t[-P for compare workload, percentage of operations that should miscompare (percent, default 0)\n"); 168 printf("\t[-f for fill workload, use this BYTE value (default 255)\n"); 169 printf("\t[-y verify result if this switch is on]\n"); 170 printf("\t[-b batch this number of operations at a time (default 0 = disabled)]\n"); 171 printf("\t[-a tasks to allocate per core (default: same value as -q)]\n"); 172 printf("\t\tCan be used to spread operations across a wider range of memory.\n"); 173 } 174 175 static int 176 parse_args(int argc, char *argv) 177 { 178 int argval = 0; 179 180 switch (argc) { 181 case 'a': 182 case 'b': 183 case 'C': 184 case 'f': 185 case 'T': 186 case 'o': 187 case 'P': 188 case 'q': 189 case 's': 190 case 't': 191 argval = spdk_strtol(optarg, 10); 192 if (argval < 0) { 193 fprintf(stderr, "-%c option must be non-negative.\n", argc); 194 usage(); 195 return 1; 196 } 197 break; 198 default: 199 break; 200 }; 201 202 switch (argc) { 203 case 'a': 204 g_allocate_depth = argval; 205 break; 206 case 'b': 207 g_ops_per_batch = argval; 208 break; 209 case 'C': 210 g_crc32c_chained_count = argval; 211 break; 212 case 'f': 213 g_fill_pattern = (uint8_t)argval; 214 break; 215 case 'T': 216 g_threads_per_core = argval; 217 break; 218 case 'o': 219 g_xfer_size_bytes = argval; 220 break; 221 case 'P': 222 g_fail_percent_goal = argval; 223 break; 224 case 'q': 225 g_queue_depth = argval; 226 break; 227 case 's': 228 g_crc32c_seed = argval; 229 break; 230 case 't': 231 g_time_in_sec = argval; 232 break; 233 case 'y': 234 g_verify = true; 235 break; 236 case 'w': 237 g_workload_type = optarg; 238 if (!strcmp(g_workload_type, "copy")) { 239 g_workload_selection = ACCEL_COPY; 240 } else if (!strcmp(g_workload_type, "fill")) { 241 g_workload_selection = ACCEL_FILL; 242 } else if (!strcmp(g_workload_type, "crc32c")) { 243 g_workload_selection = ACCEL_CRC32C; 244 } else if (!strcmp(g_workload_type, "copy_crc32c")) { 245 g_workload_selection = ACCEL_COPY_CRC32C; 246 } else if (!strcmp(g_workload_type, "compare")) { 247 g_workload_selection = ACCEL_COMPARE; 248 } else if (!strcmp(g_workload_type, "dualcast")) { 249 g_workload_selection = ACCEL_DUALCAST; 250 } 251 break; 252 default: 253 usage(); 254 return 1; 255 } 256 257 return 0; 258 } 259 260 static int dump_result(void); 261 static void 262 unregister_worker(void *arg1) 263 { 264 struct worker_thread *worker = arg1; 265 266 free(worker->task_base); 267 free(worker->batch_base); 268 spdk_put_io_channel(worker->ch); 269 pthread_mutex_lock(&g_workers_lock); 270 assert(g_num_workers >= 1); 271 if (--g_num_workers == 0) { 272 pthread_mutex_unlock(&g_workers_lock); 273 g_rc = dump_result(); 274 spdk_app_stop(0); 275 } 276 pthread_mutex_unlock(&g_workers_lock); 277 } 278 279 static int 280 _get_task_data_bufs(struct ap_task *task) 281 { 282 uint32_t align = 0; 283 uint32_t i = 0; 284 int dst_buff_len = g_xfer_size_bytes; 285 286 /* For dualcast, the DSA HW requires 4K alignment on destination addresses but 287 * we do this for all engines to keep it simple. 288 */ 289 if (g_workload_selection == ACCEL_DUALCAST) { 290 align = ALIGN_4K; 291 } 292 293 if (g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) { 294 assert(g_crc32c_chained_count > 0); 295 task->iov_cnt = g_crc32c_chained_count; 296 task->iovs = calloc(task->iov_cnt, sizeof(struct iovec)); 297 if (!task->iovs) { 298 fprintf(stderr, "cannot allocated task->iovs fot task=%p\n", task); 299 return -ENOMEM; 300 } 301 302 if (g_workload_selection == ACCEL_COPY_CRC32C) { 303 dst_buff_len = g_xfer_size_bytes * g_crc32c_chained_count; 304 } 305 306 for (i = 0; i < task->iov_cnt; i++) { 307 task->iovs[i].iov_base = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 308 if (task->iovs[i].iov_base == NULL) { 309 return -ENOMEM; 310 } 311 memset(task->iovs[i].iov_base, DATA_PATTERN, g_xfer_size_bytes); 312 task->iovs[i].iov_len = g_xfer_size_bytes; 313 } 314 315 } else { 316 task->src = spdk_dma_zmalloc(g_xfer_size_bytes, 0, NULL); 317 if (task->src == NULL) { 318 fprintf(stderr, "Unable to alloc src buffer\n"); 319 return -ENOMEM; 320 } 321 322 /* For fill, set the entire src buffer so we can check if verify is enabled. */ 323 if (g_workload_selection == ACCEL_FILL) { 324 memset(task->src, g_fill_pattern, g_xfer_size_bytes); 325 } else { 326 memset(task->src, DATA_PATTERN, g_xfer_size_bytes); 327 } 328 } 329 330 if (g_workload_selection != ACCEL_COPY_CRC32C) { 331 task->dst = spdk_dma_zmalloc(dst_buff_len, align, NULL); 332 if (task->dst == NULL) { 333 fprintf(stderr, "Unable to alloc dst buffer\n"); 334 return -ENOMEM; 335 } 336 337 /* For compare we want the buffers to match, otherwise not. */ 338 if (g_workload_selection == ACCEL_COMPARE) { 339 memset(task->dst, DATA_PATTERN, dst_buff_len); 340 } else { 341 memset(task->dst, ~DATA_PATTERN, dst_buff_len); 342 } 343 } 344 345 if (g_workload_selection == ACCEL_DUALCAST) { 346 task->dst2 = spdk_dma_zmalloc(g_xfer_size_bytes, align, NULL); 347 if (task->dst2 == NULL) { 348 fprintf(stderr, "Unable to alloc dst buffer\n"); 349 return -ENOMEM; 350 } 351 memset(task->dst2, ~DATA_PATTERN, g_xfer_size_bytes); 352 } 353 354 return 0; 355 } 356 357 inline static struct ap_task * 358 _get_task(struct worker_thread *worker) 359 { 360 struct ap_task *task; 361 362 if (!TAILQ_EMPTY(&worker->tasks_pool)) { 363 task = TAILQ_FIRST(&worker->tasks_pool); 364 TAILQ_REMOVE(&worker->tasks_pool, task, link); 365 } else { 366 fprintf(stderr, "Unable to get ap_task\n"); 367 return NULL; 368 } 369 370 return task; 371 } 372 373 /* Submit one operation using the same ap task that just completed. */ 374 static void 375 _submit_single(struct worker_thread *worker, struct ap_task *task) 376 { 377 int random_num; 378 int rc = 0; 379 380 assert(worker); 381 382 switch (g_workload_selection) { 383 case ACCEL_COPY: 384 rc = spdk_accel_submit_copy(worker->ch, task->dst, task->src, 385 g_xfer_size_bytes, accel_done, task); 386 break; 387 case ACCEL_FILL: 388 /* For fill use the first byte of the task->dst buffer */ 389 rc = spdk_accel_submit_fill(worker->ch, task->dst, *(uint8_t *)task->src, 390 g_xfer_size_bytes, accel_done, task); 391 break; 392 case ACCEL_CRC32C: 393 rc = spdk_accel_submit_crc32cv(worker->ch, &task->crc_dst, 394 task->iovs, task->iov_cnt, g_crc32c_seed, 395 accel_done, task); 396 break; 397 case ACCEL_COPY_CRC32C: 398 rc = spdk_accel_submit_copy_crc32cv(worker->ch, task->dst, task->iovs, task->iov_cnt, 399 &task->crc_dst, g_crc32c_seed, accel_done, task); 400 break; 401 case ACCEL_COMPARE: 402 random_num = rand() % 100; 403 if (random_num < g_fail_percent_goal) { 404 task->expected_status = -EILSEQ; 405 *(uint8_t *)task->dst = ~DATA_PATTERN; 406 } else { 407 task->expected_status = 0; 408 *(uint8_t *)task->dst = DATA_PATTERN; 409 } 410 rc = spdk_accel_submit_compare(worker->ch, task->dst, task->src, 411 g_xfer_size_bytes, accel_done, task); 412 break; 413 case ACCEL_DUALCAST: 414 rc = spdk_accel_submit_dualcast(worker->ch, task->dst, task->dst2, 415 task->src, g_xfer_size_bytes, accel_done, task); 416 break; 417 default: 418 assert(false); 419 break; 420 421 } 422 423 if (rc) { 424 accel_done(task, rc); 425 } 426 } 427 428 static int 429 _batch_prep_cmd(struct worker_thread *worker, struct ap_task *task, 430 struct accel_batch *worker_batch) 431 { 432 struct spdk_accel_batch *batch = worker_batch->batch; 433 int rc = 0; 434 435 worker_batch->cmd_count++; 436 assert(worker_batch->cmd_count <= g_ops_per_batch); 437 438 switch (g_workload_selection) { 439 case ACCEL_COPY: 440 rc = spdk_accel_batch_prep_copy(worker->ch, batch, task->dst, 441 task->src, g_xfer_size_bytes, accel_done, task); 442 break; 443 case ACCEL_DUALCAST: 444 rc = spdk_accel_batch_prep_dualcast(worker->ch, batch, task->dst, task->dst2, 445 task->src, g_xfer_size_bytes, accel_done, task); 446 break; 447 case ACCEL_COMPARE: 448 rc = spdk_accel_batch_prep_compare(worker->ch, batch, task->dst, task->src, 449 g_xfer_size_bytes, accel_done, task); 450 break; 451 case ACCEL_FILL: 452 rc = spdk_accel_batch_prep_fill(worker->ch, batch, task->dst, 453 *(uint8_t *)task->src, 454 g_xfer_size_bytes, accel_done, task); 455 break; 456 case ACCEL_COPY_CRC32C: 457 rc = spdk_accel_batch_prep_copy_crc32c(worker->ch, batch, task->dst, task->src, &task->crc_dst, 458 g_crc32c_seed, g_xfer_size_bytes, accel_done, task); 459 break; 460 case ACCEL_CRC32C: 461 rc = spdk_accel_batch_prep_crc32cv(worker->ch, batch, &task->crc_dst, 462 task->iovs, task->iov_cnt, g_crc32c_seed, accel_done, task); 463 break; 464 default: 465 assert(false); 466 break; 467 } 468 469 return rc; 470 } 471 472 static void 473 _free_task_buffers(struct ap_task *task) 474 { 475 uint32_t i; 476 477 if (g_workload_selection == ACCEL_CRC32C) { 478 if (task->iovs) { 479 for (i = 0; i < task->iov_cnt; i++) { 480 if (task->iovs[i].iov_base) { 481 spdk_dma_free(task->iovs[i].iov_base); 482 } 483 } 484 free(task->iovs); 485 } 486 } else { 487 spdk_dma_free(task->src); 488 } 489 490 spdk_dma_free(task->dst); 491 if (g_workload_selection == ACCEL_DUALCAST) { 492 spdk_dma_free(task->dst2); 493 } 494 } 495 496 static void _batch_done(void *cb_arg); 497 static void 498 _build_batch(struct worker_thread *worker, struct ap_task *task) 499 { 500 struct accel_batch *worker_batch = NULL; 501 int rc; 502 503 assert(!TAILQ_EMPTY(&worker->in_prep_batches)); 504 505 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 506 507 /* If an accel batch hasn't been created yet do so now. */ 508 if (worker_batch->batch == NULL) { 509 worker_batch->batch = spdk_accel_batch_create(worker->ch); 510 if (worker_batch->batch == NULL) { 511 fprintf(stderr, "error unable to create new batch\n"); 512 return; 513 } 514 } 515 516 /* Prep the command re-using the last completed command's task */ 517 rc = _batch_prep_cmd(worker, task, worker_batch); 518 if (rc) { 519 fprintf(stderr, "error preping command for batch\n"); 520 goto error; 521 } 522 523 /* If this batch is full move it to the to_submit list so it gets 524 * submitted as batches complete. 525 */ 526 if (worker_batch->cmd_count == g_ops_per_batch) { 527 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 528 TAILQ_INSERT_TAIL(&worker->to_submit_batches, worker_batch, link); 529 } 530 531 return; 532 error: 533 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 534 535 } 536 537 static void batch_done(void *cb_arg, int status); 538 static void 539 _drain_batch(struct worker_thread *worker) 540 { 541 struct accel_batch *worker_batch, *tmp; 542 int rc; 543 544 /* submit any batches that were being built up. */ 545 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_prep_batches, link, tmp) { 546 if (worker_batch->cmd_count == 0) { 547 continue; 548 } 549 worker->current_queue_depth += worker_batch->cmd_count + 1; 550 551 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 552 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 553 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 554 if (rc == 0) { 555 worker_batch->cmd_count = 0; 556 } else { 557 fprintf(stderr, "error sending final batch\n"); 558 worker->current_queue_depth -= worker_batch->cmd_count + 1; 559 break; 560 } 561 } 562 } 563 564 static void 565 _batch_done(void *cb_arg) 566 { 567 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 568 struct worker_thread *worker = worker_batch->worker; 569 int rc; 570 571 assert(TAILQ_EMPTY(&worker->in_use_batches) == 0); 572 573 if (worker_batch->status) { 574 SPDK_ERRLOG("error %d\n", worker_batch->status); 575 } 576 577 worker->current_queue_depth--; 578 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 579 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 580 worker_batch->batch = NULL; 581 worker_batch->cmd_count = 0; 582 583 if (!worker->is_draining) { 584 worker_batch = TAILQ_FIRST(&worker->to_submit_batches); 585 if (worker_batch != NULL) { 586 587 assert(worker_batch->cmd_count == g_ops_per_batch); 588 589 /* Add one for the batch command itself. */ 590 worker->current_queue_depth += g_ops_per_batch + 1; 591 TAILQ_REMOVE(&worker->to_submit_batches, worker_batch, link); 592 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 593 594 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 595 if (rc) { 596 fprintf(stderr, "error ending batch\n"); 597 worker->current_queue_depth -= g_ops_per_batch + 1; 598 return; 599 } 600 } 601 } else { 602 _drain_batch(worker); 603 } 604 } 605 606 static void 607 batch_done(void *cb_arg, int status) 608 { 609 struct accel_batch *worker_batch = (struct accel_batch *)cb_arg; 610 611 assert(worker_batch->worker); 612 613 worker_batch->status = status; 614 spdk_thread_send_msg(worker_batch->worker->thread, _batch_done, worker_batch); 615 } 616 617 static int 618 _vector_memcmp(void *_dst, struct iovec *src_iovs, uint32_t iovcnt) 619 { 620 uint32_t i; 621 uint32_t ttl_len = 0; 622 uint8_t *dst = (uint8_t *)_dst; 623 624 for (i = 0; i < iovcnt; i++) { 625 if (memcmp(dst, src_iovs[i].iov_base, src_iovs[i].iov_len)) { 626 return -1; 627 } 628 dst += src_iovs[i].iov_len; 629 ttl_len += src_iovs[i].iov_len; 630 } 631 632 if (ttl_len != iovcnt * g_xfer_size_bytes) { 633 return -1; 634 } 635 636 return 0; 637 } 638 639 static void 640 _accel_done(void *arg1) 641 { 642 struct ap_task *task = arg1; 643 struct worker_thread *worker = task->worker; 644 uint32_t sw_crc32c; 645 646 assert(worker); 647 assert(worker->current_queue_depth > 0); 648 649 if (g_verify && task->status == 0) { 650 switch (g_workload_selection) { 651 case ACCEL_COPY_CRC32C: 652 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 653 if (task->crc_dst != sw_crc32c) { 654 SPDK_NOTICELOG("CRC-32C miscompare\n"); 655 worker->xfer_failed++; 656 } 657 if (_vector_memcmp(task->dst, task->iovs, task->iov_cnt)) { 658 SPDK_NOTICELOG("Data miscompare\n"); 659 worker->xfer_failed++; 660 } 661 break; 662 case ACCEL_CRC32C: 663 sw_crc32c = spdk_crc32c_iov_update(task->iovs, task->iov_cnt, ~g_crc32c_seed); 664 if (task->crc_dst != sw_crc32c) { 665 SPDK_NOTICELOG("CRC-32C miscompare\n"); 666 worker->xfer_failed++; 667 } 668 break; 669 case ACCEL_COPY: 670 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 671 SPDK_NOTICELOG("Data miscompare\n"); 672 worker->xfer_failed++; 673 } 674 break; 675 case ACCEL_DUALCAST: 676 if (memcmp(task->src, task->dst, g_xfer_size_bytes)) { 677 SPDK_NOTICELOG("Data miscompare, first destination\n"); 678 worker->xfer_failed++; 679 } 680 if (memcmp(task->src, task->dst2, g_xfer_size_bytes)) { 681 SPDK_NOTICELOG("Data miscompare, second destination\n"); 682 worker->xfer_failed++; 683 } 684 break; 685 case ACCEL_FILL: 686 if (memcmp(task->dst, task->src, g_xfer_size_bytes)) { 687 SPDK_NOTICELOG("Data miscompare\n"); 688 worker->xfer_failed++; 689 } 690 break; 691 case ACCEL_COMPARE: 692 break; 693 default: 694 assert(false); 695 break; 696 } 697 } 698 699 if (task->expected_status == -EILSEQ) { 700 assert(task->status != 0); 701 worker->injected_miscompares++; 702 } else if (task->status) { 703 /* Expected to pass but the accel engine reported an error (ex: COMPARE operation). */ 704 worker->xfer_failed++; 705 } 706 707 worker->xfer_completed++; 708 worker->current_queue_depth--; 709 710 if (!worker->is_draining) { 711 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 712 task = _get_task(worker); 713 if (g_ops_per_batch == 0) { 714 _submit_single(worker, task); 715 worker->current_queue_depth++; 716 } else { 717 _build_batch(worker, task); 718 } 719 } else if (g_ops_per_batch > 0) { 720 _drain_batch(worker); 721 } else { 722 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 723 } 724 } 725 726 static int 727 dump_result(void) 728 { 729 uint64_t total_completed = 0; 730 uint64_t total_failed = 0; 731 uint64_t total_miscompared = 0; 732 uint64_t total_xfer_per_sec, total_bw_in_MiBps; 733 struct worker_thread *worker = g_workers; 734 735 printf("\nCore,Thread Transfers Bandwidth Failed Miscompares\n"); 736 printf("------------------------------------------------------------------------\n"); 737 while (worker != NULL) { 738 739 uint64_t xfer_per_sec = worker->xfer_completed / g_time_in_sec; 740 uint64_t bw_in_MiBps = (worker->xfer_completed * g_xfer_size_bytes) / 741 (g_time_in_sec * 1024 * 1024); 742 743 total_completed += worker->xfer_completed; 744 total_failed += worker->xfer_failed; 745 total_miscompared += worker->injected_miscompares; 746 747 if (xfer_per_sec) { 748 printf("%u,%u%17" PRIu64 "/s%9" PRIu64 " MiB/s%7" PRIu64 " %11" PRIu64 "\n", 749 worker->display.core, worker->display.thread, xfer_per_sec, 750 bw_in_MiBps, worker->xfer_failed, worker->injected_miscompares); 751 } 752 753 worker = worker->next; 754 } 755 756 total_xfer_per_sec = total_completed / g_time_in_sec; 757 total_bw_in_MiBps = (total_completed * g_xfer_size_bytes) / 758 (g_time_in_sec * 1024 * 1024); 759 760 printf("=========================================================================\n"); 761 printf("Total:%15" PRIu64 "/s%9" PRIu64 " MiB/s%6" PRIu64 " %11" PRIu64"\n\n", 762 total_xfer_per_sec, total_bw_in_MiBps, total_failed, total_miscompared); 763 764 return total_failed ? 1 : 0; 765 } 766 767 static inline void 768 _free_task_buffers_in_pool(struct worker_thread *worker) 769 { 770 struct ap_task *task; 771 772 assert(worker); 773 while ((task = TAILQ_FIRST(&worker->tasks_pool))) { 774 TAILQ_REMOVE(&worker->tasks_pool, task, link); 775 _free_task_buffers(task); 776 } 777 } 778 779 static int 780 _check_draining(void *arg) 781 { 782 struct worker_thread *worker = arg; 783 784 assert(worker); 785 786 if (worker->current_queue_depth == 0) { 787 _free_task_buffers_in_pool(worker); 788 spdk_poller_unregister(&worker->is_draining_poller); 789 unregister_worker(worker); 790 } 791 792 return -1; 793 } 794 795 static int 796 _worker_stop(void *arg) 797 { 798 struct worker_thread *worker = arg; 799 800 assert(worker); 801 802 spdk_poller_unregister(&worker->stop_poller); 803 804 /* now let the worker drain and check it's outstanding IO with a poller */ 805 worker->is_draining = true; 806 worker->is_draining_poller = SPDK_POLLER_REGISTER(_check_draining, worker, 0); 807 808 return 0; 809 } 810 811 static void 812 _init_thread(void *arg1) 813 { 814 struct worker_thread *worker; 815 struct ap_task *task; 816 int i, rc, num_batches; 817 int max_per_batch; 818 int remaining = g_queue_depth; 819 int num_tasks = g_allocate_depth; 820 struct accel_batch *tmp; 821 struct accel_batch *worker_batch = NULL; 822 struct display_info *display = arg1; 823 uint64_t capabilities; 824 825 worker = calloc(1, sizeof(*worker)); 826 if (worker == NULL) { 827 fprintf(stderr, "Unable to allocate worker\n"); 828 free(display); 829 return; 830 } 831 832 worker->display.core = display->core; 833 worker->display.thread = display->thread; 834 free(display); 835 worker->core = spdk_env_get_current_core(); 836 worker->thread = spdk_get_thread(); 837 pthread_mutex_lock(&g_workers_lock); 838 g_num_workers++; 839 worker->next = g_workers; 840 g_workers = worker; 841 pthread_mutex_unlock(&g_workers_lock); 842 worker->ch = spdk_accel_engine_get_io_channel(); 843 844 if (g_num_workers == 1) { 845 capabilities = spdk_accel_get_capabilities(worker->ch); 846 if ((capabilities & g_workload_selection) != g_workload_selection) { 847 g_using_sw_engine = true; 848 SPDK_WARNLOG("The selected workload is not natively supported by the current engine\n"); 849 SPDK_WARNLOG("The software engine will be used instead.\n\n"); 850 } 851 } 852 853 TAILQ_INIT(&worker->tasks_pool); 854 855 if (g_ops_per_batch > 0) { 856 857 max_per_batch = spdk_accel_batch_get_max(worker->ch); 858 assert(max_per_batch > 0); 859 860 if (g_ops_per_batch > max_per_batch) { 861 fprintf(stderr, "Reducing requested batch amount to max supported of %d\n", max_per_batch); 862 g_ops_per_batch = max_per_batch; 863 } 864 865 if (g_ops_per_batch > g_queue_depth) { 866 fprintf(stderr, "Batch amount > queue depth, resetting to %d\n", g_queue_depth); 867 g_ops_per_batch = g_queue_depth; 868 } 869 870 TAILQ_INIT(&worker->in_prep_batches); 871 TAILQ_INIT(&worker->to_submit_batches); 872 TAILQ_INIT(&worker->in_use_batches); 873 874 /* A worker_batch will live on one of 3 lists: 875 * IN_PREP: as individual IOs complete new ones are built on on a 876 * worker_batch on this list until it reaches g_ops_per_batch. 877 * TO_SUBMIT: as batches are built up on IO completion they are moved 878 * to this list once they are full. This list is used in 879 * batch completion to start new batches. 880 * IN_USE: the worker_batch is outstanding and will be moved to in prep 881 * list when the batch is completed. 882 * 883 * So we need enough to cover Q depth loading and then one to replace 884 * each one of those and for when everything is outstanding there needs 885 * to be one extra batch to build up while the last batch is completing 886 * IO but before it's completed the batch command. 887 */ 888 num_batches = (g_queue_depth / g_ops_per_batch * 2) + 1; 889 worker->batch_base = calloc(num_batches, sizeof(struct accel_batch)); 890 worker_batch = worker->batch_base; 891 for (i = 0; i < num_batches; i++) { 892 worker_batch->worker = worker; 893 TAILQ_INSERT_TAIL(&worker->in_prep_batches, worker_batch, link); 894 worker_batch++; 895 } 896 } 897 898 worker->task_base = calloc(num_tasks, sizeof(struct ap_task)); 899 if (worker->task_base == NULL) { 900 fprintf(stderr, "Could not allocate task base.\n"); 901 goto error; 902 } 903 904 task = worker->task_base; 905 for (i = 0; i < num_tasks; i++) { 906 TAILQ_INSERT_TAIL(&worker->tasks_pool, task, link); 907 task->worker = worker; 908 if (_get_task_data_bufs(task)) { 909 fprintf(stderr, "Unable to get data bufs\n"); 910 goto error; 911 } 912 task++; 913 } 914 915 /* Register a poller that will stop the worker at time elapsed */ 916 worker->stop_poller = SPDK_POLLER_REGISTER(_worker_stop, worker, 917 g_time_in_sec * 1000000ULL); 918 919 /* If batching is enabled load up to the full Q depth before 920 * processing any completions, then ping pong between two batches, 921 * one processing and one being built up for when the other completes. 922 */ 923 if (g_ops_per_batch > 0) { 924 do { 925 worker_batch = TAILQ_FIRST(&worker->in_prep_batches); 926 if (worker_batch == NULL) { 927 goto error; 928 } 929 930 worker_batch->batch = spdk_accel_batch_create(worker->ch); 931 if (worker_batch->batch == NULL) { 932 raise(SIGINT); 933 break; 934 } 935 936 for (i = 0; i < g_ops_per_batch; i++) { 937 task = _get_task(worker); 938 worker->current_queue_depth++; 939 if (task == NULL) { 940 goto error; 941 } 942 943 rc = _batch_prep_cmd(worker, task, worker_batch); 944 if (rc) { 945 fprintf(stderr, "error preping command\n"); 946 goto error; 947 } 948 } 949 950 /* for the batch operation itself. */ 951 task->worker->current_queue_depth++; 952 TAILQ_REMOVE(&worker->in_prep_batches, worker_batch, link); 953 TAILQ_INSERT_TAIL(&worker->in_use_batches, worker_batch, link); 954 955 rc = spdk_accel_batch_submit(worker->ch, worker_batch->batch, batch_done, worker_batch); 956 if (rc) { 957 fprintf(stderr, "error ending batch\n"); 958 goto error; 959 } 960 assert(remaining >= g_ops_per_batch); 961 remaining -= g_ops_per_batch; 962 } while (remaining > 0); 963 } 964 965 /* Submit as singles when no batching is enabled or we ran out of batches. */ 966 for (i = 0; i < remaining; i++) { 967 task = _get_task(worker); 968 worker->current_queue_depth++; 969 if (task == NULL) { 970 goto error; 971 } 972 973 _submit_single(worker, task); 974 } 975 return; 976 error: 977 if (worker_batch && worker_batch->batch) { 978 TAILQ_FOREACH_SAFE(worker_batch, &worker->in_use_batches, link, tmp) { 979 spdk_accel_batch_cancel(worker->ch, worker_batch->batch); 980 TAILQ_REMOVE(&worker->in_use_batches, worker_batch, link); 981 } 982 } 983 984 _free_task_buffers_in_pool(worker); 985 free(worker->batch_base); 986 free(worker->task_base); 987 free(worker); 988 spdk_app_stop(-1); 989 } 990 991 static void 992 accel_done(void *cb_arg, int status) 993 { 994 struct ap_task *task = (struct ap_task *)cb_arg; 995 struct worker_thread *worker = task->worker; 996 997 assert(worker); 998 999 task->status = status; 1000 if (g_using_sw_engine == false) { 1001 _accel_done(task); 1002 } else { 1003 spdk_thread_send_msg(worker->thread, _accel_done, task); 1004 } 1005 } 1006 1007 static void 1008 accel_perf_start(void *arg1) 1009 { 1010 struct spdk_cpuset tmp_cpumask = {}; 1011 char thread_name[32]; 1012 uint32_t i; 1013 int j; 1014 struct spdk_thread *thread; 1015 struct display_info *display; 1016 1017 g_tsc_rate = spdk_get_ticks_hz(); 1018 g_tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate; 1019 1020 printf("Running for %d seconds...\n", g_time_in_sec); 1021 fflush(stdout); 1022 1023 /* Create worker threads for each core that was specified. */ 1024 SPDK_ENV_FOREACH_CORE(i) { 1025 for (j = 0; j < g_threads_per_core; j++) { 1026 snprintf(thread_name, sizeof(thread_name), "ap_worker_%u_%u", i, j); 1027 spdk_cpuset_zero(&tmp_cpumask); 1028 spdk_cpuset_set_cpu(&tmp_cpumask, i, true); 1029 thread = spdk_thread_create(thread_name, &tmp_cpumask); 1030 display = calloc(1, sizeof(*display)); 1031 if (display == NULL) { 1032 fprintf(stderr, "Unable to allocate memory\n"); 1033 spdk_app_stop(-1); 1034 return; 1035 } 1036 display->core = i; 1037 display->thread = j; 1038 spdk_thread_send_msg(thread, _init_thread, display); 1039 } 1040 } 1041 } 1042 1043 int 1044 main(int argc, char **argv) 1045 { 1046 struct spdk_app_opts opts = {}; 1047 struct worker_thread *worker, *tmp; 1048 1049 pthread_mutex_init(&g_workers_lock, NULL); 1050 spdk_app_opts_init(&opts, sizeof(opts)); 1051 opts.reactor_mask = "0x1"; 1052 if (spdk_app_parse_args(argc, argv, &opts, "a:C:o:q:t:yw:P:f:b:T:", NULL, parse_args, 1053 usage) != SPDK_APP_PARSE_ARGS_SUCCESS) { 1054 g_rc = -1; 1055 goto cleanup; 1056 } 1057 1058 if ((g_workload_selection != ACCEL_COPY) && 1059 (g_workload_selection != ACCEL_FILL) && 1060 (g_workload_selection != ACCEL_CRC32C) && 1061 (g_workload_selection != ACCEL_COPY_CRC32C) && 1062 (g_workload_selection != ACCEL_COMPARE) && 1063 (g_workload_selection != ACCEL_DUALCAST)) { 1064 usage(); 1065 g_rc = -1; 1066 goto cleanup; 1067 } 1068 1069 if (g_ops_per_batch > 0 && (g_queue_depth % g_ops_per_batch > 0)) { 1070 fprintf(stdout, "batch size must be a multiple of queue depth\n"); 1071 usage(); 1072 g_rc = -1; 1073 goto cleanup; 1074 } 1075 1076 if (g_allocate_depth > 0 && g_queue_depth > g_allocate_depth) { 1077 fprintf(stdout, "allocate depth must be at least as big as queue depth\n"); 1078 usage(); 1079 g_rc = -1; 1080 goto cleanup; 1081 } 1082 1083 if (g_allocate_depth == 0) { 1084 g_allocate_depth = g_queue_depth; 1085 } 1086 1087 if ((g_workload_selection == ACCEL_CRC32C || g_workload_selection == ACCEL_COPY_CRC32C) && 1088 g_crc32c_chained_count == 0) { 1089 usage(); 1090 g_rc = -1; 1091 goto cleanup; 1092 } 1093 1094 dump_user_config(&opts); 1095 g_rc = spdk_app_start(&opts, accel_perf_start, NULL); 1096 if (g_rc) { 1097 SPDK_ERRLOG("ERROR starting application\n"); 1098 } 1099 1100 pthread_mutex_destroy(&g_workers_lock); 1101 1102 worker = g_workers; 1103 while (worker) { 1104 tmp = worker->next; 1105 free(worker); 1106 worker = tmp; 1107 } 1108 cleanup: 1109 spdk_app_fini(); 1110 return g_rc; 1111 } 1112